diff --git a/src/bake-bulk-rpc.c b/src/bake-bulk-rpc.c index 5a8c21a36886f8583f578ed9a6b371b902586310..e17b60e37d8f8073a4570036efbc40b83a651169 100644 --- a/src/bake-bulk-rpc.c +++ b/src/bake-bulk-rpc.c @@ -388,6 +388,61 @@ static void bake_bulk_read_ult(hg_handle_t handle) } DEFINE_MARGO_RPC_HANDLER(bake_bulk_read_ult) + +/* service a remote RPC that reads to a bulk region and eagerly sends + * response */ +static void bake_bulk_eager_read_ult(hg_handle_t handle) +{ + bake_bulk_eager_read_out_t out; + bake_bulk_eager_read_in_t in; + hg_return_t hret; + char* buffer; + hg_size_t size; + struct hg_info *hgi; + margo_instance_id mid; + pmemobj_region_id_t* prid; + + // printf("Got RPC request to read bulk region.\n"); + + memset(&out, 0, sizeof(out)); + + hgi = HG_Get_info(handle); + assert(hgi); + mid = margo_hg_class_to_instance(hgi->hg_class); + + hret = HG_Get_input(handle, &in); + if(hret != HG_SUCCESS) + { + out.ret = -1; + HG_Respond(handle, NULL, NULL, &out); + HG_Destroy(handle); + return; + } + + prid = (pmemobj_region_id_t*)in.rid.data; + + /* find memory address for target object */ + buffer = pmemobj_direct(prid->oid); + if(!buffer) + { + out.ret = -1; + HG_Free_input(handle, &in); + HG_Respond(handle, NULL, NULL, &out); + HG_Destroy(handle); + return; + } + + out.ret = 0; + out.buffer = buffer; + out.size = in.size; + + HG_Free_input(handle, &in); + HG_Respond(handle, NULL, NULL, &out); + HG_Destroy(handle); + return; +} +DEFINE_MARGO_RPC_HANDLER(bake_bulk_eager_read_ult) + /* service a remote RPC that probes for a target id */ static void bake_bulk_probe_ult(hg_handle_t handle) { diff --git a/src/bake-bulk-rpc.h b/src/bake-bulk-rpc.h index f198fd60594641e759120d96281e3e97b940873a..3fb13484a99fedea52bba1439a5ee3a592184a48 100644 --- a/src/bake-bulk-rpc.h +++ b/src/bake-bulk-rpc.h @@ -77,6 +77,21 @@ MERCURY_GEN_PROC(bake_bulk_read_out_t, ((int32_t)(ret))) DECLARE_MARGO_RPC_HANDLER(bake_bulk_read_ult) +/* bulk eager read */ +MERCURY_GEN_PROC(bake_bulk_eager_read_in_t, + ((bake_target_id_t)(bti))\ + ((bake_bulk_region_id_t)(rid))\ + ((uint64_t)(region_offset))\ + ((uint32_t)(size))) +typedef struct +{ + int32_t ret; + uint32_t size; + char * buffer; +} bake_bulk_eager_read_out_t; +static inline hg_return_t hg_proc_bake_bulk_eager_read_out_t(hg_proc_t proc, void *v_out_p); +DECLARE_MARGO_RPC_HANDLER(bake_bulk_eager_read_ult) + /* bulk probe */ MERCURY_GEN_PROC(bake_bulk_probe_out_t, ((int32_t)(ret))\ @@ -132,11 +147,27 @@ static inline hg_return_t hg_proc_bake_bulk_eager_write_in_t(hg_proc_t proc, voi hg_proc_uint64_t(proc, &in->region_offset); hg_proc_uint32_t(proc, &in->size); if(hg_proc_get_op(proc) == HG_DECODE) - hg_proc_memcpy_decode_in_place(proc, &in->buffer, in->size); + hg_proc_memcpy_decode_in_place(proc, (void**)(&in->buffer), in->size); else hg_proc_memcpy(proc, in->buffer, in->size); return(HG_SUCCESS); } + +static inline hg_return_t hg_proc_bake_bulk_eager_read_out_t(hg_proc_t proc, void *v_out_p) +{ + /* TODO: error checking */ + bake_bulk_eager_read_out_t *out = v_out_p; + + hg_proc_int32_t(proc, &out->ret); + hg_proc_uint32_t(proc, &out->size); + if(hg_proc_get_op(proc) == HG_DECODE) + hg_proc_memcpy_decode_in_place(proc, (void**)(&out->buffer), out->size); + else + hg_proc_memcpy(proc, out->buffer, out->size); + + return(HG_SUCCESS); +} + #endif /* __BAKE_BULK_RPC */ diff --git a/src/bake-bulk-server.c b/src/bake-bulk-server.c index cac3fff2567ea99e28c32bcd112fb30f9f72aa1b..9b4bec769a56f382c7a108718b4c405f5137d858 100644 --- a/src/bake-bulk-server.c +++ b/src/bake-bulk-server.c @@ -130,6 +130,9 @@ int main(int argc, char **argv) MERCURY_REGISTER(hg_class, "bake_bulk_eager_write_rpc", bake_bulk_eager_write_in_t, bake_bulk_eager_write_out_t, bake_bulk_eager_write_ult_handler); + MERCURY_REGISTER(hg_class, "bake_bulk_eager_read_rpc", bake_bulk_eager_read_in_t, + bake_bulk_eager_read_out_t, + bake_bulk_eager_read_ult_handler); MERCURY_REGISTER(hg_class, "bake_bulk_persist_rpc", bake_bulk_persist_in_t, bake_bulk_persist_out_t, bake_bulk_persist_ult_handler); diff --git a/src/bake-bulk.c b/src/bake-bulk.c index a4ff49ed89fb08a871e2cdb60a2bd9da693377bc..f009da166e8d40ee328e0b649770a8110fbd7071 100644 --- a/src/bake-bulk.c +++ b/src/bake-bulk.c @@ -25,6 +25,7 @@ struct hg_instance hg_id_t bake_bulk_shutdown_id; hg_id_t bake_bulk_create_id; hg_id_t bake_bulk_eager_write_id; + hg_id_t bake_bulk_eager_read_id; hg_id_t bake_bulk_write_id; hg_id_t bake_bulk_persist_id; hg_id_t bake_bulk_get_size_id; @@ -49,6 +50,13 @@ struct hg_instance g_hginst = { .refct = 0, }; +static int bake_bulk_eager_read( + bake_target_id_t bti, + bake_bulk_region_id_t rid, + uint64_t region_offset, + void *buf, + uint64_t buf_size); + static int hg_instance_init(const char *mercury_dest) { char hg_na[64] = {0}; @@ -109,6 +117,12 @@ static int hg_instance_init(const char *mercury_dest) bake_bulk_eager_write_in_t, bake_bulk_eager_write_out_t, NULL); + g_hginst.bake_bulk_eager_read_id = + MERCURY_REGISTER(g_hginst.hg_class, + "bake_bulk_eager_read_rpc", + bake_bulk_eager_read_in_t, + bake_bulk_eager_read_out_t, + NULL); g_hginst.bake_bulk_persist_id = MERCURY_REGISTER(g_hginst.hg_class, "bake_bulk_persist_rpc", @@ -284,7 +298,7 @@ int bake_shutdown_service(bake_target_id_t bti) return(0); } -int bake_bulk_eager_write( +static int bake_bulk_eager_write( bake_target_id_t bti, bake_bulk_region_id_t rid, uint64_t region_offset, @@ -596,6 +610,11 @@ int bake_bulk_read( int ret; struct bake_instance *instance = NULL; + if(buf_size <= BAKE_BULK_EAGER_LIMIT) + { + return(bake_bulk_eager_read(bti, rid, region_offset, buf, buf_size)); + } + HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance); if(!instance) return(-1); @@ -645,3 +664,58 @@ int bake_bulk_read( } +static int bake_bulk_eager_read( + bake_target_id_t bti, + bake_bulk_region_id_t rid, + uint64_t region_offset, + void *buf, + uint64_t buf_size) +{ + hg_return_t hret; + hg_handle_t handle; + bake_bulk_eager_read_in_t in; + bake_bulk_eager_read_out_t out; + int ret; + struct bake_instance *instance = NULL; + + HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance); + if(!instance) + return(-1); + + in.bti = bti; + in.rid = rid; + in.region_offset = region_offset; + in.size = buf_size; + + /* create handle */ + hret = HG_Create(g_hginst.hg_context, instance->dest, + g_hginst.bake_bulk_eager_read_id, &handle); + if(hret != HG_SUCCESS) + { + return(-1); + } + + hret = margo_forward(g_hginst.mid, handle, &in); + if(hret != HG_SUCCESS) + { + HG_Destroy(handle); + return(-1); + } + + hret = HG_Get_output(handle, &out); + if(hret != HG_SUCCESS) + { + HG_Destroy(handle); + return(-1); + } + + ret = out.ret; + if(ret == 0) + memcpy(buf, out.buffer, out.size); + + HG_Free_output(handle, &out); + HG_Destroy(handle); + return(ret); +} + +