Commit 59f693ae authored by Shane Snyder's avatar Shane Snyder

mobject srv shutdown, cleanup, wait_for_shutdown

parent 4d389cc8
......@@ -29,4 +29,9 @@ int mobject_server_init(margo_instance_id mid, const char *cluster_file);
*/
void mobject_server_shutdown(margo_instance_id mid);
/**
* Wait for a mobject server instance to get a shutdown request.
*/
void mobject_server_wait_for_shutdown(void);
#endif
......@@ -49,10 +49,9 @@ int main(int argc, char *argv[])
return -1;
}
/* shutdown */
margo_wait_for_finalize(mid); /* XXX: probably need a conditional here, so we can cleanup after blocking */
mobject_server_shutdown(mid);
//margo_finalize(mid);
/* wait for shutdown signal, then clean up */
mobject_server_wait_for_shutdown();
margo_finalize(mid);
MPI_Finalize();
return 0;
......
......@@ -6,6 +6,7 @@
#include <assert.h>
#include <mpi.h>
#include <abt.h>
#include <margo.h>
//#include <sds-keyval.h>
//#include <bake-bulk-server.h>
......@@ -20,12 +21,20 @@
typedef struct mobject_server_context
{
/* margo, bake, sds-keyval, ssg state */
margo_instance_id mid;
/* TODO bake, sds-keyval stuff */
ssg_group_id_t gid;
/* server shutdown conditional logic */
ABT_mutex shutdown_mutex;
ABT_cond shutdown_cond;
int shutdown_flag;
int ref_count;
} mobject_server_context_t;
static int mobject_server_register(mobject_server_context_t *srv_ctx);
static void mobject_server_cleanup(mobject_server_context_t *srv_ctx);
DECLARE_MARGO_RPC_HANDLER(mobject_write_op_ult)
DECLARE_MARGO_RPC_HANDLER(mobject_read_op_ult)
......@@ -42,6 +51,7 @@ static mobject_server_context_t *g_srv_ctx = NULL;
int mobject_server_init(margo_instance_id mid, const char *cluster_file)
{
mobject_server_context_t *srv_ctx;
int my_id;
int ret;
......@@ -51,11 +61,13 @@ int mobject_server_init(margo_instance_id mid, const char *cluster_file)
return -1;
}
g_srv_ctx = malloc(sizeof(*g_srv_ctx));
if (!g_srv_ctx)
srv_ctx = calloc(1, sizeof(*srv_ctx));
if (!srv_ctx)
return -1;
memset(g_srv_ctx, 0, sizeof(*g_srv_ctx));
g_srv_ctx->mid = mid;
srv_ctx->mid = mid;
srv_ctx->ref_count = 1;
ABT_mutex_create(&srv_ctx->shutdown_mutex);
ABT_cond_create(&srv_ctx->shutdown_cond);
/* TODO bake-bulk */
/* TODO sds-keyval */
......@@ -68,28 +80,30 @@ int mobject_server_init(margo_instance_id mid, const char *cluster_file)
ret = ssg_init(mid);
if (ret != SSG_SUCCESS)
{
free(srv_ctx);
fprintf(stderr, "Error: Unable to initialize SSG\n");
return -1;
}
/* server group create */
g_srv_ctx->gid = ssg_group_create_mpi(MOBJECT_SERVER_GROUP_NAME, MPI_COMM_WORLD,
srv_ctx->gid = ssg_group_create_mpi(MOBJECT_SERVER_GROUP_NAME, MPI_COMM_WORLD,
NULL, NULL); /* XXX membership update callbacks unused currently */
if (g_srv_ctx->gid == SSG_GROUP_ID_NULL)
if (srv_ctx->gid == SSG_GROUP_ID_NULL)
{
fprintf(stderr, "Error: Unable to create the mobject server group\n");
ssg_finalize();
free(srv_ctx);
return -1;
}
my_id = ssg_get_group_self_id(g_srv_ctx->gid);
my_id = ssg_get_group_self_id(srv_ctx->gid);
/* register mobject & friends RPC handlers */
mobject_server_register(g_srv_ctx);
mobject_server_register(srv_ctx);
/* one proccess writes cluster connect info to file for clients to find later */
if (my_id == 0)
{
ret = ssg_group_id_store(cluster_file, g_srv_ctx->gid);
ret = ssg_group_id_store(cluster_file, srv_ctx->gid);
if (ret != 0)
{
fprintf(stderr, "Error: unable to store mobject cluster info to file %s\n",
......@@ -97,26 +111,59 @@ int mobject_server_init(margo_instance_id mid, const char *cluster_file)
/* XXX: this call is performed by one process, and we do not currently
* have an easy way to propagate this error to the entire cluster group
*/
ssg_group_destroy(g_srv_ctx->gid);
ssg_group_destroy(srv_ctx->gid);
ssg_finalize();
free(srv_ctx);
return -1;
}
}
/* XXX cleanup? */
g_srv_ctx = srv_ctx;
return 0;
}
void mobject_server_shutdown(margo_instance_id mid)
{
assert(g_srv_ctx);
mobject_server_context_t *srv_ctx = g_srv_ctx;
int do_cleanup;
ssg_group_destroy(g_srv_ctx->gid);
ssg_finalize();
assert(srv_ctx);
//margo_wait_for_finalize(mid);
//pmemobj_close(NULL);
ABT_mutex_lock(srv_ctx->shutdown_mutex);
srv_ctx->shutdown_flag = 1;
ABT_cond_broadcast(srv_ctx->shutdown_cond);
srv_ctx->ref_count--;
do_cleanup = srv_ctx->ref_count == 0;
ABT_mutex_unlock(srv_ctx->shutdown_mutex);
if (do_cleanup)
mobject_server_cleanup(srv_ctx);
return;
}
void mobject_server_wait_for_shutdown()
{
mobject_server_context_t *srv_ctx = g_srv_ctx;
int do_cleanup;
assert(srv_ctx);
ABT_mutex_lock(srv_ctx->shutdown_mutex);
srv_ctx->ref_count++;
while(!srv_ctx->shutdown_flag)
ABT_cond_wait(srv_ctx->shutdown_cond, srv_ctx->shutdown_mutex);
srv_ctx->ref_count--;
do_cleanup = srv_ctx->ref_count == 0;
ABT_mutex_unlock(srv_ctx->shutdown_mutex);
if (do_cleanup)
mobject_server_cleanup(srv_ctx);
return;
}
......@@ -215,10 +262,36 @@ static hg_return_t mobject_read_op_ult(hg_handle_t h)
}
DEFINE_MARGO_RPC_HANDLER(mobject_read_op_ult)
static void mobject_shutdown_ult(hg_handle_t handle)
static void mobject_shutdown_ult(hg_handle_t h)
{
hg_return_t ret;
margo_instance_id mid = margo_hg_handle_get_instance(h);
printf("Got shutdown signal!\n");
ret = margo_respond(h, NULL);
assert(ret == HG_SUCCESS);
ret = margo_destroy(h);
assert(ret == HG_SUCCESS);
mobject_server_shutdown(mid);
margo_destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(mobject_shutdown_ult)
static void mobject_server_cleanup(mobject_server_context_t *srv_ctx)
{
ssg_group_destroy(srv_ctx->gid);
ssg_finalize();
//pmemobj_close(NULL);
ABT_mutex_free(&srv_ctx->shutdown_mutex);
ABT_cond_free(&srv_ctx->shutdown_cond);
free(srv_ctx);
return;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment