Commit eb01864c authored by Shane Snyder's avatar Shane Snyder

add bake_remove functionality

parent cea2843a
......@@ -305,7 +305,8 @@ int bake_proxy_read(
* @returns 0 on success, -1 on failure
*/
int bake_shutdown_service(
bake_client_t client, hg_addr_t addr);
bake_client_t client,
hg_addr_t addr);
/**
* Issues a BAKE no-op operation.
......@@ -315,6 +316,13 @@ int bake_shutdown_service(
*/
int bake_noop(bake_provider_handle_t provider);
/**
* Removes a previously persisted BAKE region and frees its associated memory.
*/
int bake_remove(
bake_provider_handle_t provider,
bake_region_id_t rid);
#ifdef __cplusplus
}
#endif
......
......@@ -32,6 +32,7 @@ struct bake_client
hg_id_t bake_get_size_id;
hg_id_t bake_read_id;
hg_id_t bake_noop_id;
hg_id_t bake_remove_id;
uint64_t num_provider_handles;
};
......@@ -65,6 +66,7 @@ static int bake_client_register(bake_client_t client, margo_instance_id mid)
margo_registered_name(mid, "bake_get_size_rpc", &client->bake_get_size_id, &flag);
margo_registered_name(mid, "bake_read_rpc", &client->bake_read_id, &flag);
margo_registered_name(mid, "bake_noop_rpc", &client->bake_noop_id, &flag);
margo_registered_name(mid, "bake_remove_rpc", &client->bake_remove_id, &flag);
} else { /* RPCs not already registered */
......@@ -98,6 +100,9 @@ static int bake_client_register(bake_client_t client, margo_instance_id mid)
client->bake_noop_id =
MARGO_REGISTER(mid, "bake_noop_rpc",
void, void, NULL);
client->bake_remove_id =
MARGO_REGISTER(mid, "bake_remove_rpc",
bake_remove_in_t, bake_remove_out_t, NULL);
}
return(0);
......@@ -828,3 +833,41 @@ int bake_proxy_read(
return(ret);
}
int bake_remove(
bake_provider_handle_t provider,
bake_region_id_t rid)
{
hg_return_t hret;
hg_handle_t handle;
bake_remove_in_t in;
bake_remove_out_t out;
int ret;
in.rid = rid;
hret = margo_create(provider->client->mid, provider->addr,
provider->client->bake_remove_id, &handle);
margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS)
return(-1);
hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
return(-1);
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
return(-1);
}
ret = out.ret;
margo_destroy(handle);
return(ret);
}
......@@ -122,6 +122,12 @@ DECLARE_MARGO_RPC_HANDLER(bake_probe_ult)
/* BAKE noop */
DECLARE_MARGO_RPC_HANDLER(bake_noop_ult)
/* BAKE remove */
MERCURY_GEN_PROC(bake_remove_in_t,
((bake_region_id_t)(rid)))
MERCURY_GEN_PROC(bake_remove_out_t,
((int32_t)(ret)))
DECLARE_MARGO_RPC_HANDLER(bake_remove_ult)
static inline hg_return_t hg_proc_bake_region_id_t(hg_proc_t proc, bake_region_id_t *rid)
{
......
......@@ -140,6 +140,11 @@ int bake_provider_register(
rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_noop_rpc",
void, void, bake_noop_ult, provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_remove_rpc",
bake_remove_in_t, bake_remove_out_t, bake_remove_ult,
provider_id, abt_pool);
margo_register_data(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
/* install the bake server finalize callback */
margo_push_finalize_callback(mid, &bake_server_finalize_cb, tmp_svr_ctx);
......@@ -917,6 +922,49 @@ static void bake_probe_ult(hg_handle_t handle)
}
DEFINE_MARGO_RPC_HANDLER(bake_probe_ult)
static void bake_remove_ult(hg_handle_t handle)
{
bake_remove_in_t in;
bake_remove_out_t out;
hg_return_t hret;
pmemobj_region_id_t* prid;
memset(&out, 0, sizeof(out));
margo_instance_id mid = margo_hg_handle_get_instance(handle);
assert(mid);
const struct hg_info* hgi = margo_get_info(handle);
bake_provider_t svr_ctx =
margo_registered_data_mplex(mid, hgi->id, hgi->target_id);
if(!svr_ctx) {
out.ret = -1;
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS)
{
out.ret = -1;
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
prid = (pmemobj_region_id_t*)in.rid.data;
pmemobj_free(&prid->oid);
out.ret = 0;
margo_free_input(handle, &in);
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(bake_remove_ult)
static void bake_server_finalize_cb(void *data)
{
bake_server_context_t *svr_ctx = (bake_server_context_t *)data;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment