Commit e88a74ef authored by Philip Carns's avatar Philip Carns

api, rpc, and test for retrieving size of region

parent 446c68fc
Pipeline #380 skipped
......@@ -63,7 +63,7 @@ int bake_bulk_create(
* processes) perform overlapping writes.
*
* @param [in] bti BAKE target identifier
* @param [in] rid identifier for new region
* @param [in] rid identifier for region
* @param [in] region_offset offset into the target region to write
* @param [in] buf local memory buffer to write
* @param [in] buf_size size of local memory buffer to write
......@@ -81,13 +81,26 @@ int bake_bulk_write(
* and reads may be performed on the region.
*
* @param [in] bti BAKE target identifier
* @param [in] rid identifier for new region
* @param [in] rid identifier for region
* @returns 0 on success, -1 on failure
*/
int bake_bulk_persist(
bake_target_id_t bti,
bake_bulk_region_id_t rid);
/**
* Check the size of an existing region.
*
* @param [in] bti BAKE target identifier
* @param [in] rid identifier for region
* @param [out] size sizes of region
* @returns 0 on success, -1 on failure
*/
int bake_bulk_get_size(
bake_target_id_t bti,
bake_bulk_region_id_t rid,
uint64_t *region_size);
/**
* Reads from a region that was previously persisted with bake_bulk_persist().
*
......
......@@ -207,3 +207,37 @@ static void bake_bulk_persist_ult(hg_handle_t handle)
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_persist_ult)
/* service a remote RPC that retrieves the size of a bulk region */
static void bake_bulk_get_size_ult(hg_handle_t handle)
{
bake_bulk_get_size_out_t out;
bake_bulk_get_size_in_t in;
hg_return_t hret;
pmemobj_region_id_t* prid;
printf("Got RPC request to get_size bulk region.\n");
memset(&out, 0, sizeof(out));
hret = HG_Get_input(handle, &in);
if(hret != HG_SUCCESS)
{
out.ret = -1;
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
prid = (pmemobj_region_id_t*)in.rid.data;
/* kind of cheating here; the size is encoded in the RID */
out.size = prid->size;
out.ret = 0;
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_get_size_ult)
......@@ -44,6 +44,15 @@ MERCURY_GEN_PROC(bake_bulk_persist_out_t,
((int32_t)(ret)))
DECLARE_MARGO_RPC_HANDLER(bake_bulk_persist_ult)
/* bulk get size */
MERCURY_GEN_PROC(bake_bulk_get_size_in_t,
((bake_target_id_t)(bti))\
((bake_bulk_region_id_t)(rid)))
MERCURY_GEN_PROC(bake_bulk_get_size_out_t,
((int32_t)(ret))\
((uint64_t)(size)))
DECLARE_MARGO_RPC_HANDLER(bake_bulk_get_size_ult)
/* TODO: where should the encoder defs live? Not in bake-bulk-rpc.c because
......
......@@ -111,6 +111,9 @@ int main(int argc, char **argv)
MERCURY_REGISTER(hg_class, "bake_bulk_persist_rpc", bake_bulk_persist_in_t,
bake_bulk_persist_out_t,
bake_bulk_persist_ult_handler);
MERCURY_REGISTER(hg_class, "bake_bulk_get_size_rpc", bake_bulk_get_size_in_t,
bake_bulk_get_size_out_t,
bake_bulk_get_size_ult_handler);
/* NOTE: at this point this server ULT has two options. It can wait on
* whatever mechanism it wants to (however long the daemon should run and
......
......@@ -27,6 +27,7 @@ static hg_id_t g_bake_bulk_shutdown_id;
static hg_id_t g_bake_bulk_create_id;
static hg_id_t g_bake_bulk_write_id;
static hg_id_t g_bake_bulk_persist_id;
static hg_id_t g_bake_bulk_get_size_id;
int bake_probe_instance(
const char *mercury_dest,
......@@ -84,7 +85,12 @@ int bake_probe_instance(
bake_bulk_persist_in_t,
bake_bulk_persist_out_t,
NULL);
g_bake_bulk_get_size_id =
MERCURY_REGISTER(g_binst.hg_class,
"bake_bulk_get_size_rpc",
bake_bulk_get_size_in_t,
bake_bulk_get_size_out_t,
NULL);
g_binst.mid = margo_init(0, 0, g_binst.hg_context);
if(!g_binst.mid)
......@@ -285,6 +291,50 @@ int bake_bulk_persist(
return(ret);
}
int bake_bulk_get_size(
bake_target_id_t bti,
bake_bulk_region_id_t rid,
uint64_t *region_size)
{
hg_return_t hret;
hg_handle_t handle;
bake_bulk_get_size_in_t in;
bake_bulk_get_size_out_t out;
int ret;
in.bti = bti;
in.rid = rid;
/* create handle */
hret = HG_Create(g_binst.hg_context, g_binst.dest,
g_bake_bulk_get_size_id, &handle);
if(hret != HG_SUCCESS)
{
return(-1);
}
hret = margo_forward(g_binst.mid, handle, &in);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
return(-1);
}
hret = HG_Get_output(handle, &out);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
return(-1);
}
ret = out.ret;
*region_size = out.size;
HG_Free_output(handle, &out);
HG_Destroy(handle);
return(ret);
}
int bake_bulk_read(
bake_target_id_t bti,
bake_bulk_region_id_t rid,
......
......@@ -29,6 +29,7 @@ int main(int argc, char **argv)
char* local_region;
int region_fd;
char region_file[128];
uint64_t check_size;
if(argc != 3)
{
......@@ -131,6 +132,25 @@ int main(int argc, char **argv)
return(-1);
}
/* safety check size */
ret = bake_bulk_get_size(bti, rid, &check_size);
if(ret != 0)
{
bake_release_instance(bti);
ABT_finalize();
fprintf(stderr, "Error: bake_bulk_get_size()\n");
return(-1);
}
bake_release_instance(bti);
if(check_size != statbuf.st_size)
{
ABT_finalize();
fprintf(stderr, "Error: size mismatch!\n");
return(-1);
}
sprintf(region_file, "/tmp/bb-copy-rid.XXXXXX");
region_fd = mkstemp(region_file);
if(region_fd < 0)
......@@ -150,7 +170,6 @@ int main(int argc, char **argv)
}
}
bake_release_instance(bti);
ABT_finalize();
return(0);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment