Commit aec05985 authored by Philip Carns's avatar Philip Carns

implement bulk write handler

parent 9bcec8fc
......@@ -70,6 +70,7 @@ static void bake_bulk_create_ult(hg_handle_t handle)
memcpy(&out.rid, &oid, sizeof(oid));
}
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
......@@ -82,11 +83,21 @@ static void bake_bulk_write_ult(hg_handle_t handle)
bake_bulk_write_out_t out;
bake_bulk_write_in_t in;
hg_return_t hret;
PMEMoid oid;
char* buffer;
hg_size_t size;
hg_bulk_t bulk_handle;
struct hg_info *hgi;
margo_instance_id mid;
printf("Got RPC request to write bulk region.\n");
memset(&out, 0, sizeof(out));
hgi = HG_Get_info(handle);
assert(hgi);
mid = margo_hg_class_to_instance(hgi->hg_class);
hret = HG_Get_input(handle, &in);
if(hret != HG_SUCCESS)
{
......@@ -96,9 +107,50 @@ static void bake_bulk_write_ult(hg_handle_t handle)
return;
}
/* TODO: implement */
out.ret = -1;
/* TODO: real translation functions for opaque type */
memcpy(&oid, &in.rid, sizeof(oid));
/* find memory address for target object */
buffer = pmemobj_direct(oid);
if(!buffer)
{
out.ret = -1;
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
size = HG_Bulk_get_size(in.bulk_handle);
/* create bulk handle for local side of transfer */
hret = HG_Bulk_create(hgi->hg_class, 1, (void**)(&buffer), &size,
HG_BULK_WRITE_ONLY, &bulk_handle);
if(hret != HG_SUCCESS)
{
out.ret = -1;
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
hret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, in.bulk_handle,
0, bulk_handle, 0, size);
if(hret != HG_SUCCESS)
{
out.ret = -1;
HG_Bulk_free(bulk_handle);
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
out.ret = 0;
HG_Bulk_free(bulk_handle);
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
......
......@@ -115,6 +115,8 @@ int main(int argc, char **argv)
fprintf(stderr, "Error: bake_bulk_write()\n");
return(-1);
}
/* TODO: persist */
munmap(local_region, statbuf.st_size);
close(fd);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment