Commit 5f5cc293 authored by Matthieu Dorier's avatar Matthieu Dorier

Merge branch 'dev-expose-ptr' into 'master'

added bake_get_data function to retrieve the raw pointer

See merge request !3
parents a2d31d86 777b77f4
......@@ -246,6 +246,31 @@ int bake_get_size(
bake_region_id_t rid,
uint64_t *size);
/**
* Gets the raw pointer of an existing BAKE region.
* This pointer is valid in the address space of the
* targeted BAKE provider. This function is meant to
* be used when the client is co-located with the
* BAKE provider and lives in the same address space
* (e.g. active storage scenarios where we want to
* access a piece of data without making a copy).
* This function will return an error if the caller
* is not running at the same address as the provider.
*
* Note that if the data pointed to is modified by the
* client, the client will need to call bake_persist
* to make the provider persist the changes.
*
* @param [in] provider provider handle
* @param [in] rid identifier for region
* @param [out] ptr pointer to the address of the data
* @returns 0 on success, -1 on failure
*/
int bake_get_data(
bake_provider_handle_t provider,
bake_region_id_t rid,
void** ptr);
/**
* Reads from a BAKE region that was previously persisted with bake_persist().
*
......
......@@ -30,6 +30,7 @@ struct bake_client
hg_id_t bake_persist_id;
hg_id_t bake_create_write_persist_id;
hg_id_t bake_get_size_id;
hg_id_t bake_get_data_id;
hg_id_t bake_read_id;
hg_id_t bake_noop_id;
hg_id_t bake_remove_id;
......@@ -64,6 +65,7 @@ static int bake_client_register(bake_client_t client, margo_instance_id mid)
margo_registered_name(mid, "bake_persist_rpc", &client->bake_persist_id, &flag);
margo_registered_name(mid, "bake_create_write_persist_rpc", &client->bake_create_write_persist_id, &flag);
margo_registered_name(mid, "bake_get_size_rpc", &client->bake_get_size_id, &flag);
margo_registered_name(mid, "bake_get_data_rpc", &client->bake_get_data_id, &flag);
margo_registered_name(mid, "bake_read_rpc", &client->bake_read_id, &flag);
margo_registered_name(mid, "bake_noop_rpc", &client->bake_noop_id, &flag);
margo_registered_name(mid, "bake_remove_rpc", &client->bake_remove_id, &flag);
......@@ -94,6 +96,9 @@ static int bake_client_register(bake_client_t client, margo_instance_id mid)
client->bake_get_size_id =
MARGO_REGISTER(mid, "bake_get_size_rpc",
bake_get_size_in_t, bake_get_size_out_t, NULL);
client->bake_get_data_id =
MARGO_REGISTER(mid, "bake_get_data_rpc",
bake_get_data_in_t, bake_get_data_out_t, NULL);
client->bake_read_id =
MARGO_REGISTER(mid, "bake_read_rpc",
bake_read_in_t, bake_read_out_t, NULL);
......@@ -648,6 +653,69 @@ int bake_get_size(
return(ret);
}
int bake_get_data(
bake_provider_handle_t provider,
bake_region_id_t rid,
void** ptr)
{
hg_return_t hret;
hg_handle_t handle;
bake_get_data_in_t in;
bake_get_data_out_t out;
int ret;
// make sure the target provider is on the same address space
hg_addr_t self_addr;
if(HG_SUCCESS != margo_addr_self(provider->client->mid, &self_addr)) return -1;
hg_addr_t trgt_addr = provider->addr;
hg_size_t addr_size = 128;
char self_addr_str[128];
char trgt_addr_str[128];
if(HG_SUCCESS != margo_addr_to_string(provider->client->mid, self_addr_str, &addr_size, self_addr)) {
margo_addr_free(provider->client->mid, self_addr);
return -1;
}
if(HG_SUCCESS != margo_addr_to_string(provider->client->mid, trgt_addr_str, &addr_size, trgt_addr)) {
margo_addr_free(provider->client->mid, self_addr);
return -1;
}
if(strcmp(self_addr_str, trgt_addr_str) != 0) {
margo_addr_free(provider->client->mid, self_addr);
return -1;
}
margo_addr_free(provider->client->mid, self_addr);
in.rid = rid;
hret = margo_create(provider->client->mid, provider->addr,
provider->client->bake_get_data_id, &handle);
if(hret != HG_SUCCESS)
return(-1);
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
return(-1);
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
return(-1);
}
ret = out.ret;
*ptr = (void*)out.ptr;
margo_free_output(handle, &out);
margo_destroy(handle);
return(ret);
}
int bake_noop(bake_provider_handle_t provider)
{
hg_return_t hret;
......
......@@ -82,6 +82,14 @@ MERCURY_GEN_PROC(bake_get_size_out_t,
((uint64_t)(size)))
DECLARE_MARGO_RPC_HANDLER(bake_get_size_ult)
/* BAKE get data */
MERCURY_GEN_PROC(bake_get_data_in_t,
((bake_region_id_t)(rid)))
MERCURY_GEN_PROC(bake_get_data_out_t,
((int32_t)(ret))\
((uint64_t)(ptr)))
DECLARE_MARGO_RPC_HANDLER(bake_get_data_ult)
/* BAKE read */
MERCURY_GEN_PROC(bake_read_in_t,
((bake_region_id_t)(rid))\
......
......@@ -129,6 +129,10 @@ int bake_provider_register(
bake_get_size_in_t, bake_get_size_out_t,
bake_get_size_ult, provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_get_data_rpc",
bake_get_data_in_t, bake_get_data_out_t,
bake_get_data_ult, provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_read_rpc",
bake_read_in_t, bake_read_out_t,
bake_read_ult, provider_id, abt_pool);
......@@ -708,6 +712,59 @@ static void bake_get_size_ult(hg_handle_t handle)
}
DEFINE_MARGO_RPC_HANDLER(bake_get_size_ult)
/* Get the raw pointer of a region */
static void bake_get_data_ult(hg_handle_t handle)
{
bake_get_data_out_t out;
bake_get_data_in_t in;
hg_return_t hret;
pmemobj_region_id_t* prid;
memset(&out, 0, sizeof(out));
margo_instance_id mid = margo_hg_handle_get_instance(handle);
assert(mid);
const struct hg_info* hgi = margo_get_info(handle);
bake_provider_t svr_ctx = margo_registered_data(mid, hgi->id);
if(!svr_ctx) {
out.ret = -1;
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS)
{
out.ret = -1;
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
prid = (pmemobj_region_id_t*)in.rid.data;
/* find memory address for target object */
char* buffer = pmemobj_direct(prid->oid);
if(!buffer)
{
out.ret = -1;
margo_free_input(handle, &in);
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
out.ptr = (uint64_t)buffer;
out.ret = 0;
margo_free_input(handle, &in);
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(bake_get_data_ult)
/* service a remote RPC for a BAKE no-op */
static void bake_noop_ult(hg_handle_t handle)
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment