Commit 282c0154 authored by Shane Snyder's avatar Shane Snyder

add server context structure to server library

parent c69ec981
...@@ -10,22 +10,34 @@ ...@@ -10,22 +10,34 @@
#include "bake-bulk-rpc.h" #include "bake-bulk-rpc.h"
/* definition of bake root data structure (just a target_id for now) */ /* definition of bake root data structure (just a target_id for now) */
typedef struct bake_bulk_root typedef struct
{ {
bake_target_id_t target_id; bake_target_id_t target_id;
} bake_bulk_root_t; } bake_bulk_root_t;
/* definition of internal region_id_t identifier for libpmemobj back end */ /* definition of internal region_id_t identifier for libpmemobj back end */
typedef struct { typedef struct
{
PMEMoid oid; PMEMoid oid;
uint64_t size; uint64_t size;
} pmemobj_region_id_t; } pmemobj_region_id_t;
typedef struct
{
PMEMobjpool *pmem_pool;
bake_bulk_root_t *pmem_root;
/* server shutdown conditional logic */
ABT_mutex shutdown_mutex;
ABT_cond shutdown_cond;
int shutdown_flag;
int ref_count;
} bake_bulk_server_context_t;
/* TODO: this should not be global in the long run; server may provide access /* TODO: this should not be global in the long run; server may provide access
* to multiple targets * to multiple targets
*/ */
static PMEMobjpool *g_pmem_pool = NULL; static bake_bulk_server_context_t *g_svr_ctx = NULL;
static bake_bulk_root_t *g_pmem_root = NULL;
int bake_server_makepool( int bake_server_makepool(
const char *pool_name, const char *pool_name,
...@@ -67,14 +79,20 @@ int bake_server_init( ...@@ -67,14 +79,20 @@ int bake_server_init(
PMEMoid root_oid; PMEMoid root_oid;
bake_bulk_root_t *root; bake_bulk_root_t *root;
char target_string[64]; char target_string[64];
bake_bulk_server_context_t *tmp_svr_ctx;
/* make sure to initialize the server only once */ /* make sure to initialize the server only once */
if(g_pmem_pool || g_pmem_root) if(g_svr_ctx)
{ {
fprintf(stderr, "Error: bake-bulk server already initialized\n"); fprintf(stderr, "Error: bake-bulk server already initialized\n");
return(-1); return(-1);
} }
tmp_svr_ctx = malloc(sizeof(*tmp_svr_ctx));
if(!tmp_svr_ctx)
return(-1);
memset(tmp_svr_ctx, 0, sizeof(*tmp_svr_ctx));
/* open the given pmem pool */ /* open the given pmem pool */
pool = pmemobj_open(pool_name, NULL); pool = pmemobj_open(pool_name, NULL);
if(!pool) if(!pool)
...@@ -126,9 +144,13 @@ int bake_server_init( ...@@ -126,9 +144,13 @@ int bake_server_init(
void, void,
bake_bulk_noop_ult); bake_bulk_noop_ult);
/* set global pmem variables needed by the bake server */ /* set global server context */
g_pmem_pool = pool; tmp_svr_ctx->pmem_pool = pool;
g_pmem_root = root; tmp_svr_ctx->pmem_root = root;
tmp_svr_ctx->ref_count = 1;
ABT_mutex_create(&tmp_svr_ctx->shutdown_mutex);
ABT_cond_create(&tmp_svr_ctx->shutdown_cond);
g_svr_ctx = tmp_svr_ctx;
return(0); return(0);
} }
...@@ -139,6 +161,8 @@ static void bake_bulk_shutdown_ult(hg_handle_t handle) ...@@ -139,6 +161,8 @@ static void bake_bulk_shutdown_ult(hg_handle_t handle)
hg_return_t hret; hg_return_t hret;
margo_instance_id mid; margo_instance_id mid;
assert(g_svr_ctx);
mid = margo_hg_handle_get_instance(handle); mid = margo_hg_handle_get_instance(handle);
hret = margo_respond(handle, NULL); hret = margo_respond(handle, NULL);
...@@ -164,9 +188,10 @@ static void bake_bulk_create_ult(hg_handle_t handle) ...@@ -164,9 +188,10 @@ static void bake_bulk_create_ult(hg_handle_t handle)
hg_return_t hret; hg_return_t hret;
pmemobj_region_id_t* prid; pmemobj_region_id_t* prid;
assert(g_svr_ctx);
/* TODO: this check needs to be somewhere else */ /* TODO: this check needs to be somewhere else */
assert(sizeof(pmemobj_region_id_t) <= BAKE_BULK_REGION_ID_DATA_SIZE); assert(sizeof(pmemobj_region_id_t) <= BAKE_BULK_REGION_ID_DATA_SIZE);
// printf("Got RPC request to create bulk region.\n");
memset(&out, 0, sizeof(out)); memset(&out, 0, sizeof(out));
...@@ -181,7 +206,8 @@ static void bake_bulk_create_ult(hg_handle_t handle) ...@@ -181,7 +206,8 @@ static void bake_bulk_create_ult(hg_handle_t handle)
prid = (pmemobj_region_id_t*)out.rid.data; prid = (pmemobj_region_id_t*)out.rid.data;
prid->size = in.region_size; prid->size = in.region_size;
out.ret = pmemobj_alloc(g_pmem_pool, &prid->oid, in.region_size, 0, NULL, NULL); out.ret = pmemobj_alloc(g_svr_ctx->pmem_pool, &prid->oid,
in.region_size, 0, NULL, NULL);
margo_free_input(handle, &in); margo_free_input(handle, &in);
margo_respond(handle, &out); margo_respond(handle, &out);
...@@ -203,8 +229,8 @@ static void bake_bulk_write_ult(hg_handle_t handle) ...@@ -203,8 +229,8 @@ static void bake_bulk_write_ult(hg_handle_t handle)
margo_instance_id mid; margo_instance_id mid;
pmemobj_region_id_t* prid; pmemobj_region_id_t* prid;
// printf("Got RPC request to write bulk region.\n"); assert(g_svr_ctx);
memset(&out, 0, sizeof(out)); memset(&out, 0, sizeof(out));
hgi = margo_get_info(handle); hgi = margo_get_info(handle);
...@@ -302,8 +328,8 @@ static void bake_bulk_eager_write_ult(hg_handle_t handle) ...@@ -302,8 +328,8 @@ static void bake_bulk_eager_write_ult(hg_handle_t handle)
hg_bulk_t bulk_handle; hg_bulk_t bulk_handle;
pmemobj_region_id_t* prid; pmemobj_region_id_t* prid;
// printf("Got RPC request to write bulk region.\n"); assert(g_svr_ctx);
memset(&out, 0, sizeof(out)); memset(&out, 0, sizeof(out));
hret = margo_get_input(handle, &in); hret = margo_get_input(handle, &in);
...@@ -348,7 +374,7 @@ static void bake_bulk_persist_ult(hg_handle_t handle) ...@@ -348,7 +374,7 @@ static void bake_bulk_persist_ult(hg_handle_t handle)
char* buffer; char* buffer;
pmemobj_region_id_t* prid; pmemobj_region_id_t* prid;
// printf("Got RPC request to persist bulk region.\n"); assert(g_svr_ctx);
memset(&out, 0, sizeof(out)); memset(&out, 0, sizeof(out));
...@@ -375,7 +401,7 @@ static void bake_bulk_persist_ult(hg_handle_t handle) ...@@ -375,7 +401,7 @@ static void bake_bulk_persist_ult(hg_handle_t handle)
} }
/* TODO: should this have an abt shim in case it blocks? */ /* TODO: should this have an abt shim in case it blocks? */
pmemobj_persist(g_pmem_pool, buffer, prid->size); pmemobj_persist(g_svr_ctx->pmem_pool, buffer, prid->size);
out.ret = 0; out.ret = 0;
...@@ -394,7 +420,7 @@ static void bake_bulk_get_size_ult(hg_handle_t handle) ...@@ -394,7 +420,7 @@ static void bake_bulk_get_size_ult(hg_handle_t handle)
hg_return_t hret; hg_return_t hret;
pmemobj_region_id_t* prid; pmemobj_region_id_t* prid;
// printf("Got RPC request to get_size bulk region.\n"); assert(g_svr_ctx);
memset(&out, 0, sizeof(out)); memset(&out, 0, sizeof(out));
...@@ -423,8 +449,8 @@ DEFINE_MARGO_RPC_HANDLER(bake_bulk_get_size_ult) ...@@ -423,8 +449,8 @@ DEFINE_MARGO_RPC_HANDLER(bake_bulk_get_size_ult)
/* service a remote RPC for a no-op */ /* service a remote RPC for a no-op */
static void bake_bulk_noop_ult(hg_handle_t handle) static void bake_bulk_noop_ult(hg_handle_t handle)
{ {
// printf("Got RPC request to noop bulk region.\n"); assert(g_svr_ctx);
margo_respond(handle, NULL); margo_respond(handle, NULL);
margo_destroy(handle); margo_destroy(handle);
return; return;
...@@ -445,7 +471,7 @@ static void bake_bulk_read_ult(hg_handle_t handle) ...@@ -445,7 +471,7 @@ static void bake_bulk_read_ult(hg_handle_t handle)
margo_instance_id mid; margo_instance_id mid;
pmemobj_region_id_t* prid; pmemobj_region_id_t* prid;
// printf("Got RPC request to read bulk region.\n"); assert(g_svr_ctx);
memset(&out, 0, sizeof(out)); memset(&out, 0, sizeof(out));
...@@ -511,7 +537,6 @@ static void bake_bulk_read_ult(hg_handle_t handle) ...@@ -511,7 +537,6 @@ static void bake_bulk_read_ult(hg_handle_t handle)
} }
DEFINE_MARGO_RPC_HANDLER(bake_bulk_read_ult) DEFINE_MARGO_RPC_HANDLER(bake_bulk_read_ult)
/* service a remote RPC that reads to a bulk region and eagerly sends /* service a remote RPC that reads to a bulk region and eagerly sends
* response */ * response */
static void bake_bulk_eager_read_ult(hg_handle_t handle) static void bake_bulk_eager_read_ult(hg_handle_t handle)
...@@ -523,7 +548,7 @@ static void bake_bulk_eager_read_ult(hg_handle_t handle) ...@@ -523,7 +548,7 @@ static void bake_bulk_eager_read_ult(hg_handle_t handle)
hg_size_t size; hg_size_t size;
pmemobj_region_id_t* prid; pmemobj_region_id_t* prid;
// printf("Got RPC request to read bulk region.\n"); assert(g_svr_ctx);
memset(&out, 0, sizeof(out)); memset(&out, 0, sizeof(out));
...@@ -565,12 +590,12 @@ static void bake_bulk_probe_ult(hg_handle_t handle) ...@@ -565,12 +590,12 @@ static void bake_bulk_probe_ult(hg_handle_t handle)
{ {
bake_bulk_probe_out_t out; bake_bulk_probe_out_t out;
// printf("Got RPC request to probe bulk region.\n"); assert(g_svr_ctx);
memset(&out, 0, sizeof(out)); memset(&out, 0, sizeof(out));
out.ret = 0; out.ret = 0;
out.bti = g_pmem_root->target_id; out.bti = g_svr_ctx->pmem_root->target_id;
margo_respond(handle, &out); margo_respond(handle, &out);
margo_destroy(handle); margo_destroy(handle);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment