Commit f41344c4 authored by Shane Snyder's avatar Shane Snyder

add env var to allow clients to shutdown servers

parent 1457bd14
...@@ -36,6 +36,8 @@ typedef struct mobject_store_handle ...@@ -36,6 +36,8 @@ typedef struct mobject_store_handle
int connected; int connected;
} mobject_store_handle_t; } mobject_store_handle_t;
static void mobject_store_register(margo_instance_id mid);
static int mobject_store_shutdown_servers(mobject_store_handle_t *cluster_handle);
int mobject_store_create(mobject_store_t *cluster, const char * const id) int mobject_store_create(mobject_store_t *cluster, const char * const id)
{ {
...@@ -79,7 +81,7 @@ int mobject_store_create(mobject_store_t *cluster, const char * const id) ...@@ -79,7 +81,7 @@ int mobject_store_create(mobject_store_t *cluster, const char * const id)
int mobject_store_connect(mobject_store_t cluster) int mobject_store_connect(mobject_store_t cluster)
{ {
mobject_store_handle_t *cluster_handle = (mobject_store_handle_t *)cluster; mobject_store_handle_t *cluster_handle = (mobject_store_handle_t *)cluster;
char *srv_addr; char *svr_addr_str;
char proto[24] = {0}; char proto[24] = {0};
int i; int i;
int ret; int ret;
...@@ -90,8 +92,8 @@ int mobject_store_connect(mobject_store_t cluster) ...@@ -90,8 +92,8 @@ int mobject_store_connect(mobject_store_t cluster)
/* figure out protocol to connect with using address information /* figure out protocol to connect with using address information
* associated with the SSG group ID * associated with the SSG group ID
*/ */
srv_addr = ssg_group_id_get_addr_str(cluster_handle->gid); svr_addr_str = ssg_group_id_get_addr_str(cluster_handle->gid);
if (!srv_addr) if (!svr_addr_str)
{ {
fprintf(stderr, "Error: Unable to obtain cluster group server address\n"); fprintf(stderr, "Error: Unable to obtain cluster group server address\n");
ssg_group_id_free(cluster_handle->gid); ssg_group_id_free(cluster_handle->gid);
...@@ -99,8 +101,8 @@ int mobject_store_connect(mobject_store_t cluster) ...@@ -99,8 +101,8 @@ int mobject_store_connect(mobject_store_t cluster)
return -1; return -1;
} }
/* we only need to the proto portion of the address to initialize */ /* we only need to the proto portion of the address to initialize */
for(i=0; i<24 && srv_addr[i] != '\0' && srv_addr[i] != ':'; i++) for(i=0; i<24 && svr_addr_str[i] != '\0' && svr_addr_str[i] != ':'; i++)
proto[i] = srv_addr[i]; proto[i] = svr_addr_str[i];
/* intialize margo */ /* intialize margo */
/* XXX: probably want to expose some way of tweaking threading parameters */ /* XXX: probably want to expose some way of tweaking threading parameters */
...@@ -108,19 +110,23 @@ int mobject_store_connect(mobject_store_t cluster) ...@@ -108,19 +110,23 @@ int mobject_store_connect(mobject_store_t cluster)
if (cluster_handle->mid == MARGO_INSTANCE_NULL) if (cluster_handle->mid == MARGO_INSTANCE_NULL)
{ {
fprintf(stderr, "Error: Unable to initialize margo\n"); fprintf(stderr, "Error: Unable to initialize margo\n");
free(srv_addr); free(svr_addr_str);
ssg_group_id_free(cluster_handle->gid); ssg_group_id_free(cluster_handle->gid);
free(cluster_handle); free(cluster_handle);
return -1; return -1;
} }
/* register mobject RPCs for this cluster */
mobject_store_register(cluster_handle->mid);
/* initialize ssg */ /* initialize ssg */
/* XXX: we need to think about how to do this once per-client... clients could connect to mult. clusters */
ret = ssg_init(cluster_handle->mid); ret = ssg_init(cluster_handle->mid);
if (ret != SSG_SUCCESS) if (ret != SSG_SUCCESS)
{ {
fprintf(stderr, "Error: Unable to initialize SSG\n"); fprintf(stderr, "Error: Unable to initialize SSG\n");
margo_finalize(cluster_handle->mid); margo_finalize(cluster_handle->mid);
free(srv_addr); free(svr_addr_str);
ssg_group_id_free(cluster_handle->gid); ssg_group_id_free(cluster_handle->gid);
free(cluster_handle); free(cluster_handle);
return -1; return -1;
...@@ -133,14 +139,14 @@ int mobject_store_connect(mobject_store_t cluster) ...@@ -133,14 +139,14 @@ int mobject_store_connect(mobject_store_t cluster)
fprintf(stderr, "Error: Unable to attach to the mobject cluster group\n"); fprintf(stderr, "Error: Unable to attach to the mobject cluster group\n");
ssg_finalize(); ssg_finalize();
margo_finalize(cluster_handle->mid); margo_finalize(cluster_handle->mid);
free(srv_addr); free(svr_addr_str);
ssg_group_id_free(cluster_handle->gid); ssg_group_id_free(cluster_handle->gid);
free(cluster_handle); free(cluster_handle);
return -1; return -1;
} }
cluster_handle->connected = 1; cluster_handle->connected = 1;
free(srv_addr); free(svr_addr_str);
return 0; return 0;
} }
...@@ -149,11 +155,26 @@ void mobject_store_shutdown(mobject_store_t cluster) ...@@ -149,11 +155,26 @@ void mobject_store_shutdown(mobject_store_t cluster)
{ {
mobject_store_handle_t *cluster_handle = mobject_store_handle_t *cluster_handle =
(mobject_store_handle_t *)cluster; (mobject_store_handle_t *)cluster;
char *svr_kill_env_str;
int ret;
assert(cluster_handle != NULL); assert(cluster_handle != NULL);
if (!cluster_handle->connected) if (!cluster_handle->connected)
return; return;
svr_kill_env_str = getenv("MOBJECT_SHUTDOWN_KILL_SERVERS");
if (svr_kill_env_str && !strcmp(svr_kill_env_str, "true"))
{
/* kill server cluster if requested */
ret = mobject_store_shutdown_servers(cluster_handle);
if (ret != 0)
{
fprintf(stderr, "Warning: Unable to send shutdown signal \
to mobject server cluster\n");
}
}
ssg_group_detach(cluster_handle->gid); ssg_group_detach(cluster_handle->gid);
ssg_finalize(); ssg_finalize();
margo_finalize(cluster_handle->mid); margo_finalize(cluster_handle->mid);
...@@ -163,21 +184,6 @@ void mobject_store_shutdown(mobject_store_t cluster) ...@@ -163,21 +184,6 @@ void mobject_store_shutdown(mobject_store_t cluster)
return; return;
} }
void mobject_store_register(margo_instance_id mid)
{
static int registered = 0;
if(!registered) {
mobject_write_op_rpc_id =
MARGO_REGISTER(mid, "mobject_write_op", write_op_in_t, write_op_out_t, NULL);
mobject_read_op_rpc_id =
MARGO_REGISTER(mid, "mobject_read_op", read_op_in_t, read_op_out_t, NULL);
mobject_shutdown_rpc_id =
MARGO_REGISTER(mid, "mobject_shutdown", void, void, NULL);
registered = 1;
}
}
int mobject_store_write_op_operate(mobject_store_write_op_t write_op, int mobject_store_write_op_operate(mobject_store_write_op_t write_op,
mobject_store_ioctx_t io, mobject_store_ioctx_t io,
const char *oid, const char *oid,
...@@ -231,3 +237,54 @@ int mobject_store_read_op_operate(mobject_store_read_op_t read_op, ...@@ -231,3 +237,54 @@ int mobject_store_read_op_operate(mobject_store_read_op_t read_op,
return 0; return 0;
} }
/* internal helper routines */
// register mobject RPCs
static void mobject_store_register(margo_instance_id mid)
{
/* XXX i think ultimately these need to be stored in per-mid containers instead of global... */
mobject_write_op_rpc_id =
MARGO_REGISTER(mid, "mobject_write_op", write_op_in_t, write_op_out_t, NULL);
mobject_read_op_rpc_id =
MARGO_REGISTER(mid, "mobject_read_op", read_op_in_t, read_op_out_t, NULL);
mobject_shutdown_rpc_id =
MARGO_REGISTER(mid, "mobject_shutdown", void, void, NULL);
}
// send a shutdown signal to a server cluster
static int mobject_store_shutdown_servers(mobject_store_handle_t *cluster_handle)
{
hg_addr_t svr_addr;
hg_handle_t h;
hg_return_t ret;
/* get the address of the first server */
svr_addr = ssg_get_addr(cluster_handle->gid, 0);
if (svr_addr == HG_ADDR_NULL)
{
fprintf(stderr, "Error: Unable to obtain address for mobject server\n");
return -1;
}
ret = margo_create(cluster_handle->mid, svr_addr, mobject_shutdown_rpc_id, &h);
if (ret != HG_SUCCESS)
{
fprintf(stderr, "Error: Unable to create margo handle\n");
return -1;
}
/* send shutdown signal */
ret = margo_forward(h, NULL);
if (ret != HG_SUCCESS)
{
fprintf(stderr, "Error: Unable to forward margo handle\n");
margo_destroy(h);
return -1;
}
margo_destroy(h);
return 0;
}
...@@ -264,14 +264,13 @@ static void mobject_shutdown_ult(hg_handle_t h) ...@@ -264,14 +264,13 @@ static void mobject_shutdown_ult(hg_handle_t h)
margo_instance_id mid = margo_hg_handle_get_instance(h); margo_instance_id mid = margo_hg_handle_get_instance(h);
printf("Got shutdown signal!\n");
ret = margo_respond(h, NULL); ret = margo_respond(h, NULL);
assert(ret == HG_SUCCESS); assert(ret == HG_SUCCESS);
ret = margo_destroy(h); ret = margo_destroy(h);
assert(ret == HG_SUCCESS); assert(ret == HG_SUCCESS);
/* TODO: propagate shutdown to other servers */
mobject_server_shutdown(mid); mobject_server_shutdown(mid);
return; return;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment