From bb756ca2bb1c42d580fbaeb21bb5dfa40791cdf3 Mon Sep 17 00:00:00 2001 From: Phil Carns Date: Sat, 20 May 2017 10:27:02 +0200 Subject: [PATCH] begin stubbing composed service example --- Makefile.am | 1 + examples/composition/Makefile.subdir | 6 + examples/composition/data-xfer-proto.h | 16 ++ examples/composition/data-xfer-service.c | 91 +++++++++++ examples/composition/data-xfer-service.h | 15 ++ examples/composition/svc-daemon.c | 197 +++++++++++++++++++++++ 6 files changed, 326 insertions(+) create mode 100644 examples/composition/Makefile.subdir create mode 100644 examples/composition/data-xfer-proto.h create mode 100644 examples/composition/data-xfer-service.c create mode 100644 examples/composition/data-xfer-service.h create mode 100644 examples/composition/svc-daemon.c diff --git a/Makefile.am b/Makefile.am index b5121ea..2290320 100644 --- a/Makefile.am +++ b/Makefile.am @@ -40,5 +40,6 @@ include Make.rules include $(top_srcdir)/src/Makefile.subdir include $(top_srcdir)/examples/Makefile.subdir include $(top_srcdir)/examples/multiplex/Makefile.subdir +include $(top_srcdir)/examples/composition/Makefile.subdir include $(top_srcdir)/tests/Makefile.subdir diff --git a/examples/composition/Makefile.subdir b/examples/composition/Makefile.subdir new file mode 100644 index 0000000..0c028e2 --- /dev/null +++ b/examples/composition/Makefile.subdir @@ -0,0 +1,6 @@ +noinst_PROGRAMS += examples/composition/svc-daemon + +examples_composition_svc_daemon_SOURCES = \ + examples/composition/svc-daemon.c \ + examples/composition/data-xfer-service.c + diff --git a/examples/composition/data-xfer-proto.h b/examples/composition/data-xfer-proto.h new file mode 100644 index 0000000..5792c1a --- /dev/null +++ b/examples/composition/data-xfer-proto.h @@ -0,0 +1,16 @@ +/* + * (C) 2015 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ + +#ifndef __DATA_XFER_PROTO +#define __DATA_XFER_PROTO + +#include + +MERCURY_GEN_PROC(data_xfer_read_out_t, ((int32_t)(ret))) +MERCURY_GEN_PROC(data_xfer_read_in_t, + ((hg_bulk_t)(bulk_handle))) + +#endif /* __DATA_XFER_PROTO */ diff --git a/examples/composition/data-xfer-service.c b/examples/composition/data-xfer-service.c new file mode 100644 index 0000000..8537286 --- /dev/null +++ b/examples/composition/data-xfer-service.c @@ -0,0 +1,91 @@ +/* + * (C) 2015 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ + +#include +#include +#include "data-xfer-proto.h" +#include "data-xfer-service.h" + +static hg_size_t g_buffer_size = 8*1024*1024; +static void *g_buffer; +static hg_bulk_t g_buffer_bulk_handle; + +static void data_xfer_read_ult(hg_handle_t handle) +{ + hg_return_t hret; + data_xfer_read_out_t out; + data_xfer_read_in_t in; + int ret; + const struct hg_info *hgi; + margo_instance_id mid; +#if 0 + ABT_thread my_ult; + ABT_xstream my_xstream; + pthread_t my_tid; +#endif + + ret = HG_Get_input(handle, &in); + assert(ret == HG_SUCCESS); + hgi = HG_Get_info(handle); + assert(hgi); + +#if 0 + ABT_xstream_self(&my_xstream); + ABT_thread_self(&my_ult); + my_tid = pthread_self(); + printf("svc1: do_thing: mplex_id: %u, ult: %p, xstream %p, tid: %lu\n", + hgi->target_id, my_ult, my_xstream, my_tid); +#endif + + out.ret = 0; + + mid = margo_hg_class_to_instance(hgi->hg_class); + + /* do bulk transfer from client to server */ + ret = margo_bulk_transfer(mid, HG_BULK_PULL, + hgi->addr, in.bulk_handle, 0, + g_buffer_bulk_handle, 0, g_buffer_size); + assert(ret == 0); + + HG_Free_input(handle, &in); + + hret = HG_Respond(handle, NULL, NULL, &out); + assert(hret == HG_SUCCESS); + + HG_Destroy(handle); + + return; +} +DEFINE_MARGO_RPC_HANDLER(data_xfer_read_ult) + +int data_xfer_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id) +{ + hg_return_t hret; + + /* set up global target buffer for bulk transfer */ + g_buffer = calloc(1, g_buffer_size); + assert(g_buffer); + + /* register local target buffer for bulk access */ + hret = HG_Bulk_create(margo_get_class(mid), 1, &g_buffer, + &g_buffer_size, HG_BULK_READ_ONLY, &g_buffer_bulk_handle); + assert(hret == HG_SUCCESS); + + /* register RPC handler */ + MARGO_REGISTER(mid, "data_xfer_read", data_xfer_read_in_t, data_xfer_read_out_t, data_xfer_read_ult_handler, mplex_id, pool); + + return(0); +} + +void data_xfer_deregister(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id) +{ + HG_Bulk_free(g_buffer_bulk_handle); + free(g_buffer); + + /* TODO: undo what was done in data_xfer_register() */ + return; +} + diff --git a/examples/composition/data-xfer-service.h b/examples/composition/data-xfer-service.h new file mode 100644 index 0000000..4a18ede --- /dev/null +++ b/examples/composition/data-xfer-service.h @@ -0,0 +1,15 @@ +/* + * (C) 2015 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ + +#ifndef __DATA_XFER_SERVICE +#define __DATA_XFER_SERVICE + +#include + +int data_xfer_service_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id); +void data_xfer_service_deregister(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id); + +#endif /* __DATA_XFER_SERVICE */ diff --git a/examples/composition/svc-daemon.c b/examples/composition/svc-daemon.c new file mode 100644 index 0000000..0f97887 --- /dev/null +++ b/examples/composition/svc-daemon.c @@ -0,0 +1,197 @@ +/* + * (C) 2015 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ + +#include +#include +#include +#include +#include +#include + +#include "data-xfer-service.h" + +/* server program that starts a skeleton for sub-services within + * this process to register with + */ + +/* this is a "common" rpc that is handled by the core daemon + */ + +static void my_rpc_shutdown_ult(hg_handle_t handle) +{ + hg_return_t hret; + const struct hg_info *hgi; + margo_instance_id mid; + + //printf("Got RPC request to shutdown\n"); + + hgi = HG_Get_info(handle); + assert(hgi); + mid = margo_hg_class_to_instance(hgi->hg_class); + + hret = margo_respond(mid, handle, NULL); + assert(hret == HG_SUCCESS); + + HG_Destroy(handle); + + /* NOTE: we assume that the server daemon is using + * margo_wait_for_finalize() to suspend until this RPC executes, so there + * is no need to send any extra signal to notify it. + */ + margo_finalize(mid); + + return; +} +DEFINE_MARGO_RPC_HANDLER(my_rpc_shutdown_ult) + + +int main(int argc, char **argv) +{ + int ret; + margo_instance_id mid; + hg_context_t *hg_context; + hg_class_t *hg_class; + char proto[12] = {0}; + int i; + hg_addr_t addr_self; + char addr_self_string[128]; + hg_size_t addr_self_string_sz = 128; + ABT_xstream svc1_xstream2; + ABT_pool svc1_pool2; + ABT_pool *handler_pool; + + if(argc != 2) + { + fprintf(stderr, "Usage: ./server \n"); + fprintf(stderr, "Example: ./server delegator,data-xfer na+sm://\n"); + return(-1); + } + + /* boilerplate HG initialization steps */ + /***************************************/ + hg_class = HG_Init(argv[1], HG_TRUE); + if(!hg_class) + { + fprintf(stderr, "Error: HG_Init()\n"); + return(-1); + } + hg_context = HG_Context_create(hg_class); + if(!hg_context) + { + fprintf(stderr, "Error: HG_Context_create()\n"); + HG_Finalize(hg_class); + return(-1); + } + + /* figure out what address this server is listening on */ + ret = HG_Addr_self(hg_class, &addr_self); + if(ret != HG_SUCCESS) + { + fprintf(stderr, "Error: HG_Addr_self()\n"); + HG_Context_destroy(hg_context); + HG_Finalize(hg_class); + return(-1); + } + ret = HG_Addr_to_string(hg_class, addr_self_string, &addr_self_string_sz, addr_self); + if(ret != HG_SUCCESS) + { + fprintf(stderr, "Error: HG_Addr_self()\n"); + HG_Context_destroy(hg_context); + HG_Finalize(hg_class); + HG_Addr_free(hg_class, addr_self); + return(-1); + } + HG_Addr_free(hg_class, addr_self); + + for(i=0; i<11 && argv[1][i] != '\0' && argv[1][i] != ':'; i++) + proto[i] = argv[1][i]; + printf("# accepting RPCs on address \"%s://%s\"\n", proto, addr_self_string); + + /* set up argobots */ + /***************************************/ + ret = ABT_init(argc, argv); + if(ret != 0) + { + fprintf(stderr, "Error: ABT_init()\n"); + return(-1); + } + + /* set primary ES to idle without polling */ + ret = ABT_snoozer_xstream_self_set(); + if(ret != 0) + { + fprintf(stderr, "Error: ABT_snoozer_xstream_self_set()\n"); + return(-1); + } + + /* actually start margo */ + /***************************************/ + mid = margo_init(0, 0, hg_context); + assert(mid); + + /* register RPCs and services */ + /***************************************/ + + /* register a shutdown RPC as just a generic handler; not part of a + * multiplexed service + */ + MERCURY_REGISTER(hg_class, "my_shutdown_rpc", void, void, + my_rpc_shutdown_ult_handler); + +#if 0 + /* register svc1, with mplex_id 1, to execute on the default handler pool + * used by Margo + */ + handler_pool = margo_get_handler_pool(mid); + ret = svc1_register(mid, *handler_pool, 1); + assert(ret == 0); + + /* create a dedicated and pool for another instance of svc1 */ + ret = ABT_snoozer_xstream_create(1, &svc1_pool2, &svc1_xstream2); + assert(ret == 0); + /* register svc1, with mplex_id 2, to execute on a separate pool. This + * will result in svc1 being registered twice, with the client being able + * to dictate which instance they want to target + */ + ret = svc1_register(mid, svc1_pool2, 2); + assert(ret == 0); + + /* register svc2, with mplex_id 3, to execute on the default handler pool + * used by Margo + */ + handler_pool = margo_get_handler_pool(mid); + ret = svc2_register(mid, *handler_pool, 3); + assert(ret == 0); +#endif + + /* shut things down */ + /****************************************/ + + /* NOTE: there isn't anything else for the server to do at this point + * except wait for itself to be shut down. The + * margo_wait_for_finalize() call here yields to let Margo drive + * progress until that happens. + */ + margo_wait_for_finalize(mid); + + /* TODO: rethink this; can't touch mid after wait for finalize */ +#if 0 + svc1_deregister(mid, *handler_pool, 1); + svc1_deregister(mid, svc1_pool2, 2); + svc2_deregister(mid, *handler_pool, 3); +#endif + + ABT_xstream_join(svc1_xstream2); + ABT_xstream_free(&svc1_xstream2); + + ABT_finalize(); + + HG_Context_destroy(hg_context); + HG_Finalize(hg_class); + + return(0); +} + -- 2.26.2