diff --git a/examples/composition/composed-benchmark.c b/examples/composition/composed-benchmark.c index 23b4d1f2c3443b9c7a8f3c91eba9ae029ee65923..0e00df25587568c344a1471474f36054a4e48ae8 100644 --- a/examples/composition/composed-benchmark.c +++ b/examples/composition/composed-benchmark.c @@ -8,7 +8,6 @@ #include #include #include -#include #include #include "composed-client-lib.h" @@ -22,8 +21,7 @@ int main(int argc, char **argv) int i; int ret; margo_instance_id mid; - hg_context_t *hg_context; - hg_class_t *hg_class; + hg_return_t hret; hg_addr_t delegator_svr_addr = HG_ADDR_NULL; hg_addr_t data_xfer_svr_addr = HG_ADDR_NULL; hg_handle_t handle; @@ -40,64 +38,36 @@ int main(int argc, char **argv) ret = sscanf(argv[3], "%d", &iterations); assert(ret == 1); - /* boilerplate HG initialization steps */ - /***************************************/ - /* initialize Mercury using the transport portion of the destination * address (i.e., the part before the first : character if present) */ for(i=0; i<11 && argv[1][i] != '\0' && argv[1][i] != ':'; i++) proto[i] = argv[1][i]; + /* TODO: this is a hack for now; I don't really want this to operate in server mode, * but it seems like it needs to for now for sub-service to be able to get back to it */ - hg_class = HG_Init(proto, HG_TRUE); - if(!hg_class) - { - fprintf(stderr, "Error: HG_Init()\n"); - return(-1); - } - hg_context = HG_Context_create(hg_class); - if(!hg_context) - { - fprintf(stderr, "Error: HG_Context_create()\n"); - HG_Finalize(hg_class); - return(-1); - } - - /* set up argobots */ + /* actually start margo */ /***************************************/ - ret = ABT_init(argc, argv); - if(ret != 0) + mid = margo_init(proto, MARGO_SERVER_MODE, 0, -1); + if(mid == MARGO_INSTANCE_NULL) { - fprintf(stderr, "Error: ABT_init()\n"); + fprintf(stderr, "Error: margo_init()\n"); return(-1); } - /* set primary ES to idle without polling */ - ret = ABT_snoozer_xstream_self_set(); - if(ret != 0) - { - fprintf(stderr, "Error: ABT_snoozer_xstream_self_set()\n"); - return(-1); - } - - /* actually start margo */ - /***************************************/ - mid = margo_init(0, 0, hg_context); - /* register core RPC */ - MARGO_REGISTER(hg_class, "my_shutdown_rpc", void, void, - NULL, &my_rpc_shutdown_id); + my_rpc_shutdown_id = MARGO_REGISTER(mid, "my_shutdown_rpc", + void, void, NULL); /* register service APIs */ data_xfer_register_client(mid); composed_register_client(mid); /* find addrs for servers */ - ret = margo_addr_lookup(mid, argv[2], &data_xfer_svr_addr); - assert(ret == 0); - ret = margo_addr_lookup(mid, argv[1], &delegator_svr_addr); - assert(ret == 0); + hret = margo_addr_lookup(mid, argv[2], &data_xfer_svr_addr); + assert(hret == HG_SUCCESS); + hret = margo_addr_lookup(mid, argv[1], &delegator_svr_addr); + assert(hret == HG_SUCCESS); buffer = calloc(1, buffer_sz); assert(buffer); @@ -150,33 +120,28 @@ int main(int argc, char **argv) /* send rpc(s) to shut down server(s) */ sleep(3); printf("Shutting down delegator server.\n"); - ret = HG_Create(hg_context, delegator_svr_addr, my_rpc_shutdown_id, &handle); - assert(ret == 0); - margo_forward(mid, handle, NULL); - HG_Destroy(handle); + hret = margo_create(mid, delegator_svr_addr, my_rpc_shutdown_id, &handle); + assert(hret == HG_SUCCESS); + hret = margo_forward(mid, handle, NULL); + assert(hret == HG_SUCCESS); + margo_destroy(handle); if(strcmp(argv[1], argv[2])) { sleep(3); printf("Shutting down data_xfer server.\n"); - ret = HG_Create(hg_context, data_xfer_svr_addr, my_rpc_shutdown_id, &handle); - assert(ret == 0); - margo_forward(mid, handle, NULL); - HG_Destroy(handle); + hret = margo_create(mid, data_xfer_svr_addr, my_rpc_shutdown_id, &handle); + assert(hret == HG_SUCCESS); + hret = margo_forward(mid, handle, NULL); + assert(hret == HG_SUCCESS); + margo_destroy(handle); } - HG_Addr_free(hg_class, delegator_svr_addr); - HG_Addr_free(hg_class, data_xfer_svr_addr); + margo_addr_free(mid, delegator_svr_addr); + margo_addr_free(mid, data_xfer_svr_addr); /* shut down everything */ margo_finalize(mid); - - ABT_finalize(); - - HG_Context_destroy(hg_context); - HG_Finalize(hg_class); free(buffer); return(0); } - - diff --git a/examples/composition/composed-client-lib.c b/examples/composition/composed-client-lib.c index bf84b0ca49f1e456c4154dac64a6a3b277016cb4..b4b374c39edec001ca73db1b27f56a8e95502540 100644 --- a/examples/composition/composed-client-lib.c +++ b/examples/composition/composed-client-lib.c @@ -22,8 +22,8 @@ static hg_id_t data_xfer_read_id = -1; int composed_register_client(margo_instance_id mid) { - MARGO_REGISTER(mid, "delegator_read", - delegator_read_in_t, delegator_read_out_t, NULL, &delegator_read_id); + delegator_read_id = MARGO_REGISTER(mid, "delegator_read", + delegator_read_in_t, delegator_read_out_t, NULL); return(0); } @@ -31,8 +31,8 @@ int composed_register_client(margo_instance_id mid) int data_xfer_register_client(margo_instance_id mid) { - MARGO_REGISTER(mid, "data_xfer_read", - data_xfer_read_in_t, data_xfer_read_out_t, NULL, &data_xfer_read_id); + data_xfer_read_id = MARGO_REGISTER(mid, "data_xfer_read", + data_xfer_read_in_t, data_xfer_read_out_t, NULL); return(0); } @@ -42,39 +42,37 @@ void composed_read(margo_instance_id mid, hg_addr_t svr_addr, void *buffer, hg_s hg_handle_t handle; delegator_read_in_t in; delegator_read_out_t out; - int ret; - const struct hg_info *hgi; + hg_return_t hret; /* create handle */ - ret = HG_Create(margo_get_context(mid), svr_addr, delegator_read_id, &handle); - assert(ret == 0); + hret = margo_create(mid, svr_addr, delegator_read_id, &handle); + assert(hret == HG_SUCCESS); /* register buffer for rdma/bulk access by server */ - hgi = HG_Get_info(handle); - assert(hgi); - ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, &buffer_sz, + hret = margo_bulk_create(mid, 1, &buffer, &buffer_sz, HG_BULK_WRITE_ONLY, &in.bulk_handle); - assert(ret == 0); + assert(hret == HG_SUCCESS); in.data_xfer_svc_addr = data_xfer_svc_addr_string; #if 0 - HG_Set_target_id(handle, mplex_id); + margo_set_target_id(handle, mplex_id); #endif /* Send rpc. Note that we are also transmitting the bulk handle in the * input struct. It was set above. */ - margo_forward(mid, handle, &in); + hret = margo_forward(mid, handle, &in); + assert(hret == HG_SUCCESS); /* decode response */ - ret = HG_Get_output(handle, &out); - assert(ret == 0); + hret = margo_get_output(handle, &out); + assert(hret == HG_SUCCESS); /* clean up resources consumed by this rpc */ - HG_Bulk_free(in.bulk_handle); - HG_Free_output(handle, &out); - HG_Destroy(handle); + margo_free_output(handle, &out); + margo_bulk_free(in.bulk_handle); + margo_destroy(handle); return; } @@ -84,51 +82,48 @@ void data_xfer_read(margo_instance_id mid, hg_addr_t svr_addr, void *buffer, hg_ hg_handle_t handle; data_xfer_read_in_t in; data_xfer_read_out_t out; - int ret; - const struct hg_info *hgi; + hg_return_t hret; hg_addr_t addr_self; char addr_self_string[128]; hg_size_t addr_self_string_sz = 128; /* create handle */ - ret = HG_Create(margo_get_context(mid), svr_addr, data_xfer_read_id, &handle); - assert(ret == 0); + hret = margo_create(mid, svr_addr, data_xfer_read_id, &handle); + assert(hret == HG_SUCCESS); /* register buffer for rdma/bulk access by server */ - hgi = HG_Get_info(handle); - assert(hgi); - ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, &buffer_sz, + hret = margo_bulk_create(mid, 1, &buffer, &buffer_sz, HG_BULK_WRITE_ONLY, &in.bulk_handle); - assert(ret == 0); + assert(hret == HG_SUCCESS); /* figure out local address */ - ret = HG_Addr_self(margo_get_class(mid), &addr_self); - assert(ret == HG_SUCCESS); + hret = margo_addr_self(mid, &addr_self); + assert(hret == HG_SUCCESS); - ret = HG_Addr_to_string(margo_get_class(mid), addr_self_string, &addr_self_string_sz, addr_self); - assert(ret == HG_SUCCESS); + hret = margo_addr_to_string(mid, addr_self_string, &addr_self_string_sz, addr_self); + assert(hret == HG_SUCCESS); in.client_addr = addr_self_string; #if 0 - HG_Set_target_id(handle, mplex_id); + margo_set_target_id(handle, mplex_id); #endif /* Send rpc. Note that we are also transmitting the bulk handle in the * input struct. It was set above. */ - margo_forward(mid, handle, &in); + hret = margo_forward(mid, handle, &in); + assert(hret == HG_SUCCESS); /* decode response */ - ret = HG_Get_output(handle, &out); - assert(ret == 0); + hret = margo_get_output(handle, &out); + assert(hret == HG_SUCCESS); /* clean up resources consumed by this rpc */ - HG_Bulk_free(in.bulk_handle); - HG_Free_output(handle, &out); - HG_Destroy(handle); - HG_Addr_free(margo_get_class(mid), addr_self); + margo_free_output(handle, &out); + margo_bulk_free(in.bulk_handle); + margo_destroy(handle); + margo_addr_free(mid, addr_self); return; } - diff --git a/examples/composition/composed-svc-daemon.c b/examples/composition/composed-svc-daemon.c index bdd44ae7f3c2d27b62c5641f9d11602d5cb759e5..0c7ffd341946ddff74c940416aa937575b470630 100644 --- a/examples/composition/composed-svc-daemon.c +++ b/examples/composition/composed-svc-daemon.c @@ -11,7 +11,6 @@ #include #include -#include #include #include "data-xfer-service.h" @@ -27,19 +26,17 @@ static void my_rpc_shutdown_ult(hg_handle_t handle) { hg_return_t hret; - const struct hg_info *hgi; margo_instance_id mid; //printf("Got RPC request to shutdown\n"); - hgi = HG_Get_info(handle); - assert(hgi); mid = margo_hg_handle_get_instance(handle); + assert(mid != MARGO_INSTANCE_NULL); hret = margo_respond(mid, handle, NULL); assert(hret == HG_SUCCESS); - HG_Destroy(handle); + margo_destroy(handle); /* NOTE: we assume that the server daemon is using * margo_wait_for_finalize() to suspend until this RPC executes, so there @@ -54,10 +51,8 @@ DEFINE_MARGO_RPC_HANDLER(my_rpc_shutdown_ult) int main(int argc, char **argv) { - int ret; + hg_return_t hret; margo_instance_id mid; - hg_context_t *hg_context; - hg_class_t *hg_class; hg_addr_t addr_self; char addr_self_string[128]; hg_size_t addr_self_string_sz = 128; @@ -75,73 +70,44 @@ int main(int argc, char **argv) svc_list = strdup(argv[2]); assert(svc_list); - /* boilerplate HG initialization steps */ + /* actually start margo -- this step encapsulates the Mercury and + * Argobots initialization and must precede their use */ + /* Use the calling xstream to drive progress and execute handlers. */ /***************************************/ - hg_class = HG_Init(argv[1], HG_TRUE); - if(!hg_class) + mid = margo_init(argv[1], MARGO_SERVER_MODE, 0, -1); + if(mid == MARGO_INSTANCE_NULL) { - fprintf(stderr, "Error: HG_Init()\n"); - return(-1); - } - hg_context = HG_Context_create(hg_class); - if(!hg_context) - { - fprintf(stderr, "Error: HG_Context_create()\n"); - HG_Finalize(hg_class); + fprintf(stderr, "Error: margo_init()\n"); return(-1); } /* figure out what address this server is listening on */ - ret = HG_Addr_self(hg_class, &addr_self); - if(ret != HG_SUCCESS) + hret = margo_addr_self(mid, &addr_self); + if(hret != HG_SUCCESS) { - fprintf(stderr, "Error: HG_Addr_self()\n"); - HG_Context_destroy(hg_context); - HG_Finalize(hg_class); + fprintf(stderr, "Error: margo_addr_self()\n"); + margo_finalize(mid); return(-1); } - ret = HG_Addr_to_string(hg_class, addr_self_string, &addr_self_string_sz, addr_self); - if(ret != HG_SUCCESS) + hret = margo_addr_to_string(mid, addr_self_string, &addr_self_string_sz, addr_self); + if(hret != HG_SUCCESS) { - fprintf(stderr, "Error: HG_Addr_self()\n"); - HG_Context_destroy(hg_context); - HG_Finalize(hg_class); - HG_Addr_free(hg_class, addr_self); + fprintf(stderr, "Error: margo_addr_to_string()\n"); + margo_addr_free(mid, addr_self); + margo_finalize(mid); return(-1); } - HG_Addr_free(hg_class, addr_self); + margo_addr_free(mid, addr_self); printf("# accepting RPCs on address \"%s\"\n", addr_self_string); - /* set up argobots */ - /***************************************/ - ret = ABT_init(argc, argv); - if(ret != 0) - { - fprintf(stderr, "Error: ABT_init()\n"); - return(-1); - } - - /* set primary ES to idle without polling */ - ret = ABT_snoozer_xstream_self_set(); - if(ret != 0) - { - fprintf(stderr, "Error: ABT_snoozer_xstream_self_set()\n"); - return(-1); - } - - /* actually start margo */ - /***************************************/ - mid = margo_init(0, 0, hg_context); - assert(mid); - /* register RPCs and services */ /***************************************/ /* register a shutdown RPC as just a generic handler; not part of a * multiplexed service */ - MARGO_REGISTER(mid, "my_shutdown_rpc", void, void, my_rpc_shutdown_ult, MARGO_RPC_ID_IGNORE); + MARGO_REGISTER(mid, "my_shutdown_rpc", void, void, my_rpc_shutdown_ult); handler_pool = margo_get_handler_pool(mid); svc = strtok(svc_list, ","); @@ -178,11 +144,5 @@ int main(int argc, char **argv) svc2_deregister(mid, *handler_pool, 3); #endif - ABT_finalize(); - - HG_Context_destroy(hg_context); - HG_Finalize(hg_class); - return(0); } - diff --git a/examples/composition/data-xfer-service.c b/examples/composition/data-xfer-service.c index 84684702cd41da87ecfecf7f525ebfe40bbdf45b..a6297d1fae58a5eb4ed4465a05a8ec8c4bdfd0a8 100644 --- a/examples/composition/data-xfer-service.c +++ b/examples/composition/data-xfer-service.c @@ -18,7 +18,6 @@ static void data_xfer_read_ult(hg_handle_t handle) hg_return_t hret; data_xfer_read_out_t out; data_xfer_read_in_t in; - int ret; const struct hg_info *hgi; margo_instance_id mid; hg_addr_t client_addr; @@ -28,10 +27,12 @@ static void data_xfer_read_ult(hg_handle_t handle) pthread_t my_tid; #endif - ret = HG_Get_input(handle, &in); - assert(ret == HG_SUCCESS); - hgi = HG_Get_info(handle); + hret = margo_get_input(handle, &in); + assert(hret == HG_SUCCESS); + hgi = margo_get_info(handle); assert(hgi); + mid = margo_hg_info_get_instance(hgi); + assert(mid != MARGO_INSTANCE_NULL); #if 0 ABT_xstream_self(&my_xstream); @@ -43,8 +44,6 @@ static void data_xfer_read_ult(hg_handle_t handle) out.ret = 0; - mid = margo_hg_handle_get_instance(handle); - if(!in.client_addr) client_addr = hgi->addr; else @@ -54,20 +53,20 @@ static void data_xfer_read_ult(hg_handle_t handle) } /* do bulk transfer from client to server */ - ret = margo_bulk_transfer(mid, HG_BULK_PUSH, + hret = margo_bulk_transfer(mid, HG_BULK_PUSH, client_addr, in.bulk_handle, 0, g_buffer_bulk_handle, 0, g_buffer_size); - assert(ret == 0); + assert(hret == HG_SUCCESS); if(in.client_addr) - HG_Addr_free(margo_get_class(mid), client_addr); + margo_addr_free(mid, client_addr); - HG_Free_input(handle, &in); + margo_free_input(handle, &in); - hret = HG_Respond(handle, NULL, NULL, &out); + hret = margo_respond(mid, handle, &out); assert(hret == HG_SUCCESS); - HG_Destroy(handle); + margo_destroy(handle); return; } @@ -82,24 +81,23 @@ int data_xfer_service_register(margo_instance_id mid, ABT_pool pool, uint32_t mp assert(g_buffer); /* register local target buffer for bulk access */ - hret = HG_Bulk_create(margo_get_class(mid), 1, &g_buffer, + hret = margo_bulk_create(mid, 1, &g_buffer, &g_buffer_size, HG_BULK_READ_ONLY, &g_buffer_bulk_handle); assert(hret == HG_SUCCESS); /* register RPC handler */ MARGO_REGISTER_MPLEX(mid, "data_xfer_read", data_xfer_read_in_t, data_xfer_read_out_t, - data_xfer_read_ult, mplex_id, pool, MARGO_RPC_ID_IGNORE); + data_xfer_read_ult, mplex_id, pool); return(0); } void data_xfer_deregister(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id) { - HG_Bulk_free(g_buffer_bulk_handle); + margo_bulk_free(g_buffer_bulk_handle); free(g_buffer); /* TODO: undo what was done in data_xfer_register() */ return; } - diff --git a/examples/composition/delegator-service.c b/examples/composition/delegator-service.c index eb1d49748b5432dd8d7cd80a33f5ca00cee9f7bb..8b5feaf804032e4f8c50ade629b61fd94d481eec 100644 --- a/examples/composition/delegator-service.c +++ b/examples/composition/delegator-service.c @@ -19,7 +19,6 @@ static void delegator_read_ult(hg_handle_t handle) delegator_read_in_t in; data_xfer_read_in_t in_relay; data_xfer_read_out_t out_relay; - int ret; const struct hg_info *hgi; margo_instance_id mid; hg_addr_t data_xfer_svc_addr; @@ -33,10 +32,12 @@ static void delegator_read_ult(hg_handle_t handle) pthread_t my_tid; #endif - ret = HG_Get_input(handle, &in); - assert(ret == HG_SUCCESS); - hgi = HG_Get_info(handle); + hret = margo_get_input(handle, &in); + assert(hret == HG_SUCCESS); + hgi = margo_get_info(handle); assert(hgi); + mid = margo_hg_info_get_instance(hgi); + assert(mid != MARGO_INSTANCE_NULL); #if 0 ABT_xstream_self(&my_xstream); @@ -48,34 +49,33 @@ static void delegator_read_ult(hg_handle_t handle) out.ret = 0; - mid = margo_hg_handle_get_instance(handle); - hret = margo_addr_lookup(mid, in.data_xfer_svc_addr, &data_xfer_svc_addr); assert(hret == HG_SUCCESS); /* relay to microservice */ - hret = HG_Create(margo_get_context(mid), data_xfer_svc_addr, g_data_xfer_read_id, &handle_relay); + hret = margo_create(mid, data_xfer_svc_addr, g_data_xfer_read_id, &handle_relay); assert(hret == HG_SUCCESS); /* pass through bulk handle */ in_relay.bulk_handle = in.bulk_handle; /* get addr of client to relay to data_xfer service */ - hret = HG_Addr_to_string(margo_get_class(mid), client_addr_string, &client_addr_string_sz, hgi->addr); + hret = margo_addr_to_string(mid, client_addr_string, &client_addr_string_sz, hgi->addr); assert(hret == HG_SUCCESS); in_relay.client_addr = client_addr_string; - margo_forward(mid, handle_relay, &in_relay); + hret = margo_forward(mid, handle_relay, &in_relay); + assert(hret == HG_SUCCESS); - hret = HG_Get_output(handle_relay, &out_relay); + hret = margo_get_output(handle_relay, &out_relay); assert(hret == HG_SUCCESS); - HG_Free_input(handle, &in); - HG_Free_output(handle_relay, &out); + margo_free_input(handle, &in); + margo_free_output(handle_relay, &out); - hret = HG_Respond(handle, NULL, NULL, &out); + hret = margo_respond(mid, handle, &out); assert(hret == HG_SUCCESS); - HG_Addr_free(margo_get_class(mid), data_xfer_svc_addr); - HG_Destroy(handle); - HG_Destroy(handle_relay); + margo_addr_free(mid, data_xfer_svc_addr); + margo_destroy(handle); + margo_destroy(handle_relay); return; } @@ -87,13 +87,13 @@ int delegator_service_register(margo_instance_id mid, ABT_pool pool, uint32_t mp /* NOTE: this RPC may already be registered if this process has already registered a * data-xfer service */ - MARGO_REGISTER(mid, "data_xfer_read", - data_xfer_read_in_t, data_xfer_read_out_t, NULL, &g_data_xfer_read_id); + g_data_xfer_read_id = MARGO_REGISTER(mid, "data_xfer_read", + data_xfer_read_in_t, data_xfer_read_out_t, NULL); /* register RPC handler */ MARGO_REGISTER_MPLEX(mid, "delegator_read", delegator_read_in_t, delegator_read_out_t, - delegator_read_ult, mplex_id, pool, MARGO_RPC_ID_IGNORE); + delegator_read_ult, mplex_id, pool); return(0); } @@ -103,4 +103,3 @@ void delegator_deregister(margo_instance_id mid, ABT_pool pool, uint32_t mplex_i /* TODO: undo what was done in delegator_register() */ return; } - diff --git a/examples/margo-example-client.c b/examples/margo-example-client.c index d064ce738be2149aaefaf9ad6ff0c21610284fca..a2e48f1117b9be8c2269134d43143e1bf0070845 100644 --- a/examples/margo-example-client.c +++ b/examples/margo-example-client.c @@ -7,8 +7,8 @@ #include #include #include +#include #include -#include #include #include "my-rpc.h" @@ -24,8 +24,6 @@ struct run_my_rpc_args { int val; margo_instance_id mid; - hg_context_t *hg_context; - hg_class_t *hg_class; hg_addr_t svr_addr; }; @@ -40,11 +38,10 @@ int main(int argc, char **argv) ABT_thread threads[4]; int i; int ret; + hg_return_t hret; ABT_xstream xstream; ABT_pool pool; margo_instance_id mid; - hg_context_t *hg_context; - hg_class_t *hg_class; hg_addr_t svr_addr = HG_ADDR_NULL; hg_handle_t handle; char proto[12] = {0}; @@ -55,44 +52,27 @@ int main(int argc, char **argv) return(-1); } - /* boilerplate HG initialization steps */ - /***************************************/ - /* initialize Mercury using the transport portion of the destination * address (i.e., the part before the first : character if present) */ for(i=0; i<11 && argv[1][i] != '\0' && argv[1][i] != ':'; i++) proto[i] = argv[1][i]; - hg_class = HG_Init(proto, HG_FALSE); - if(!hg_class) - { - fprintf(stderr, "Error: HG_Init()\n"); - return(-1); - } - hg_context = HG_Context_create(hg_class); - if(!hg_context) - { - fprintf(stderr, "Error: HG_Context_create()\n"); - HG_Finalize(hg_class); - return(-1); - } - /* set up argobots */ + /* actually start margo -- margo_init() encapsulates the Mercury & + * Argobots initialization, so this step must precede their use. */ + /* Use main process to drive progress (it will relinquish control to + * Mercury during blocking communication calls). No RPC threads are + * used because this is a pure client that will not be servicing + * rpc requests. + */ /***************************************/ - ret = ABT_init(argc, argv); - if(ret != 0) + mid = margo_init(proto, MARGO_CLIENT_MODE, 0, 0); + if(mid == MARGO_INSTANCE_NULL) { - fprintf(stderr, "Error: ABT_init()\n"); - return(-1); - } - - /* set primary ES to idle without polling */ - ret = ABT_snoozer_xstream_self_set(); - if(ret != 0) - { - fprintf(stderr, "Error: ABT_snoozer_xstream_self_set()\n"); + fprintf(stderr, "Error: margo_init()\n"); return(-1); } + margo_diag_start(mid); /* retrieve current pool to use for ULT creation */ ret = ABT_xstream_self(&xstream); @@ -108,26 +88,18 @@ int main(int argc, char **argv) return(-1); } - /* actually start margo */ - /***************************************/ - mid = margo_init(0, 0, hg_context); - assert(mid); - margo_diag_start(mid); - /* register RPC */ - MARGO_REGISTER(mid, "my_rpc", my_rpc_in_t, my_rpc_out_t, NULL, &my_rpc_id); - MARGO_REGISTER(mid, "my_shutdown_rpc", void, void, NULL, &my_rpc_shutdown_id); + my_rpc_id = MARGO_REGISTER(mid, "my_rpc", my_rpc_in_t, my_rpc_out_t, NULL); + my_rpc_shutdown_id = MARGO_REGISTER(mid, "my_shutdown_rpc", void, void, NULL); /* find addr for server */ - ret = margo_addr_lookup(mid, argv[1], &svr_addr); - assert(ret == 0); + hret = margo_addr_lookup(mid, argv[1], &svr_addr); + assert(hret == HG_SUCCESS); for(i=0; i<4; i++) { args[i].val = i; args[i].mid = mid; - args[i].hg_class = hg_class; - args[i].hg_context = hg_context; args[i].svr_addr = svr_addr; /* Each ult gets a pointer to an element of the array to use @@ -165,22 +137,18 @@ int main(int argc, char **argv) /* send one rpc to server to shut it down */ /* create handle */ - ret = HG_Create(hg_context, svr_addr, my_rpc_shutdown_id, &handle); - assert(ret == 0); + hret = margo_create(mid, svr_addr, my_rpc_shutdown_id, &handle); + assert(hret == HG_SUCCESS); - margo_forward(mid, handle, NULL); + hret = margo_forward(mid, handle, NULL); + assert(hret == HG_SUCCESS); - HG_Destroy(handle); - HG_Addr_free(hg_class, svr_addr); + margo_destroy(handle); + margo_addr_free(mid, svr_addr); /* shut down everything */ margo_diag_dump(mid, "-", 0); margo_finalize(mid); - - ABT_finalize(); - - HG_Context_destroy(hg_context); - HG_Finalize(hg_class); return(0); } @@ -191,10 +159,9 @@ static void run_my_rpc(void *_arg) hg_handle_t handle; my_rpc_in_t in; my_rpc_out_t out; - int ret; + hg_return_t hret; hg_size_t size; void* buffer; - const struct hg_info *hgi; printf("ULT [%d] running.\n", arg->val); @@ -205,32 +172,31 @@ static void run_my_rpc(void *_arg) sprintf((char*)buffer, "Hello world!\n"); /* create handle */ - ret = HG_Create(arg->hg_context, arg->svr_addr, my_rpc_id, &handle); - assert(ret == 0); + hret = margo_create(arg->mid, arg->svr_addr, my_rpc_id, &handle); + assert(hret == HG_SUCCESS); /* register buffer for rdma/bulk access by server */ - hgi = HG_Get_info(handle); - assert(hgi); - ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, &size, + hret = margo_bulk_create(arg->mid, 1, &buffer, &size, HG_BULK_READ_ONLY, &in.bulk_handle); - assert(ret == 0); + assert(hret == HG_SUCCESS); /* Send rpc. Note that we are also transmitting the bulk handle in the * input struct. It was set above. */ in.input_val = arg->val; - margo_forward(arg->mid, handle, &in); + hret = margo_forward(arg->mid, handle, &in); + assert(hret == HG_SUCCESS); /* decode response */ - ret = HG_Get_output(handle, &out); - assert(ret == 0); + hret = margo_get_output(handle, &out); + assert(hret == HG_SUCCESS); printf("Got response ret: %d\n", out.ret); /* clean up resources consumed by this rpc */ - HG_Bulk_free(in.bulk_handle); - HG_Free_output(handle, &out); - HG_Destroy(handle); + margo_bulk_free(in.bulk_handle); + margo_free_output(handle, &out); + margo_destroy(handle); free(buffer); printf("ULT [%d] done.\n", arg->val); diff --git a/examples/margo-example-server.c b/examples/margo-example-server.c index 64fcdd5cd938eb30cf2dbad63b2662c6d1c799b6..b605ef2cf79997bf796ae9912c971b8fcc95479b 100644 --- a/examples/margo-example-server.c +++ b/examples/margo-example-server.c @@ -7,8 +7,8 @@ #include #include #include +#include #include -#include #include #include "my-rpc.h" @@ -19,10 +19,8 @@ int main(int argc, char **argv) { - int ret; + hg_return_t hret; margo_instance_id mid; - hg_context_t *hg_context; - hg_class_t *hg_class; hg_addr_t addr_self; char addr_self_string[128]; hg_size_t addr_self_string_sz = 128; @@ -34,70 +32,41 @@ int main(int argc, char **argv) return(-1); } - /* boilerplate HG initialization steps */ + /* actually start margo -- this step encapsulates the Mercury and + * Argobots initialization and must precede their use */ + /* Use the calling xstream to drive progress and execute handlers. */ /***************************************/ - hg_class = HG_Init(argv[1], HG_TRUE); - if(!hg_class) + mid = margo_init(argv[1], MARGO_SERVER_MODE, 0, -1); + if(mid == MARGO_INSTANCE_NULL) { - fprintf(stderr, "Error: HG_Init()\n"); + fprintf(stderr, "Error: margo_init()\n"); return(-1); } - hg_context = HG_Context_create(hg_class); - if(!hg_context) - { - fprintf(stderr, "Error: HG_Context_create()\n"); - HG_Finalize(hg_class); - return(-1); - } - + margo_diag_start(mid); /* figure out what address this server is listening on */ - ret = HG_Addr_self(hg_class, &addr_self); - if(ret != HG_SUCCESS) + hret = margo_addr_self(mid, &addr_self); + if(hret != HG_SUCCESS) { - fprintf(stderr, "Error: HG_Addr_self()\n"); - HG_Context_destroy(hg_context); - HG_Finalize(hg_class); + fprintf(stderr, "Error: margo_addr_self()\n"); + margo_finalize(mid); return(-1); } - ret = HG_Addr_to_string(hg_class, addr_self_string, &addr_self_string_sz, addr_self); - if(ret != HG_SUCCESS) + hret = margo_addr_to_string(mid, addr_self_string, &addr_self_string_sz, addr_self); + if(hret != HG_SUCCESS) { - fprintf(stderr, "Error: HG_Addr_self()\n"); - HG_Context_destroy(hg_context); - HG_Finalize(hg_class); - HG_Addr_free(hg_class, addr_self); + fprintf(stderr, "Error: margo_addr_to_string()\n"); + margo_addr_free(mid, addr_self); + margo_finalize(mid); return(-1); } - HG_Addr_free(hg_class, addr_self); + margo_addr_free(mid, addr_self); printf("# accepting RPCs on address \"%s\"\n", addr_self_string); - /* set up argobots */ - /***************************************/ - ret = ABT_init(argc, argv); - if(ret != 0) - { - fprintf(stderr, "Error: ABT_init()\n"); - return(-1); - } - - /* set primary ES to idle without polling */ - ret = ABT_snoozer_xstream_self_set(); - if(ret != 0) - { - fprintf(stderr, "Error: ABT_snoozer_xstream_self_set()\n"); - return(-1); - } - - /* actually start margo */ - mid = margo_init(0, 0, hg_context); - assert(mid); - margo_diag_start(mid); - /* register RPC */ - MARGO_REGISTER(mid, "my_rpc", my_rpc_in_t, my_rpc_out_t, my_rpc_ult, MARGO_RPC_ID_IGNORE); - MARGO_REGISTER(mid, "my_shutdown_rpc", void, void, my_rpc_shutdown_ult, MARGO_RPC_ID_IGNORE); + MARGO_REGISTER(mid, "my_rpc", my_rpc_in_t, my_rpc_out_t, my_rpc_ult); + MARGO_REGISTER(mid, "my_shutdown_rpc", void, void, my_rpc_shutdown_ult); /* NOTE: there isn't anything else for the server to do at this point * except wait for itself to be shut down. The @@ -106,11 +75,5 @@ int main(int argc, char **argv) */ margo_wait_for_finalize(mid); - ABT_finalize(); - - HG_Context_destroy(hg_context); - HG_Finalize(hg_class); - return(0); } - diff --git a/examples/multiplex/margo-example-mp-client.c b/examples/multiplex/margo-example-mp-client.c index 0a5180f3718ab727e29ea176db9223a3f9504d4b..bf8e2f2270a966ef757daab9b8dd32d435c9eb51 100644 --- a/examples/multiplex/margo-example-mp-client.c +++ b/examples/multiplex/margo-example-mp-client.c @@ -8,7 +8,6 @@ #include #include #include -#include #include #include "svc1-client.h" @@ -19,10 +18,8 @@ static hg_id_t my_rpc_shutdown_id; int main(int argc, char **argv) { int i; - int ret; margo_instance_id mid; - hg_context_t *hg_context; - hg_class_t *hg_class; + hg_return_t hret; hg_addr_t svr_addr = HG_ADDR_NULL; hg_handle_t handle; char proto[12] = {0}; @@ -33,58 +30,36 @@ int main(int argc, char **argv) return(-1); } - /* boilerplate HG initialization steps */ - /***************************************/ - /* initialize Mercury using the transport portion of the destination * address (i.e., the part before the first : character if present) */ for(i=0; i<11 && argv[1][i] != '\0' && argv[1][i] != ':'; i++) proto[i] = argv[1][i]; - hg_class = HG_Init(proto, HG_FALSE); - if(!hg_class) - { - fprintf(stderr, "Error: HG_Init()\n"); - return(-1); - } - hg_context = HG_Context_create(hg_class); - if(!hg_context) - { - fprintf(stderr, "Error: HG_Context_create()\n"); - HG_Finalize(hg_class); - return(-1); - } - /* set up argobots */ + /* actually start margo -- margo_init() encapsulates the Mercury & + * Argobots initialization, so this step must precede their use. */ + /* Use main process to drive progress (it will relinquish control to + * Mercury during blocking communication calls). No RPC threads are + * used because this is a pure client that will not be servicing + * rpc requests. + */ /***************************************/ - ret = ABT_init(argc, argv); - if(ret != 0) + mid = margo_init(proto, MARGO_CLIENT_MODE, 0, 0); + if(mid == MARGO_INSTANCE_NULL) { - fprintf(stderr, "Error: ABT_init()\n"); + fprintf(stderr, "Error: margo_init()\n"); return(-1); } - /* set primary ES to idle without polling */ - ret = ABT_snoozer_xstream_self_set(); - if(ret != 0) - { - fprintf(stderr, "Error: ABT_snoozer_xstream_self_set()\n"); - return(-1); - } - - /* actually start margo */ - /***************************************/ - mid = margo_init(0, 0, hg_context); - /* register core RPC */ - MARGO_REGISTER(mid, "my_shutdown_rpc", void, void, NULL, &my_rpc_shutdown_id); + my_rpc_shutdown_id = MARGO_REGISTER(mid, "my_shutdown_rpc", void, void, NULL); /* register service APIs */ svc1_register_client(mid); svc2_register_client(mid); /* find addr for server */ - ret = margo_addr_lookup(mid, argv[1], &svr_addr); - assert(ret == 0); + hret = margo_addr_lookup(mid, argv[1], &svr_addr); + assert(hret == HG_SUCCESS); svc1_do_thing(mid, svr_addr, 1); svc1_do_other_thing(mid, svr_addr, 1); @@ -95,22 +70,17 @@ int main(int argc, char **argv) /* send one rpc to server to shut it down */ /* create handle */ - ret = HG_Create(hg_context, svr_addr, my_rpc_shutdown_id, &handle); - assert(ret == 0); + hret = margo_create(mid, svr_addr, my_rpc_shutdown_id, &handle); + assert(hret == HG_SUCCESS); - margo_forward(mid, handle, NULL); + hret = margo_forward(mid, handle, NULL); + assert(hret == HG_SUCCESS); - HG_Addr_free(hg_class, svr_addr); + margo_destroy(handle); + margo_addr_free(mid, svr_addr); /* shut down everything */ margo_finalize(mid); - - ABT_finalize(); - - HG_Context_destroy(hg_context); - HG_Finalize(hg_class); return(0); } - - diff --git a/examples/multiplex/margo-example-mp-server.c b/examples/multiplex/margo-example-mp-server.c index cad67e9e9c96f217eeead44db9079be2264b5918..c24107769403d6558e44ead47f17d2331394890a 100644 --- a/examples/multiplex/margo-example-mp-server.c +++ b/examples/multiplex/margo-example-mp-server.c @@ -24,19 +24,17 @@ static void my_rpc_shutdown_ult(hg_handle_t handle) { hg_return_t hret; - const struct hg_info *hgi; margo_instance_id mid; //printf("Got RPC request to shutdown\n"); - hgi = HG_Get_info(handle); - assert(hgi); mid = margo_hg_handle_get_instance(handle); + assert(mid != MARGO_INSTANCE_NULL); hret = margo_respond(mid, handle, NULL); assert(hret == HG_SUCCESS); - HG_Destroy(handle); + margo_destroy(handle); /* NOTE: we assume that the server daemon is using * margo_wait_for_finalize() to suspend until this RPC executes, so there @@ -53,8 +51,7 @@ int main(int argc, char **argv) { int ret; margo_instance_id mid; - hg_context_t *hg_context; - hg_class_t *hg_class; + hg_return_t hret; hg_addr_t addr_self; char addr_self_string[128]; hg_size_t addr_self_string_sz = 128; @@ -69,73 +66,44 @@ int main(int argc, char **argv) return(-1); } - /* boilerplate HG initialization steps */ + /* actually start margo -- this step encapsulates the Mercury and + * Argobots initialization and must precede their use */ + /* Use the calling xstream to drive progress and execute handlers. */ /***************************************/ - hg_class = HG_Init(argv[1], HG_TRUE); - if(!hg_class) + mid = margo_init(argv[1], MARGO_SERVER_MODE, 0, -1); + if(mid == MARGO_INSTANCE_NULL) { - fprintf(stderr, "Error: HG_Init()\n"); - return(-1); - } - hg_context = HG_Context_create(hg_class); - if(!hg_context) - { - fprintf(stderr, "Error: HG_Context_create()\n"); - HG_Finalize(hg_class); + fprintf(stderr, "Error: margo_init()\n"); return(-1); } /* figure out what address this server is listening on */ - ret = HG_Addr_self(hg_class, &addr_self); - if(ret != HG_SUCCESS) + hret = margo_addr_self(mid, &addr_self); + if(hret != HG_SUCCESS) { - fprintf(stderr, "Error: HG_Addr_self()\n"); - HG_Context_destroy(hg_context); - HG_Finalize(hg_class); + fprintf(stderr, "Error: margo_addr_self()\n"); + margo_finalize(mid); return(-1); } - ret = HG_Addr_to_string(hg_class, addr_self_string, &addr_self_string_sz, addr_self); - if(ret != HG_SUCCESS) + hret = margo_addr_to_string(mid, addr_self_string, &addr_self_string_sz, addr_self); + if(hret != HG_SUCCESS) { - fprintf(stderr, "Error: HG_Addr_self()\n"); - HG_Context_destroy(hg_context); - HG_Finalize(hg_class); - HG_Addr_free(hg_class, addr_self); + fprintf(stderr, "Error: margo_addr_to_string()\n"); + margo_addr_free(mid, addr_self); + margo_finalize(mid); return(-1); } - HG_Addr_free(hg_class, addr_self); + margo_addr_free(mid, addr_self); printf("# accepting RPCs on address \"%s\"\n", addr_self_string); - /* set up argobots */ - /***************************************/ - ret = ABT_init(argc, argv); - if(ret != 0) - { - fprintf(stderr, "Error: ABT_init()\n"); - return(-1); - } - - /* set primary ES to idle without polling */ - ret = ABT_snoozer_xstream_self_set(); - if(ret != 0) - { - fprintf(stderr, "Error: ABT_snoozer_xstream_self_set()\n"); - return(-1); - } - - /* actually start margo */ - /***************************************/ - mid = margo_init(0, 0, hg_context); - assert(mid); - /* register RPCs and services */ /***************************************/ /* register a shutdown RPC as just a generic handler; not part of a * multiplexed service */ - MARGO_REGISTER(hg_class, "my_shutdown_rpc", void, void, my_rpc_shutdown_ult, MARGO_RPC_ID_IGNORE); + MARGO_REGISTER(mid, "my_shutdown_rpc", void, void, my_rpc_shutdown_ult); /* register svc1, with mplex_id 1, to execute on the default handler pool * used by Margo @@ -144,7 +112,7 @@ int main(int argc, char **argv) ret = svc1_register(mid, *handler_pool, 1); assert(ret == 0); - /* create a dedicated and pool for another instance of svc1 */ + /* create a dedicated xstream and pool for another instance of svc1 */ ret = ABT_snoozer_xstream_create(1, &svc1_pool2, &svc1_xstream2); assert(ret == 0); /* register svc1, with mplex_id 2, to execute on a separate pool. This @@ -161,7 +129,6 @@ int main(int argc, char **argv) ret = svc2_register(mid, *handler_pool, 3); assert(ret == 0); - /* shut things down */ /****************************************/ @@ -172,20 +139,15 @@ int main(int argc, char **argv) */ margo_wait_for_finalize(mid); - /* TODO: rethink this; can't touch mid after wait for finalize */ + /* TODO: rethink this; can't touch mid or use ABT after wait for finalize */ #if 0 svc1_deregister(mid, *handler_pool, 1); svc1_deregister(mid, svc1_pool2, 2); svc2_deregister(mid, *handler_pool, 3); -#endif ABT_xstream_join(svc1_xstream2); ABT_xstream_free(&svc1_xstream2); - - ABT_finalize(); - - HG_Context_destroy(hg_context); - HG_Finalize(hg_class); +#endif return(0); } diff --git a/examples/multiplex/svc1-client.c b/examples/multiplex/svc1-client.c index ecc4529445a3ac9e6fe798f75ffa8791fbe1c078..b1405a177d10b9c99f7b67fa9d6737c5980c8978 100644 --- a/examples/multiplex/svc1-client.c +++ b/examples/multiplex/svc1-client.c @@ -21,11 +21,11 @@ static hg_id_t svc1_do_other_thing_id = -1; int svc1_register_client(margo_instance_id mid) { - MARGO_REGISTER(mid, "svc1_do_thing", - svc1_do_thing_in_t, svc1_do_thing_out_t, NULL, &svc1_do_thing_id); + svc1_do_thing_id = MARGO_REGISTER(mid, "svc1_do_thing", + svc1_do_thing_in_t, svc1_do_thing_out_t, NULL); - MARGO_REGISTER(mid, "svc1_do_other_thing", - svc1_do_other_thing_in_t, svc1_do_other_thing_out_t, NULL, &svc1_do_other_thing_id); + svc1_do_other_thing_id = MARGO_REGISTER(mid, "svc1_do_other_thing", + svc1_do_other_thing_in_t, svc1_do_other_thing_out_t, NULL); return(0); } @@ -35,10 +35,9 @@ void svc1_do_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mplex_id) hg_handle_t handle; svc1_do_thing_in_t in; svc1_do_thing_out_t out; - int ret; + hg_return_t hret; hg_size_t size; void* buffer; - const struct hg_info *hgi; /* allocate buffer for bulk transfer */ size = 512; @@ -47,32 +46,31 @@ void svc1_do_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mplex_id) sprintf((char*)buffer, "Hello world!\n"); /* create handle */ - ret = HG_Create(margo_get_context(mid), svr_addr, svc1_do_thing_id, &handle); - assert(ret == 0); + hret = margo_create(mid, svr_addr, svc1_do_thing_id, &handle); + assert(hret == HG_SUCCESS); /* register buffer for rdma/bulk access by server */ - hgi = HG_Get_info(handle); - assert(hgi); - ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, &size, + hret = margo_bulk_create(mid, 1, &buffer, &size, HG_BULK_READ_ONLY, &in.bulk_handle); - assert(ret == 0); + assert(hret == HG_SUCCESS); - HG_Set_target_id(handle, mplex_id); + margo_set_target_id(handle, mplex_id); /* Send rpc. Note that we are also transmitting the bulk handle in the * input struct. It was set above. */ in.input_val = 0; - margo_forward(mid, handle, &in); + hret = margo_forward(mid, handle, &in); + assert(hret == HG_SUCCESS); /* decode response */ - ret = HG_Get_output(handle, &out); - assert(ret == 0); + hret = margo_get_output(handle, &out); + assert(hret == HG_SUCCESS); /* clean up resources consumed by this rpc */ - HG_Bulk_free(in.bulk_handle); - HG_Free_output(handle, &out); - HG_Destroy(handle); + margo_free_output(handle, &out); + margo_bulk_free(in.bulk_handle); + margo_destroy(handle); free(buffer); return; @@ -83,10 +81,9 @@ void svc1_do_other_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mpl hg_handle_t handle; svc1_do_other_thing_in_t in; svc1_do_other_thing_out_t out; - int ret; + hg_return_t hret; hg_size_t size; void* buffer; - const struct hg_info *hgi; /* allocate buffer for bulk transfer */ size = 512; @@ -95,34 +92,32 @@ void svc1_do_other_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mpl sprintf((char*)buffer, "Hello world!\n"); /* create handle */ - ret = HG_Create(margo_get_context(mid), svr_addr, svc1_do_other_thing_id, &handle); - assert(ret == 0); + hret = margo_create(mid, svr_addr, svc1_do_other_thing_id, &handle); + assert(hret == HG_SUCCESS); /* register buffer for rdma/bulk access by server */ - hgi = HG_Get_info(handle); - assert(hgi); - ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, &size, + hret = margo_bulk_create(mid, 1, &buffer, &size, HG_BULK_READ_ONLY, &in.bulk_handle); - assert(ret == 0); + assert(hret == HG_SUCCESS); - HG_Set_target_id(handle, mplex_id); + margo_set_target_id(handle, mplex_id); /* Send rpc. Note that we are also transmitting the bulk handle in the * input struct. It was set above. */ in.input_val = 0; - margo_forward(mid, handle, &in); + hret = margo_forward(mid, handle, &in); + assert(hret == HG_SUCCESS); /* decode response */ - ret = HG_Get_output(handle, &out); - assert(ret == 0); + hret = margo_get_output(handle, &out); + assert(hret == HG_SUCCESS); /* clean up resources consumed by this rpc */ - HG_Bulk_free(in.bulk_handle); - HG_Free_output(handle, &out); - HG_Destroy(handle); + margo_free_output(handle, &out); + margo_bulk_free(in.bulk_handle); + margo_destroy(handle); free(buffer); return; } - diff --git a/examples/multiplex/svc1-server.c b/examples/multiplex/svc1-server.c index aeccf2c4c5771d3fd54011140249f988b04d9b10..0cadc810c9e1b628cd39c0b73de2cf5760e0e74e 100644 --- a/examples/multiplex/svc1-server.c +++ b/examples/multiplex/svc1-server.c @@ -14,7 +14,6 @@ static void svc1_do_thing_ult(hg_handle_t handle) hg_return_t hret; svc1_do_thing_out_t out; svc1_do_thing_in_t in; - int ret; hg_size_t size; void *buffer; hg_bulk_t bulk_handle; @@ -24,10 +23,12 @@ static void svc1_do_thing_ult(hg_handle_t handle) ABT_xstream my_xstream; pthread_t my_tid; - ret = HG_Get_input(handle, &in); - assert(ret == HG_SUCCESS); - hgi = HG_Get_info(handle); + hret = margo_get_input(handle, &in); + assert(hret == HG_SUCCESS); + hgi = margo_get_info(handle); assert(hgi); + mid = margo_hg_info_get_instance(hgi); + assert(mid != MARGO_INSTANCE_NULL); ABT_xstream_self(&my_xstream); ABT_thread_self(&my_ult); @@ -43,25 +44,23 @@ static void svc1_do_thing_ult(hg_handle_t handle) assert(buffer); /* register local target buffer for bulk access */ - ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, + hret = margo_bulk_create(mid, 1, &buffer, &size, HG_BULK_WRITE_ONLY, &bulk_handle); - assert(ret == 0); - - mid = margo_hg_handle_get_instance(handle); + assert(hret == HG_SUCCESS); /* do bulk transfer from client to server */ - ret = margo_bulk_transfer(mid, HG_BULK_PULL, + hret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, in.bulk_handle, 0, bulk_handle, 0, size); - assert(ret == 0); + assert(hret == HG_SUCCESS); - HG_Free_input(handle, &in); + margo_free_input(handle, &in); - hret = HG_Respond(handle, NULL, NULL, &out); + hret = margo_respond(mid, handle, &out); assert(hret == HG_SUCCESS); - HG_Bulk_free(bulk_handle); - HG_Destroy(handle); + margo_bulk_free(bulk_handle); + margo_destroy(handle); free(buffer); return; @@ -73,7 +72,6 @@ static void svc1_do_other_thing_ult(hg_handle_t handle) hg_return_t hret; svc1_do_other_thing_out_t out; svc1_do_other_thing_in_t in; - int ret; hg_size_t size; void *buffer; hg_bulk_t bulk_handle; @@ -83,10 +81,12 @@ static void svc1_do_other_thing_ult(hg_handle_t handle) ABT_xstream my_xstream; pthread_t my_tid; - ret = HG_Get_input(handle, &in); - assert(ret == HG_SUCCESS); - hgi = HG_Get_info(handle); + hret = margo_get_input(handle, &in); + assert(hret == HG_SUCCESS); + hgi = margo_get_info(handle); assert(hgi); + mid = margo_hg_info_get_instance(hgi); + assert(mid != MARGO_INSTANCE_NULL); ABT_xstream_self(&my_xstream); ABT_thread_self(&my_ult); @@ -102,25 +102,23 @@ static void svc1_do_other_thing_ult(hg_handle_t handle) assert(buffer); /* register local target buffer for bulk access */ - ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, + hret = margo_bulk_create(mid, 1, &buffer, &size, HG_BULK_WRITE_ONLY, &bulk_handle); - assert(ret == 0); - - mid = margo_hg_handle_get_instance(handle); + assert(hret == HG_SUCCESS); /* do bulk transfer from client to server */ - ret = margo_bulk_transfer(mid, HG_BULK_PULL, + hret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, in.bulk_handle, 0, bulk_handle, 0, size); - assert(ret == 0); + assert(hret == HG_SUCCESS); - HG_Free_input(handle, &in); + margo_free_input(handle, &in); - hret = HG_Respond(handle, NULL, NULL, &out); + hret = margo_respond(mid, handle, &out); assert(hret == HG_SUCCESS); - HG_Bulk_free(bulk_handle); - HG_Destroy(handle); + margo_bulk_free(bulk_handle); + margo_destroy(handle); free(buffer); return; @@ -131,10 +129,10 @@ int svc1_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id) { MARGO_REGISTER_MPLEX(mid, "svc1_do_thing", svc1_do_thing_in_t, svc1_do_thing_out_t, - svc1_do_thing_ult, mplex_id, pool, MARGO_RPC_ID_IGNORE); + svc1_do_thing_ult, mplex_id, pool); MARGO_REGISTER_MPLEX(mid, "svc1_do_other_thing", svc1_do_other_thing_in_t, svc1_do_other_thing_out_t, - svc1_do_other_thing_ult, mplex_id, pool, MARGO_RPC_ID_IGNORE); + svc1_do_other_thing_ult, mplex_id, pool); return(0); } @@ -144,4 +142,3 @@ void svc1_deregister(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id) /* TODO: undo what was done in svc1_register() */ return; } - diff --git a/examples/multiplex/svc2-client.c b/examples/multiplex/svc2-client.c index 7f00ac6aa943548ba29ea4a1f947d950601b1d33..9b234e81e78c4b4acd1980d4cf8a7acf56788fb6 100644 --- a/examples/multiplex/svc2-client.c +++ b/examples/multiplex/svc2-client.c @@ -21,10 +21,10 @@ static hg_id_t svc2_do_other_thing_id = -1; int svc2_register_client(margo_instance_id mid) { - MARGO_REGISTER(mid, "svc2_do_thing", - svc2_do_thing_in_t, svc2_do_thing_out_t, NULL, &svc2_do_thing_id); - MARGO_REGISTER(mid, "svc2_do_other_thing", - svc2_do_other_thing_in_t, svc2_do_other_thing_out_t, NULL, &svc2_do_other_thing_id); + svc2_do_thing_id = MARGO_REGISTER(mid, "svc2_do_thing", + svc2_do_thing_in_t, svc2_do_thing_out_t, NULL); + svc2_do_other_thing_id = MARGO_REGISTER(mid, "svc2_do_other_thing", + svc2_do_other_thing_in_t, svc2_do_other_thing_out_t, NULL); return(0); } @@ -34,10 +34,9 @@ void svc2_do_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mplex_id) hg_handle_t handle; svc2_do_thing_in_t in; svc2_do_thing_out_t out; - int ret; + hg_return_t hret; hg_size_t size; void* buffer; - const struct hg_info *hgi; /* allocate buffer for bulk transfer */ size = 512; @@ -46,32 +45,31 @@ void svc2_do_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mplex_id) sprintf((char*)buffer, "Hello world!\n"); /* create handle */ - ret = HG_Create(margo_get_context(mid), svr_addr, svc2_do_thing_id, &handle); - assert(ret == 0); + hret = margo_create(mid, svr_addr, svc2_do_thing_id, &handle); + assert(hret == HG_SUCCESS); /* register buffer for rdma/bulk access by server */ - hgi = HG_Get_info(handle); - assert(hgi); - ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, &size, + hret = margo_bulk_create(mid, 1, &buffer, &size, HG_BULK_READ_ONLY, &in.bulk_handle); - assert(ret == 0); + assert(hret == HG_SUCCESS); - HG_Set_target_id(handle, mplex_id); + margo_set_target_id(handle, mplex_id); /* Send rpc. Note that we are also transmitting the bulk handle in the * input struct. It was set above. */ in.input_val = 0; - margo_forward(mid, handle, &in); + hret = margo_forward(mid, handle, &in); + assert(hret == HG_SUCCESS); /* decode response */ - ret = HG_Get_output(handle, &out); - assert(ret == 0); + hret = margo_get_output(handle, &out); + assert(hret == HG_SUCCESS); /* clean up resources consumed by this rpc */ - HG_Bulk_free(in.bulk_handle); - HG_Free_output(handle, &out); - HG_Destroy(handle); + margo_free_output(handle, &out); + margo_bulk_free(in.bulk_handle); + margo_destroy(handle); free(buffer); return; @@ -82,10 +80,9 @@ void svc2_do_other_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mpl hg_handle_t handle; svc2_do_other_thing_in_t in; svc2_do_other_thing_out_t out; - int ret; + hg_return_t hret; hg_size_t size; void* buffer; - const struct hg_info *hgi; /* allocate buffer for bulk transfer */ size = 512; @@ -94,34 +91,32 @@ void svc2_do_other_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mpl sprintf((char*)buffer, "Hello world!\n"); /* create handle */ - ret = HG_Create(margo_get_context(mid), svr_addr, svc2_do_other_thing_id, &handle); - assert(ret == 0); + hret = margo_create(mid, svr_addr, svc2_do_other_thing_id, &handle); + assert(hret == HG_SUCCESS); /* register buffer for rdma/bulk access by server */ - hgi = HG_Get_info(handle); - assert(hgi); - ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, &size, + hret = margo_bulk_create(mid, 1, &buffer, &size, HG_BULK_READ_ONLY, &in.bulk_handle); - assert(ret == 0); + assert(hret == HG_SUCCESS); - HG_Set_target_id(handle, mplex_id); + margo_set_target_id(handle, mplex_id); /* Send rpc. Note that we are also transmitting the bulk handle in the * input struct. It was set above. */ in.input_val = 0; - margo_forward(mid, handle, &in); + hret = margo_forward(mid, handle, &in); + assert(hret == HG_SUCCESS); /* decode response */ - ret = HG_Get_output(handle, &out); - assert(ret == 0); + hret = margo_get_output(handle, &out); + assert(hret == HG_SUCCESS); /* clean up resources consumed by this rpc */ - HG_Bulk_free(in.bulk_handle); - HG_Free_output(handle, &out); - HG_Destroy(handle); + margo_free_output(handle, &out); + margo_bulk_free(in.bulk_handle); + margo_destroy(handle); free(buffer); return; } - diff --git a/examples/multiplex/svc2-server.c b/examples/multiplex/svc2-server.c index 8a558a3a07fea55b01a375ec6938bcd35112ba04..26a3aac03aef00758560b472fa884d8dafa30ee7 100644 --- a/examples/multiplex/svc2-server.c +++ b/examples/multiplex/svc2-server.c @@ -14,7 +14,6 @@ static void svc2_do_thing_ult(hg_handle_t handle) hg_return_t hret; svc2_do_thing_out_t out; svc2_do_thing_in_t in; - int ret; hg_size_t size; void *buffer; hg_bulk_t bulk_handle; @@ -24,10 +23,12 @@ static void svc2_do_thing_ult(hg_handle_t handle) ABT_xstream my_xstream; pthread_t my_tid; - ret = HG_Get_input(handle, &in); - assert(ret == HG_SUCCESS); - hgi = HG_Get_info(handle); + hret = margo_get_input(handle, &in); + assert(hret == HG_SUCCESS); + hgi = margo_get_info(handle); assert(hgi); + mid = margo_hg_info_get_instance(hgi); + assert(mid != MARGO_INSTANCE_NULL); ABT_xstream_self(&my_xstream); ABT_thread_self(&my_ult); @@ -43,25 +44,23 @@ static void svc2_do_thing_ult(hg_handle_t handle) assert(buffer); /* register local target buffer for bulk access */ - ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, + hret = margo_bulk_create(mid, 1, &buffer, &size, HG_BULK_WRITE_ONLY, &bulk_handle); - assert(ret == 0); - - mid = margo_hg_handle_get_instance(handle); + assert(hret == HG_SUCCESS); /* do bulk transfer from client to server */ - ret = margo_bulk_transfer(mid, HG_BULK_PULL, + hret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, in.bulk_handle, 0, bulk_handle, 0, size); - assert(ret == 0); + assert(hret == HG_SUCCESS); - HG_Free_input(handle, &in); + margo_free_input(handle, &in); - hret = HG_Respond(handle, NULL, NULL, &out); + hret = margo_respond(mid, handle, &out); assert(hret == HG_SUCCESS); - HG_Bulk_free(bulk_handle); - HG_Destroy(handle); + margo_bulk_free(bulk_handle); + margo_destroy(handle); free(buffer); return; @@ -73,7 +72,6 @@ static void svc2_do_other_thing_ult(hg_handle_t handle) hg_return_t hret; svc2_do_other_thing_out_t out; svc2_do_other_thing_in_t in; - int ret; hg_size_t size; void *buffer; hg_bulk_t bulk_handle; @@ -83,10 +81,12 @@ static void svc2_do_other_thing_ult(hg_handle_t handle) ABT_xstream my_xstream; pthread_t my_tid; - ret = HG_Get_input(handle, &in); - assert(ret == HG_SUCCESS); - hgi = HG_Get_info(handle); + hret = margo_get_input(handle, &in); + assert(hret == HG_SUCCESS); + hgi = margo_get_info(handle); assert(hgi); + mid = margo_hg_info_get_instance(hgi); + assert(mid != MARGO_INSTANCE_NULL); ABT_xstream_self(&my_xstream); ABT_thread_self(&my_ult); @@ -102,25 +102,23 @@ static void svc2_do_other_thing_ult(hg_handle_t handle) assert(buffer); /* register local target buffer for bulk access */ - ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, + hret = margo_bulk_create(mid, 1, &buffer, &size, HG_BULK_WRITE_ONLY, &bulk_handle); - assert(ret == 0); - - mid = margo_hg_handle_get_instance(handle); + assert(hret == HG_SUCCESS); /* do bulk transfer from client to server */ - ret = margo_bulk_transfer(mid, HG_BULK_PULL, + hret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, in.bulk_handle, 0, bulk_handle, 0, size); - assert(ret == 0); + assert(hret == HG_SUCCESS); - HG_Free_input(handle, &in); + margo_free_input(handle, &in); - hret = HG_Respond(handle, NULL, NULL, &out); + hret = margo_respond(mid, handle, &out); assert(hret == HG_SUCCESS); - HG_Bulk_free(bulk_handle); - HG_Destroy(handle); + margo_bulk_free(bulk_handle); + margo_destroy(handle); free(buffer); return; @@ -131,10 +129,10 @@ int svc2_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id) { MARGO_REGISTER_MPLEX(mid, "svc2_do_thing", svc2_do_thing_in_t, svc2_do_thing_out_t, - svc2_do_thing_ult, mplex_id, pool, MARGO_RPC_ID_IGNORE); + svc2_do_thing_ult, mplex_id, pool); MARGO_REGISTER_MPLEX(mid, "svc2_do_other_thing", svc2_do_other_thing_in_t, svc2_do_other_thing_out_t, - svc2_do_other_thing_ult, mplex_id, pool, MARGO_RPC_ID_IGNORE); + svc2_do_other_thing_ult, mplex_id, pool); return(0); } @@ -144,4 +142,3 @@ void svc2_deregister(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id) /* TODO: undo what was done in svc2_register() */ return; } - diff --git a/examples/my-rpc.c b/examples/my-rpc.c index 0d5ecd50116dbc51e0303235e895945b080c77d2..47f315c378ca6f6f9f8feae28a55c919112907e6 100644 --- a/examples/my-rpc.c +++ b/examples/my-rpc.c @@ -23,19 +23,19 @@ static void my_rpc_ult(hg_handle_t handle) hg_return_t hret; my_rpc_out_t out; my_rpc_in_t in; - int ret; hg_size_t size; void *buffer; hg_bulk_t bulk_handle; const struct hg_info *hgi; #if 0 + int ret; int fd; char filename[256]; #endif margo_instance_id mid; - ret = HG_Get_input(handle, &in); - assert(ret == HG_SUCCESS); + hret = margo_get_input(handle, &in); + assert(hret == HG_SUCCESS); printf("Got RPC request with input_val: %d\n", in.input_val); out.ret = 0; @@ -45,20 +45,22 @@ static void my_rpc_ult(hg_handle_t handle) buffer = calloc(1, 512); assert(buffer); - /* register local target buffer for bulk access */ - hgi = HG_Get_info(handle); + /* get handle info and margo instance */ + hgi = margo_get_info(handle); assert(hgi); - ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, - &size, HG_BULK_WRITE_ONLY, &bulk_handle); - assert(ret == 0); + mid = margo_hg_info_get_instance(hgi); + assert(mid != MARGO_INSTANCE_NULL); - mid = margo_hg_handle_get_instance(handle); + /* register local target buffer for bulk access */ + hret = margo_bulk_create(mid, 1, &buffer, + &size, HG_BULK_WRITE_ONLY, &bulk_handle); + assert(hret == HG_SUCCESS); /* do bulk transfer from client to server */ - ret = margo_bulk_transfer(mid, HG_BULK_PULL, + hret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, in.bulk_handle, 0, bulk_handle, 0, size); - assert(ret == 0); + assert(hret == HG_SUCCESS); /* write to a file; would be done with abt-io if we enabled it */ #if 0 @@ -72,13 +74,13 @@ static void my_rpc_ult(hg_handle_t handle) abt_io_close(aid, fd); #endif - HG_Free_input(handle, &in); + margo_free_input(handle, &in); - hret = HG_Respond(handle, NULL, NULL, &out); + hret = margo_respond(mid, handle, &out); assert(hret == HG_SUCCESS); - HG_Bulk_free(bulk_handle); - HG_Destroy(handle); + margo_bulk_free(bulk_handle); + margo_destroy(handle); free(buffer); return; @@ -93,14 +95,16 @@ static void my_rpc_shutdown_ult(hg_handle_t handle) printf("Got RPC request to shutdown\n"); - hgi = HG_Get_info(handle); + /* get handle info and margo instance */ + hgi = margo_get_info(handle); assert(hgi); - mid = margo_hg_handle_get_instance(handle); + mid = margo_hg_info_get_instance(hgi); + assert(mid != MARGO_INSTANCE_NULL); hret = margo_respond(mid, handle, NULL); assert(hret == HG_SUCCESS); - HG_Destroy(handle); + margo_destroy(handle); margo_diag_dump(mid, "-", 0); diff --git a/include/margo.h b/include/margo.h index c614a47b12752c0fc16cfefb8e2f71e4a0850c3b..906489f89486a1788df5575ff9a269cdc0ec5c3f 100644 --- a/include/margo.h +++ b/include/margo.h @@ -17,8 +17,9 @@ extern "C" { * respecively. */ -#include #include +#include +#include #include #include @@ -29,13 +30,18 @@ typedef struct margo_instance* margo_instance_id; typedef struct margo_data* margo_data_ptr; #define MARGO_INSTANCE_NULL ((margo_instance_id)NULL) +#define MARGO_CLIENT_MODE 0 +#define MARGO_SERVER_MODE 1 #define MARGO_DEFAULT_MPLEX_ID 0 -#define MARGO_RPC_ID_IGNORE ((hg_id_t*)NULL) -#define MARGO_INFO_PROGRESS_TIMEOUT_UB 1 +#define MARGO_PARAM_PROGRESS_TIMEOUT_UB 1 /** * Initializes margo library. + * @param [in] addr_str Mercury host address with port number + * @param [in] mode Mode to run Margo in: + * - MARGO_CLIENT_MODE + * - MARGO_SERVER_MODE * @param [in] use_progress_thread Boolean flag to use a dedicated thread for * running Mercury's progress loop. If false, * it will run in the caller's thread context. @@ -47,7 +53,6 @@ typedef struct margo_data* margo_data_ptr; * of 0. A value of -1 directs Margo to use * the same execution context as that used * for Mercury progress. - * @param [in] hg_context * @returns margo instance id on success, MARGO_INSTANCE_NULL upon error * * NOTE: Servers (processes expecting to service incoming RPC requests) must @@ -55,8 +60,11 @@ typedef struct margo_data* margo_data_ptr; * call margo_wait_for_finalize() after margo_init() to relinguish control to * Margo. */ -margo_instance_id margo_init(int use_progress_thread, int rpc_thread_count, - hg_context_t *hg_context); +margo_instance_id margo_init( + const char *addr_str, + int mode, + int use_progress_thread, + int rpc_thread_count); /** * Initializes margo library from given argobots and Mercury instances. @@ -65,14 +73,17 @@ margo_instance_id margo_init(int use_progress_thread, int rpc_thread_count, * @param [in] hg_context Mercury context * @returns margo instance id on success, MARGO_INSTANCE_NULL upon error */ -margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool, +margo_instance_id margo_init_pool( + ABT_pool progress_pool, + ABT_pool handler_pool, hg_context_t *hg_context); /** * Shuts down margo library and its underlying abt and mercury resources * @param [in] mid Margo instance */ -void margo_finalize(margo_instance_id mid); +void margo_finalize( + margo_instance_id mid); /** * Suspends the caller until some other entity (e.g. an RPC, thread, or @@ -84,30 +95,65 @@ void margo_finalize(margo_instance_id mid); * * @param [in] mid Margo instance */ -void margo_wait_for_finalize(margo_instance_id mid); +void margo_wait_for_finalize( + margo_instance_id mid); -/** - * Retrieve the abt_handler pool that was associated with the instance at - * initialization time - * @param [in] mid Margo instance +/** + * Registers an RPC with margo + * + * \param [in] mid Margo instance + * \param [in] func_name unique function name for RPC + * \param [in] in_proc_cb pointer to input proc callback + * \param [in] out_proc_cb pointer to output proc callback + * \param [in] rpc_cb RPC callback + * + * \return unique ID associated to the registered function */ -ABT_pool* margo_get_handler_pool(margo_instance_id mid); +hg_id_t margo_register_name( + margo_instance_id mid, + const char *func_name, + hg_proc_cb_t in_proc_cb, + hg_proc_cb_t out_proc_cb, + hg_rpc_cb_t rpc_cb); -/** - * Retrieve the Mercury context that was associated with this instance at - * initialization time - * @param [in] mid Margo instance - * @return the Mercury context used in margo_init +/** + * Registers an RPC with margo that is associated with a multiplexed service + * + * \param [in] mid Margo instance + * \param [in] func_name unique function name for RPC + * \param [in] in_proc_cb pointer to input proc callback + * \param [in] out_proc_cb pointer to output proc callback + * \param [in] rpc_cb RPC callback + * \param [in] mplex_id multiplexing identifier + * \param [in] pool Argobots pool the handler will execute in + * + * \return unique ID associated to the registered function */ -hg_context_t* margo_get_context(margo_instance_id mid); +hg_id_t margo_register_name_mplex( + margo_instance_id mid, + const char *func_name, + hg_proc_cb_t in_proc_cb, + hg_proc_cb_t out_proc_cb, + hg_rpc_cb_t rpc_cb, + uint32_t mplex_id, + ABT_pool pool); -/** - * Retrieve the Mercury class that was associated with this instance at - * initialization time - * @param [in] mid Margo instance - * @return the Mercury class used in margo_init +/* + * Indicate whether margo_register_name() has been called for the RPC specified by + * func_name. + * + * \param [in] mid Margo instance + * \param [in] func_name function name + * \param [out] id registered RPC ID + * \param [out] flag pointer to boolean + * + * \return HG_SUCCESS or corresponding HG error code */ -hg_class_t* margo_get_class(margo_instance_id mid); +hg_return_t margo_registered_name( + margo_instance_id mid, + const char *func_name, + hg_id_t *id, + hg_bool_t *flag); /** * Register and associate user data to registered function. @@ -136,16 +182,192 @@ hg_return_t margo_register_data( * * \return Pointer to data or NULL */ -void* margo_registered_data(margo_instance_id mid, hg_id_t id); +void* margo_registered_data( + margo_instance_id mid, + hg_id_t id); /** - * Get the margo_instance_id from a received RPC handle. + * Disable response for a given RPC ID. * - * \param [in] h RPC handle - * - * \return Margo instance + * \param [in] mid Margo instance + * \param [in] id registered function ID + * \param [in] disable_flag flag to disable (1) or re-enable (0) responses + * + * \return HG_SUCCESS or corresponding HG error code */ -margo_instance_id margo_hg_handle_get_instance(hg_handle_t h); +hg_return_t margo_registered_disable_response( + margo_instance_id mid, + hg_id_t id, + int disable_flag); + +/** + * Lookup an addr from a peer address/name. + * \param [in] name lookup name + * \param [out] addr return address + * + * \return HG_SUCCESS or corresponding HG error code + */ +hg_return_t margo_addr_lookup( + margo_instance_id mid, + const char *name, + hg_addr_t *addr); + +/** + * Free the given Mercury addr. + * + * \param [in] mid Margo instance + * \param [in] addr Mercury address + * + * \return HG_SUCCESS or corresponding HG error code + */ +hg_return_t margo_addr_free( + margo_instance_id mid, + hg_addr_t addr); + +/** + * Access self address. Address must be freed with margo_addr_free(). + * + * \param [in] mid Margo instance + * \param [in] addr pointer to abstract Mercury address + * + * \return HG_SUCCESS or corresponding HG error code + */ +hg_return_t margo_addr_self( + margo_instance_id mid, + hg_addr_t *addr); + +/** + * Duplicate an existing Mercury address. + * + * \param [in] mid Margo instance + * \param [in] addr abstract Mercury address to duplicate + * \param [in] new_addr pointer to newly allocated abstract Mercury address + * + * \return HG_SUCCESS or corresponding HG error code + */ +hg_return_t margo_addr_dup( + margo_instance_id mid, + hg_addr_t addr, + hg_addr_t *new_addr); + +/** + * Convert a Mercury addr to a string (returned string includes the + * terminating null byte '\0'). If buf is NULL, the address is not + * converted and only the required size of the buffer is returned. + * If the input value passed through buf_size is too small, HG_SIZE_ERROR + * is returned and the buf_size output is set to the minimum size required. + * + * \param [in] mid Margo instance + * \param [in/out] buf pointer to destination buffer + * \param [in/out] buf_size pointer to buffer size + * \param [in] addr abstract Mercury address + * + * \return HG_SUCCESS or corresponding HG error code + */ +hg_return_t margo_addr_to_string( + margo_instance_id mid, + char *buf, + hg_size_t *buf_size, + hg_addr_t addr); + +/** + * Initiate a new Mercury RPC using the specified function ID and the + * local/remote target defined by addr. The handle created can be used to + * query input and output, as well as issuing the RPC by calling + * HG_Forward(). After completion the handle must be freed using HG_Destroy(). + * + * \param [in] mid Margo instance + * \param [in] addr abstract Mercury address of destination + * \param [in] id registered function ID + * \param [out] handle pointer to HG handle + * + * \return HG_SUCCESS or corresponding HG error code + */ +hg_return_t margo_create( + margo_instance_id mid, + hg_addr_t addr, + hg_id_t id, + hg_handle_t *handle); + +/** + * Destroy Mercury handle. + * + * \param [in] handle Mercury handle + * + * \return HG_SUCCESS or corresponding HG error code + */ +hg_return_t margo_destroy( + hg_handle_t handle); + +/** + * Increment ref count on a Mercury handle. + * + * \param [in] handle Mercury handle + * + * \return HG_SUCCESS or corresponding HG error code + */ +#define margo_ref_incr HG_Ref_incr + +/** + * Get info from handle. + * + * \param [in] handle Mercury handle + * + * \return Pointer to info or NULL in case of failure + */ +#define margo_get_info HG_Get_info + +/** + * Get input from handle (requires registration of input proc to deserialize + * parameters). Input must be freed using margo_free_input(). + * + * \param [in] handle Mercury handle + * \param [in/out] in_struct pointer to input structure + * + * \return HG_SUCCESS or corresponding HG error code + */ +#define margo_get_input HG_Get_input + +/** + * Free resources allocated when deserializing the input. + * + * \param [in] handle Mercury handle + * \param [in/out] in_struct pointer to input structure + * + * \return HG_SUCCESS or corresponding HG error code + */ +#define margo_free_input HG_Free_input + +/** + * Get output from handle (requires registration of output proc to deserialize + * parameters). Output must be freed using margo_free_output(). + * + * \param [in] handle Mercury handle + * \param [in/out] out_struct pointer to output structure + * + * \return HG_SUCCESS or corresponding HG error code + */ +#define margo_get_output HG_Get_output + +/** + * Free resources allocated when deserializing the output. + * + * \param [in] handle Mercury handle + * \param [in/out] out_struct pointer to output structure + * + * \return HG_SUCCESS or corresponding HG error code + */ +#define margo_free_output HG_Free_output + +/** + * Set target ID that will receive and process RPC request. + * + * \param [in] handle Mercury handle + * \param [in] target_id user-defined target ID + * + * \return HG_SUCCESS or corresponding HG error code + */ +#define margo_set_target_id HG_Set_target_id /** * Forward an RPC request to a remote host @@ -191,6 +413,131 @@ hg_return_t margo_respond( hg_handle_t handle, void *out_struct); +/** + * Create an abstract bulk handle from specified memory segments. + * Memory allocated is then freed when margo_bulk_free() is called. + * \remark If NULL is passed to buf_ptrs, i.e., + * \verbatim margo_bulk_create(mid, count, NULL, buf_sizes, flags, &handle) \endverbatim + * memory for the missing buf_ptrs array will be internally allocated. + * + * \param [in] mid Margo instance + * \param [in] count number of segments + * \param [in] buf_ptrs array of pointers + * \param [in] buf_sizes array of sizes + * \param [in] flags permission flag: + * - HG_BULK_READWRITE + * - HG_BULK_READ_ONLY + * - HG_BULK_WRITE_ONLY + * \param [out] handle pointer to returned abstract bulk handle + * + * \return HG_SUCCESS or corresponding HG error code + */ +hg_return_t margo_bulk_create( + margo_instance_id mid, + hg_uint32_t count, + void **buf_ptrs, + const hg_size_t *buf_sizes, + hg_uint8_t flags, + hg_bulk_t *handle); + +/** + * Free bulk handle. + * + * \param [in/out] handle abstract bulk handle + * + * \return HG_SUCCESS or corresponding HG error code + */ +hg_return_t margo_bulk_free( + hg_bulk_t handle); + +/** + * Increment ref count on bulk handle. + * + * \param handle [IN] abstract bulk handle + * + * \return HG_SUCCESS or corresponding HG error code + */ +#define margo_bulk_ref_incr HG_Bulk_ref_incr + +/** + * Access bulk handle to retrieve memory segments abstracted by handle. + * + * \param [in] handle abstract bulk handle + * \param [in] offset bulk offset + * \param [in] size bulk size + * \param [in] flags permission flag: + * - HG_BULK_READWRITE + * - HG_BULK_READ_ONLY + * \param [in] max_count maximum number of segments to be returned + * \param [in/out] buf_ptrs array of buffer pointers + * \param [in/out] buf_sizes array of buffer sizes + * \param [out] actual_count actual number of segments returned + * + * \return HG_SUCCESS or corresponding HG error code + */ +#define margo_bulk_access HG_Bulk_access + +/** + * Get total size of data abstracted by bulk handle. + * + * \param [in] handle abstract bulk handle + * + * \return Non-negative value + */ +#define margo_bulk_get_size HG_Bulk_get_size + +/** + * Get total number of segments abstracted by bulk handle. + * + * \param [in] handle abstract bulk handle + * + * \return Non-negative value + */ +#define margo_bulk_get_segment_count HG_Bulk_get_segment_count + +/** + * Get size required to serialize bulk handle. + * + * \param [in] handle abstract bulk handle + * \param [in] request_eager boolean (passing HG_TRUE adds size of encoding + * actual data along the handle if handle meets + * HG_BULK_READ_ONLY flag condition) + * + * \return Non-negative value + */ +#define margo_bulk_get_serialize_size HG_Bulk_get_serialize_size + +/** + * Serialize bulk handle into a buffer. + * + * \param [in/out] buf pointer to buffer + * \param [in] buf_size buffer size + * \param [in] request_eager boolean (passing HG_TRUE encodes actual data + * along the handle, which is more efficient for + * small data, this is only valid if bulk handle + * has HG_BULK_READ_ONLY permission) + * \param [in] handle abstract bulk handle + * + * \return HG_SUCCESS or corresponding HG error code + */ +#define margo_bulk_serialize HG_Bulk_serialize + +/** + * Deserialize bulk handle from an existing buffer. + * + * \param [in] mid Margo instance + * \param [out] handle abstract bulk handle + * \param [in] buf pointer to buffer + * \param [in] buf_size buffer size + * + * \return HG_SUCCESS or corresponding HG error code + */ +hg_return_t margo_bulk_deserialize( + margo_instance_id mid, + hg_bulk_t *handle, + const void *buf, + hg_size_t buf_size); + /** * Perform a bulk transfer * @param [in] mid Margo instance @@ -213,17 +560,6 @@ hg_return_t margo_bulk_transfer( size_t local_offset, size_t size); -/** - * address lookup - * @param [in] name lookup name - * @param [out] addr return address - * @returns HG_SUCCESS on success - */ -hg_return_t margo_addr_lookup( - margo_instance_id mid, - const char *name, - hg_addr_t *addr); - /** * Suspends the calling ULT for a specified time duration * @param [in] mid Margo instance @@ -233,21 +569,46 @@ void margo_thread_sleep( margo_instance_id mid, double timeout_ms); -/** - * Registers an RPC with margo +/** + * Retrieve the abt_handler pool that was associated with the instance at + * initialization time * @param [in] mid Margo instance - * @param [in] id Mercury RPC identifier */ -int margo_register(margo_instance_id mid, hg_id_t id); +ABT_pool* margo_get_handler_pool(margo_instance_id mid); -/** - * Registers an RPC with margo that is associated with a multiplexed service +/** + * Retrieve the Mercury context that was associated with this instance at + * initialization time * @param [in] mid Margo instance - * @param [in] id Mercury RPC identifier - * @param [in] mplex_id multiplexing identifier - * @param [in] pool Argobots pool the handler will execute in + * @return the Mercury context used in margo_init */ -int margo_register_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, ABT_pool pool); +hg_context_t* margo_get_context(margo_instance_id mid); + +/** + * Retrieve the Mercury class that was associated with this instance at + * initialization time + * @param [in] mid Margo instance + * @return the Mercury class used in margo_init + */ +hg_class_t* margo_get_class(margo_instance_id mid); + +/** + * Get the margo_instance_id from a received RPC handle. + * + * \param [in] h RPC handle + * + * \return Margo instance + */ +margo_instance_id margo_hg_handle_get_instance(hg_handle_t h); + +/** + * Get the margo_instance_id from a received RPC handle. + * + * \param [in] info RPC info structure pointer + * + * \return Margo instance + */ +margo_instance_id margo_hg_info_get_instance(const struct hg_info *info); /** * Maps an RPC id and mplex id to the pool that it should execute on @@ -286,7 +647,7 @@ void margo_diag_dump(margo_instance_id mid, const char* file, int uniquify); * @param [out] inout_param used to pass in values * @returns void */ -void margo_set_info(margo_instance_id mid, int option, const void *param); +void margo_set_param(margo_instance_id mid, int option, const void *param); /** * Retrieves configurable parameters/hints @@ -296,49 +657,24 @@ void margo_set_info(margo_instance_id mid, int option, const void *param); * @param [out] param used to pass out values * @returns void */ -void margo_get_info(margo_instance_id mid, int option, void *param); +void margo_get_param(margo_instance_id mid, int option, void *param); /** * macro that registers a function as an RPC. */ -#define MARGO_REGISTER(__mid, __func_name, __in_t, __out_t, __handler, __rpc_id_ptr) do { \ - hg_return_t __hret; \ - hg_id_t __id; \ - hg_bool_t __flag; \ - int __ret; \ - __hret = HG_Registered_name(margo_get_class(__mid), __func_name, &__id, &__flag); \ - assert(__hret == HG_SUCCESS); \ - if(!__flag) \ - __id = HG_Register_name(margo_get_class(__mid), __func_name,\ - BOOST_PP_CAT(hg_proc_, __in_t),\ - BOOST_PP_CAT(hg_proc_, __out_t),\ - __handler##_handler); \ - __ret = margo_register(__mid, __id); \ - assert(__ret == 0); \ - if(__rpc_id_ptr != MARGO_RPC_ID_IGNORE) { \ - *(__rpc_id_ptr) = __id; \ - } \ -} while(0) - -#define MARGO_REGISTER_MPLEX(__mid, __func_name, __in_t, __out_t, __handler, __mplex_id, __pool, __rpc_id_ptr) do { \ - hg_return_t __hret; \ - hg_id_t __id; \ - hg_bool_t __flag; \ - int __ret; \ - __hret = HG_Registered_name(margo_get_class(__mid), __func_name, &__id, &__flag); \ - assert(__hret == HG_SUCCESS); \ - if(!__flag) \ - __id = HG_Register_name(margo_get_class(__mid), __func_name,\ - BOOST_PP_CAT(hg_proc_, __in_t),\ - BOOST_PP_CAT(hg_proc_, __out_t),\ - __handler##_handler); \ - __ret = margo_register_mplex(__mid, __id, __mplex_id, __pool); \ - assert(__ret == 0); \ - if(__rpc_id_ptr != MARGO_RPC_ID_IGNORE) { \ - *(__rpc_id_ptr) = __id; \ - } \ -} while(0) +#define MARGO_REGISTER(__mid, __func_name, __in_t, __out_t, __handler) \ + margo_register_name(__mid, __func_name, \ + BOOST_PP_CAT(hg_proc_, __in_t), \ + BOOST_PP_CAT(hg_proc_, __out_t), \ + __handler##_handler); + +#define MARGO_REGISTER_MPLEX(__mid, __func_name, __in_t, __out_t, __handler, __mplex_id, __pool) \ + margo_register_name_mplex(__mid, __func_name, \ + BOOST_PP_CAT(hg_proc_, __in_t), \ + BOOST_PP_CAT(hg_proc_, __out_t), \ + __handler##_handler, \ + __mplex_id, __pool); #define NULL_handler NULL diff --git a/src/margo.c b/src/margo.c index bf19925be91371d8f38d3cf24913e3817964e9da..d9f1e16bb133603d50dbf23b50b3b6a0965ea918 100644 --- a/src/margo.c +++ b/src/margo.c @@ -58,6 +58,7 @@ struct margo_instance ABT_pool progress_pool; /* internal to margo for this particular instance */ + int margo_init; ABT_thread hg_progress_tid; int hg_progress_shutdown_flag; ABT_xstream progress_xstream; @@ -102,27 +103,31 @@ struct margo_rpc_data }; static void hg_progress_fn(void* foo); -static int margo_xstream_is_in_progress_pool(margo_instance_id mid); static void margo_rpc_data_free(void* ptr); -struct handler_entry +margo_instance_id margo_init(const char *addr_str, int mode, + int use_progress_thread, int rpc_thread_count) { - void* fn; - hg_handle_t handle; - struct handler_entry *next; -}; - -margo_instance_id margo_init(int use_progress_thread, int rpc_thread_count, - hg_context_t *hg_context) -{ - struct margo_instance *mid = MARGO_INSTANCE_NULL; ABT_xstream progress_xstream = ABT_XSTREAM_NULL; ABT_pool progress_pool = ABT_POOL_NULL; ABT_xstream *rpc_xstreams = NULL; ABT_xstream rpc_xstream = ABT_XSTREAM_NULL; ABT_pool rpc_pool = ABT_POOL_NULL; - int ret; + hg_class_t *hg_class = NULL; + hg_context_t *hg_context = NULL; + int listen_flag = (mode == MARGO_CLIENT_MODE) ? HG_FALSE : HG_TRUE; int i; + int ret; + struct margo_instance *mid = MARGO_INSTANCE_NULL; + + if(mode != MARGO_CLIENT_MODE && mode != MARGO_SERVER_MODE) goto err; + + ret = ABT_init(0, NULL); /* XXX: argc/argv not currently used by ABT ... */ + if(ret != 0) goto err; + + /* set caller (self) ES to idle without polling */ + ret = ABT_snoozer_xstream_self_set(); + if(ret != 0) goto err; if (use_progress_thread) { @@ -137,37 +142,54 @@ margo_instance_id margo_init(int use_progress_thread, int rpc_thread_count, if (ret != ABT_SUCCESS) goto err; } - if (rpc_thread_count > 0) - { - rpc_xstreams = malloc(rpc_thread_count * sizeof(*rpc_xstreams)); - if (rpc_xstreams == NULL) goto err; - ret = ABT_snoozer_xstream_create(rpc_thread_count, &rpc_pool, - rpc_xstreams); - if (ret != ABT_SUCCESS) goto err; - } - else if (rpc_thread_count == 0) - { - ret = ABT_xstream_self(&rpc_xstream); - if (ret != ABT_SUCCESS) goto err; - ret = ABT_xstream_get_main_pools(rpc_xstream, 1, &rpc_pool); - if (ret != ABT_SUCCESS) goto err; - } - else + if (mode == MARGO_SERVER_MODE) { - rpc_pool = progress_pool; + if (rpc_thread_count > 0) + { + rpc_xstreams = malloc(rpc_thread_count * sizeof(*rpc_xstreams)); + if (rpc_xstreams == NULL) goto err; + ret = ABT_snoozer_xstream_create(rpc_thread_count, &rpc_pool, + rpc_xstreams); + if (ret != ABT_SUCCESS) goto err; + } + else if (rpc_thread_count == 0) + { + ret = ABT_xstream_self(&rpc_xstream); + if (ret != ABT_SUCCESS) goto err; + ret = ABT_xstream_get_main_pools(rpc_xstream, 1, &rpc_pool); + if (ret != ABT_SUCCESS) goto err; + } + else + { + rpc_pool = progress_pool; + } } + hg_class = HG_Init(addr_str, listen_flag); + if(!hg_class) goto err; + + hg_context = HG_Context_create(hg_class); + if(!hg_context) goto err; + mid = margo_init_pool(progress_pool, rpc_pool, hg_context); if (mid == MARGO_INSTANCE_NULL) goto err; + mid->margo_init = 1; mid->owns_progress_pool = use_progress_thread; mid->progress_xstream = progress_xstream; mid->num_handler_pool_threads = rpc_thread_count < 0 ? 0 : rpc_thread_count; mid->rpc_xstreams = rpc_xstreams; - mid->hg_progress_timeout_ub = DEFAULT_MERCURY_PROGRESS_TIMEOUT_UB; + return mid; err: + if(mid) + { + margo_timer_instance_finalize(mid); + ABT_mutex_free(&mid->finalize_mutex); + ABT_cond_free(&mid->finalize_cond); + free(mid); + } if (use_progress_thread && progress_xstream != ABT_XSTREAM_NULL) { ABT_xstream_join(progress_xstream); @@ -182,6 +204,11 @@ err: } free(rpc_xstreams); } + if(hg_context) + HG_Context_destroy(hg_context); + if(hg_class) + HG_Finalize(hg_class); + ABT_finalize(); return MARGO_INSTANCE_NULL; } @@ -192,8 +219,7 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool, struct margo_instance *mid; mid = malloc(sizeof(*mid)); - if(!mid) - return(MARGO_INSTANCE_NULL); + if(!mid) goto err; memset(mid, 0, sizeof(*mid)); ABT_mutex_create(&mid->finalize_mutex); @@ -203,26 +229,27 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool, mid->handler_pool = handler_pool; mid->hg_class = HG_Context_get_class(hg_context); mid->hg_context = hg_context; + mid->hg_progress_timeout_ub = DEFAULT_MERCURY_PROGRESS_TIMEOUT_UB; mid->refcount = 1; ret = margo_timer_instance_init(mid); - if(ret != 0) - { - fprintf(stderr, "Error: margo_timer_instance_init()\n"); - free(mid); - return(MARGO_INSTANCE_NULL); - } + if(ret != 0) goto err; ret = ABT_thread_create(mid->progress_pool, hg_progress_fn, mid, ABT_THREAD_ATTR_NULL, &mid->hg_progress_tid); - if(ret != 0) + if(ret != 0) goto err; + + return mid; + +err: + if(mid) { - fprintf(stderr, "Error: ABT_thread_create()\n"); + margo_timer_instance_finalize(mid); + ABT_mutex_free(&mid->finalize_mutex); + ABT_cond_free(&mid->finalize_cond); free(mid); - return(MARGO_INSTANCE_NULL); } - - return mid; + return MARGO_INSTANCE_NULL; } static void margo_cleanup(margo_instance_id mid) @@ -250,6 +277,15 @@ static void margo_cleanup(margo_instance_id mid) free(mid->rpc_xstreams); } + if (mid->margo_init) + { + if (mid->hg_context) + HG_Context_destroy(mid->hg_context); + if (mid->hg_class) + HG_Finalize(mid->hg_class); + ABT_finalize(); + } + free(mid); } @@ -304,143 +340,77 @@ void margo_wait_for_finalize(margo_instance_id mid) return; } -/* dedicated thread function to drive Mercury progress */ -static void hg_progress_fn(void* foo) +hg_id_t margo_register_name(margo_instance_id mid, const char *func_name, + hg_proc_cb_t in_proc_cb, hg_proc_cb_t out_proc_cb, hg_rpc_cb_t rpc_cb) { - int ret; - unsigned int actual_count; - struct margo_instance *mid = (struct margo_instance *)foo; - size_t size; - unsigned int hg_progress_timeout = mid->hg_progress_timeout_ub; - double next_timer_exp; - int trigger_happened; - double tm1, tm2; - int diag_enabled = 0; - - while(!mid->hg_progress_shutdown_flag) - { - trigger_happened = 0; - do { - /* save value of instance diag variable, in case it is modified - * while we are in loop - */ - diag_enabled = mid->diag_enabled; - - if(diag_enabled) tm1 = ABT_get_wtime(); - ret = HG_Trigger(mid->hg_context, 0, 1, &actual_count); - if(diag_enabled) - { - tm2 = ABT_get_wtime(); - __DIAG_UPDATE(mid->diag_trigger_elapsed, (tm2-tm1)); - } - - if(ret == HG_SUCCESS && actual_count > 0) - trigger_happened = 1; - } while((ret == HG_SUCCESS) && actual_count && !mid->hg_progress_shutdown_flag); + struct margo_rpc_data* margo_data; + hg_return_t hret; + hg_id_t id; - if(trigger_happened) - ABT_thread_yield(); + id = HG_Register_name(mid->hg_class, func_name, in_proc_cb, out_proc_cb, rpc_cb); + if(id <= 0) + return(0); - ABT_pool_get_size(mid->progress_pool, &size); - /* Are there any other threads executing in this pool that are *not* - * blocked ? If so then, we can't sleep here or else those threads - * will not get a chance to execute. - * TODO: check is ABT_pool_get_size returns the number of ULT/tasks - * that can be executed including this one, or not including this one. - */ - if(size > 0) + /* register the margo data with the RPC */ + margo_data = (struct margo_rpc_data*)HG_Registered_data(mid->hg_class, id); + if(!margo_data) + { + margo_data = (struct margo_rpc_data*)malloc(sizeof(struct margo_rpc_data)); + if(!margo_data) + return(0); + margo_data->mid = mid; + margo_data->user_data = NULL; + margo_data->user_free_callback = NULL; + hret = HG_Register_data(mid->hg_class, id, margo_data, margo_rpc_data_free); + if(hret != HG_SUCCESS) { - /* TODO: this is being executed more than is necessary (i.e. - * in cases where there are other legitimate ULTs eligible - * for execution that are not blocking on any events, Margo - * or otherwise). Maybe we need an abt scheduling tweak here - * to make sure that this ULT is the lowest priority in that - * scenario. - */ - if(diag_enabled) tm1 = ABT_get_wtime(); - ret = HG_Progress(mid->hg_context, 0); - if(diag_enabled) - { - tm2 = ABT_get_wtime(); - __DIAG_UPDATE(mid->diag_progress_elapsed_zero_timeout, (tm2-tm1)); - __DIAG_UPDATE(mid->diag_progress_timeout_value, 0); - } - if(ret == HG_SUCCESS) - { - /* Mercury completed something; loop around to trigger - * callbacks - */ - } - else if(ret == HG_TIMEOUT) - { - /* No completion; yield here to allow other ULTs to run */ - ABT_thread_yield(); - } - else - { - /* TODO: error handling */ - fprintf(stderr, "WARNING: unexpected return code (%d) from HG_Progress()\n", ret); - } + free(margo_data); + return(0); } - else - { - hg_progress_timeout = mid->hg_progress_timeout_ub; - ret = margo_timer_get_next_expiration(mid, &next_timer_exp); - if(ret == 0) - { - /* there is a queued timer, don't block long enough - * to keep this timer waiting - */ - if(next_timer_exp >= 0.0) - { - next_timer_exp *= 1000; /* convert to milliseconds */ - if(next_timer_exp < mid->hg_progress_timeout_ub) - hg_progress_timeout = (unsigned int)next_timer_exp; - } - else - { - hg_progress_timeout = 0; - } - } - if(diag_enabled) tm1 = ABT_get_wtime(); - ret = HG_Progress(mid->hg_context, hg_progress_timeout); - if(diag_enabled) - { - tm2 = ABT_get_wtime(); - if(hg_progress_timeout == 0) - __DIAG_UPDATE(mid->diag_progress_elapsed_zero_timeout, (tm2-tm1)); - else - __DIAG_UPDATE(mid->diag_progress_elapsed_nonzero_timeout, (tm2-tm1)); - - __DIAG_UPDATE(mid->diag_progress_timeout_value, hg_progress_timeout); - } - if(ret != HG_SUCCESS && ret != HG_TIMEOUT) - { - /* TODO: error handling */ - fprintf(stderr, "WARNING: unexpected return code (%d) from HG_Progress()\n", ret); - } - } - - /* check for any expired timers */ - margo_check_timers(mid); } - return; + return(id); } -ABT_pool* margo_get_handler_pool(margo_instance_id mid) +hg_id_t margo_register_name_mplex(margo_instance_id mid, const char *func_name, + hg_proc_cb_t in_proc_cb, hg_proc_cb_t out_proc_cb, hg_rpc_cb_t rpc_cb, + uint32_t mplex_id, ABT_pool pool) { - return(&mid->handler_pool); -} + struct mplex_key key; + struct mplex_element *element; + hg_id_t id; -hg_context_t* margo_get_context(margo_instance_id mid) -{ - return(mid->hg_context); + id = margo_register_name(mid, func_name, in_proc_cb, out_proc_cb, rpc_cb); + if(id <= 0) + return(0); + + /* nothing to do, we'll let the handler pool take this directly */ + if(mplex_id == MARGO_DEFAULT_MPLEX_ID) + return(id); + + memset(&key, 0, sizeof(key)); + key.id = id; + key.mplex_id = mplex_id; + + HASH_FIND(hh, mid->mplex_table, &key, sizeof(key), element); + if(element) + return(id); + + element = malloc(sizeof(*element)); + if(!element) + return(0); + element->key = key; + element->pool = pool; + + HASH_ADD(hh, mid->mplex_table, key, sizeof(key), element); + + return(id); } -hg_class_t* margo_get_class(margo_instance_id mid) +hg_return_t margo_registered_name(margo_instance_id mid, const char *func_name, + hg_id_t *id, hg_bool_t *flag) { - return(mid->hg_class); + return(HG_Registered_name(mid->hg_class, func_name, id, flag)); } hg_return_t margo_register_data( @@ -450,7 +420,7 @@ hg_return_t margo_register_data( void (*free_callback)(void *)) { struct margo_rpc_data* margo_data - = (struct margo_rpc_data*) HG_Registered_data(margo_get_class(mid), id); + = (struct margo_rpc_data*) HG_Registered_data(mid->hg_class, id); if(!margo_data) return HG_OTHER_ERROR; margo_data->user_data = data; margo_data->user_free_callback = free_callback; @@ -465,54 +435,133 @@ void* margo_registered_data(margo_instance_id mid, hg_id_t id) else return data->user_data; } -margo_instance_id margo_hg_handle_get_instance(hg_handle_t h) +hg_return_t margo_registered_disable_response( + margo_instance_id mid, + hg_id_t id, + int disable_flag) { - const struct hg_info* info = HG_Get_info(h); - if(!info) return MARGO_INSTANCE_NULL; - struct margo_rpc_data* data = - (struct margo_rpc_data*) HG_Registered_data(info->hg_class, info->id); - if(!data) return MARGO_INSTANCE_NULL; - return data->mid; + return(HG_Registered_disable_response(mid->hg_class, id, disable_flag)); } -static hg_return_t margo_cb(const struct hg_cb_info *info) +struct lookup_cb_evt { - hg_return_t hret = info->ret; + hg_return_t hret; + hg_addr_t addr; +}; + +static hg_return_t margo_addr_lookup_cb(const struct hg_cb_info *info) +{ + struct lookup_cb_evt evt; + evt.hret = info->ret; + evt.addr = info->info.lookup.addr; struct margo_cb_arg* arg = info->arg; /* propagate return code out through eventual */ - ABT_eventual_set(*(arg->eventual), &hret, sizeof(hret)); - + ABT_eventual_set(*(arg->eventual), &evt, sizeof(evt)); + return(HG_SUCCESS); } -typedef struct +hg_return_t margo_addr_lookup( + margo_instance_id mid, + const char *name, + hg_addr_t *addr) { - hg_handle_t handle; -} margo_forward_timeout_cb_dat; + hg_return_t hret; + struct lookup_cb_evt *evt; + ABT_eventual eventual; + int ret; + struct margo_cb_arg arg; -static void margo_forward_timeout_cb(void *arg) + ret = ABT_eventual_create(sizeof(*evt), &eventual); + if(ret != 0) + { + return(HG_NOMEM_ERROR); + } + + arg.eventual = &eventual; + arg.mid = mid; + + hret = HG_Addr_lookup(mid->hg_context, margo_addr_lookup_cb, + &arg, name, HG_OP_ID_IGNORE); + if(hret == HG_SUCCESS) + { + ABT_eventual_wait(eventual, (void**)&evt); + *addr = evt->addr; + hret = evt->hret; + } + + ABT_eventual_free(&eventual); + + return(hret); +} + +hg_return_t margo_addr_free( + margo_instance_id mid, + hg_addr_t addr) { - margo_forward_timeout_cb_dat *timeout_cb_dat = - (margo_forward_timeout_cb_dat *)arg; + return(HG_Addr_free(mid->hg_class, addr)); +} - /* cancel the Mercury op if the forward timed out */ - HG_Cancel(timeout_cb_dat->handle); - return; +hg_return_t margo_addr_self( + margo_instance_id mid, + hg_addr_t *addr) +{ + return(HG_Addr_self(mid->hg_class, addr)); } -hg_return_t margo_forward_timed( +hg_return_t margo_addr_dup( + margo_instance_id mid, + hg_addr_t addr, + hg_addr_t *new_addr) +{ + return(HG_Addr_dup(mid->hg_class, addr, new_addr)); +} + +hg_return_t margo_addr_to_string( + margo_instance_id mid, + char *buf, + hg_size_t *buf_size, + hg_addr_t addr) +{ + return(HG_Addr_to_string(mid->hg_class, buf, buf_size, addr)); +} + +hg_return_t margo_create(margo_instance_id mid, hg_addr_t addr, + hg_id_t id, hg_handle_t *handle) +{ + /* TODO: handle caching logic? */ + + return(HG_Create(mid->hg_context, addr, id, handle)); +} + +hg_return_t margo_destroy(hg_handle_t handle) +{ + /* TODO handle caching logic? */ + + return(HG_Destroy(handle)); +} + +static hg_return_t margo_cb(const struct hg_cb_info *info) +{ + hg_return_t hret = info->ret; + struct margo_cb_arg* arg = info->arg; + + /* propagate return code out through eventual */ + ABT_eventual_set(*(arg->eventual), &hret, sizeof(hret)); + + return(HG_SUCCESS); +} + +hg_return_t margo_forward( margo_instance_id mid, hg_handle_t handle, - void *in_struct, - double timeout_ms) + void *in_struct) { - int ret; - hg_return_t hret; + hg_return_t hret = HG_TIMEOUT; ABT_eventual eventual; + int ret; hg_return_t* waited_hret; - margo_timer_t forward_timer; - margo_forward_timeout_cb_dat timeout_cb_dat; struct margo_cb_arg arg; ret = ABT_eventual_create(sizeof(hret), &eventual); @@ -521,44 +570,48 @@ hg_return_t margo_forward_timed( return(HG_NOMEM_ERROR); } - /* set a timer object to expire when this forward times out */ - timeout_cb_dat.handle = handle; - margo_timer_init(mid, &forward_timer, margo_forward_timeout_cb, - &timeout_cb_dat, timeout_ms); - arg.eventual = &eventual; arg.mid = mid; hret = HG_Forward(handle, margo_cb, &arg, in_struct); - if(hret == 0) + if(hret == HG_SUCCESS) { ABT_eventual_wait(eventual, (void**)&waited_hret); hret = *waited_hret; } - /* convert HG_CANCELED to HG_TIMEOUT to indicate op timed out */ - if(hret == HG_CANCELED) - hret = HG_TIMEOUT; - - /* remove timer if it is still in place (i.e., not timed out) */ - if(hret != HG_TIMEOUT) - margo_timer_destroy(mid, &forward_timer); - ABT_eventual_free(&eventual); return(hret); } +typedef struct +{ + hg_handle_t handle; +} margo_forward_timeout_cb_dat; + +static void margo_forward_timeout_cb(void *arg) +{ + margo_forward_timeout_cb_dat *timeout_cb_dat = + (margo_forward_timeout_cb_dat *)arg; + + /* cancel the Mercury op if the forward timed out */ + HG_Cancel(timeout_cb_dat->handle); + return; +} -hg_return_t margo_forward( +hg_return_t margo_forward_timed( margo_instance_id mid, hg_handle_t handle, - void *in_struct) + void *in_struct, + double timeout_ms) { - hg_return_t hret = HG_TIMEOUT; - ABT_eventual eventual; int ret; + hg_return_t hret; + ABT_eventual eventual; hg_return_t* waited_hret; + margo_timer_t forward_timer; + margo_forward_timeout_cb_dat timeout_cb_dat; struct margo_cb_arg arg; ret = ABT_eventual_create(sizeof(hret), &eventual); @@ -567,16 +620,29 @@ hg_return_t margo_forward( return(HG_NOMEM_ERROR); } + /* set a timer object to expire when this forward times out */ + timeout_cb_dat.handle = handle; + margo_timer_init(mid, &forward_timer, margo_forward_timeout_cb, + &timeout_cb_dat, timeout_ms); + arg.eventual = &eventual; arg.mid = mid; hret = HG_Forward(handle, margo_cb, &arg, in_struct); - if(hret == 0) + if(hret == HG_SUCCESS) { ABT_eventual_wait(eventual, (void**)&waited_hret); hret = *waited_hret; } + /* convert HG_CANCELED to HG_TIMEOUT to indicate op timed out */ + if(hret == HG_CANCELED) + hret = HG_TIMEOUT; + + /* remove timer if it is still in place (i.e., not timed out) */ + if(hret != HG_TIMEOUT) + margo_timer_destroy(mid, &forward_timer); + ABT_eventual_free(&eventual); return(hret); @@ -603,7 +669,7 @@ hg_return_t margo_respond( arg.mid = mid; hret = HG_Respond(handle, margo_cb, &arg, out_struct); - if(hret == 0) + if(hret == HG_SUCCESS) { ABT_eventual_wait(eventual, (void**)&waited_hret); hret = *waited_hret; @@ -614,70 +680,46 @@ hg_return_t margo_respond( return(hret); } - -static hg_return_t margo_bulk_transfer_cb(const struct hg_cb_info *info) +hg_return_t margo_bulk_create( + margo_instance_id mid, + hg_uint32_t count, + void **buf_ptrs, + const hg_size_t *buf_sizes, + hg_uint8_t flags, + hg_bulk_t *handle) { - hg_return_t hret = info->ret; - struct margo_cb_arg* arg = info->arg; + /* XXX: handle caching logic? */ - /* propagate return code out through eventual */ - ABT_eventual_set(*(arg->eventual), &hret, sizeof(hret)); - - return(HG_SUCCESS); + return(HG_Bulk_create(mid->hg_class, count, + buf_ptrs, buf_sizes, flags, handle)); } -struct lookup_cb_evt -{ - hg_return_t nret; - hg_addr_t addr; -}; - -static hg_return_t margo_addr_lookup_cb(const struct hg_cb_info *info) +hg_return_t margo_bulk_free( + hg_bulk_t handle) { - struct lookup_cb_evt evt; - evt.nret = info->ret; - evt.addr = info->info.lookup.addr; - struct margo_cb_arg* arg = info->arg; - - /* propagate return code out through eventual */ - ABT_eventual_set(*(arg->eventual), &evt, sizeof(evt)); + /* XXX: handle caching logic? */ - return(HG_SUCCESS); + return(HG_Bulk_free(handle)); } - -hg_return_t margo_addr_lookup( +hg_return_t margo_bulk_deserialize( margo_instance_id mid, - const char *name, - hg_addr_t *addr) + hg_bulk_t *handle, + const void *buf, + hg_size_t buf_size) { - hg_return_t nret; - struct lookup_cb_evt *evt; - ABT_eventual eventual; - int ret; - struct margo_cb_arg arg; - - ret = ABT_eventual_create(sizeof(*evt), &eventual); - if(ret != 0) - { - return(HG_NOMEM_ERROR); - } - - arg.eventual = &eventual; - arg.mid = mid; - - nret = HG_Addr_lookup(mid->hg_context, margo_addr_lookup_cb, - &arg, name, HG_OP_ID_IGNORE); - if(nret == 0) - { - ABT_eventual_wait(eventual, (void**)&evt); - *addr = evt->addr; - nret = evt->nret; - } + return(HG_Bulk_deserialize(mid->hg_class, handle, buf, buf_size)); +} - ABT_eventual_free(&eventual); +static hg_return_t margo_bulk_transfer_cb(const struct hg_cb_info *info) +{ + hg_return_t hret = info->ret; + struct margo_cb_arg* arg = info->arg; - return(nret); + /* propagate return code out through eventual */ + ABT_eventual_set(*(arg->eventual), &hret, sizeof(hret)); + + return(HG_SUCCESS); } hg_return_t margo_bulk_transfer( @@ -708,7 +750,7 @@ hg_return_t margo_bulk_transfer( hret = HG_Bulk_transfer(mid->hg_context, margo_bulk_transfer_cb, &arg, op, origin_addr, origin_handle, origin_offset, local_handle, local_offset, size, HG_OP_ID_IGNORE); - if(hret == 0) + if(hret == HG_SUCCESS) { ABT_eventual_wait(eventual, (void**)&waited_hret); hret = *waited_hret; @@ -721,7 +763,6 @@ hg_return_t margo_bulk_transfer( typedef struct { - margo_instance_id mid; ABT_mutex mutex; ABT_cond cond; char is_asleep; @@ -749,7 +790,6 @@ void margo_thread_sleep( margo_thread_sleep_cb_dat sleep_cb_dat; /* set data needed for sleep callback */ - sleep_cb_dat.mid = mid; ABT_mutex_create(&(sleep_cb_dat.mutex)); ABT_cond_create(&(sleep_cb_dat.cond)); sleep_cb_dat.is_asleep = 1; @@ -771,31 +811,34 @@ void margo_thread_sleep( return; } -/* returns 1 if current xstream is in the progress pool, 0 if not */ -static int margo_xstream_is_in_progress_pool(margo_instance_id mid) +ABT_pool* margo_get_handler_pool(margo_instance_id mid) { - int ret; - ABT_xstream xstream; - ABT_pool pool; + return(&mid->handler_pool); +} - ret = ABT_xstream_self(&xstream); - assert(ret == ABT_SUCCESS); - ret = ABT_xstream_get_main_pools(xstream, 1, &pool); - assert(ret == ABT_SUCCESS); +hg_context_t* margo_get_context(margo_instance_id mid) +{ + return(mid->hg_context); +} - if(pool == mid->progress_pool) - return(1); - else - return(0); +hg_class_t* margo_get_class(margo_instance_id mid) +{ + return(mid->hg_class); } -static void margo_rpc_data_free(void* ptr) +margo_instance_id margo_hg_handle_get_instance(hg_handle_t h) { - struct margo_rpc_data* data = (struct margo_rpc_data*) ptr; - if(data->user_data && data->user_free_callback) { - data->user_free_callback(data->user_data); - } - free(ptr); + const struct hg_info* info = HG_Get_info(h); + if(!info) return MARGO_INSTANCE_NULL; + return margo_hg_info_get_instance(info); +} + +margo_instance_id margo_hg_info_get_instance(const struct hg_info *info) +{ + struct margo_rpc_data* data = + (struct margo_rpc_data*) HG_Registered_data(info->hg_class, info->id); + if(!data) return MARGO_INSTANCE_NULL; + return data->mid; } int margo_lookup_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, ABT_pool *pool) @@ -824,52 +867,137 @@ int margo_lookup_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, ABT return(0); } -int margo_register(margo_instance_id mid, hg_id_t id) +static void margo_rpc_data_free(void* ptr) { - /* register the margo data with the RPC */ - struct margo_rpc_data* margo_data = (struct margo_rpc_data*)malloc(sizeof(struct margo_rpc_data)); - margo_data->mid = mid; - margo_data->user_data = NULL; - margo_data->user_free_callback = NULL; - hg_return_t ret = HG_Register_data(margo_get_class(mid), id, margo_data, margo_rpc_data_free); - return ret; + struct margo_rpc_data* data = (struct margo_rpc_data*) ptr; + if(data->user_data && data->user_free_callback) { + data->user_free_callback(data->user_data); + } + free(ptr); } -int margo_register_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, ABT_pool pool) +/* dedicated thread function to drive Mercury progress */ +static void hg_progress_fn(void* foo) { - struct mplex_key key; - struct mplex_element *element; + int ret; + unsigned int actual_count; + struct margo_instance *mid = (struct margo_instance *)foo; + size_t size; + unsigned int hg_progress_timeout = mid->hg_progress_timeout_ub; + double next_timer_exp; + int trigger_happened; + double tm1, tm2; + int diag_enabled = 0; - /* register the margo data with the RPC */ - struct margo_rpc_data* margo_data = (struct margo_rpc_data*)malloc(sizeof(struct margo_rpc_data)); - margo_data->mid = mid; - margo_data->user_data = NULL; - margo_data->user_free_callback = NULL; - hg_return_t ret = HG_Register_data(margo_get_class(mid), id, margo_data, margo_rpc_data_free); - if(ret != HG_SUCCESS) - return ret; + while(!mid->hg_progress_shutdown_flag) + { + trigger_happened = 0; + do { + /* save value of instance diag variable, in case it is modified + * while we are in loop + */ + diag_enabled = mid->diag_enabled; - /* nothing to do, we'll let the handler pool take this directly */ - if(mplex_id == MARGO_DEFAULT_MPLEX_ID) - return(0); + if(diag_enabled) tm1 = ABT_get_wtime(); + ret = HG_Trigger(mid->hg_context, 0, 1, &actual_count); + if(diag_enabled) + { + tm2 = ABT_get_wtime(); + __DIAG_UPDATE(mid->diag_trigger_elapsed, (tm2-tm1)); + } - memset(&key, 0, sizeof(key)); - key.id = id; - key.mplex_id = mplex_id; + if(ret == HG_SUCCESS && actual_count > 0) + trigger_happened = 1; + } while((ret == HG_SUCCESS) && actual_count && !mid->hg_progress_shutdown_flag); - HASH_FIND(hh, mid->mplex_table, &key, sizeof(key), element); - if(element) - return(0); + if(trigger_happened) + ABT_thread_yield(); - element = malloc(sizeof(*element)); - if(!element) - return(-1); - element->key = key; - element->pool = pool; + ABT_pool_get_size(mid->progress_pool, &size); + /* Are there any other threads executing in this pool that are *not* + * blocked ? If so then, we can't sleep here or else those threads + * will not get a chance to execute. + * TODO: check is ABT_pool_get_size returns the number of ULT/tasks + * that can be executed including this one, or not including this one. + */ + if(size > 0) + { + /* TODO: this is being executed more than is necessary (i.e. + * in cases where there are other legitimate ULTs eligible + * for execution that are not blocking on any events, Margo + * or otherwise). Maybe we need an abt scheduling tweak here + * to make sure that this ULT is the lowest priority in that + * scenario. + */ + if(diag_enabled) tm1 = ABT_get_wtime(); + ret = HG_Progress(mid->hg_context, 0); + if(diag_enabled) + { + tm2 = ABT_get_wtime(); + __DIAG_UPDATE(mid->diag_progress_elapsed_zero_timeout, (tm2-tm1)); + __DIAG_UPDATE(mid->diag_progress_timeout_value, 0); + } + if(ret == HG_SUCCESS) + { + /* Mercury completed something; loop around to trigger + * callbacks + */ + } + else if(ret == HG_TIMEOUT) + { + /* No completion; yield here to allow other ULTs to run */ + ABT_thread_yield(); + } + else + { + /* TODO: error handling */ + fprintf(stderr, "WARNING: unexpected return code (%d) from HG_Progress()\n", ret); + } + } + else + { + hg_progress_timeout = mid->hg_progress_timeout_ub; + ret = margo_timer_get_next_expiration(mid, &next_timer_exp); + if(ret == 0) + { + /* there is a queued timer, don't block long enough + * to keep this timer waiting + */ + if(next_timer_exp >= 0.0) + { + next_timer_exp *= 1000; /* convert to milliseconds */ + if(next_timer_exp < mid->hg_progress_timeout_ub) + hg_progress_timeout = (unsigned int)next_timer_exp; + } + else + { + hg_progress_timeout = 0; + } + } + if(diag_enabled) tm1 = ABT_get_wtime(); + ret = HG_Progress(mid->hg_context, hg_progress_timeout); + if(diag_enabled) + { + tm2 = ABT_get_wtime(); + if(hg_progress_timeout == 0) + __DIAG_UPDATE(mid->diag_progress_elapsed_zero_timeout, (tm2-tm1)); + else + __DIAG_UPDATE(mid->diag_progress_elapsed_nonzero_timeout, (tm2-tm1)); + + __DIAG_UPDATE(mid->diag_progress_timeout_value, hg_progress_timeout); + } + if(ret != HG_SUCCESS && ret != HG_TIMEOUT) + { + /* TODO: error handling */ + fprintf(stderr, "WARNING: unexpected return code (%d) from HG_Progress()\n", ret); + } + } - HASH_ADD(hh, mid->mplex_table, key, sizeof(key), element); + /* check for any expired timers */ + margo_check_timers(mid); + } - return(0); + return; } @@ -956,11 +1084,11 @@ void margo_diag_dump(margo_instance_id mid, const char* file, int uniquify) return; } -void margo_set_info(margo_instance_id mid, int option, const void *param) +void margo_set_param(margo_instance_id mid, int option, const void *param) { switch(option) { - case MARGO_INFO_PROGRESS_TIMEOUT_UB: + case MARGO_PARAM_PROGRESS_TIMEOUT_UB: mid->hg_progress_timeout_ub = (*((const unsigned int*)param)); break; } @@ -968,17 +1096,15 @@ void margo_set_info(margo_instance_id mid, int option, const void *param) return; } -void margo_get_info(margo_instance_id mid, int option, void *param) +void margo_get_param(margo_instance_id mid, int option, void *param) { switch(option) { - case MARGO_INFO_PROGRESS_TIMEOUT_UB: + case MARGO_PARAM_PROGRESS_TIMEOUT_UB: (*((unsigned int*)param)) = mid->hg_progress_timeout_ub; break; } return; } - - diff --git a/tests/margo-test-client-timeout.c b/tests/margo-test-client-timeout.c index 23e0f141fa88daa0896c696b1e9a0c07d320304e..cc7cd6a3392fc2c730ea66182e1c83e90b45e660 100644 --- a/tests/margo-test-client-timeout.c +++ b/tests/margo-test-client-timeout.c @@ -7,8 +7,8 @@ #include #include #include +#include #include -#include #include #include "my-rpc.h" @@ -24,8 +24,6 @@ struct run_my_rpc_args { int val; margo_instance_id mid; - hg_context_t *hg_context; - hg_class_t *hg_class; hg_addr_t svr_addr; }; @@ -40,11 +38,10 @@ int main(int argc, char **argv) ABT_thread threads[4]; int i; int ret; + hg_return_t hret; ABT_xstream xstream; ABT_pool pool; margo_instance_id mid; - hg_context_t *hg_context; - hg_class_t *hg_class; hg_addr_t svr_addr = HG_ADDR_NULL; hg_handle_t handle; char proto[12] = {0}; @@ -55,42 +52,24 @@ int main(int argc, char **argv) return(-1); } - /* boilerplate HG initialization steps */ - /***************************************/ - /* initialize Mercury using the transport portion of the destination * address (i.e., the part before the first : character if present) */ for(i=0; i<11 && argv[1][i] != '\0' && argv[1][i] != ':'; i++) proto[i] = argv[1][i]; - hg_class = HG_Init(proto, HG_FALSE); - if(!hg_class) - { - fprintf(stderr, "Error: HG_Init()\n"); - return(-1); - } - hg_context = HG_Context_create(hg_class); - if(!hg_context) - { - fprintf(stderr, "Error: HG_Context_create()\n"); - HG_Finalize(hg_class); - return(-1); - } - /* set up argobots */ + /* actually start margo -- margo_init() encapsulates the Mercury & + * Argobots initialization, so this step must precede their use. */ + /* Use main process to drive progress (it will relinquish control to + * Mercury during blocking communication calls). The rpc handler pool + * is null in this example program because this is a pure client that + * will not be servicing rpc requests. + */ /***************************************/ - ret = ABT_init(argc, argv); - if(ret != 0) + mid = margo_init(proto, MARGO_CLIENT_MODE, 0, 0); + if(mid == MARGO_INSTANCE_NULL) { - fprintf(stderr, "Error: ABT_init()\n"); - return(-1); - } - - /* set primary ES to idle without polling */ - ret = ABT_snoozer_xstream_self_set(); - if(ret != 0) - { - fprintf(stderr, "Error: ABT_snoozer_xstream_self_set()\n"); + fprintf(stderr, "Error: margo_init()\n"); return(-1); } @@ -108,31 +87,20 @@ int main(int argc, char **argv) return(-1); } - /* actually start margo */ - /* Use main process to drive progress (it will relinquish control to - * Mercury during blocking communication calls). The rpc handler pool - * is null in this example program because this is a pure client that - * will not be servicing rpc requests. - */ - /***************************************/ - mid = margo_init_pool(pool, ABT_POOL_NULL, hg_context); - - /* register RPC */ - my_rpc_hang_id = MERCURY_REGISTER(hg_class, "my_rpc_hang", my_rpc_hang_in_t, my_rpc_hang_out_t, + /* register RPCs */ + my_rpc_hang_id = MARGO_REGISTER(mid, "my_rpc_hang", my_rpc_hang_in_t, my_rpc_hang_out_t, NULL); - my_rpc_shutdown_id = MERCURY_REGISTER(hg_class, "my_shutdown_rpc", void, void, + my_rpc_shutdown_id = MARGO_REGISTER(mid, "my_shutdown_rpc", void, void, NULL); /* find addr for server */ - ret = margo_addr_lookup(mid, argv[1], &svr_addr); - assert(ret == 0); + hret = margo_addr_lookup(mid, argv[1], &svr_addr); + assert(hret == HG_SUCCESS); for(i=0; i<4; i++) { args[i].val = i; args[i].mid = mid; - args[i].hg_class = hg_class; - args[i].hg_context = hg_context; args[i].svr_addr = svr_addr; /* Each ult gets a pointer to an element of the array to use @@ -167,22 +135,21 @@ int main(int argc, char **argv) } } + /* send one rpc to server to shut it down */ + /* create handle */ - ret = HG_Create(hg_context, svr_addr, my_rpc_shutdown_id, &handle); - assert(ret == 0); + hret = margo_create(mid, svr_addr, my_rpc_shutdown_id, &handle); + assert(hret == HG_SUCCESS); - margo_forward_timed(mid, handle, NULL, 2000.0); - - HG_Addr_free(hg_class, svr_addr); + hret = margo_forward_timed(mid, handle, NULL, 2000.0); + assert(hret == HG_SUCCESS); + + margo_destroy(handle); + margo_addr_free(mid, svr_addr); /* shut down everything */ margo_finalize(mid); - ABT_finalize(); - - HG_Context_destroy(hg_context); - HG_Finalize(hg_class); - return(0); } @@ -192,10 +159,9 @@ static void run_my_rpc(void *_arg) hg_handle_t handle; my_rpc_hang_in_t in; my_rpc_hang_out_t out; - int ret; + hg_return_t hret; hg_size_t size; void* buffer; - const struct hg_info *hgi; printf("ULT [%d] running.\n", arg->val); @@ -206,42 +172,39 @@ static void run_my_rpc(void *_arg) sprintf((char*)buffer, "Hello world!\n"); /* create handle */ - ret = HG_Create(arg->hg_context, arg->svr_addr, my_rpc_hang_id, &handle); - assert(ret == 0); + hret = margo_create(arg->mid, arg->svr_addr, my_rpc_hang_id, &handle); + assert(hret == HG_SUCCESS); /* register buffer for rdma/bulk access by server */ - hgi = HG_Get_info(handle); - assert(hgi); - ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, &size, + hret = margo_bulk_create(arg->mid, 1, &buffer, &size, HG_BULK_READ_ONLY, &in.bulk_handle); - assert(ret == 0); + assert(hret == HG_SUCCESS); /* Send rpc. Note that we are also transmitting the bulk handle in the * input struct. It was set above. */ in.input_val = arg->val; /* call with 2 second timeout */ - ret = margo_forward_timed(arg->mid, handle, &in, 2000.0); + hret = margo_forward_timed(arg->mid, handle, &in, 2000.0); - if(ret == 0) + if(hret == HG_SUCCESS) { /* decode response */ - ret = HG_Get_output(handle, &out); - assert(ret == 0); + hret = margo_get_output(handle, &out); + assert(hret == HG_SUCCESS); printf("Got response ret: %d\n", out.ret); - HG_Free_output(handle, &out); + margo_free_output(handle, &out); } else { - printf("margo_forward returned %d\n", ret); + printf("margo_forward returned %d\n", hret); } /* clean up resources consumed by this rpc */ - HG_Bulk_free(in.bulk_handle); - HG_Destroy(handle); + margo_bulk_free(in.bulk_handle); + margo_destroy(handle); free(buffer); printf("ULT [%d] done.\n", arg->val); return; } - diff --git a/tests/margo-test-client.c b/tests/margo-test-client.c index 9435727d92c4b6125ff6b3cc20d948558f5c01d8..cae4a95c5ffc1ab7d1c3b16dcd4a0ecd4e6e325b 100644 --- a/tests/margo-test-client.c +++ b/tests/margo-test-client.c @@ -7,8 +7,8 @@ #include #include #include +#include #include -#include #include #include "my-rpc.h" @@ -24,8 +24,6 @@ struct run_my_rpc_args { int val; margo_instance_id mid; - hg_context_t *hg_context; - hg_class_t *hg_class; hg_addr_t svr_addr; }; @@ -40,11 +38,10 @@ int main(int argc, char **argv) ABT_thread threads[4]; int i; int ret; + hg_return_t hret; ABT_xstream xstream; ABT_pool pool; margo_instance_id mid; - hg_context_t *hg_context; - hg_class_t *hg_class; hg_addr_t svr_addr = HG_ADDR_NULL; hg_handle_t handle; char proto[12] = {0}; @@ -55,42 +52,24 @@ int main(int argc, char **argv) return(-1); } - /* boilerplate HG initialization steps */ - /***************************************/ - /* initialize Mercury using the transport portion of the destination * address (i.e., the part before the first : character if present) */ for(i=0; i<11 && argv[1][i] != '\0' && argv[1][i] != ':'; i++) proto[i] = argv[1][i]; - hg_class = HG_Init(proto, HG_FALSE); - if(!hg_class) - { - fprintf(stderr, "Error: HG_Init()\n"); - return(-1); - } - hg_context = HG_Context_create(hg_class); - if(!hg_context) - { - fprintf(stderr, "Error: HG_Context_create()\n"); - HG_Finalize(hg_class); - return(-1); - } - /* set up argobots */ + /* actually start margo -- margo_init() encapsulates the Mercury & + * Argobots initialization, so this step must precede their use. */ + /* Use main process to drive progress (it will relinquish control to + * Mercury during blocking communication calls). No RPC threads are + * used because this is a pure client that will not be servicing + * rpc requests. + */ /***************************************/ - ret = ABT_init(argc, argv); - if(ret != 0) + mid = margo_init(proto, MARGO_CLIENT_MODE, 0, 0); + if(mid == MARGO_INSTANCE_NULL) { - fprintf(stderr, "Error: ABT_init()\n"); - return(-1); - } - - /* set primary ES to idle without polling */ - ret = ABT_snoozer_xstream_self_set(); - if(ret != 0) - { - fprintf(stderr, "Error: ABT_snoozer_xstream_self_set()\n"); + fprintf(stderr, "Error: margo_init()\n"); return(-1); } @@ -100,38 +79,26 @@ int main(int argc, char **argv) { fprintf(stderr, "Error: ABT_xstream_self()\n"); return(-1); - } ret = ABT_xstream_get_main_pools(xstream, 1, &pool); + } + ret = ABT_xstream_get_main_pools(xstream, 1, &pool); if(ret != 0) { fprintf(stderr, "Error: ABT_xstream_get_main_pools()\n"); return(-1); } - /* actually start margo */ - /* Use main process to drive progress (it will relinquish control to - * Mercury during blocking communication calls). The rpc handler pool - * is null in this example program because this is a pure client that - * will not be servicing rpc requests. - */ - /***************************************/ - mid = margo_init_pool(pool, ABT_POOL_NULL, hg_context); - - /* register RPC */ - my_rpc_id = MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t, - NULL); - my_rpc_shutdown_id = MERCURY_REGISTER(hg_class, "my_shutdown_rpc", void, void, - NULL); + /* register RPCs */ + my_rpc_id = MARGO_REGISTER(mid, "my_rpc", my_rpc_in_t, my_rpc_out_t, NULL); + my_rpc_shutdown_id = MARGO_REGISTER(mid, "my_shutdown_rpc", void, void, NULL); /* find addr for server */ - ret = margo_addr_lookup(mid, argv[1], &svr_addr); - assert(ret == 0); + hret = margo_addr_lookup(mid, argv[1], &svr_addr); + assert(hret == HG_SUCCESS); for(i=0; i<4; i++) { args[i].val = i; args[i].mid = mid; - args[i].hg_class = hg_class; - args[i].hg_context = hg_context; args[i].svr_addr = svr_addr; /* Each ult gets a pointer to an element of the array to use @@ -169,21 +136,18 @@ int main(int argc, char **argv) /* send one rpc to server to shut it down */ /* create handle */ - ret = HG_Create(hg_context, svr_addr, my_rpc_shutdown_id, &handle); - assert(ret == 0); + hret = margo_create(mid, svr_addr, my_rpc_shutdown_id, &handle); + assert(hret == HG_SUCCESS); - margo_forward(mid, handle, NULL); + hret = margo_forward(mid, handle, NULL); + assert(hret == HG_SUCCESS); - HG_Addr_free(hg_class, svr_addr); + margo_destroy(handle); + margo_addr_free(mid, svr_addr); /* shut down everything */ margo_finalize(mid); - ABT_finalize(); - - HG_Context_destroy(hg_context); - HG_Finalize(hg_class); - return(0); } @@ -193,10 +157,9 @@ static void run_my_rpc(void *_arg) hg_handle_t handle; my_rpc_in_t in; my_rpc_out_t out; - int ret; + hg_return_t hret; hg_size_t size; void* buffer; - const struct hg_info *hgi; printf("ULT [%d] running.\n", arg->val); @@ -207,35 +170,33 @@ static void run_my_rpc(void *_arg) sprintf((char*)buffer, "Hello world!\n"); /* create handle */ - ret = HG_Create(arg->hg_context, arg->svr_addr, my_rpc_id, &handle); - assert(ret == 0); + hret = margo_create(arg->mid, arg->svr_addr, my_rpc_id, &handle); + assert(hret == HG_SUCCESS); /* register buffer for rdma/bulk access by server */ - hgi = HG_Get_info(handle); - assert(hgi); - ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, &size, + hret = margo_bulk_create(arg->mid, 1, &buffer, &size, HG_BULK_READ_ONLY, &in.bulk_handle); - assert(ret == 0); + assert(hret == HG_SUCCESS); /* Send rpc. Note that we are also transmitting the bulk handle in the * input struct. It was set above. */ in.input_val = arg->val; - margo_forward(arg->mid, handle, &in); + hret = margo_forward(arg->mid, handle, &in); + assert(hret == HG_SUCCESS); /* decode response */ - ret = HG_Get_output(handle, &out); - assert(ret == 0); + hret = margo_get_output(handle, &out); + assert(hret == HG_SUCCESS); printf("Got response ret: %d\n", out.ret); /* clean up resources consumed by this rpc */ - HG_Bulk_free(in.bulk_handle); - HG_Free_output(handle, &out); - HG_Destroy(handle); + margo_bulk_free(in.bulk_handle); + margo_free_output(handle, &out); + margo_destroy(handle); free(buffer); printf("ULT [%d] done.\n", arg->val); return; } - diff --git a/tests/margo-test-server.c b/tests/margo-test-server.c index 9a75a6619e1474fa2f2e98578cff6009c15802f5..af2a9e0fd40846e8f2848c2ea8372bb8f247f411 100644 --- a/tests/margo-test-server.c +++ b/tests/margo-test-server.c @@ -7,13 +7,13 @@ #include #include #include +#include #include -#include #include #include "my-rpc.h" -/* example server program. Starts HG engine, registers the example RPC type, +/* example server program. Starts margo, registers the example RPC type, * and then executes indefinitely. */ @@ -28,31 +28,26 @@ static void parse_args(int argc, char **argv, struct options *opts); int main(int argc, char **argv) { - int ret; + hg_return_t hret; margo_instance_id mid; - ABT_xstream handler_xstream; - ABT_pool handler_pool; - ABT_xstream progress_xstream; - ABT_pool progress_pool; - hg_context_t *hg_context; - hg_class_t *hg_class; struct options opts; parse_args(argc, argv, &opts); - /* boilerplate HG initialization steps */ + /* actually start margo -- this step encapsulates the Mercury and + * Argobots initialization and must precede their use */ + /* If single pool mode, use the calling xstream to drive progress and + * execute handlers. If not, use a dedicated progress xstream and + * run handlers directly on the calling xstream + */ /***************************************/ - hg_class = HG_Init(opts.listen_addr, HG_TRUE); - if(!hg_class) - { - fprintf(stderr, "Error: HG_Init()\n"); - return(-1); - } - hg_context = HG_Context_create(hg_class); - if(!hg_context) + if(opts.single_pool_mode) + mid = margo_init(opts.listen_addr, MARGO_SERVER_MODE, 0, -1); + else + mid = margo_init(opts.listen_addr, MARGO_SERVER_MODE, 1, 0); + if(mid == MARGO_INSTANCE_NULL) { - fprintf(stderr, "Error: HG_Context_create()\n"); - HG_Finalize(hg_class); + fprintf(stderr, "Error: margo_init()\n"); return(-1); } @@ -64,32 +59,28 @@ int main(int argc, char **argv) hg_size_t addr_self_string_sz = 128; /* figure out what address this server is listening on */ - ret = HG_Addr_self(hg_class, &addr_self); - if(ret != HG_SUCCESS) + hret = margo_addr_self(mid, &addr_self); + if(hret != HG_SUCCESS) { - fprintf(stderr, "Error: HG_Addr_self()\n"); - HG_Context_destroy(hg_context); - HG_Finalize(hg_class); + fprintf(stderr, "Error: margo_addr_self()\n"); + margo_finalize(mid); return(-1); } - ret = HG_Addr_to_string(hg_class, addr_self_string, &addr_self_string_sz, addr_self); - if(ret != HG_SUCCESS) + hret = margo_addr_to_string(mid, addr_self_string, &addr_self_string_sz, addr_self); + if(hret != HG_SUCCESS) { - fprintf(stderr, "Error: HG_Addr_self()\n"); - HG_Context_destroy(hg_context); - HG_Finalize(hg_class); - HG_Addr_free(hg_class, addr_self); + fprintf(stderr, "Error: margo_addr_to_string()\n"); + margo_addr_free(mid, addr_self); + margo_finalize(mid); return(-1); } - HG_Addr_free(hg_class, addr_self); + margo_addr_free(mid, addr_self); fp = fopen(opts.hostfile, "w"); if(!fp) { perror("fopen"); - HG_Context_destroy(hg_context); - HG_Finalize(hg_class); - HG_Addr_free(hg_class, addr_self); + margo_finalize(mid); return(-1); } @@ -97,67 +88,13 @@ int main(int argc, char **argv) fclose(fp); } - /* set up argobots */ - /***************************************/ - ret = ABT_init(argc, argv); - if(ret != 0) - { - fprintf(stderr, "Error: ABT_init()\n"); - return(-1); - } - - /* set primary ES to idle without polling */ - ret = ABT_snoozer_xstream_self_set(); - if(ret != 0) - { - fprintf(stderr, "Error: ABT_snoozer_xstream_self_set()\n"); - return(-1); - } - - /* Find primary pool to use for running rpc handlers */ - ret = ABT_xstream_self(&handler_xstream); - if(ret != 0) - { - fprintf(stderr, "Error: ABT_xstream_self()\n"); - return(-1); - } - ret = ABT_xstream_get_main_pools(handler_xstream, 1, &handler_pool); - if(ret != 0) - { - fprintf(stderr, "Error: ABT_xstream_get_main_pools()\n"); - return(-1); - } - - if(!opts.single_pool_mode) - { - /* create a dedicated ES drive Mercury progress */ - ret = ABT_snoozer_xstream_create(1, &progress_pool, &progress_xstream); - if(ret != 0) - { - fprintf(stderr, "Error: ABT_snoozer_xstream_create()\n"); - return(-1); - } - } - - /* actually start margo */ - /* provide argobots pools for driving communication progress and - * executing rpc handlers as well as class and context for Mercury - * communication. - */ - /***************************************/ - if(opts.single_pool_mode) - mid = margo_init_pool(handler_pool, handler_pool, hg_context); - else - mid = margo_init_pool(progress_pool, handler_pool, hg_context); - assert(mid); - /* register RPC */ - MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t, - my_rpc_ult_handler); - MERCURY_REGISTER(hg_class, "my_rpc_hang", my_rpc_hang_in_t, my_rpc_hang_out_t, - my_rpc_hang_ult_handler); - MERCURY_REGISTER(hg_class, "my_shutdown_rpc", void, void, - my_rpc_shutdown_ult_handler); + MARGO_REGISTER(mid, "my_rpc", my_rpc_in_t, my_rpc_out_t, + my_rpc_ult); + MARGO_REGISTER(mid, "my_rpc_hang", my_rpc_hang_in_t, my_rpc_hang_out_t, + my_rpc_hang_ult); + MARGO_REGISTER(mid, "my_shutdown_rpc", void, void, + my_rpc_shutdown_ult); /* NOTE: at this point this server ULT has two options. It can wait on * whatever mechanism it wants to (however long the daemon should run and @@ -175,17 +112,6 @@ int main(int argc, char **argv) */ margo_wait_for_finalize(mid); - if(!opts.single_pool_mode) - { - ABT_xstream_join(progress_xstream); - ABT_xstream_free(&progress_xstream); - } - - ABT_finalize(); - - HG_Context_destroy(hg_context); - HG_Finalize(hg_class); - return(0); } diff --git a/tests/margo-test-sleep.c b/tests/margo-test-sleep.c index 0368afc58053ac3ea26b232697a6b829b5479e60..110226eecaede4e06757cf4e6ad344ad7dbaaab5 100644 --- a/tests/margo-test-sleep.c +++ b/tests/margo-test-sleep.c @@ -10,7 +10,6 @@ #include #include #include -#include #include static int use_abt_sleep = 0; @@ -27,8 +26,6 @@ int main(int argc, char **argv) int ret; ABT_xstream xstream; ABT_pool pool; - hg_context_t *hg_context; - hg_class_t *hg_class; int arg_ndx = 1; if(argc > 1) @@ -52,43 +49,22 @@ int main(int argc, char **argv) } } - /* boilerplate HG initialization steps */ + /* actually start margo -- this step encapsulates the Mercury and + * Argobots initialization and must precede their use */ + /* use a single pool for progress and sleeper threads */ + /* NOTE: we don't use RPC handlers, so no need for an RPC pool */ /***************************************/ - hg_class = HG_Init("tcp", HG_FALSE); - if(!hg_class) + mid = margo_init("tcp", MARGO_CLIENT_MODE, 0, 0); + if(mid == MARGO_INSTANCE_NULL) { /* if tcp didn't work, try sm */ - hg_class = HG_Init("sm", HG_FALSE); - if(!hg_class) + mid = margo_init("sm", MARGO_CLIENT_MODE, 0, 0); + if(mid == MARGO_INSTANCE_NULL) { - fprintf(stderr, "Error: HG_Init()\n"); + fprintf(stderr, "Error: margo_init()\n"); return(-1); } } - hg_context = HG_Context_create(hg_class); - if(!hg_context) - { - fprintf(stderr, "Error: HG_Context_create()\n"); - HG_Finalize(hg_class); - return(-1); - } - - /* set up argobots */ - /***************************************/ - ret = ABT_init(argc, argv); - if(ret != 0) - { - fprintf(stderr, "Error: ABT_init()\n"); - return(-1); - } - - /* set primary ES to idle without polling */ - ret = ABT_snoozer_xstream_self_set(); - if(ret != 0) - { - fprintf(stderr, "Error: ABT_snoozer_xstream_self_set()\n"); - return(-1); - } /* retrieve current pool to use for ULT creation */ ret = ABT_xstream_self(&xstream); @@ -104,11 +80,6 @@ int main(int argc, char **argv) return(-1); } - /* actually start margo */ - /* use a single pool for progress and sleeper threads */ - /* NOTE: we don't use RPC handlers, so no need for an RPC pool */ - /***************************************/ - mid = margo_init_pool(pool, ABT_POOL_NULL, hg_context); for(i=0; i<4; i++) { t_ids[i] = i; @@ -146,11 +117,6 @@ int main(int argc, char **argv) /* shut down everything */ margo_finalize(mid); - ABT_finalize(); - - HG_Context_destroy(hg_context); - HG_Finalize(hg_class); - return(0); } diff --git a/tests/my-rpc.c b/tests/my-rpc.c index a11f28f78575bf07fda05095d56efe1155b45836..63b0a1c4d14524e56061f52c02a933a3dec0c14f 100644 --- a/tests/my-rpc.c +++ b/tests/my-rpc.c @@ -23,19 +23,19 @@ static void my_rpc_ult(hg_handle_t handle) hg_return_t hret; my_rpc_out_t out; my_rpc_in_t in; - int ret; hg_size_t size; void *buffer; hg_bulk_t bulk_handle; const struct hg_info *hgi; #if 0 + int ret; int fd; char filename[256]; #endif margo_instance_id mid; - ret = HG_Get_input(handle, &in); - assert(ret == HG_SUCCESS); + hret = margo_get_input(handle, &in); + assert(hret == HG_SUCCESS); printf("Got RPC request with input_val: %d\n", in.input_val); out.ret = 0; @@ -45,22 +45,22 @@ static void my_rpc_ult(hg_handle_t handle) buffer = calloc(1, 512); assert(buffer); - /* register local target buffer for bulk access */ - hgi = HG_Get_info(handle); + /* get handle info and margo instance */ + hgi = margo_get_info(handle); assert(hgi); - ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, - &size, HG_BULK_WRITE_ONLY, &bulk_handle); - assert(ret == 0); + mid = margo_hg_info_get_instance(hgi); + assert(mid != MARGO_INSTANCE_NULL); - mid = margo_hg_class_to_instance(hgi->hg_class); + /* register local target buffer for bulk access */ + hret = margo_bulk_create(mid, 1, &buffer, + &size, HG_BULK_WRITE_ONLY, &bulk_handle); + assert(hret == HG_SUCCESS); /* do bulk transfer from client to server */ - ret = margo_bulk_transfer(mid, HG_BULK_PULL, + hret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, in.bulk_handle, 0, bulk_handle, 0, size); - assert(ret == 0); - - HG_Free_input(handle, &in); + assert(hret == HG_SUCCESS); /* write to a file; would be done with abt-io if we enabled it */ #if 0 @@ -74,11 +74,13 @@ static void my_rpc_ult(hg_handle_t handle) abt_io_close(aid, fd); #endif + margo_free_input(handle, &in); + hret = margo_respond(mid, handle, &out); assert(hret == HG_SUCCESS); - HG_Bulk_free(bulk_handle); - HG_Destroy(handle); + margo_bulk_free(bulk_handle); + margo_destroy(handle); free(buffer); return; @@ -93,14 +95,16 @@ static void my_rpc_shutdown_ult(hg_handle_t handle) printf("Got RPC request to shutdown\n"); - hgi = HG_Get_info(handle); + /* get handle info and margo instance */ + hgi = margo_get_info(handle); assert(hgi); - mid = margo_hg_class_to_instance(hgi->hg_class); + mid = margo_hg_info_get_instance(hgi); + assert(mid != MARGO_INSTANCE_NULL); hret = margo_respond(mid, handle, NULL); assert(hret == HG_SUCCESS); - HG_Destroy(handle); + margo_destroy(handle); /* NOTE: we assume that the server daemon is using * margo_wait_for_finalize() to suspend until this RPC executes, so there @@ -117,22 +121,23 @@ static void my_rpc_hang_ult(hg_handle_t handle) hg_return_t hret; my_rpc_out_t out; my_rpc_in_t in; - int ret; hg_size_t size; void *buffer; hg_bulk_t bulk_handle; const struct hg_info *hgi; margo_instance_id mid; - ret = HG_Get_input(handle, &in); - assert(ret == HG_SUCCESS); + hret = margo_get_input(handle, &in); + assert(hret == HG_SUCCESS); printf("Got RPC request with input_val: %d, deliberately hanging.\n", in.input_val); out.ret = 0; - hgi = HG_Get_info(handle); + /* get handle info and margo instance */ + hgi = margo_get_info(handle); assert(hgi); - mid = margo_hg_class_to_instance(hgi->hg_class); + mid = margo_hg_info_get_instance(hgi); + assert(mid != MARGO_INSTANCE_NULL); /* sleep for an hour (to allow client to test timeout capability) */ margo_thread_sleep(mid, 1000*60*60); @@ -143,27 +148,25 @@ static void my_rpc_hang_ult(hg_handle_t handle) assert(buffer); /* register local target buffer for bulk access */ - ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, + hret = margo_bulk_create(mid, 1, &buffer, &size, HG_BULK_WRITE_ONLY, &bulk_handle); - assert(ret == 0); + assert(hret == HG_SUCCESS); /* do bulk transfer from client to server */ - ret = margo_bulk_transfer(mid, HG_BULK_PULL, + hret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, in.bulk_handle, 0, bulk_handle, 0, size); - assert(ret == 0); + assert(hret == HG_SUCCESS); - HG_Free_input(handle, &in); + margo_free_input(handle, &in); hret = margo_respond(mid, handle, &out); assert(hret == HG_SUCCESS); - HG_Bulk_free(bulk_handle); - HG_Destroy(handle); + margo_bulk_free(bulk_handle); + margo_destroy(handle); free(buffer); return; } DEFINE_MARGO_RPC_HANDLER(my_rpc_hang_ult) - -