Commit b6798044 authored by David Rich's avatar David Rich Committed by Rob Latham
Browse files

Add support for cleanly shutting down server.

parent b55e11bb
......@@ -90,7 +90,10 @@ int kv_open(kv_context *context, char * server, char *name,
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
context->bench_id, &(context->bench_handle) );
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
context->shutdown_id, &(context->shutdown_handle) );
assert(ret == HG_SUCCESS);
margo_free_output(handle, &open_out);
margo_destroy(handle);
return ret;
......@@ -240,3 +243,12 @@ int kv_client_deregister(kv_context *context) {
return HG_SUCCESS;
}
int kv_shutdown_server(kv_context *context) {
int ret;
ret = margo_forward(context->shutdown_handle, NULL);
assert(ret == HG_SUCCESS);
return HG_SUCCESS;
}
......@@ -222,7 +222,7 @@ static hg_return_t close_handler(hg_handle_t h)
}
DEFINE_MARGO_RPC_HANDLER(close_handler)
static hg_return_t put_handler(hg_handle_t h)
static hg_return_t put_handler(hg_handle_t h)
{
hg_return_t ret;
put_in_t in;
......@@ -286,7 +286,7 @@ static hg_return_t bulk_put_handler(hg_handle_t h)
}
DEFINE_MARGO_RPC_HANDLER(bulk_put_handler)
static hg_return_t get_handler(hg_handle_t h)
static hg_return_t get_handler(hg_handle_t h)
{
hg_return_t ret;
get_in_t in;
......@@ -385,6 +385,32 @@ static hg_return_t bulk_get_handler(hg_handle_t h)
}
DEFINE_MARGO_RPC_HANDLER(bulk_get_handler)
static void shutdown_handler(hg_handle_t handle)
{
hg_return_t ret;
margo_instance_id mid;
printf("Got RPC request to shutdown!\n");
/* get handle info and margo instance */
mid = margo_hg_handle_get_instance(handle);
assert(mid != MARGO_INSTANCE_NULL);
ret = margo_respond(handle, NULL);
assert(ret == HG_SUCCESS);
margo_destroy(handle);
/* NOTE: we assume that the server daemon is using
* margo_wait_for_finalize() to suspend until this RPC executes, so there
* is no need to send any extra signal to notify it.
*/
margo_finalize(mid);
return;
}
DEFINE_MARGO_RPC_HANDLER(shutdown_handler)
/*
* from BwTree tests:
* RandomInsertSpeedTest() - Tests how fast it is to insert keys randomly
......@@ -469,7 +495,7 @@ static void RandomInsertSpeedTest(size_t key_num, bench_result *results)
}
static hg_return_t bench_handler(hg_handle_t h)
static hg_return_t bench_handler(hg_handle_t h)
{
hg_return_t ret = HG_SUCCESS;
bench_in_t bench_in;
......@@ -516,7 +542,7 @@ kv_context *kv_server_register(margo_instance_id mid);
return(NULL);
}
ret = margo_addr_to_string(context->mid, addr_self_string,
&addr_self_string_sz, addr_self);
&addr_self_string_sz, addr_self);
if(ret != HG_SUCCESS)
{
fprintf(stderr, "Error: HG_Addr_self()\n");
......@@ -527,34 +553,41 @@ kv_context *kv_server_register(margo_instance_id mid);
printf("# accepting RPCs on address \"%s\"\n", addr_self_string);
context->open_id = MARGO_REGISTER(context->mid, "open",
open_in_t, open_out_t, open_handler);
open_in_t, open_out_t, open_handler);
context->close_id = MARGO_REGISTER(context->mid, "close",
close_in_t, close_out_t, close_handler);
close_in_t, close_out_t, close_handler);
context->put_id = MARGO_REGISTER(context->mid, "put",
put_in_t, put_out_t, put_handler);
put_in_t, put_out_t, put_handler);
context->put_id = MARGO_REGISTER(context->mid, "bulk_put",
bulk_put_in_t, bulk_put_out_t, bulk_put_handler);
bulk_put_in_t, bulk_put_out_t, bulk_put_handler);
context->get_id = MARGO_REGISTER(context->mid, "get",
get_in_t, get_out_t, get_handler);
get_in_t, get_out_t, get_handler);
context->get_id = MARGO_REGISTER(context->mid, "bulk_get",
bulk_get_in_t, bulk_get_out_t, bulk_get_handler);
bulk_get_in_t, bulk_get_out_t, bulk_get_handler);
context->bench_id = MARGO_REGISTER(context->mid, "bench",
bench_in_t, bench_out_t, bench_handler);
bench_in_t, bench_out_t, bench_handler);
context->shutdown_id = MARGO_REGISTER(context->mid, "shutdown",
void, void, shutdown_handler);
return context;
}
int kv_server_wait_for_shutdown(kv_context *context) {
margo_wait_for_finalize(context->mid);
return HG_SUCCESS;
}
/* this is the same as client. should be moved to common utility library */
int kv_server_deregister(kv_context *context) {
margo_wait_for_finalize(context->mid);
margo_finalize(context->mid);
free(context);
delete(TREE);
return 0;
free(context);
delete(TREE);
return HG_SUCCESS;
}
......@@ -38,11 +38,13 @@ typedef struct kv_context_s {
hg_id_t open_id;
hg_id_t close_id;
hg_id_t bench_id;
hg_id_t shutdown_id;
hg_handle_t put_handle;
hg_handle_t bulk_put_handle; // necessary?
hg_handle_t get_handle;
hg_handle_t bulk_get_handle; // necessary?
hg_handle_t bench_handle;
hg_handle_t shutdown_handle;
/* some keyval dodad goes here so the server can discriminate. Seems
* like it should be some universal identifier we can share with other
* clients */
......@@ -123,6 +125,9 @@ DECLARE_MARGO_RPC_HANDLER(bulk_get_handler)
kv_context *kv_client_register(char *addr_str=0);
kv_context * kv_server_register(margo_instance_id mid);
DECLARE_MARGO_RPC_HANDLER(shutdown_handler)
/* both the same: should probably move to common */
int kv_client_deregister(kv_context *context);
int kv_server_deregister(kv_context *context);
......
......@@ -50,7 +50,10 @@ int main(int argc, char *argv[])
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm clientComm;
if (rank == 0) {
MPI_Comm_split(MPI_COMM_WORLD, MPI_UNDEFINED, rank, &clientComm);
// kv-server
kv_context *context = kv_server_register(argv[1]);
hgctx = margo_get_context(context->mid);
......@@ -68,14 +71,14 @@ int main(int argc, char *argv[])
MPI_Bcast(server_addr_str, 128, MPI_BYTE, 0, MPI_COMM_WORLD);
// process requests until finalized
margo_wait_for_finalize(context->mid);
kv_server_wait_for_shutdown(context);
// now shutdown
printf("rank %d: server deregistering\n", rank);
// now finish cleaning up
kv_server_deregister(context);
printf("rank %d: server deregistered\n", rank);
}
else {
MPI_Comm_split(MPI_COMM_WORLD, 1, rank, &clientComm);
// broadcast (recv) server address
MPI_Bcast(server_addr_str, 128, MPI_BYTE, 0, MPI_COMM_WORLD);
printf("client (rank %d): server add_str: %s\n", rank, server_addr_str);
......@@ -110,7 +113,13 @@ int main(int argc, char *argv[])
// close
ret = kv_close(context);
printf("rank %d: client deregistering\n", rank);
// once all clients are done, one client can signal server
MPI_Barrier(clientComm);
if (rank==1) {
printf("rank %d: sending server a shutdown request\n", rank);
kv_shutdown_server(context);
}
kv_client_deregister(context);
printf("rank %d: client deregistered\n", rank);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment