Commit 081a52d7 authored by Shane Snyder's avatar Shane Snyder

refactor & cleanup bake server api

parent 555b7d7a
...@@ -15,38 +15,31 @@ ...@@ -15,38 +15,31 @@
extern "C" { extern "C" {
#endif #endif
struct bake_bulk_root
{
bake_target_id_t target_id;
};
struct bake_pool_info
{
PMEMobjpool *bb_pmem_pool;
struct bake_bulk_root *bb_pmem_root;
};
/** /**
* Register a bake server instance for a given Margo instance. * Creates a bake-bulk pool to use for backend PMEM storage.
* NOTE: this function must be called on a pool before the pool
* can be passed to 'bake_server_init'
* *
* @param[in] mid Margo instance identifier * @param[in] pool_name path to pmem backend file
* @param[in] bb_pmem_pool libpmem pool to use for the bake storage service * @param[in] pool_size size of the created pool
* @param[in] bb_pmem_root libpmem root for the bake pool * @param[in] pool_mode mode of the created pool
* @returns 0 on success, -1 otherwise
*/ */
void bake_server_register( int bake_server_makepool(
margo_instance_id mid, const char *pool_name,
struct bake_pool_info *pool_info); size_t pool_size,
mode_t pool_mode);
/** /**
* Convienence function to set up a PMEM backend. * Register a bake server instance for a given Margo instance.
*
* @param[in] poolname path to pmem backend file
* *
* returns a pointer to an initialized `struct bake_pool_info` on sucess, * @param[in] mid Margo instance identifier
* NULL if anything goes wrong * @param[in] pool_name path to pmem backend file
* @returns 0 on success, -1 otherwise
*/ */
struct bake_pool_info *bake_server_makepool( int bake_server_init(
const char *poolname); margo_instance_id mid,
const char *pool_name);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -7,7 +7,6 @@ ...@@ -7,7 +7,6 @@
#include <stdio.h> #include <stdio.h>
#include <assert.h> #include <assert.h>
#include <unistd.h> #include <unistd.h>
#include <uuid/uuid.h>
#include <margo.h> #include <margo.h>
#include <libpmemobj.h> #include <libpmemobj.h>
#include <bake-bulk-server.h> #include <bake-bulk-server.h>
...@@ -64,13 +63,11 @@ static void parse_args(int argc, char **argv, struct options *opts) ...@@ -64,13 +63,11 @@ static void parse_args(int argc, char **argv, struct options *opts)
int main(int argc, char **argv) int main(int argc, char **argv)
{ {
struct options opts; struct options opts;
struct bake_pool_info * pool_info;
margo_instance_id mid; margo_instance_id mid;
int ret;
parse_args(argc, argv, &opts); parse_args(argc, argv, &opts);
pool_info = bake_server_makepool(opts.pmem_pool);
/* start margo */ /* start margo */
/* use the main xstream for driving progress and executing rpc handlers */ /* use the main xstream for driving progress and executing rpc handlers */
mid = margo_init(opts.listen_addr, MARGO_SERVER_MODE, 0, -1); mid = margo_init(opts.listen_addr, MARGO_SERVER_MODE, 0, -1);
...@@ -120,7 +117,13 @@ int main(int argc, char **argv) ...@@ -120,7 +117,13 @@ int main(int argc, char **argv)
} }
/* register the bake bulk server */ /* register the bake bulk server */
bake_server_register(mid, pool_info); ret = bake_server_init(mid, opts.pmem_pool);
if(ret != 0)
{
fprintf(stderr, "Error: bake_server_init()\n");
margo_finalize(mid);
return(-1);
}
/* NOTE: at this point this server ULT has two options. It can wait on /* NOTE: at this point this server ULT has two options. It can wait on
* whatever mechanism it wants to (however long the daemon should run and * whatever mechanism it wants to (however long the daemon should run and
...@@ -138,7 +141,7 @@ int main(int argc, char **argv) ...@@ -138,7 +141,7 @@ int main(int argc, char **argv)
*/ */
margo_wait_for_finalize(mid); margo_wait_for_finalize(mid);
pmemobj_close(pool_info->bb_pmem_pool); /* XXX pmemobj_close(pool_info->bb_pmem_pool); */
return(0); return(0);
} }
......
...@@ -5,10 +5,16 @@ ...@@ -5,10 +5,16 @@
*/ */
#include <assert.h> #include <assert.h>
#include <bake-bulk-server.h>
#include <libpmemobj.h> #include <libpmemobj.h>
#include <bake-bulk-server.h>
#include "bake-bulk-rpc.h" #include "bake-bulk-rpc.h"
/* definition of bake root data structure (just a target_id for now) */
typedef struct bake_bulk_root
{
bake_target_id_t target_id;
} bake_bulk_root_t;
/* definition of internal region_id_t identifier for libpmemobj back end */ /* definition of internal region_id_t identifier for libpmemobj back end */
typedef struct { typedef struct {
PMEMoid oid; PMEMoid oid;
...@@ -19,45 +25,76 @@ typedef struct { ...@@ -19,45 +25,76 @@ typedef struct {
* to multiple targets * to multiple targets
*/ */
static PMEMobjpool *g_pmem_pool = NULL; static PMEMobjpool *g_pmem_pool = NULL;
static struct bake_bulk_root *g_pmem_root = NULL; static bake_bulk_root_t *g_pmem_root = NULL;
struct bake_pool_info * bake_server_makepool( int bake_server_makepool(
const char *poolname) const char *pool_name,
size_t pool_size,
mode_t pool_mode)
{ {
PMEMobjpool *pool;
PMEMoid root_oid; PMEMoid root_oid;
bake_bulk_root_t *root;
char target_string[64]; char target_string[64];
struct bake_pool_info *pool_info;
pool_info = malloc(sizeof(*pool_info));
/* open pmem pool */ pool = pmemobj_create(pool_name, NULL, pool_size, pool_mode);
pool_info->bb_pmem_pool = pmemobj_open(poolname, NULL); if(!pool)
if(!pool_info->bb_pmem_pool)
{ {
fprintf(stderr, "pmemobj_open: %s\n", pmemobj_errormsg()); fprintf(stderr, "pmemobj_create: %s\n", pmemobj_errormsg());
return(NULL); return(-1);
} }
/* find root */ /* find root */
root_oid = pmemobj_root(pool_info->bb_pmem_pool, root_oid = pmemobj_root(pool, sizeof(bake_bulk_root_t));
sizeof(*(pool_info->bb_pmem_root)) ); root = pmemobj_direct(root_oid);
pool_info->bb_pmem_root = pmemobj_direct(root_oid);
if(uuid_is_null(pool_info->bb_pmem_root->target_id.id)) /* store the target id for this bake pool at the root */
uuid_generate(root->target_id.id);
pmemobj_persist(pool, root, sizeof(bake_bulk_root_t));
uuid_unparse(root->target_id.id, target_string);
fprintf(stderr, "created BAKE target ID: %s\n", target_string);
pmemobj_close(pool);
return(0);
}
int bake_server_init(
margo_instance_id mid,
const char *pool_name)
{
PMEMobjpool *pool;
PMEMoid root_oid;
bake_bulk_root_t *root;
char target_string[64];
/* make sure to initialize the server only once */
if(g_pmem_pool || g_pmem_root)
{ {
uuid_generate(pool_info->bb_pmem_root->target_id.id); fprintf(stderr, "Error: bake-bulk server already initialized\n");
pmemobj_persist(pool_info->bb_pmem_pool, return(-1);
pool_info->bb_pmem_root, sizeof(*(pool_info->bb_pmem_root)) );
} }
uuid_unparse(pool_info->bb_pmem_root->target_id.id, target_string);
fprintf(stderr, "BAKE target ID: %s\n", target_string);
return pool_info; /* open the given pmem pool */
} pool = pmemobj_open(pool_name, NULL);
if(!pool)
{
fprintf(stderr, "pmemobj_open: %s\n", pmemobj_errormsg());
return(-1);
}
/* check to make sure the root is properly set */
root_oid = pmemobj_root(pool, sizeof(bake_bulk_root_t));
root = pmemobj_direct(root_oid);
if(uuid_is_null(root->target_id.id))
{
fprintf(stderr, "Error: bake-bulk pool is not properly initialized\n");
pmemobj_close(pool);
return(-1);
}
uuid_unparse(root->target_id.id, target_string);
fprintf(stderr, "opened BAKE target ID: %s\n", target_string);
void bake_server_register(margo_instance_id mid,
struct bake_pool_info *pool_info)
{
/* register RPCs */ /* register RPCs */
MARGO_REGISTER(mid, "bake_bulk_shutdown_rpc", void, void, MARGO_REGISTER(mid, "bake_bulk_shutdown_rpc", void, void,
bake_bulk_shutdown_ult); bake_bulk_shutdown_ult);
...@@ -90,10 +127,10 @@ void bake_server_register(margo_instance_id mid, ...@@ -90,10 +127,10 @@ void bake_server_register(margo_instance_id mid,
bake_bulk_noop_ult); bake_bulk_noop_ult);
/* set global pmem variables needed by the bake server */ /* set global pmem variables needed by the bake server */
g_pmem_pool = pool_info->bb_pmem_pool; g_pmem_pool = pool;
g_pmem_root = pool_info->bb_pmem_root; g_pmem_root = root;
return; return(0);
} }
/* service a remote RPC that instructs the server daemon to shut down */ /* service a remote RPC that instructs the server daemon to shut down */
...@@ -102,8 +139,6 @@ static void bake_bulk_shutdown_ult(hg_handle_t handle) ...@@ -102,8 +139,6 @@ static void bake_bulk_shutdown_ult(hg_handle_t handle)
hg_return_t hret; hg_return_t hret;
margo_instance_id mid; margo_instance_id mid;
// printf("Got RPC request to shutdown.\n");
mid = margo_hg_handle_get_instance(handle); mid = margo_hg_handle_get_instance(handle);
hret = margo_respond(handle, NULL); hret = margo_respond(handle, NULL);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment