Commit f7bdeaef authored by Shane Snyder's avatar Shane Snyder

update mobject_server_register, add shutdown rpc

parent 644064f8
......@@ -13,16 +13,20 @@
#define MOBJECT_SERVER_GROUP_NAME "mobject-store-servers"
int mobject_server_init(margo_instance_id mid, const char *cluster_file);
/**
* Start a mobject server instance
*
* @param[in] mid
* @param[in poolname
* @returns 0 on success, negative error code on failure */
int mobject_server_register(margo_instance_id mid, const char *poolname);
* @param[in] mid margo instance id
* @param[in] cluster_file file name to write cluster connect info to
* @returns 0 on success, negative error code on failure
*/
int mobject_server_init(margo_instance_id mid, const char *cluster_file);
/**
* Shutdown a mobject server instance
*
* @param[in] mid margo instance id
*/
void mobject_server_shutdown(margo_instance_id mid);
#endif
......@@ -50,9 +50,9 @@ int main(int argc, char *argv[])
}
/* shutdown */
//margo_wait_for_finalize(mid); /* XXX: probably need a conditional here, so we can cleanup after blocking */
margo_wait_for_finalize(mid); /* XXX: probably need a conditional here, so we can cleanup after blocking */
mobject_server_shutdown(mid);
margo_finalize(mid);
//margo_finalize(mid);
MPI_Finalize();
return 0;
......
......@@ -17,17 +17,25 @@
typedef struct mobject_server_context
{
/* XXX bake, sds-keyval stuff */
margo_instance_id mid;
/* TODO bake, sds-keyval stuff */
ssg_group_id_t gid;
} mobject_server_context_t;
static int mobject_server_store_cluster_info(const char *file_name);
static int mobject_server_register(mobject_server_context_t *srv_ctx);
DECLARE_MARGO_RPC_HANDLER(mobject_shutdown_ult)
/* mobject RPC IDs */
static hg_id_t mobject_shutdown_rpc_id;
/* XXX one global mobject server state struct */
mobject_server_context_t *g_srv_ctx = NULL;
static mobject_server_context_t *g_srv_ctx = NULL;
int mobject_server_init(margo_instance_id mid, const char *cluster_file)
{
int my_id;
int ret;
if (g_srv_ctx)
......@@ -40,9 +48,15 @@ int mobject_server_init(margo_instance_id mid, const char *cluster_file)
if (!g_srv_ctx)
return -1;
memset(g_srv_ctx, 0, sizeof(*g_srv_ctx));
g_srv_ctx->mid = mid;
/* TODO bake-bulk */
/* TODO sds-keyval */
# if 0
kv_context *metadata;
struct bake_pool_info *pool_info;
pool_info = bake_server_makepool(poolname);
#endif
ret = ssg_init(mid);
if (ret != SSG_SUCCESS)
......@@ -53,26 +67,33 @@ int mobject_server_init(margo_instance_id mid, const char *cluster_file)
/* server group create */
g_srv_ctx->gid = ssg_group_create_mpi(MOBJECT_SERVER_GROUP_NAME, MPI_COMM_WORLD,
NULL, NULL); /* XXX membership change callbacks unused currently */
NULL, NULL); /* XXX membership update callbacks unused currently */
if (g_srv_ctx->gid == SSG_GROUP_ID_NULL)
{
fprintf(stderr, "Error: Unable to create the mobject server group\n");
ssg_finalize();
return -1;
}
my_id = ssg_get_group_self_id(g_srv_ctx->gid);
/* register mobject & friends RPC handlers */
mobject_server_register(g_srv_ctx);
/* write cluster connect info to file for clients to find later */
ret = mobject_server_store_cluster_info(cluster_file);
if (ret != 0)
/* one proccess writes cluster connect info to file for clients to find later */
if (my_id == 0)
{
fprintf(stderr, "Error: unable to store mobject cluster info to file %s\n",
cluster_file);
/* XXX: this call is performed by one process under the covers, and we
* don't currently have a way to propagate this error to the entire cluster
*/
ssg_group_destroy(g_srv_ctx->gid);
ssg_finalize();
return -1;
ret = ssg_group_id_store(cluster_file, g_srv_ctx->gid);
if (ret != 0)
{
fprintf(stderr, "Error: unable to store mobject cluster info to file %s\n",
cluster_file);
/* XXX: this call is performed by one process, and we do not currently
* have an easy way to propagate this error to the entire cluster group
*/
ssg_group_destroy(g_srv_ctx->gid);
ssg_finalize();
return -1;
}
}
/* XXX cleanup? */
......@@ -80,20 +101,6 @@ int mobject_server_init(margo_instance_id mid, const char *cluster_file)
return 0;
}
int mobject_server_register(margo_instance_id mid, const char *poolname)
{
int ret=0;
kv_context *metadata;
struct bake_pool_info *pool_info;
pool_info = bake_server_makepool(poolname);
bake_server_register(mid, pool_info);
metadata = kv_server_register(mid);
return ret;
}
#endif
void mobject_server_shutdown(margo_instance_id mid)
{
assert(g_srv_ctx);
......@@ -107,18 +114,25 @@ void mobject_server_shutdown(margo_instance_id mid)
return;
}
static int mobject_server_store_cluster_info(const char *file_name)
static int mobject_server_register(mobject_server_context_t *srv_ctx)
{
int my_id;
int ret;
int ret=0;
assert(g_srv_ctx);
mobject_shutdown_rpc_id = MARGO_REGISTER(srv_ctx->mid, "mobject_shutdown",
void, void, mobject_shutdown_ult);
/* only have one group member do this */
my_id = ssg_get_group_self_id(g_srv_ctx->gid);
if (my_id != 0)
return 0;
#if 0
bake_server_register(mid, pool_info);
metadata = kv_server_register(mid);
#endif
ret = ssg_group_id_store(file_name, g_srv_ctx->gid);
return ret;
}
static void mobject_shutdown_ult(hg_handle_t handle)
{
margo_destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(mobject_shutdown_ult)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment