Commit 92bf62fc authored by Shane Snyder's avatar Shane Snyder

implement the bake_create_write_persist call

parent e6192208
......@@ -101,6 +101,26 @@ int bake_persist(
bake_target_id_t bti,
bake_region_id_t rid);
/**
* Creates a bounded-size BAKE region, writes data into it, and persists
* the reason all in one call/RPC (and thus 1 RTT).
*
* @param [in] bti BAKE target identifier
* @param [in] region_size size of region to be created
* @param [in] region_offset offset into the target region to write
* @param [in] buf local memory buffer to write
* @param [in] buf_size size of local memory buffer to write
* @param [out] rid identifier for new region
* @returns 0 on success, -1 on failure
*/
int bake_create_write_persist(
bake_target_id_t bti,
uint64_t region_size,
uint64_t region_offset,
void const *buf,
uint64_t buf_size,
bake_region_id_t *rid);
/**
* Checks the size of an existing BAKE region.
*
......
......@@ -29,6 +29,7 @@ struct bake_margo_instance
hg_id_t bake_eager_read_id;
hg_id_t bake_write_id;
hg_id_t bake_persist_id;
hg_id_t bake_create_write_persist_id;
hg_id_t bake_get_size_id;
hg_id_t bake_read_id;
hg_id_t bake_noop_id;
......@@ -59,45 +60,38 @@ static int bake_margo_instance_init(margo_instance_id mid)
/* register RPCs */
g_margo_inst.bake_probe_id =
MARGO_REGISTER(g_margo_inst.mid,
"bake_probe_rpc", void, bake_probe_out_t,
NULL);
MARGO_REGISTER(g_margo_inst.mid, "bake_probe_rpc",
void, bake_probe_out_t, NULL);
g_margo_inst.bake_shutdown_id =
MARGO_REGISTER(g_margo_inst.mid,
"bake_shutdown_rpc", void, void,
NULL);
MARGO_REGISTER(g_margo_inst.mid, "bake_shutdown_rpc",
void, void, NULL);
g_margo_inst.bake_create_id =
MARGO_REGISTER(g_margo_inst.mid,
"bake_create_rpc", bake_create_in_t, bake_create_out_t,
NULL);
MARGO_REGISTER(g_margo_inst.mid, "bake_create_rpc",
bake_create_in_t, bake_create_out_t, NULL);
g_margo_inst.bake_write_id =
MARGO_REGISTER(g_margo_inst.mid,
"bake_write_rpc", bake_write_in_t, bake_write_out_t,
NULL);
MARGO_REGISTER(g_margo_inst.mid, "bake_write_rpc",
bake_write_in_t, bake_write_out_t, NULL);
g_margo_inst.bake_eager_write_id =
MARGO_REGISTER(g_margo_inst.mid,
"bake_eager_write_rpc", bake_eager_write_in_t, bake_eager_write_out_t,
NULL);
MARGO_REGISTER(g_margo_inst.mid, "bake_eager_write_rpc",
bake_eager_write_in_t, bake_eager_write_out_t, NULL);
g_margo_inst.bake_eager_read_id =
MARGO_REGISTER(g_margo_inst.mid,
"bake_eager_read_rpc", bake_eager_read_in_t, bake_eager_read_out_t,
NULL);
MARGO_REGISTER(g_margo_inst.mid, "bake_eager_read_rpc",
bake_eager_read_in_t, bake_eager_read_out_t, NULL);
g_margo_inst.bake_persist_id =
MARGO_REGISTER(g_margo_inst.mid,
"bake_persist_rpc", bake_persist_in_t, bake_persist_out_t,
NULL);
MARGO_REGISTER(g_margo_inst.mid, "bake_persist_rpc",
bake_persist_in_t, bake_persist_out_t, NULL);
g_margo_inst.bake_create_write_persist_id =
MARGO_REGISTER(g_margo_inst.mid, "bake_create_write_persist_rpc",
bake_create_write_persist_in_t, bake_create_write_persist_out_t, NULL);
g_margo_inst.bake_get_size_id =
MARGO_REGISTER(g_margo_inst.mid,
"bake_get_size_rpc", bake_get_size_in_t, bake_get_size_out_t,
NULL);
MARGO_REGISTER(g_margo_inst.mid, "bake_get_size_rpc",
bake_get_size_in_t, bake_get_size_out_t, NULL);
g_margo_inst.bake_read_id =
MARGO_REGISTER(g_margo_inst.mid,
"bake_read_rpc", bake_read_in_t, bake_read_out_t,
NULL);
MARGO_REGISTER(g_margo_inst.mid, "bake_read_rpc",
bake_read_in_t, bake_read_out_t, NULL);
g_margo_inst.bake_noop_id =
MARGO_REGISTER(g_margo_inst.mid,
"bake_noop_rpc", void, void,
NULL);
MARGO_REGISTER(g_margo_inst.mid, "bake_noop_rpc",
void, void, NULL);
return(0);
}
......@@ -480,6 +474,73 @@ int bake_persist(
return(ret);
}
int bake_create_write_persist(
bake_target_id_t bti,
uint64_t region_size,
uint64_t region_offset,
void const *buf,
uint64_t buf_size,
bake_region_id_t *rid)
{
hg_return_t hret;
hg_handle_t handle;
bake_create_write_persist_in_t in;
bake_create_write_persist_out_t out;
int ret;
struct bake_instance *instance = NULL;
/* XXX eager path? */
HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
if(!instance)
return(-1);
in.bti = bti;
in.region_size = region_size;
in.region_offset = region_offset;
in.bulk_offset = 0;
in.bulk_size = buf_size;
in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy write */
hret = margo_bulk_create(g_margo_inst.mid, 1, (void**)(&buf), &buf_size,
HG_BULK_READ_ONLY, &in.bulk_handle);
if(hret != HG_SUCCESS)
return(-1);
hret = margo_create(g_margo_inst.mid, instance->dest,
g_margo_inst.bake_create_write_persist_id, &handle);
if(hret != HG_SUCCESS)
{
margo_bulk_free(in.bulk_handle);
return(-1);
}
hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS)
{
margo_bulk_free(in.bulk_handle);
margo_destroy(handle);
return(-1);
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS)
{
margo_bulk_free(in.bulk_handle);
margo_destroy(handle);
return(-1);
}
ret = out.ret;
if(ret == 0)
*rid = out.rid;
margo_free_output(handle, &out);
margo_bulk_free(in.bulk_handle);
margo_destroy(handle);
return(ret);
}
int bake_get_size(
bake_target_id_t bti,
bake_region_id_t rid,
......
......@@ -62,6 +62,20 @@ MERCURY_GEN_PROC(bake_persist_out_t,
((int32_t)(ret)))
DECLARE_MARGO_RPC_HANDLER(bake_persist_ult)
/* BAKE create/write/persist */
MERCURY_GEN_PROC(bake_create_write_persist_in_t,
((bake_target_id_t)(bti))\
((uint64_t)(region_size))\
((uint64_t)(region_offset))\
((hg_bulk_t)(bulk_handle))\
((uint64_t)(bulk_offset))\
((uint64_t)(bulk_size))\
((hg_string_t)(remote_addr_str)))
MERCURY_GEN_PROC(bake_create_write_persist_out_t,
((int32_t)(ret))\
((bake_region_id_t)(rid)))
DECLARE_MARGO_RPC_HANDLER(bake_create_write_persist_ult)
/* BAKE get size */
MERCURY_GEN_PROC(bake_get_size_in_t,
((bake_target_id_t)(bti))\
......
......@@ -134,6 +134,9 @@ int bake_server_init(
bake_eager_read_in_t, bake_eager_read_out_t, bake_eager_read_ult);
MARGO_REGISTER(mid, "bake_persist_rpc",
bake_persist_in_t, bake_persist_out_t, bake_persist_ult);
MARGO_REGISTER(mid, "bake_create_write_persist_rpc",
bake_create_write_persist_in_t, bake_create_write_persist_out_t,
bake_create_write_persist_ult);
MARGO_REGISTER(mid, "bake_get_size_rpc",
bake_get_size_in_t, bake_get_size_out_t, bake_get_size_ult);
MARGO_REGISTER(mid, "bake_read_rpc",
......@@ -286,6 +289,7 @@ static void bake_write_ult(hg_handle_t handle)
hgi = margo_get_info(handle);
assert(hgi);
mid = margo_hg_info_get_instance(hgi);
assert(mid != MARGO_INSTANCE_NULL);
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS)
......@@ -461,6 +465,125 @@ static void bake_persist_ult(hg_handle_t handle)
}
DEFINE_MARGO_RPC_HANDLER(bake_persist_ult)
static void bake_create_write_persist_ult(hg_handle_t handle)
{
bake_create_write_persist_out_t out;
bake_create_write_persist_in_t in;
hg_addr_t src_addr;
char* buffer;
hg_bulk_t bulk_handle;
const struct hg_info *hgi;
margo_instance_id mid;
hg_return_t hret;
int ret;
pmemobj_region_id_t* prid;
assert(g_svr_ctx);
/* TODO: this check needs to be somewhere else */
assert(sizeof(pmemobj_region_id_t) <= BAKE_REGION_ID_DATA_SIZE);
memset(&out, 0, sizeof(out));
hgi = margo_get_info(handle);
assert(hgi);
mid = margo_hg_info_get_instance(hgi);
assert(mid != MARGO_INSTANCE_NULL);
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS)
{
out.ret = -1;
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
prid = (pmemobj_region_id_t*)out.rid.data;
prid->size = in.region_size;
ret = pmemobj_alloc(g_svr_ctx->pmem_pool, &prid->oid,
in.region_size, 0, NULL, NULL);
if(ret != 0)
{
out.ret = -1;
margo_free_input(handle, &in);
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
/* find memory address for target object */
buffer = pmemobj_direct(prid->oid);
if(!buffer)
{
out.ret = -1;
margo_free_input(handle, &in);
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
/* create bulk handle for local side of transfer */
hret = margo_bulk_create(mid, 1, (void**)(&buffer), &in.bulk_size,
HG_BULK_WRITE_ONLY, &bulk_handle);
if(hret != HG_SUCCESS)
{
out.ret = -1;
margo_free_input(handle, &in);
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
if(in.remote_addr_str)
{
/* a proxy address was provided to pull write data from */
hret = margo_addr_lookup(mid, in.remote_addr_str, &src_addr);
if(hret != HG_SUCCESS)
{
out.ret = -1;
margo_bulk_free(bulk_handle);
margo_free_input(handle, &in);
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
}
else
{
/* no proxy write, use the source of this request */
src_addr = hgi->addr;
}
hret = margo_bulk_transfer(mid, HG_BULK_PULL, src_addr, in.bulk_handle,
in.bulk_offset, bulk_handle, 0, in.bulk_size);
if(hret != HG_SUCCESS)
{
out.ret = -1;
if(in.remote_addr_str)
margo_addr_free(mid, src_addr);
margo_bulk_free(bulk_handle);
margo_free_input(handle, &in);
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
/* TODO: should this have an abt shim in case it blocks? */
pmemobj_persist(g_svr_ctx->pmem_pool, buffer, prid->size);
out.ret = 0;
if(in.remote_addr_str)
margo_addr_free(mid, src_addr);
margo_bulk_free(bulk_handle);
margo_free_input(handle, &in);
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(bake_create_write_persist_ult)
/* service a remote RPC that retrieves the size of a BAKE region */
static void bake_get_size_ult(hg_handle_t handle)
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment