diff --git a/src/client/libmobject-store.c b/src/client/libmobject-store.c index 0e038a0659b5dab997c9a9c4b50ae2eb1097a0ca..8cb77d30c456e68e8343300313e258b8a0e2543e 100644 --- a/src/client/libmobject-store.c +++ b/src/client/libmobject-store.c @@ -36,6 +36,8 @@ typedef struct mobject_store_handle int connected; } mobject_store_handle_t; +static void mobject_store_register(margo_instance_id mid); +static int mobject_store_shutdown_servers(mobject_store_handle_t *cluster_handle); int mobject_store_create(mobject_store_t *cluster, const char * const id) { @@ -79,7 +81,7 @@ int mobject_store_create(mobject_store_t *cluster, const char * const id) int mobject_store_connect(mobject_store_t cluster) { mobject_store_handle_t *cluster_handle = (mobject_store_handle_t *)cluster; - char *srv_addr; + char *svr_addr_str; char proto[24] = {0}; int i; int ret; @@ -90,8 +92,8 @@ int mobject_store_connect(mobject_store_t cluster) /* figure out protocol to connect with using address information * associated with the SSG group ID */ - srv_addr = ssg_group_id_get_addr_str(cluster_handle->gid); - if (!srv_addr) + svr_addr_str = ssg_group_id_get_addr_str(cluster_handle->gid); + if (!svr_addr_str) { fprintf(stderr, "Error: Unable to obtain cluster group server address\n"); ssg_group_id_free(cluster_handle->gid); @@ -99,8 +101,8 @@ int mobject_store_connect(mobject_store_t cluster) return -1; } /* we only need to the proto portion of the address to initialize */ - for(i=0; i<24 && srv_addr[i] != '\0' && srv_addr[i] != ':'; i++) - proto[i] = srv_addr[i]; + for(i=0; i<24 && svr_addr_str[i] != '\0' && svr_addr_str[i] != ':'; i++) + proto[i] = svr_addr_str[i]; /* intialize margo */ /* XXX: probably want to expose some way of tweaking threading parameters */ @@ -108,19 +110,23 @@ int mobject_store_connect(mobject_store_t cluster) if (cluster_handle->mid == MARGO_INSTANCE_NULL) { fprintf(stderr, "Error: Unable to initialize margo\n"); - free(srv_addr); + free(svr_addr_str); ssg_group_id_free(cluster_handle->gid); free(cluster_handle); return -1; } + /* register mobject RPCs for this cluster */ + mobject_store_register(cluster_handle->mid); + /* initialize ssg */ + /* XXX: we need to think about how to do this once per-client... clients could connect to mult. clusters */ ret = ssg_init(cluster_handle->mid); if (ret != SSG_SUCCESS) { fprintf(stderr, "Error: Unable to initialize SSG\n"); margo_finalize(cluster_handle->mid); - free(srv_addr); + free(svr_addr_str); ssg_group_id_free(cluster_handle->gid); free(cluster_handle); return -1; @@ -133,14 +139,14 @@ int mobject_store_connect(mobject_store_t cluster) fprintf(stderr, "Error: Unable to attach to the mobject cluster group\n"); ssg_finalize(); margo_finalize(cluster_handle->mid); - free(srv_addr); + free(svr_addr_str); ssg_group_id_free(cluster_handle->gid); free(cluster_handle); return -1; } cluster_handle->connected = 1; - free(srv_addr); + free(svr_addr_str); return 0; } @@ -149,11 +155,26 @@ void mobject_store_shutdown(mobject_store_t cluster) { mobject_store_handle_t *cluster_handle = (mobject_store_handle_t *)cluster; + char *svr_kill_env_str; + int ret; + assert(cluster_handle != NULL); if (!cluster_handle->connected) return; + svr_kill_env_str = getenv("MOBJECT_SHUTDOWN_KILL_SERVERS"); + if (svr_kill_env_str && !strcmp(svr_kill_env_str, "true")) + { + /* kill server cluster if requested */ + ret = mobject_store_shutdown_servers(cluster_handle); + if (ret != 0) + { + fprintf(stderr, "Warning: Unable to send shutdown signal \ + to mobject server cluster\n"); + } + } + ssg_group_detach(cluster_handle->gid); ssg_finalize(); margo_finalize(cluster_handle->mid); @@ -163,21 +184,6 @@ void mobject_store_shutdown(mobject_store_t cluster) return; } -void mobject_store_register(margo_instance_id mid) -{ - static int registered = 0; - - if(!registered) { - mobject_write_op_rpc_id = - MARGO_REGISTER(mid, "mobject_write_op", write_op_in_t, write_op_out_t, NULL); - mobject_read_op_rpc_id = - MARGO_REGISTER(mid, "mobject_read_op", read_op_in_t, read_op_out_t, NULL); - mobject_shutdown_rpc_id = - MARGO_REGISTER(mid, "mobject_shutdown", void, void, NULL); - registered = 1; - } -} - int mobject_store_write_op_operate(mobject_store_write_op_t write_op, mobject_store_ioctx_t io, const char *oid, @@ -231,3 +237,54 @@ int mobject_store_read_op_operate(mobject_store_read_op_t read_op, return 0; } + +/* internal helper routines */ + +// register mobject RPCs +static void mobject_store_register(margo_instance_id mid) +{ + /* XXX i think ultimately these need to be stored in per-mid containers instead of global... */ + mobject_write_op_rpc_id = + MARGO_REGISTER(mid, "mobject_write_op", write_op_in_t, write_op_out_t, NULL); + mobject_read_op_rpc_id = + MARGO_REGISTER(mid, "mobject_read_op", read_op_in_t, read_op_out_t, NULL); + mobject_shutdown_rpc_id = + MARGO_REGISTER(mid, "mobject_shutdown", void, void, NULL); +} + +// send a shutdown signal to a server cluster +static int mobject_store_shutdown_servers(mobject_store_handle_t *cluster_handle) +{ + hg_addr_t svr_addr; + hg_handle_t h; + hg_return_t ret; + + /* get the address of the first server */ + svr_addr = ssg_get_addr(cluster_handle->gid, 0); + if (svr_addr == HG_ADDR_NULL) + { + fprintf(stderr, "Error: Unable to obtain address for mobject server\n"); + return -1; + } + + ret = margo_create(cluster_handle->mid, svr_addr, mobject_shutdown_rpc_id, &h); + if (ret != HG_SUCCESS) + { + fprintf(stderr, "Error: Unable to create margo handle\n"); + return -1; + } + + /* send shutdown signal */ + ret = margo_forward(h, NULL); + if (ret != HG_SUCCESS) + { + fprintf(stderr, "Error: Unable to forward margo handle\n"); + margo_destroy(h); + return -1; + } + + margo_destroy(h); + + return 0; +} + diff --git a/src/server/mobject-server.c b/src/server/mobject-server.c index b9fb6ae35e9ae40397d9a21b3ed5f508171fe8e7..63fb6642b9ca7dc8a535bf09649a6bb9214786b2 100644 --- a/src/server/mobject-server.c +++ b/src/server/mobject-server.c @@ -264,14 +264,13 @@ static void mobject_shutdown_ult(hg_handle_t h) margo_instance_id mid = margo_hg_handle_get_instance(h); - printf("Got shutdown signal!\n"); - ret = margo_respond(h, NULL); assert(ret == HG_SUCCESS); ret = margo_destroy(h); assert(ret == HG_SUCCESS); + /* TODO: propagate shutdown to other servers */ mobject_server_shutdown(mid); return;