Commit c70b64f5 authored by Philip Carns's avatar Philip Carns

implement bulk persist rpc

- doesn't persist the correct number of bytes yet
parent ffc8f114
......@@ -157,3 +157,58 @@ static void bake_bulk_write_ult(hg_handle_t handle)
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_write_ult)
/* service a remote RPC that persists to a bulk region */
static void bake_bulk_persist_ult(hg_handle_t handle)
{
bake_bulk_persist_out_t out;
bake_bulk_persist_in_t in;
hg_return_t hret;
PMEMoid oid;
char* buffer;
hg_size_t size;
printf("Got RPC request to persist bulk region.\n");
memset(&out, 0, sizeof(out));
hret = HG_Get_input(handle, &in);
if(hret != HG_SUCCESS)
{
out.ret = -1;
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
/* TODO: real translation functions for opaque type */
memcpy(&oid, &in.rid, sizeof(oid));
/* find memory address for target object */
buffer = pmemobj_direct(oid);
if(!buffer)
{
out.ret = -1;
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
/* TODO: how to get the size of the object? */
/* TODO: options:
* - do typed objects in the pmemobj api help?
* - can we store a header on each region?
*/
size = 1;
pmemobj_persist(pmem_pool, buffer, size);
out.ret = 0;
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_persist_ult)
......@@ -36,6 +36,16 @@ MERCURY_GEN_PROC(bake_bulk_write_out_t,
((int32_t)(ret)))
DECLARE_MARGO_RPC_HANDLER(bake_bulk_write_ult)
/* bulk persist */
MERCURY_GEN_PROC(bake_bulk_persist_in_t,
((bake_target_id_t)(bti))\
((bake_bulk_region_id_t)(rid)))
MERCURY_GEN_PROC(bake_bulk_persist_out_t,
((int32_t)(ret)))
DECLARE_MARGO_RPC_HANDLER(bake_bulk_persist_ult)
/* TODO: where should the encoder defs live? Not in bake-bulk-rpc.c because
* we don't really need the rpc handlers to be linked into clients...
*/
......
......@@ -108,6 +108,9 @@ int main(int argc, char **argv)
MERCURY_REGISTER(hg_class, "bake_bulk_write_rpc", bake_bulk_write_in_t,
bake_bulk_write_out_t,
bake_bulk_write_ult_handler);
MERCURY_REGISTER(hg_class, "bake_bulk_persist_rpc", bake_bulk_persist_in_t,
bake_bulk_persist_out_t,
bake_bulk_persist_ult_handler);
/* NOTE: at this point this server ULT has two options. It can wait on
* whatever mechanism it wants to (however long the daemon should run and
......
......@@ -22,12 +22,11 @@ struct bake_instance
/* TODO: replace later, hard coded global instance */
struct bake_instance g_binst;
/* TODO: probably not global in the long run either */
/* TODO: these IDs are probably not global in the long run either */
static hg_id_t g_bake_bulk_shutdown_id;
/* TODO: probably not global in the long run either */
static hg_id_t g_bake_bulk_create_id;
/* TODO: probably not global in the long run either */
static hg_id_t g_bake_bulk_write_id;
static hg_id_t g_bake_bulk_persist_id;
int bake_probe_instance(
const char *mercury_dest,
......@@ -79,6 +78,13 @@ int bake_probe_instance(
bake_bulk_write_in_t,
bake_bulk_write_out_t,
NULL);
g_bake_bulk_persist_id =
MERCURY_REGISTER(g_binst.hg_class,
"bake_bulk_persist_rpc",
bake_bulk_persist_in_t,
bake_bulk_persist_out_t,
NULL);
g_binst.mid = margo_init(0, 0, g_binst.hg_context);
if(!g_binst.mid)
......@@ -241,6 +247,40 @@ int bake_bulk_persist(
bake_target_id_t bti,
bake_bulk_region_id_t rid)
{
/* TODO: implement */
return(-1);
hg_return_t hret;
hg_handle_t handle;
bake_bulk_persist_in_t in;
bake_bulk_persist_out_t out;
int ret;
in.bti = bti;
in.rid = rid;
/* create handle */
hret = HG_Create(g_binst.hg_context, g_binst.dest,
g_bake_bulk_persist_id, &handle);
if(hret != HG_SUCCESS)
{
return(-1);
}
hret = margo_forward(g_binst.mid, handle, &in);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
return(-1);
}
hret = HG_Get_output(handle, &out);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
return(-1);
}
ret = out.ret;
HG_Free_output(handle, &out);
HG_Destroy(handle);
return(ret);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment