Commit 3fa30d81 authored by Philip Carns's avatar Philip Carns

"eager" read path

parent 7118521f
...@@ -388,6 +388,61 @@ static void bake_bulk_read_ult(hg_handle_t handle) ...@@ -388,6 +388,61 @@ static void bake_bulk_read_ult(hg_handle_t handle)
} }
DEFINE_MARGO_RPC_HANDLER(bake_bulk_read_ult) DEFINE_MARGO_RPC_HANDLER(bake_bulk_read_ult)
/* service a remote RPC that reads to a bulk region and eagerly sends
* response */
static void bake_bulk_eager_read_ult(hg_handle_t handle)
{
bake_bulk_eager_read_out_t out;
bake_bulk_eager_read_in_t in;
hg_return_t hret;
char* buffer;
hg_size_t size;
struct hg_info *hgi;
margo_instance_id mid;
pmemobj_region_id_t* prid;
// printf("Got RPC request to read bulk region.\n");
memset(&out, 0, sizeof(out));
hgi = HG_Get_info(handle);
assert(hgi);
mid = margo_hg_class_to_instance(hgi->hg_class);
hret = HG_Get_input(handle, &in);
if(hret != HG_SUCCESS)
{
out.ret = -1;
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
prid = (pmemobj_region_id_t*)in.rid.data;
/* find memory address for target object */
buffer = pmemobj_direct(prid->oid);
if(!buffer)
{
out.ret = -1;
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
out.ret = 0;
out.buffer = buffer;
out.size = in.size;
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_eager_read_ult)
/* service a remote RPC that probes for a target id */ /* service a remote RPC that probes for a target id */
static void bake_bulk_probe_ult(hg_handle_t handle) static void bake_bulk_probe_ult(hg_handle_t handle)
{ {
......
...@@ -77,6 +77,21 @@ MERCURY_GEN_PROC(bake_bulk_read_out_t, ...@@ -77,6 +77,21 @@ MERCURY_GEN_PROC(bake_bulk_read_out_t,
((int32_t)(ret))) ((int32_t)(ret)))
DECLARE_MARGO_RPC_HANDLER(bake_bulk_read_ult) DECLARE_MARGO_RPC_HANDLER(bake_bulk_read_ult)
/* bulk eager read */
MERCURY_GEN_PROC(bake_bulk_eager_read_in_t,
((bake_target_id_t)(bti))\
((bake_bulk_region_id_t)(rid))\
((uint64_t)(region_offset))\
((uint32_t)(size)))
typedef struct
{
int32_t ret;
uint32_t size;
char * buffer;
} bake_bulk_eager_read_out_t;
static inline hg_return_t hg_proc_bake_bulk_eager_read_out_t(hg_proc_t proc, void *v_out_p);
DECLARE_MARGO_RPC_HANDLER(bake_bulk_eager_read_ult)
/* bulk probe */ /* bulk probe */
MERCURY_GEN_PROC(bake_bulk_probe_out_t, MERCURY_GEN_PROC(bake_bulk_probe_out_t,
((int32_t)(ret))\ ((int32_t)(ret))\
...@@ -132,11 +147,27 @@ static inline hg_return_t hg_proc_bake_bulk_eager_write_in_t(hg_proc_t proc, voi ...@@ -132,11 +147,27 @@ static inline hg_return_t hg_proc_bake_bulk_eager_write_in_t(hg_proc_t proc, voi
hg_proc_uint64_t(proc, &in->region_offset); hg_proc_uint64_t(proc, &in->region_offset);
hg_proc_uint32_t(proc, &in->size); hg_proc_uint32_t(proc, &in->size);
if(hg_proc_get_op(proc) == HG_DECODE) if(hg_proc_get_op(proc) == HG_DECODE)
hg_proc_memcpy_decode_in_place(proc, &in->buffer, in->size); hg_proc_memcpy_decode_in_place(proc, (void**)(&in->buffer), in->size);
else else
hg_proc_memcpy(proc, in->buffer, in->size); hg_proc_memcpy(proc, in->buffer, in->size);
return(HG_SUCCESS); return(HG_SUCCESS);
} }
static inline hg_return_t hg_proc_bake_bulk_eager_read_out_t(hg_proc_t proc, void *v_out_p)
{
/* TODO: error checking */
bake_bulk_eager_read_out_t *out = v_out_p;
hg_proc_int32_t(proc, &out->ret);
hg_proc_uint32_t(proc, &out->size);
if(hg_proc_get_op(proc) == HG_DECODE)
hg_proc_memcpy_decode_in_place(proc, (void**)(&out->buffer), out->size);
else
hg_proc_memcpy(proc, out->buffer, out->size);
return(HG_SUCCESS);
}
#endif /* __BAKE_BULK_RPC */ #endif /* __BAKE_BULK_RPC */
...@@ -130,6 +130,9 @@ int main(int argc, char **argv) ...@@ -130,6 +130,9 @@ int main(int argc, char **argv)
MERCURY_REGISTER(hg_class, "bake_bulk_eager_write_rpc", bake_bulk_eager_write_in_t, MERCURY_REGISTER(hg_class, "bake_bulk_eager_write_rpc", bake_bulk_eager_write_in_t,
bake_bulk_eager_write_out_t, bake_bulk_eager_write_out_t,
bake_bulk_eager_write_ult_handler); bake_bulk_eager_write_ult_handler);
MERCURY_REGISTER(hg_class, "bake_bulk_eager_read_rpc", bake_bulk_eager_read_in_t,
bake_bulk_eager_read_out_t,
bake_bulk_eager_read_ult_handler);
MERCURY_REGISTER(hg_class, "bake_bulk_persist_rpc", bake_bulk_persist_in_t, MERCURY_REGISTER(hg_class, "bake_bulk_persist_rpc", bake_bulk_persist_in_t,
bake_bulk_persist_out_t, bake_bulk_persist_out_t,
bake_bulk_persist_ult_handler); bake_bulk_persist_ult_handler);
......
...@@ -25,6 +25,7 @@ struct hg_instance ...@@ -25,6 +25,7 @@ struct hg_instance
hg_id_t bake_bulk_shutdown_id; hg_id_t bake_bulk_shutdown_id;
hg_id_t bake_bulk_create_id; hg_id_t bake_bulk_create_id;
hg_id_t bake_bulk_eager_write_id; hg_id_t bake_bulk_eager_write_id;
hg_id_t bake_bulk_eager_read_id;
hg_id_t bake_bulk_write_id; hg_id_t bake_bulk_write_id;
hg_id_t bake_bulk_persist_id; hg_id_t bake_bulk_persist_id;
hg_id_t bake_bulk_get_size_id; hg_id_t bake_bulk_get_size_id;
...@@ -49,6 +50,13 @@ struct hg_instance g_hginst = { ...@@ -49,6 +50,13 @@ struct hg_instance g_hginst = {
.refct = 0, .refct = 0,
}; };
static int bake_bulk_eager_read(
bake_target_id_t bti,
bake_bulk_region_id_t rid,
uint64_t region_offset,
void *buf,
uint64_t buf_size);
static int hg_instance_init(const char *mercury_dest) static int hg_instance_init(const char *mercury_dest)
{ {
char hg_na[64] = {0}; char hg_na[64] = {0};
...@@ -109,6 +117,12 @@ static int hg_instance_init(const char *mercury_dest) ...@@ -109,6 +117,12 @@ static int hg_instance_init(const char *mercury_dest)
bake_bulk_eager_write_in_t, bake_bulk_eager_write_in_t,
bake_bulk_eager_write_out_t, bake_bulk_eager_write_out_t,
NULL); NULL);
g_hginst.bake_bulk_eager_read_id =
MERCURY_REGISTER(g_hginst.hg_class,
"bake_bulk_eager_read_rpc",
bake_bulk_eager_read_in_t,
bake_bulk_eager_read_out_t,
NULL);
g_hginst.bake_bulk_persist_id = g_hginst.bake_bulk_persist_id =
MERCURY_REGISTER(g_hginst.hg_class, MERCURY_REGISTER(g_hginst.hg_class,
"bake_bulk_persist_rpc", "bake_bulk_persist_rpc",
...@@ -284,7 +298,7 @@ int bake_shutdown_service(bake_target_id_t bti) ...@@ -284,7 +298,7 @@ int bake_shutdown_service(bake_target_id_t bti)
return(0); return(0);
} }
int bake_bulk_eager_write( static int bake_bulk_eager_write(
bake_target_id_t bti, bake_target_id_t bti,
bake_bulk_region_id_t rid, bake_bulk_region_id_t rid,
uint64_t region_offset, uint64_t region_offset,
...@@ -596,6 +610,11 @@ int bake_bulk_read( ...@@ -596,6 +610,11 @@ int bake_bulk_read(
int ret; int ret;
struct bake_instance *instance = NULL; struct bake_instance *instance = NULL;
if(buf_size <= BAKE_BULK_EAGER_LIMIT)
{
return(bake_bulk_eager_read(bti, rid, region_offset, buf, buf_size));
}
HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance); HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
if(!instance) if(!instance)
return(-1); return(-1);
...@@ -645,3 +664,58 @@ int bake_bulk_read( ...@@ -645,3 +664,58 @@ int bake_bulk_read(
} }
static int bake_bulk_eager_read(
bake_target_id_t bti,
bake_bulk_region_id_t rid,
uint64_t region_offset,
void *buf,
uint64_t buf_size)
{
hg_return_t hret;
hg_handle_t handle;
bake_bulk_eager_read_in_t in;
bake_bulk_eager_read_out_t out;
int ret;
struct bake_instance *instance = NULL;
HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
if(!instance)
return(-1);
in.bti = bti;
in.rid = rid;
in.region_offset = region_offset;
in.size = buf_size;
/* create handle */
hret = HG_Create(g_hginst.hg_context, instance->dest,
g_hginst.bake_bulk_eager_read_id, &handle);
if(hret != HG_SUCCESS)
{
return(-1);
}
hret = margo_forward(g_hginst.mid, handle, &in);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
return(-1);
}
hret = HG_Get_output(handle, &out);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
return(-1);
}
ret = out.ret;
if(ret == 0)
memcpy(buf, out.buffer, out.size);
HG_Free_output(handle, &out);
HG_Destroy(handle);
return(ret);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment