Commit ec622820 authored by Matthieu Dorier's avatar Matthieu Dorier

removed global context

parent bc5e8250
...@@ -13,6 +13,8 @@ ...@@ -13,6 +13,8 @@
#define MOBJECT_SERVER_GROUP_NAME "mobject-store-servers" #define MOBJECT_SERVER_GROUP_NAME "mobject-store-servers"
typedef struct mobject_server_context mobject_server_context_t;
/** /**
* Start a mobject server instance * Start a mobject server instance
* *
...@@ -20,18 +22,18 @@ ...@@ -20,18 +22,18 @@
* @param[in] cluster_file file name to write cluster connect info to * @param[in] cluster_file file name to write cluster connect info to
* @returns 0 on success, negative error code on failure * @returns 0 on success, negative error code on failure
*/ */
int mobject_server_init(margo_instance_id mid, const char *cluster_file); mobject_server_context_t* mobject_server_init(margo_instance_id mid, const char *cluster_file);
/** /**
* Shutdown a mobject server instance * Shutdown a mobject server instance
* *
* @param[in] mid margo instance id * @param[in] mid margo instance id
*/ */
void mobject_server_shutdown(margo_instance_id mid); void mobject_server_shutdown(mobject_server_context_t* svr_ctx);
/** /**
* Wait for a mobject server instance to get a shutdown request. * Wait for a mobject server instance to get a shutdown request.
*/ */
void mobject_server_wait_for_shutdown(void); void mobject_server_wait_for_shutdown(mobject_server_context_t* srv_ctx);
#endif #endif
...@@ -24,6 +24,7 @@ int main(int argc, char *argv[]) ...@@ -24,6 +24,7 @@ int main(int argc, char *argv[])
char *cluster_file; char *cluster_file;
margo_instance_id mid; margo_instance_id mid;
int ret; int ret;
mobject_server_context_t* mobject_svr_ctx;
/* check args */ /* check args */
if (argc != 3) if (argc != 3)
...@@ -41,8 +42,8 @@ int main(int argc, char *argv[]) ...@@ -41,8 +42,8 @@ int main(int argc, char *argv[])
return -1; return -1;
} }
ret = mobject_server_init(mid, cluster_file); mobject_svr_ctx = mobject_server_init(mid, cluster_file);
if (ret != 0) if (mobject_svr_ctx == NULL)
{ {
fprintf(stderr, "Error: Unable to initialize mobject server\n"); fprintf(stderr, "Error: Unable to initialize mobject server\n");
margo_finalize(mid); margo_finalize(mid);
...@@ -50,7 +51,7 @@ int main(int argc, char *argv[]) ...@@ -50,7 +51,7 @@ int main(int argc, char *argv[])
} }
/* wait for shutdown signal, then clean up */ /* wait for shutdown signal, then clean up */
mobject_server_wait_for_shutdown(); mobject_server_wait_for_shutdown(mobject_svr_ctx);
margo_finalize(mid); margo_finalize(mid);
MPI_Finalize(); MPI_Finalize();
......
...@@ -27,7 +27,7 @@ ...@@ -27,7 +27,7 @@
#include "src/server/core/core-read-op.h" #include "src/server/core/core-read-op.h"
#include "src/server/core/core-write-op.h" #include "src/server/core/core-write-op.h"
typedef struct mobject_server_context struct mobject_server_context
{ {
/* margo, bake, sds-keyval, ssg state */ /* margo, bake, sds-keyval, ssg state */
margo_instance_id mid; margo_instance_id mid;
...@@ -39,7 +39,7 @@ typedef struct mobject_server_context ...@@ -39,7 +39,7 @@ typedef struct mobject_server_context
ABT_cond shutdown_cond; ABT_cond shutdown_cond;
int shutdown_flag; int shutdown_flag;
int ref_count; int ref_count;
} mobject_server_context_t; } ;
static int mobject_server_register(mobject_server_context_t *srv_ctx); static int mobject_server_register(mobject_server_context_t *srv_ctx);
static void mobject_server_cleanup(mobject_server_context_t *srv_ctx); static void mobject_server_cleanup(mobject_server_context_t *srv_ctx);
...@@ -53,25 +53,23 @@ static hg_id_t mobject_write_op_rpc_id; ...@@ -53,25 +53,23 @@ static hg_id_t mobject_write_op_rpc_id;
static hg_id_t mobject_read_op_rpc_id; static hg_id_t mobject_read_op_rpc_id;
static hg_id_t mobject_shutdown_rpc_id; static hg_id_t mobject_shutdown_rpc_id;
/* XXX one global mobject server state struct */ static int mobject_server_is_initialized = 0;
static mobject_server_context_t *g_srv_ctx = NULL;
mobject_server_context_t* mobject_server_init(margo_instance_id mid, const char *cluster_file)
int mobject_server_init(margo_instance_id mid, const char *cluster_file)
{ {
mobject_server_context_t *srv_ctx; mobject_server_context_t *srv_ctx;
int my_id; int my_id;
int ret; int ret;
if (g_srv_ctx) if (mobject_server_is_initialized)
{ {
fprintf(stderr, "Error: mobject server has already been initialized\n"); fprintf(stderr, "Error: mobject server has already been initialized\n");
return -1; return NULL;
} }
srv_ctx = calloc(1, sizeof(*srv_ctx)); srv_ctx = calloc(1, sizeof(*srv_ctx));
if (!srv_ctx) if (!srv_ctx)
return -1; return NULL;
srv_ctx->mid = mid; srv_ctx->mid = mid;
srv_ctx->ref_count = 1; srv_ctx->ref_count = 1;
ABT_mutex_create(&srv_ctx->shutdown_mutex); ABT_mutex_create(&srv_ctx->shutdown_mutex);
...@@ -89,7 +87,7 @@ int mobject_server_init(margo_instance_id mid, const char *cluster_file) ...@@ -89,7 +87,7 @@ int mobject_server_init(margo_instance_id mid, const char *cluster_file)
{ {
free(srv_ctx); free(srv_ctx);
fprintf(stderr, "Error: Unable to initialize SSG\n"); fprintf(stderr, "Error: Unable to initialize SSG\n");
return -1; return NULL;
} }
/* server group create */ /* server group create */
...@@ -100,7 +98,7 @@ int mobject_server_init(margo_instance_id mid, const char *cluster_file) ...@@ -100,7 +98,7 @@ int mobject_server_init(margo_instance_id mid, const char *cluster_file)
fprintf(stderr, "Error: Unable to create the mobject server group\n"); fprintf(stderr, "Error: Unable to create the mobject server group\n");
ssg_finalize(); ssg_finalize();
free(srv_ctx); free(srv_ctx);
return -1; return NULL;
} }
my_id = ssg_get_group_self_id(srv_ctx->gid); my_id = ssg_get_group_self_id(srv_ctx->gid);
...@@ -121,7 +119,7 @@ int mobject_server_init(margo_instance_id mid, const char *cluster_file) ...@@ -121,7 +119,7 @@ int mobject_server_init(margo_instance_id mid, const char *cluster_file)
ssg_group_destroy(srv_ctx->gid); ssg_group_destroy(srv_ctx->gid);
ssg_finalize(); ssg_finalize();
free(srv_ctx); free(srv_ctx);
return -1; return NULL;
} }
} }
...@@ -135,14 +133,13 @@ int mobject_server_init(margo_instance_id mid, const char *cluster_file) ...@@ -135,14 +133,13 @@ int mobject_server_init(margo_instance_id mid, const char *cluster_file)
bake_probe_instance(mid, self_addr, &(srv_ctx->bake_id)); bake_probe_instance(mid, self_addr, &(srv_ctx->bake_id));
// XXX: check return value of the above calls // XXX: check return value of the above calls
g_srv_ctx = srv_ctx; mobject_server_is_initialized = 1;
return 0; return srv_ctx;
} }
void mobject_server_shutdown(margo_instance_id mid) void mobject_server_shutdown(mobject_server_context_t *srv_ctx)
{ {
mobject_server_context_t *srv_ctx = g_srv_ctx;
int do_cleanup; int do_cleanup;
assert(srv_ctx); assert(srv_ctx);
...@@ -162,9 +159,8 @@ void mobject_server_shutdown(margo_instance_id mid) ...@@ -162,9 +159,8 @@ void mobject_server_shutdown(margo_instance_id mid)
return; return;
} }
void mobject_server_wait_for_shutdown() void mobject_server_wait_for_shutdown(mobject_server_context_t* srv_ctx)
{ {
mobject_server_context_t *srv_ctx = g_srv_ctx;
int do_cleanup; int do_cleanup;
assert(srv_ctx); assert(srv_ctx);
...@@ -193,12 +189,18 @@ static int mobject_server_register(mobject_server_context_t *srv_ctx) ...@@ -193,12 +189,18 @@ static int mobject_server_register(mobject_server_context_t *srv_ctx)
mobject_write_op_rpc_id = MARGO_REGISTER(mid, "mobject_write_op", mobject_write_op_rpc_id = MARGO_REGISTER(mid, "mobject_write_op",
write_op_in_t, write_op_out_t, mobject_write_op_ult); write_op_in_t, write_op_out_t, mobject_write_op_ult);
margo_register_data(mid, mobject_write_op_rpc_id, srv_ctx, NULL);
mobject_read_op_rpc_id = MARGO_REGISTER(mid, "mobject_read_op", mobject_read_op_rpc_id = MARGO_REGISTER(mid, "mobject_read_op",
read_op_in_t, read_op_out_t, mobject_read_op_ult); read_op_in_t, read_op_out_t, mobject_read_op_ult);
margo_register_data(mid, mobject_read_op_rpc_id, srv_ctx, NULL);
mobject_shutdown_rpc_id = MARGO_REGISTER(mid, "mobject_shutdown", mobject_shutdown_rpc_id = MARGO_REGISTER(mid, "mobject_shutdown",
void, void, mobject_shutdown_ult); void, void, mobject_shutdown_ult);
margo_register_data(mid, mobject_shutdown_rpc_id, srv_ctx, NULL);
#if 0 #if 0
bake_server_register(mid, pool_info); bake_server_register(mid, pool_info);
metadata = kv_server_register(mid); metadata = kv_server_register(mid);
...@@ -301,7 +303,9 @@ static void mobject_shutdown_ult(hg_handle_t h) ...@@ -301,7 +303,9 @@ static void mobject_shutdown_ult(hg_handle_t h)
{ {
hg_return_t ret; hg_return_t ret;
const struct hg_info *info = margo_get_info(h);
margo_instance_id mid = margo_hg_handle_get_instance(h); margo_instance_id mid = margo_hg_handle_get_instance(h);
mobject_server_context_t* srv_ctx = margo_registered_data(mid, info->id);
ret = margo_respond(h, NULL); ret = margo_respond(h, NULL);
assert(ret == HG_SUCCESS); assert(ret == HG_SUCCESS);
...@@ -310,7 +314,7 @@ static void mobject_shutdown_ult(hg_handle_t h) ...@@ -310,7 +314,7 @@ static void mobject_shutdown_ult(hg_handle_t h)
assert(ret == HG_SUCCESS); assert(ret == HG_SUCCESS);
/* TODO: propagate shutdown to other servers */ /* TODO: propagate shutdown to other servers */
mobject_server_shutdown(mid); mobject_server_shutdown(srv_ctx);
return; return;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment