Commit 8c2225b1 authored by Shane Snyder's avatar Shane Snyder
Browse files

add funcs for cleanly shutting down bake server

bake now exposes bake_server_shutdown and
bake_server_wait_for_shutdown (ala margo) to allow servers to
be shutdown more cleanly
parent 282c0154
...@@ -41,6 +41,16 @@ int bake_server_init( ...@@ -41,6 +41,16 @@ int bake_server_init(
margo_instance_id mid, margo_instance_id mid,
const char *pool_name); const char *pool_name);
/**
* Shuts down a bake server and frees all associated resources.
*/
void bake_server_shutdown(void);
/**
* Suspends the server process until some other entity calls bake_server_shutdown.
*/
void bake_server_wait_for_shutdown(void);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -125,23 +125,9 @@ int main(int argc, char **argv) ...@@ -125,23 +125,9 @@ int main(int argc, char **argv)
return(-1); return(-1);
} }
/* NOTE: at this point this server ULT has two options. It can wait on /* suspend until the bake server gets a shutdown signal from the client */
* whatever mechanism it wants to (however long the daemon should run and bake_server_wait_for_shutdown();
* then call margo_finalize(). Otherwise, it can call margo_finalize(mid);
* margo_wait_for_finalize() on the assumption that it should block until
* some other entity calls margo_finalize().
*
* This example does the latter. Margo will be finalized by a special
* RPC from the client.
*
* This approach will allow the server to idle gracefully even when
* executed in "single" mode, in which the main thread of the server
* daemon and the progress thread for Mercury are executing in the same
* ABT pool.
*/
margo_wait_for_finalize(mid);
/* XXX pmemobj_close(pool_info->bb_pmem_pool); */
return(0); return(0);
} }
......
...@@ -34,6 +34,8 @@ typedef struct ...@@ -34,6 +34,8 @@ typedef struct
int ref_count; int ref_count;
} bake_bulk_server_context_t; } bake_bulk_server_context_t;
static void bake_server_cleanup(bake_bulk_server_context_t *svr_ctx);
/* TODO: this should not be global in the long run; server may provide access /* TODO: this should not be global in the long run; server may provide access
* to multiple targets * to multiple targets
*/ */
...@@ -47,7 +49,6 @@ int bake_server_makepool( ...@@ -47,7 +49,6 @@ int bake_server_makepool(
PMEMobjpool *pool; PMEMobjpool *pool;
PMEMoid root_oid; PMEMoid root_oid;
bake_bulk_root_t *root; bake_bulk_root_t *root;
char target_string[64];
pool = pmemobj_create(pool_name, NULL, pool_size, pool_mode); pool = pmemobj_create(pool_name, NULL, pool_size, pool_mode);
if(!pool) if(!pool)
...@@ -63,8 +64,11 @@ int bake_server_makepool( ...@@ -63,8 +64,11 @@ int bake_server_makepool(
/* store the target id for this bake pool at the root */ /* store the target id for this bake pool at the root */
uuid_generate(root->target_id.id); uuid_generate(root->target_id.id);
pmemobj_persist(pool, root, sizeof(bake_bulk_root_t)); pmemobj_persist(pool, root, sizeof(bake_bulk_root_t));
#if 1
char target_string[64];
uuid_unparse(root->target_id.id, target_string); uuid_unparse(root->target_id.id, target_string);
fprintf(stderr, "created BAKE target ID: %s\n", target_string); fprintf(stderr, "created BAKE target ID: %s\n", target_string);
#endif
pmemobj_close(pool); pmemobj_close(pool);
...@@ -78,7 +82,6 @@ int bake_server_init( ...@@ -78,7 +82,6 @@ int bake_server_init(
PMEMobjpool *pool; PMEMobjpool *pool;
PMEMoid root_oid; PMEMoid root_oid;
bake_bulk_root_t *root; bake_bulk_root_t *root;
char target_string[64];
bake_bulk_server_context_t *tmp_svr_ctx; bake_bulk_server_context_t *tmp_svr_ctx;
/* make sure to initialize the server only once */ /* make sure to initialize the server only once */
...@@ -110,8 +113,11 @@ int bake_server_init( ...@@ -110,8 +113,11 @@ int bake_server_init(
pmemobj_close(pool); pmemobj_close(pool);
return(-1); return(-1);
} }
#if 1
char target_string[64];
uuid_unparse(root->target_id.id, target_string); uuid_unparse(root->target_id.id, target_string);
fprintf(stderr, "opened BAKE target ID: %s\n", target_string); fprintf(stderr, "opened BAKE target ID: %s\n", target_string);
#endif
/* register RPCs */ /* register RPCs */
MARGO_REGISTER(mid, "bake_bulk_shutdown_rpc", void, void, MARGO_REGISTER(mid, "bake_bulk_shutdown_rpc", void, void,
...@@ -155,6 +161,57 @@ int bake_server_init( ...@@ -155,6 +161,57 @@ int bake_server_init(
return(0); return(0);
} }
void bake_server_shutdown()
{
bake_bulk_server_context_t *svr_ctx = g_svr_ctx;
int do_cleanup;
assert(svr_ctx);
ABT_mutex_lock(svr_ctx->shutdown_mutex);
svr_ctx->shutdown_flag = 1;
ABT_cond_broadcast(svr_ctx->shutdown_cond);
svr_ctx->ref_count--;
do_cleanup = svr_ctx->ref_count == 0;
ABT_mutex_unlock(svr_ctx->shutdown_mutex);
if (do_cleanup)
{
bake_server_cleanup(svr_ctx);
g_svr_ctx = NULL;
}
return;
}
void bake_server_wait_for_shutdown()
{
bake_bulk_server_context_t *svr_ctx = g_svr_ctx;
int do_cleanup;
assert(svr_ctx);
ABT_mutex_lock(svr_ctx->shutdown_mutex);
svr_ctx->ref_count++;
while(!svr_ctx->shutdown_flag)
ABT_cond_wait(svr_ctx->shutdown_cond, svr_ctx->shutdown_mutex);
svr_ctx->ref_count--;
do_cleanup = svr_ctx->ref_count == 0;
ABT_mutex_unlock(svr_ctx->shutdown_mutex);
if (do_cleanup)
{
bake_server_cleanup(svr_ctx);
g_svr_ctx = NULL;
}
return;
}
/* service a remote RPC that instructs the server daemon to shut down */ /* service a remote RPC that instructs the server daemon to shut down */
static void bake_bulk_shutdown_ult(hg_handle_t handle) static void bake_bulk_shutdown_ult(hg_handle_t handle)
{ {
...@@ -171,10 +228,10 @@ static void bake_bulk_shutdown_ult(hg_handle_t handle) ...@@ -171,10 +228,10 @@ static void bake_bulk_shutdown_ult(hg_handle_t handle)
margo_destroy(handle); margo_destroy(handle);
/* NOTE: we assume that the server daemon is using /* NOTE: we assume that the server daemon is using
* margo_wait_for_finalize() to suspend until this RPC executes, so there * bake_server_wait_for_shutdown() to suspend until this RPC executes, so
* is no need to send any extra signal to notify it. * there is no need to send any extra signal to notify it.
*/ */
margo_finalize(mid); bake_server_shutdown();
return; return;
} }
...@@ -603,4 +660,13 @@ static void bake_bulk_probe_ult(hg_handle_t handle) ...@@ -603,4 +660,13 @@ static void bake_bulk_probe_ult(hg_handle_t handle)
} }
DEFINE_MARGO_RPC_HANDLER(bake_bulk_probe_ult) DEFINE_MARGO_RPC_HANDLER(bake_bulk_probe_ult)
static void bake_server_cleanup(bake_bulk_server_context_t *svr_ctx)
{
pmemobj_close(svr_ctx->pmem_pool);
ABT_mutex_free(&svr_ctx->shutdown_mutex);
ABT_cond_free(&svr_ctx->shutdown_cond);
free(svr_ctx);
return;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment