diff --git a/Makefile.am b/Makefile.am index bbbe5c3f7a2c5bc9d8f4811cf7cf361eb1493c5f..229032052d56461277157a8c4562a20a738601cc 100644 --- a/Makefile.am +++ b/Makefile.am @@ -40,6 +40,6 @@ include Make.rules include $(top_srcdir)/src/Makefile.subdir include $(top_srcdir)/examples/Makefile.subdir include $(top_srcdir)/examples/multiplex/Makefile.subdir -#include $(top_srcdir)/examples/composition/Makefile.subdir +include $(top_srcdir)/examples/composition/Makefile.subdir include $(top_srcdir)/tests/Makefile.subdir diff --git a/examples/composition/composed-benchmark.c b/examples/composition/composed-benchmark.c index 23b4d1f2c3443b9c7a8f3c91eba9ae029ee65923..0e00df25587568c344a1471474f36054a4e48ae8 100644 --- a/examples/composition/composed-benchmark.c +++ b/examples/composition/composed-benchmark.c @@ -8,7 +8,6 @@ #include #include #include -#include #include #include "composed-client-lib.h" @@ -22,8 +21,7 @@ int main(int argc, char **argv) int i; int ret; margo_instance_id mid; - hg_context_t *hg_context; - hg_class_t *hg_class; + hg_return_t hret; hg_addr_t delegator_svr_addr = HG_ADDR_NULL; hg_addr_t data_xfer_svr_addr = HG_ADDR_NULL; hg_handle_t handle; @@ -40,64 +38,36 @@ int main(int argc, char **argv) ret = sscanf(argv[3], "%d", &iterations); assert(ret == 1); - /* boilerplate HG initialization steps */ - /***************************************/ - /* initialize Mercury using the transport portion of the destination * address (i.e., the part before the first : character if present) */ for(i=0; i<11 && argv[1][i] != '\0' && argv[1][i] != ':'; i++) proto[i] = argv[1][i]; + /* TODO: this is a hack for now; I don't really want this to operate in server mode, * but it seems like it needs to for now for sub-service to be able to get back to it */ - hg_class = HG_Init(proto, HG_TRUE); - if(!hg_class) - { - fprintf(stderr, "Error: HG_Init()\n"); - return(-1); - } - hg_context = HG_Context_create(hg_class); - if(!hg_context) - { - fprintf(stderr, "Error: HG_Context_create()\n"); - HG_Finalize(hg_class); - return(-1); - } - - /* set up argobots */ + /* actually start margo */ /***************************************/ - ret = ABT_init(argc, argv); - if(ret != 0) + mid = margo_init(proto, MARGO_SERVER_MODE, 0, -1); + if(mid == MARGO_INSTANCE_NULL) { - fprintf(stderr, "Error: ABT_init()\n"); + fprintf(stderr, "Error: margo_init()\n"); return(-1); } - /* set primary ES to idle without polling */ - ret = ABT_snoozer_xstream_self_set(); - if(ret != 0) - { - fprintf(stderr, "Error: ABT_snoozer_xstream_self_set()\n"); - return(-1); - } - - /* actually start margo */ - /***************************************/ - mid = margo_init(0, 0, hg_context); - /* register core RPC */ - MARGO_REGISTER(hg_class, "my_shutdown_rpc", void, void, - NULL, &my_rpc_shutdown_id); + my_rpc_shutdown_id = MARGO_REGISTER(mid, "my_shutdown_rpc", + void, void, NULL); /* register service APIs */ data_xfer_register_client(mid); composed_register_client(mid); /* find addrs for servers */ - ret = margo_addr_lookup(mid, argv[2], &data_xfer_svr_addr); - assert(ret == 0); - ret = margo_addr_lookup(mid, argv[1], &delegator_svr_addr); - assert(ret == 0); + hret = margo_addr_lookup(mid, argv[2], &data_xfer_svr_addr); + assert(hret == HG_SUCCESS); + hret = margo_addr_lookup(mid, argv[1], &delegator_svr_addr); + assert(hret == HG_SUCCESS); buffer = calloc(1, buffer_sz); assert(buffer); @@ -150,33 +120,28 @@ int main(int argc, char **argv) /* send rpc(s) to shut down server(s) */ sleep(3); printf("Shutting down delegator server.\n"); - ret = HG_Create(hg_context, delegator_svr_addr, my_rpc_shutdown_id, &handle); - assert(ret == 0); - margo_forward(mid, handle, NULL); - HG_Destroy(handle); + hret = margo_create(mid, delegator_svr_addr, my_rpc_shutdown_id, &handle); + assert(hret == HG_SUCCESS); + hret = margo_forward(mid, handle, NULL); + assert(hret == HG_SUCCESS); + margo_destroy(handle); if(strcmp(argv[1], argv[2])) { sleep(3); printf("Shutting down data_xfer server.\n"); - ret = HG_Create(hg_context, data_xfer_svr_addr, my_rpc_shutdown_id, &handle); - assert(ret == 0); - margo_forward(mid, handle, NULL); - HG_Destroy(handle); + hret = margo_create(mid, data_xfer_svr_addr, my_rpc_shutdown_id, &handle); + assert(hret == HG_SUCCESS); + hret = margo_forward(mid, handle, NULL); + assert(hret == HG_SUCCESS); + margo_destroy(handle); } - HG_Addr_free(hg_class, delegator_svr_addr); - HG_Addr_free(hg_class, data_xfer_svr_addr); + margo_addr_free(mid, delegator_svr_addr); + margo_addr_free(mid, data_xfer_svr_addr); /* shut down everything */ margo_finalize(mid); - - ABT_finalize(); - - HG_Context_destroy(hg_context); - HG_Finalize(hg_class); free(buffer); return(0); } - - diff --git a/examples/composition/composed-client-lib.c b/examples/composition/composed-client-lib.c index bf84b0ca49f1e456c4154dac64a6a3b277016cb4..b4b374c39edec001ca73db1b27f56a8e95502540 100644 --- a/examples/composition/composed-client-lib.c +++ b/examples/composition/composed-client-lib.c @@ -22,8 +22,8 @@ static hg_id_t data_xfer_read_id = -1; int composed_register_client(margo_instance_id mid) { - MARGO_REGISTER(mid, "delegator_read", - delegator_read_in_t, delegator_read_out_t, NULL, &delegator_read_id); + delegator_read_id = MARGO_REGISTER(mid, "delegator_read", + delegator_read_in_t, delegator_read_out_t, NULL); return(0); } @@ -31,8 +31,8 @@ int composed_register_client(margo_instance_id mid) int data_xfer_register_client(margo_instance_id mid) { - MARGO_REGISTER(mid, "data_xfer_read", - data_xfer_read_in_t, data_xfer_read_out_t, NULL, &data_xfer_read_id); + data_xfer_read_id = MARGO_REGISTER(mid, "data_xfer_read", + data_xfer_read_in_t, data_xfer_read_out_t, NULL); return(0); } @@ -42,39 +42,37 @@ void composed_read(margo_instance_id mid, hg_addr_t svr_addr, void *buffer, hg_s hg_handle_t handle; delegator_read_in_t in; delegator_read_out_t out; - int ret; - const struct hg_info *hgi; + hg_return_t hret; /* create handle */ - ret = HG_Create(margo_get_context(mid), svr_addr, delegator_read_id, &handle); - assert(ret == 0); + hret = margo_create(mid, svr_addr, delegator_read_id, &handle); + assert(hret == HG_SUCCESS); /* register buffer for rdma/bulk access by server */ - hgi = HG_Get_info(handle); - assert(hgi); - ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, &buffer_sz, + hret = margo_bulk_create(mid, 1, &buffer, &buffer_sz, HG_BULK_WRITE_ONLY, &in.bulk_handle); - assert(ret == 0); + assert(hret == HG_SUCCESS); in.data_xfer_svc_addr = data_xfer_svc_addr_string; #if 0 - HG_Set_target_id(handle, mplex_id); + margo_set_target_id(handle, mplex_id); #endif /* Send rpc. Note that we are also transmitting the bulk handle in the * input struct. It was set above. */ - margo_forward(mid, handle, &in); + hret = margo_forward(mid, handle, &in); + assert(hret == HG_SUCCESS); /* decode response */ - ret = HG_Get_output(handle, &out); - assert(ret == 0); + hret = margo_get_output(handle, &out); + assert(hret == HG_SUCCESS); /* clean up resources consumed by this rpc */ - HG_Bulk_free(in.bulk_handle); - HG_Free_output(handle, &out); - HG_Destroy(handle); + margo_free_output(handle, &out); + margo_bulk_free(in.bulk_handle); + margo_destroy(handle); return; } @@ -84,51 +82,48 @@ void data_xfer_read(margo_instance_id mid, hg_addr_t svr_addr, void *buffer, hg_ hg_handle_t handle; data_xfer_read_in_t in; data_xfer_read_out_t out; - int ret; - const struct hg_info *hgi; + hg_return_t hret; hg_addr_t addr_self; char addr_self_string[128]; hg_size_t addr_self_string_sz = 128; /* create handle */ - ret = HG_Create(margo_get_context(mid), svr_addr, data_xfer_read_id, &handle); - assert(ret == 0); + hret = margo_create(mid, svr_addr, data_xfer_read_id, &handle); + assert(hret == HG_SUCCESS); /* register buffer for rdma/bulk access by server */ - hgi = HG_Get_info(handle); - assert(hgi); - ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, &buffer_sz, + hret = margo_bulk_create(mid, 1, &buffer, &buffer_sz, HG_BULK_WRITE_ONLY, &in.bulk_handle); - assert(ret == 0); + assert(hret == HG_SUCCESS); /* figure out local address */ - ret = HG_Addr_self(margo_get_class(mid), &addr_self); - assert(ret == HG_SUCCESS); + hret = margo_addr_self(mid, &addr_self); + assert(hret == HG_SUCCESS); - ret = HG_Addr_to_string(margo_get_class(mid), addr_self_string, &addr_self_string_sz, addr_self); - assert(ret == HG_SUCCESS); + hret = margo_addr_to_string(mid, addr_self_string, &addr_self_string_sz, addr_self); + assert(hret == HG_SUCCESS); in.client_addr = addr_self_string; #if 0 - HG_Set_target_id(handle, mplex_id); + margo_set_target_id(handle, mplex_id); #endif /* Send rpc. Note that we are also transmitting the bulk handle in the * input struct. It was set above. */ - margo_forward(mid, handle, &in); + hret = margo_forward(mid, handle, &in); + assert(hret == HG_SUCCESS); /* decode response */ - ret = HG_Get_output(handle, &out); - assert(ret == 0); + hret = margo_get_output(handle, &out); + assert(hret == HG_SUCCESS); /* clean up resources consumed by this rpc */ - HG_Bulk_free(in.bulk_handle); - HG_Free_output(handle, &out); - HG_Destroy(handle); - HG_Addr_free(margo_get_class(mid), addr_self); + margo_free_output(handle, &out); + margo_bulk_free(in.bulk_handle); + margo_destroy(handle); + margo_addr_free(mid, addr_self); return; } - diff --git a/examples/composition/composed-svc-daemon.c b/examples/composition/composed-svc-daemon.c index bdd44ae7f3c2d27b62c5641f9d11602d5cb759e5..0c7ffd341946ddff74c940416aa937575b470630 100644 --- a/examples/composition/composed-svc-daemon.c +++ b/examples/composition/composed-svc-daemon.c @@ -11,7 +11,6 @@ #include #include -#include #include #include "data-xfer-service.h" @@ -27,19 +26,17 @@ static void my_rpc_shutdown_ult(hg_handle_t handle) { hg_return_t hret; - const struct hg_info *hgi; margo_instance_id mid; //printf("Got RPC request to shutdown\n"); - hgi = HG_Get_info(handle); - assert(hgi); mid = margo_hg_handle_get_instance(handle); + assert(mid != MARGO_INSTANCE_NULL); hret = margo_respond(mid, handle, NULL); assert(hret == HG_SUCCESS); - HG_Destroy(handle); + margo_destroy(handle); /* NOTE: we assume that the server daemon is using * margo_wait_for_finalize() to suspend until this RPC executes, so there @@ -54,10 +51,8 @@ DEFINE_MARGO_RPC_HANDLER(my_rpc_shutdown_ult) int main(int argc, char **argv) { - int ret; + hg_return_t hret; margo_instance_id mid; - hg_context_t *hg_context; - hg_class_t *hg_class; hg_addr_t addr_self; char addr_self_string[128]; hg_size_t addr_self_string_sz = 128; @@ -75,73 +70,44 @@ int main(int argc, char **argv) svc_list = strdup(argv[2]); assert(svc_list); - /* boilerplate HG initialization steps */ + /* actually start margo -- this step encapsulates the Mercury and + * Argobots initialization and must precede their use */ + /* Use the calling xstream to drive progress and execute handlers. */ /***************************************/ - hg_class = HG_Init(argv[1], HG_TRUE); - if(!hg_class) + mid = margo_init(argv[1], MARGO_SERVER_MODE, 0, -1); + if(mid == MARGO_INSTANCE_NULL) { - fprintf(stderr, "Error: HG_Init()\n"); - return(-1); - } - hg_context = HG_Context_create(hg_class); - if(!hg_context) - { - fprintf(stderr, "Error: HG_Context_create()\n"); - HG_Finalize(hg_class); + fprintf(stderr, "Error: margo_init()\n"); return(-1); } /* figure out what address this server is listening on */ - ret = HG_Addr_self(hg_class, &addr_self); - if(ret != HG_SUCCESS) + hret = margo_addr_self(mid, &addr_self); + if(hret != HG_SUCCESS) { - fprintf(stderr, "Error: HG_Addr_self()\n"); - HG_Context_destroy(hg_context); - HG_Finalize(hg_class); + fprintf(stderr, "Error: margo_addr_self()\n"); + margo_finalize(mid); return(-1); } - ret = HG_Addr_to_string(hg_class, addr_self_string, &addr_self_string_sz, addr_self); - if(ret != HG_SUCCESS) + hret = margo_addr_to_string(mid, addr_self_string, &addr_self_string_sz, addr_self); + if(hret != HG_SUCCESS) { - fprintf(stderr, "Error: HG_Addr_self()\n"); - HG_Context_destroy(hg_context); - HG_Finalize(hg_class); - HG_Addr_free(hg_class, addr_self); + fprintf(stderr, "Error: margo_addr_to_string()\n"); + margo_addr_free(mid, addr_self); + margo_finalize(mid); return(-1); } - HG_Addr_free(hg_class, addr_self); + margo_addr_free(mid, addr_self); printf("# accepting RPCs on address \"%s\"\n", addr_self_string); - /* set up argobots */ - /***************************************/ - ret = ABT_init(argc, argv); - if(ret != 0) - { - fprintf(stderr, "Error: ABT_init()\n"); - return(-1); - } - - /* set primary ES to idle without polling */ - ret = ABT_snoozer_xstream_self_set(); - if(ret != 0) - { - fprintf(stderr, "Error: ABT_snoozer_xstream_self_set()\n"); - return(-1); - } - - /* actually start margo */ - /***************************************/ - mid = margo_init(0, 0, hg_context); - assert(mid); - /* register RPCs and services */ /***************************************/ /* register a shutdown RPC as just a generic handler; not part of a * multiplexed service */ - MARGO_REGISTER(mid, "my_shutdown_rpc", void, void, my_rpc_shutdown_ult, MARGO_RPC_ID_IGNORE); + MARGO_REGISTER(mid, "my_shutdown_rpc", void, void, my_rpc_shutdown_ult); handler_pool = margo_get_handler_pool(mid); svc = strtok(svc_list, ","); @@ -178,11 +144,5 @@ int main(int argc, char **argv) svc2_deregister(mid, *handler_pool, 3); #endif - ABT_finalize(); - - HG_Context_destroy(hg_context); - HG_Finalize(hg_class); - return(0); } - diff --git a/examples/composition/data-xfer-service.c b/examples/composition/data-xfer-service.c index 84684702cd41da87ecfecf7f525ebfe40bbdf45b..ae69566e4ec56b6a456ee30c83f5e435b09f032c 100644 --- a/examples/composition/data-xfer-service.c +++ b/examples/composition/data-xfer-service.c @@ -18,7 +18,6 @@ static void data_xfer_read_ult(hg_handle_t handle) hg_return_t hret; data_xfer_read_out_t out; data_xfer_read_in_t in; - int ret; const struct hg_info *hgi; margo_instance_id mid; hg_addr_t client_addr; @@ -28,10 +27,12 @@ static void data_xfer_read_ult(hg_handle_t handle) pthread_t my_tid; #endif - ret = HG_Get_input(handle, &in); - assert(ret == HG_SUCCESS); - hgi = HG_Get_info(handle); + hret = margo_get_input(handle, &in); + assert(hret == HG_SUCCESS); + hgi = margo_get_info(handle); assert(hgi); + mid = margo_hg_info_get_instance(hgi); + assert(mid != MARGO_INSTANCE_NULL); #if 0 ABT_xstream_self(&my_xstream); @@ -43,8 +44,6 @@ static void data_xfer_read_ult(hg_handle_t handle) out.ret = 0; - mid = margo_hg_handle_get_instance(handle); - if(!in.client_addr) client_addr = hgi->addr; else @@ -54,20 +53,20 @@ static void data_xfer_read_ult(hg_handle_t handle) } /* do bulk transfer from client to server */ - ret = margo_bulk_transfer(mid, HG_BULK_PUSH, + hret = margo_bulk_transfer(mid, HG_BULK_PUSH, client_addr, in.bulk_handle, 0, - g_buffer_bulk_handle, 0, g_buffer_size); - assert(ret == 0); + g_buffer_bulk_handle, 0, g_buffer_size, HG_OP_ID_IGNORE); + assert(hret == HG_SUCCESS); if(in.client_addr) - HG_Addr_free(margo_get_class(mid), client_addr); + margo_addr_free(mid, client_addr); - HG_Free_input(handle, &in); + margo_free_input(handle, &in); - hret = HG_Respond(handle, NULL, NULL, &out); + hret = margo_respond(mid, handle, &out); assert(hret == HG_SUCCESS); - HG_Destroy(handle); + margo_destroy(handle); return; } @@ -82,24 +81,23 @@ int data_xfer_service_register(margo_instance_id mid, ABT_pool pool, uint32_t mp assert(g_buffer); /* register local target buffer for bulk access */ - hret = HG_Bulk_create(margo_get_class(mid), 1, &g_buffer, + hret = margo_bulk_create(mid, 1, &g_buffer, &g_buffer_size, HG_BULK_READ_ONLY, &g_buffer_bulk_handle); assert(hret == HG_SUCCESS); /* register RPC handler */ MARGO_REGISTER_MPLEX(mid, "data_xfer_read", data_xfer_read_in_t, data_xfer_read_out_t, - data_xfer_read_ult, mplex_id, pool, MARGO_RPC_ID_IGNORE); + data_xfer_read_ult, mplex_id, pool); return(0); } void data_xfer_deregister(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id) { - HG_Bulk_free(g_buffer_bulk_handle); + margo_bulk_free(g_buffer_bulk_handle); free(g_buffer); /* TODO: undo what was done in data_xfer_register() */ return; } - diff --git a/examples/composition/delegator-service.c b/examples/composition/delegator-service.c index eb1d49748b5432dd8d7cd80a33f5ca00cee9f7bb..8b5feaf804032e4f8c50ade629b61fd94d481eec 100644 --- a/examples/composition/delegator-service.c +++ b/examples/composition/delegator-service.c @@ -19,7 +19,6 @@ static void delegator_read_ult(hg_handle_t handle) delegator_read_in_t in; data_xfer_read_in_t in_relay; data_xfer_read_out_t out_relay; - int ret; const struct hg_info *hgi; margo_instance_id mid; hg_addr_t data_xfer_svc_addr; @@ -33,10 +32,12 @@ static void delegator_read_ult(hg_handle_t handle) pthread_t my_tid; #endif - ret = HG_Get_input(handle, &in); - assert(ret == HG_SUCCESS); - hgi = HG_Get_info(handle); + hret = margo_get_input(handle, &in); + assert(hret == HG_SUCCESS); + hgi = margo_get_info(handle); assert(hgi); + mid = margo_hg_info_get_instance(hgi); + assert(mid != MARGO_INSTANCE_NULL); #if 0 ABT_xstream_self(&my_xstream); @@ -48,34 +49,33 @@ static void delegator_read_ult(hg_handle_t handle) out.ret = 0; - mid = margo_hg_handle_get_instance(handle); - hret = margo_addr_lookup(mid, in.data_xfer_svc_addr, &data_xfer_svc_addr); assert(hret == HG_SUCCESS); /* relay to microservice */ - hret = HG_Create(margo_get_context(mid), data_xfer_svc_addr, g_data_xfer_read_id, &handle_relay); + hret = margo_create(mid, data_xfer_svc_addr, g_data_xfer_read_id, &handle_relay); assert(hret == HG_SUCCESS); /* pass through bulk handle */ in_relay.bulk_handle = in.bulk_handle; /* get addr of client to relay to data_xfer service */ - hret = HG_Addr_to_string(margo_get_class(mid), client_addr_string, &client_addr_string_sz, hgi->addr); + hret = margo_addr_to_string(mid, client_addr_string, &client_addr_string_sz, hgi->addr); assert(hret == HG_SUCCESS); in_relay.client_addr = client_addr_string; - margo_forward(mid, handle_relay, &in_relay); + hret = margo_forward(mid, handle_relay, &in_relay); + assert(hret == HG_SUCCESS); - hret = HG_Get_output(handle_relay, &out_relay); + hret = margo_get_output(handle_relay, &out_relay); assert(hret == HG_SUCCESS); - HG_Free_input(handle, &in); - HG_Free_output(handle_relay, &out); + margo_free_input(handle, &in); + margo_free_output(handle_relay, &out); - hret = HG_Respond(handle, NULL, NULL, &out); + hret = margo_respond(mid, handle, &out); assert(hret == HG_SUCCESS); - HG_Addr_free(margo_get_class(mid), data_xfer_svc_addr); - HG_Destroy(handle); - HG_Destroy(handle_relay); + margo_addr_free(mid, data_xfer_svc_addr); + margo_destroy(handle); + margo_destroy(handle_relay); return; } @@ -87,13 +87,13 @@ int delegator_service_register(margo_instance_id mid, ABT_pool pool, uint32_t mp /* NOTE: this RPC may already be registered if this process has already registered a * data-xfer service */ - MARGO_REGISTER(mid, "data_xfer_read", - data_xfer_read_in_t, data_xfer_read_out_t, NULL, &g_data_xfer_read_id); + g_data_xfer_read_id = MARGO_REGISTER(mid, "data_xfer_read", + data_xfer_read_in_t, data_xfer_read_out_t, NULL); /* register RPC handler */ MARGO_REGISTER_MPLEX(mid, "delegator_read", delegator_read_in_t, delegator_read_out_t, - delegator_read_ult, mplex_id, pool, MARGO_RPC_ID_IGNORE); + delegator_read_ult, mplex_id, pool); return(0); } @@ -103,4 +103,3 @@ void delegator_deregister(margo_instance_id mid, ABT_pool pool, uint32_t mplex_i /* TODO: undo what was done in delegator_register() */ return; } -