Commit 2d9df63e authored by Philip Carns's avatar Philip Carns

implement bulk read, doesn't work yet

parent 09d611fb
Pipeline #386 skipped
......@@ -242,3 +242,84 @@ static void bake_bulk_get_size_ult(hg_handle_t handle)
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_get_size_ult)
/* TODO consolidate with write handler; read and write are nearly identical */
/* service a remote RPC that reads to a bulk region */
static void bake_bulk_read_ult(hg_handle_t handle)
{
bake_bulk_read_out_t out;
bake_bulk_read_in_t in;
hg_return_t hret;
char* buffer;
hg_size_t size;
hg_bulk_t bulk_handle;
struct hg_info *hgi;
margo_instance_id mid;
pmemobj_region_id_t* prid;
printf("Got RPC request to read bulk region.\n");
memset(&out, 0, sizeof(out));
hgi = HG_Get_info(handle);
assert(hgi);
mid = margo_hg_class_to_instance(hgi->hg_class);
hret = HG_Get_input(handle, &in);
if(hret != HG_SUCCESS)
{
out.ret = -1;
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
prid = (pmemobj_region_id_t*)in.rid.data;
/* find memory address for target object */
buffer = pmemobj_direct(prid->oid);
if(!buffer)
{
out.ret = -1;
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
size = HG_Bulk_get_size(in.bulk_handle);
/* create bulk handle for local side of transfer */
hret = HG_Bulk_create(hgi->hg_class, 1, (void**)(&buffer), &size,
HG_BULK_READ_ONLY, &bulk_handle);
if(hret != HG_SUCCESS)
{
out.ret = -1;
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
hret = margo_bulk_transfer(mid, HG_BULK_PUSH, hgi->addr, in.bulk_handle,
0, bulk_handle, 0, size);
if(hret != HG_SUCCESS)
{
out.ret = -1;
HG_Bulk_free(bulk_handle);
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
out.ret = 0;
HG_Bulk_free(bulk_handle);
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_read_ult)
......@@ -53,6 +53,16 @@ MERCURY_GEN_PROC(bake_bulk_get_size_out_t,
((uint64_t)(size)))
DECLARE_MARGO_RPC_HANDLER(bake_bulk_get_size_ult)
/* bulk read */
MERCURY_GEN_PROC(bake_bulk_read_in_t,
((bake_target_id_t)(bti))\
((bake_bulk_region_id_t)(rid))\
((uint64_t)(region_offset))\
((hg_bulk_t)(bulk_handle)))
MERCURY_GEN_PROC(bake_bulk_read_out_t,
((int32_t)(ret)))
DECLARE_MARGO_RPC_HANDLER(bake_bulk_read_ult)
/* TODO: where should the encoder defs live? Not in bake-bulk-rpc.c because
......
......@@ -114,6 +114,9 @@ int main(int argc, char **argv)
MERCURY_REGISTER(hg_class, "bake_bulk_get_size_rpc", bake_bulk_get_size_in_t,
bake_bulk_get_size_out_t,
bake_bulk_get_size_ult_handler);
MERCURY_REGISTER(hg_class, "bake_bulk_read_rpc", bake_bulk_read_in_t,
bake_bulk_read_out_t,
bake_bulk_read_ult_handler);
/* NOTE: at this point this server ULT has two options. It can wait on
* whatever mechanism it wants to (however long the daemon should run and
......
......@@ -28,6 +28,7 @@ static hg_id_t g_bake_bulk_create_id;
static hg_id_t g_bake_bulk_write_id;
static hg_id_t g_bake_bulk_persist_id;
static hg_id_t g_bake_bulk_get_size_id;
static hg_id_t g_bake_bulk_read_id;
int bake_probe_instance(
const char *mercury_dest,
......@@ -91,6 +92,12 @@ int bake_probe_instance(
bake_bulk_get_size_in_t,
bake_bulk_get_size_out_t,
NULL);
g_bake_bulk_read_id =
MERCURY_REGISTER(g_binst.hg_class,
"bake_bulk_read_rpc",
bake_bulk_read_in_t,
bake_bulk_read_out_t,
NULL);
g_binst.mid = margo_init(0, 0, g_binst.hg_context);
if(!g_binst.mid)
......@@ -342,7 +349,54 @@ int bake_bulk_read(
void *buf,
uint64_t buf_size)
{
return(-1);
hg_return_t hret;
hg_handle_t handle;
bake_bulk_read_in_t in;
bake_bulk_read_out_t out;
int ret;
in.bti = bti;
in.rid = rid;
in.region_offset = region_offset;
hret = HG_Bulk_create(g_binst.hg_class, 1, (void**)(&buf), &buf_size,
HG_BULK_WRITE_ONLY, &in.bulk_handle);
if(hret != HG_SUCCESS)
{
return(-1);
}
/* create handle */
hret = HG_Create(g_binst.hg_context, g_binst.dest,
g_bake_bulk_read_id, &handle);
if(hret != HG_SUCCESS)
{
HG_Bulk_free(in.bulk_handle);
return(-1);
}
hret = margo_forward(g_binst.mid, handle, &in);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
HG_Bulk_free(in.bulk_handle);
return(-1);
}
hret = HG_Get_output(handle, &out);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
HG_Bulk_free(in.bulk_handle);
return(-1);
}
ret = out.ret;
HG_Free_output(handle, &out);
HG_Destroy(handle);
HG_Bulk_free(in.bulk_handle);
return(ret);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment