Commit 2bc6ec83 authored by Philip Carns's avatar Philip Carns

reorg examples to use rpc for shutdown

parent 34637d8d
bin_PROGRAMS += examples/client examples/server bin_PROGRAMS += examples/client examples/server
examples_client_SOURCES = \ examples_client_SOURCES = \
examples/client.c \ examples/client.c
examples/my-rpc.c
examples_client_LDADD = src/libmargo.a examples_client_LDADD = src/libmargo.a
examples_server_SOURCES = \ examples_server_SOURCES = \
......
...@@ -33,6 +33,7 @@ struct run_my_rpc_args ...@@ -33,6 +33,7 @@ struct run_my_rpc_args
static void run_my_rpc(void *_arg); static void run_my_rpc(void *_arg);
static hg_id_t my_rpc_id; static hg_id_t my_rpc_id;
static hg_id_t my_rpc_shutdown_id;
int main(int argc, char **argv) int main(int argc, char **argv)
{ {
...@@ -49,6 +50,8 @@ int main(int argc, char **argv) ...@@ -49,6 +50,8 @@ int main(int argc, char **argv)
na_context_t *na_context; na_context_t *na_context;
hg_context_t *hg_context; hg_context_t *hg_context;
hg_class_t *hg_class; hg_class_t *hg_class;
na_addr_t svr_addr = NA_ADDR_NULL;
hg_handle_t handle;
/* boilerplate HG initialization steps */ /* boilerplate HG initialization steps */
/***************************************/ /***************************************/
...@@ -133,7 +136,9 @@ int main(int argc, char **argv) ...@@ -133,7 +136,9 @@ int main(int argc, char **argv)
/* register RPC */ /* register RPC */
my_rpc_id = MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t, my_rpc_id = MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t,
my_rpc_ult_handler); NULL);
my_rpc_shutdown_id = MERCURY_REGISTER(hg_class, "my_shutdown_rpc", void, void,
NULL);
for(i=0; i<4; i++) for(i=0; i<4; i++)
{ {
...@@ -176,6 +181,17 @@ int main(int argc, char **argv) ...@@ -176,6 +181,17 @@ int main(int argc, char **argv)
} }
} }
/* send one rpc to server to shut it down */
/* find addr for server */
ret = margo_na_addr_lookup(mid, network_class, na_context, "tcp://localhost:1234", &svr_addr);
assert(ret == 0);
/* create handle */
ret = HG_Create(hg_context, svr_addr, my_rpc_shutdown_id, &handle);
assert(ret == 0);
margo_forward(mid, handle, NULL);
/* shut down everything */ /* shut down everything */
margo_finalize(mid); margo_finalize(mid);
......
...@@ -18,6 +18,8 @@ ...@@ -18,6 +18,8 @@
* close. * close.
*/ */
extern ABT_eventual* shutdown_eventual;
static void my_rpc_ult(void *_arg) static void my_rpc_ult(void *_arg)
{ {
hg_handle_t *handle = _arg; hg_handle_t *handle = _arg;
...@@ -84,3 +86,30 @@ static void my_rpc_ult(void *_arg) ...@@ -84,3 +86,30 @@ static void my_rpc_ult(void *_arg)
return; return;
} }
DEFINE_MARGO_RPC_HANDLER(my_rpc_ult) DEFINE_MARGO_RPC_HANDLER(my_rpc_ult)
static void my_rpc_shutdown_ult(void *_arg)
{
hg_handle_t *handle = _arg;
hg_return_t hret;
struct hg_info *hgi;
margo_instance_id mid;
printf("Got RPC request to shutdown\n");
hgi = HG_Get_info(*handle);
assert(hgi);
mid = margo_hg_class_to_instance(hgi->hg_class);
hret = HG_Respond(*handle, NULL, NULL, NULL);
assert(hret == HG_SUCCESS);
HG_Destroy(*handle);
margo_finalize(mid);
ABT_eventual_set(*shutdown_eventual, NULL, 0);
return;
}
DEFINE_MARGO_RPC_HANDLER(my_rpc_shutdown_ult)
...@@ -17,4 +17,6 @@ MERCURY_GEN_PROC(my_rpc_in_t, ...@@ -17,4 +17,6 @@ MERCURY_GEN_PROC(my_rpc_in_t,
((hg_bulk_t)(bulk_handle))) ((hg_bulk_t)(bulk_handle)))
DECLARE_MARGO_RPC_HANDLER(my_rpc_ult) DECLARE_MARGO_RPC_HANDLER(my_rpc_ult)
DECLARE_MARGO_RPC_HANDLER(my_rpc_shutdown_ult)
#endif /* __MY_RPC */ #endif /* __MY_RPC */
...@@ -13,6 +13,8 @@ ...@@ -13,6 +13,8 @@
#include "my-rpc.h" #include "my-rpc.h"
ABT_eventual* shutdown_eventual;
/* example server program. Starts HG engine, registers the example RPC type, /* example server program. Starts HG engine, registers the example RPC type,
* and then executes indefinitely. * and then executes indefinitely.
*/ */
...@@ -32,6 +34,8 @@ int main(int argc, char **argv) ...@@ -32,6 +34,8 @@ int main(int argc, char **argv)
hg_class_t *hg_class; hg_class_t *hg_class;
int single_pool_mode = 0; int single_pool_mode = 0;
shutdown_eventual = &eventual;
if(argc > 2) if(argc > 2)
{ {
fprintf(stderr, "Usage: ./server <single>\n"); fprintf(stderr, "Usage: ./server <single>\n");
...@@ -136,10 +140,13 @@ int main(int argc, char **argv) ...@@ -136,10 +140,13 @@ int main(int argc, char **argv)
mid = margo_init(handler_pool, handler_pool, hg_context, hg_class); mid = margo_init(handler_pool, handler_pool, hg_context, hg_class);
else else
mid = margo_init(progress_pool, handler_pool, hg_context, hg_class); mid = margo_init(progress_pool, handler_pool, hg_context, hg_class);
assert(mid);
/* register RPC */ /* register RPC */
MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t, MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t,
my_rpc_ult_handler); my_rpc_ult_handler);
MERCURY_REGISTER(hg_class, "my_shutdown_rpc", void, void,
my_rpc_shutdown_ult_handler);
/* suspend this ULT until someone tells us to shut down */ /* suspend this ULT until someone tells us to shut down */
ret = ABT_eventual_create(0, &eventual); ret = ABT_eventual_create(0, &eventual);
...@@ -149,11 +156,9 @@ int main(int argc, char **argv) ...@@ -149,11 +156,9 @@ int main(int argc, char **argv)
return(-1); return(-1);
} }
/* wait for shutdown (assume that margo will be finalized by an RPC) */
ABT_eventual_wait(eventual, NULL); ABT_eventual_wait(eventual, NULL);
/* shut down everything */
margo_finalize(mid);
if(!single_pool_mode) if(!single_pool_mode)
{ {
ABT_xstream_join(progress_xstream); ABT_xstream_join(progress_xstream);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment