Commit 6a345e9e authored by Shane Snyder's avatar Shane Snyder

update margo tests according to new iface changes

parent 972cfbb4
...@@ -7,8 +7,8 @@ ...@@ -7,8 +7,8 @@
#include <stdio.h> #include <stdio.h>
#include <assert.h> #include <assert.h>
#include <unistd.h> #include <unistd.h>
#include <mercury.h>
#include <abt.h> #include <abt.h>
#include <abt-snoozer.h>
#include <margo.h> #include <margo.h>
#include "my-rpc.h" #include "my-rpc.h"
...@@ -24,8 +24,6 @@ struct run_my_rpc_args ...@@ -24,8 +24,6 @@ struct run_my_rpc_args
{ {
int val; int val;
margo_instance_id mid; margo_instance_id mid;
hg_context_t *hg_context;
hg_class_t *hg_class;
hg_addr_t svr_addr; hg_addr_t svr_addr;
}; };
...@@ -40,11 +38,10 @@ int main(int argc, char **argv) ...@@ -40,11 +38,10 @@ int main(int argc, char **argv)
ABT_thread threads[4]; ABT_thread threads[4];
int i; int i;
int ret; int ret;
hg_return_t hret;
ABT_xstream xstream; ABT_xstream xstream;
ABT_pool pool; ABT_pool pool;
margo_instance_id mid; margo_instance_id mid;
hg_context_t *hg_context;
hg_class_t *hg_class;
hg_addr_t svr_addr = HG_ADDR_NULL; hg_addr_t svr_addr = HG_ADDR_NULL;
hg_handle_t handle; hg_handle_t handle;
char proto[12] = {0}; char proto[12] = {0};
...@@ -55,42 +52,24 @@ int main(int argc, char **argv) ...@@ -55,42 +52,24 @@ int main(int argc, char **argv)
return(-1); return(-1);
} }
/* boilerplate HG initialization steps */
/***************************************/
/* initialize Mercury using the transport portion of the destination /* initialize Mercury using the transport portion of the destination
* address (i.e., the part before the first : character if present) * address (i.e., the part before the first : character if present)
*/ */
for(i=0; i<11 && argv[1][i] != '\0' && argv[1][i] != ':'; i++) for(i=0; i<11 && argv[1][i] != '\0' && argv[1][i] != ':'; i++)
proto[i] = argv[1][i]; proto[i] = argv[1][i];
hg_class = HG_Init(proto, HG_FALSE);
if(!hg_class)
{
fprintf(stderr, "Error: HG_Init()\n");
return(-1);
}
hg_context = HG_Context_create(hg_class);
if(!hg_context)
{
fprintf(stderr, "Error: HG_Context_create()\n");
HG_Finalize(hg_class);
return(-1);
}
/* set up argobots */ /* actually start margo -- margo_init() encapsulates the Mercury &
* Argobots initialization, so this step must precede their use. */
/* Use main process to drive progress (it will relinquish control to
* Mercury during blocking communication calls). The rpc handler pool
* is null in this example program because this is a pure client that
* will not be servicing rpc requests.
*/
/***************************************/ /***************************************/
ret = ABT_init(argc, argv); mid = margo_init(proto, MARGO_CLIENT_MODE, 0, 0);
if(ret != 0) if(mid == MARGO_INSTANCE_NULL)
{ {
fprintf(stderr, "Error: ABT_init()\n"); fprintf(stderr, "Error: margo_init()\n");
return(-1);
}
/* set primary ES to idle without polling */
ret = ABT_snoozer_xstream_self_set();
if(ret != 0)
{
fprintf(stderr, "Error: ABT_snoozer_xstream_self_set()\n");
return(-1); return(-1);
} }
...@@ -108,31 +87,20 @@ int main(int argc, char **argv) ...@@ -108,31 +87,20 @@ int main(int argc, char **argv)
return(-1); return(-1);
} }
/* actually start margo */ /* register RPCs */
/* Use main process to drive progress (it will relinquish control to my_rpc_hang_id = MARGO_REGISTER(mid, "my_rpc_hang", my_rpc_hang_in_t, my_rpc_hang_out_t,
* Mercury during blocking communication calls). The rpc handler pool
* is null in this example program because this is a pure client that
* will not be servicing rpc requests.
*/
/***************************************/
mid = margo_init_pool(pool, ABT_POOL_NULL, hg_context);
/* register RPC */
my_rpc_hang_id = MERCURY_REGISTER(hg_class, "my_rpc_hang", my_rpc_hang_in_t, my_rpc_hang_out_t,
NULL); NULL);
my_rpc_shutdown_id = MERCURY_REGISTER(hg_class, "my_shutdown_rpc", void, void, my_rpc_shutdown_id = MARGO_REGISTER(mid, "my_shutdown_rpc", void, void,
NULL); NULL);
/* find addr for server */ /* find addr for server */
ret = margo_addr_lookup(mid, argv[1], &svr_addr); hret = margo_addr_lookup(mid, argv[1], &svr_addr);
assert(ret == 0); assert(hret == HG_SUCCESS);
for(i=0; i<4; i++) for(i=0; i<4; i++)
{ {
args[i].val = i; args[i].val = i;
args[i].mid = mid; args[i].mid = mid;
args[i].hg_class = hg_class;
args[i].hg_context = hg_context;
args[i].svr_addr = svr_addr; args[i].svr_addr = svr_addr;
/* Each ult gets a pointer to an element of the array to use /* Each ult gets a pointer to an element of the array to use
...@@ -167,22 +135,21 @@ int main(int argc, char **argv) ...@@ -167,22 +135,21 @@ int main(int argc, char **argv)
} }
} }
/* send one rpc to server to shut it down */
/* create handle */ /* create handle */
ret = HG_Create(hg_context, svr_addr, my_rpc_shutdown_id, &handle); hret = margo_create(mid, svr_addr, my_rpc_shutdown_id, &handle);
assert(ret == 0); assert(hret == HG_SUCCESS);
margo_forward_timed(mid, handle, NULL, 2000.0); hret = margo_forward_timed(mid, handle, NULL, 2000.0);
assert(hret == HG_SUCCESS);
HG_Addr_free(hg_class, svr_addr);
margo_destroy(handle);
margo_addr_free(mid, svr_addr);
/* shut down everything */ /* shut down everything */
margo_finalize(mid); margo_finalize(mid);
ABT_finalize();
HG_Context_destroy(hg_context);
HG_Finalize(hg_class);
return(0); return(0);
} }
...@@ -192,10 +159,9 @@ static void run_my_rpc(void *_arg) ...@@ -192,10 +159,9 @@ static void run_my_rpc(void *_arg)
hg_handle_t handle; hg_handle_t handle;
my_rpc_hang_in_t in; my_rpc_hang_in_t in;
my_rpc_hang_out_t out; my_rpc_hang_out_t out;
int ret; hg_return_t hret;
hg_size_t size; hg_size_t size;
void* buffer; void* buffer;
const struct hg_info *hgi;
printf("ULT [%d] running.\n", arg->val); printf("ULT [%d] running.\n", arg->val);
...@@ -206,42 +172,39 @@ static void run_my_rpc(void *_arg) ...@@ -206,42 +172,39 @@ static void run_my_rpc(void *_arg)
sprintf((char*)buffer, "Hello world!\n"); sprintf((char*)buffer, "Hello world!\n");
/* create handle */ /* create handle */
ret = HG_Create(arg->hg_context, arg->svr_addr, my_rpc_hang_id, &handle); hret = margo_create(arg->mid, arg->svr_addr, my_rpc_hang_id, &handle);
assert(ret == 0); assert(hret == HG_SUCCESS);
/* register buffer for rdma/bulk access by server */ /* register buffer for rdma/bulk access by server */
hgi = HG_Get_info(handle); hret = margo_bulk_create(arg->mid, 1, &buffer, &size,
assert(hgi);
ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, &size,
HG_BULK_READ_ONLY, &in.bulk_handle); HG_BULK_READ_ONLY, &in.bulk_handle);
assert(ret == 0); assert(hret == HG_SUCCESS);
/* Send rpc. Note that we are also transmitting the bulk handle in the /* Send rpc. Note that we are also transmitting the bulk handle in the
* input struct. It was set above. * input struct. It was set above.
*/ */
in.input_val = arg->val; in.input_val = arg->val;
/* call with 2 second timeout */ /* call with 2 second timeout */
ret = margo_forward_timed(arg->mid, handle, &in, 2000.0); hret = margo_forward_timed(arg->mid, handle, &in, 2000.0);
if(ret == 0) if(hret == HG_SUCCESS)
{ {
/* decode response */ /* decode response */
ret = HG_Get_output(handle, &out); hret = margo_get_output(handle, &out);
assert(ret == 0); assert(hret == HG_SUCCESS);
printf("Got response ret: %d\n", out.ret); printf("Got response ret: %d\n", out.ret);
HG_Free_output(handle, &out); margo_free_output(handle, &out);
} }
else else
{ {
printf("margo_forward returned %d\n", ret); printf("margo_forward returned %d\n", hret);
} }
/* clean up resources consumed by this rpc */ /* clean up resources consumed by this rpc */
HG_Bulk_free(in.bulk_handle); margo_bulk_free(in.bulk_handle);
HG_Destroy(handle); margo_destroy(handle);
free(buffer); free(buffer);
printf("ULT [%d] done.\n", arg->val); printf("ULT [%d] done.\n", arg->val);
return; return;
} }
...@@ -7,8 +7,8 @@ ...@@ -7,8 +7,8 @@
#include <stdio.h> #include <stdio.h>
#include <assert.h> #include <assert.h>
#include <unistd.h> #include <unistd.h>
#include <mercury.h>
#include <abt.h> #include <abt.h>
#include <abt-snoozer.h>
#include <margo.h> #include <margo.h>
#include "my-rpc.h" #include "my-rpc.h"
...@@ -24,8 +24,6 @@ struct run_my_rpc_args ...@@ -24,8 +24,6 @@ struct run_my_rpc_args
{ {
int val; int val;
margo_instance_id mid; margo_instance_id mid;
hg_context_t *hg_context;
hg_class_t *hg_class;
hg_addr_t svr_addr; hg_addr_t svr_addr;
}; };
...@@ -40,11 +38,10 @@ int main(int argc, char **argv) ...@@ -40,11 +38,10 @@ int main(int argc, char **argv)
ABT_thread threads[4]; ABT_thread threads[4];
int i; int i;
int ret; int ret;
hg_return_t hret;
ABT_xstream xstream; ABT_xstream xstream;
ABT_pool pool; ABT_pool pool;
margo_instance_id mid; margo_instance_id mid;
hg_context_t *hg_context;
hg_class_t *hg_class;
hg_addr_t svr_addr = HG_ADDR_NULL; hg_addr_t svr_addr = HG_ADDR_NULL;
hg_handle_t handle; hg_handle_t handle;
char proto[12] = {0}; char proto[12] = {0};
...@@ -55,42 +52,24 @@ int main(int argc, char **argv) ...@@ -55,42 +52,24 @@ int main(int argc, char **argv)
return(-1); return(-1);
} }
/* boilerplate HG initialization steps */
/***************************************/
/* initialize Mercury using the transport portion of the destination /* initialize Mercury using the transport portion of the destination
* address (i.e., the part before the first : character if present) * address (i.e., the part before the first : character if present)
*/ */
for(i=0; i<11 && argv[1][i] != '\0' && argv[1][i] != ':'; i++) for(i=0; i<11 && argv[1][i] != '\0' && argv[1][i] != ':'; i++)
proto[i] = argv[1][i]; proto[i] = argv[1][i];
hg_class = HG_Init(proto, HG_FALSE);
if(!hg_class)
{
fprintf(stderr, "Error: HG_Init()\n");
return(-1);
}
hg_context = HG_Context_create(hg_class);
if(!hg_context)
{
fprintf(stderr, "Error: HG_Context_create()\n");
HG_Finalize(hg_class);
return(-1);
}
/* set up argobots */ /* actually start margo -- margo_init() encapsulates the Mercury &
* Argobots initialization, so this step must precede their use. */
/* Use main process to drive progress (it will relinquish control to
* Mercury during blocking communication calls). The rpc handler pool
* is null in this example program because this is a pure client that
* will not be servicing rpc requests.
*/
/***************************************/ /***************************************/
ret = ABT_init(argc, argv); mid = margo_init(proto, MARGO_CLIENT_MODE, 0, 0);
if(ret != 0) if(mid == MARGO_INSTANCE_NULL)
{ {
fprintf(stderr, "Error: ABT_init()\n"); fprintf(stderr, "Error: margo_init()\n");
return(-1);
}
/* set primary ES to idle without polling */
ret = ABT_snoozer_xstream_self_set();
if(ret != 0)
{
fprintf(stderr, "Error: ABT_snoozer_xstream_self_set()\n");
return(-1); return(-1);
} }
...@@ -100,38 +79,26 @@ int main(int argc, char **argv) ...@@ -100,38 +79,26 @@ int main(int argc, char **argv)
{ {
fprintf(stderr, "Error: ABT_xstream_self()\n"); fprintf(stderr, "Error: ABT_xstream_self()\n");
return(-1); return(-1);
} ret = ABT_xstream_get_main_pools(xstream, 1, &pool); }
ret = ABT_xstream_get_main_pools(xstream, 1, &pool);
if(ret != 0) if(ret != 0)
{ {
fprintf(stderr, "Error: ABT_xstream_get_main_pools()\n"); fprintf(stderr, "Error: ABT_xstream_get_main_pools()\n");
return(-1); return(-1);
} }
/* actually start margo */ /* register RPCs */
/* Use main process to drive progress (it will relinquish control to my_rpc_id = MARGO_REGISTER(mid, "my_rpc", my_rpc_in_t, my_rpc_out_t, NULL);
* Mercury during blocking communication calls). The rpc handler pool my_rpc_shutdown_id = MARGO_REGISTER(mid, "my_shutdown_rpc", void, void, NULL);
* is null in this example program because this is a pure client that
* will not be servicing rpc requests.
*/
/***************************************/
mid = margo_init_pool(pool, ABT_POOL_NULL, hg_context);
/* register RPC */
my_rpc_id = MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t,
NULL);
my_rpc_shutdown_id = MERCURY_REGISTER(hg_class, "my_shutdown_rpc", void, void,
NULL);
/* find addr for server */ /* find addr for server */
ret = margo_addr_lookup(mid, argv[1], &svr_addr); hret = margo_addr_lookup(mid, argv[1], &svr_addr);
assert(ret == 0); assert(hret == HG_SUCCESS);
for(i=0; i<4; i++) for(i=0; i<4; i++)
{ {
args[i].val = i; args[i].val = i;
args[i].mid = mid; args[i].mid = mid;
args[i].hg_class = hg_class;
args[i].hg_context = hg_context;
args[i].svr_addr = svr_addr; args[i].svr_addr = svr_addr;
/* Each ult gets a pointer to an element of the array to use /* Each ult gets a pointer to an element of the array to use
...@@ -169,21 +136,18 @@ int main(int argc, char **argv) ...@@ -169,21 +136,18 @@ int main(int argc, char **argv)
/* send one rpc to server to shut it down */ /* send one rpc to server to shut it down */
/* create handle */ /* create handle */
ret = HG_Create(hg_context, svr_addr, my_rpc_shutdown_id, &handle); hret = margo_create(mid, svr_addr, my_rpc_shutdown_id, &handle);
assert(ret == 0); assert(hret == HG_SUCCESS);
margo_forward(mid, handle, NULL); hret = margo_forward(mid, handle, NULL);
assert(hret == HG_SUCCESS);
HG_Addr_free(hg_class, svr_addr); margo_destroy(handle);
margo_addr_free(mid, svr_addr);
/* shut down everything */ /* shut down everything */
margo_finalize(mid); margo_finalize(mid);
ABT_finalize();
HG_Context_destroy(hg_context);
HG_Finalize(hg_class);
return(0); return(0);
} }
...@@ -193,10 +157,9 @@ static void run_my_rpc(void *_arg) ...@@ -193,10 +157,9 @@ static void run_my_rpc(void *_arg)
hg_handle_t handle; hg_handle_t handle;
my_rpc_in_t in; my_rpc_in_t in;
my_rpc_out_t out; my_rpc_out_t out;
int ret; hg_return_t hret;
hg_size_t size; hg_size_t size;
void* buffer; void* buffer;
const struct hg_info *hgi;
printf("ULT [%d] running.\n", arg->val); printf("ULT [%d] running.\n", arg->val);
...@@ -207,35 +170,33 @@ static void run_my_rpc(void *_arg) ...@@ -207,35 +170,33 @@ static void run_my_rpc(void *_arg)
sprintf((char*)buffer, "Hello world!\n"); sprintf((char*)buffer, "Hello world!\n");
/* create handle */ /* create handle */
ret = HG_Create(arg->hg_context, arg->svr_addr, my_rpc_id, &handle); hret = margo_create(arg->mid, arg->svr_addr, my_rpc_id, &handle);
assert(ret == 0); assert(hret == HG_SUCCESS);
/* register buffer for rdma/bulk access by server */ /* register buffer for rdma/bulk access by server */
hgi = HG_Get_info(handle); hret = margo_bulk_create(arg->mid, 1, &buffer, &size,
assert(hgi);
ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, &size,
HG_BULK_READ_ONLY, &in.bulk_handle); HG_BULK_READ_ONLY, &in.bulk_handle);
assert(ret == 0); assert(hret == HG_SUCCESS);
/* Send rpc. Note that we are also transmitting the bulk handle in the /* Send rpc. Note that we are also transmitting the bulk handle in the
* input struct. It was set above. * input struct. It was set above.
*/ */
in.input_val = arg->val; in.input_val = arg->val;
margo_forward(arg->mid, handle, &in); hret = margo_forward(arg->mid, handle, &in);
assert(hret == HG_SUCCESS);
/* decode response */ /* decode response */
ret = HG_Get_output(handle, &out); hret = margo_get_output(handle, &out);
assert(ret == 0); assert(hret == HG_SUCCESS);
printf("Got response ret: %d\n", out.ret); printf("Got response ret: %d\n", out.ret);
/* clean up resources consumed by this rpc */ /* clean up resources consumed by this rpc */
HG_Bulk_free(in.bulk_handle); margo_bulk_free(in.bulk_handle);
HG_Free_output(handle, &out); margo_free_output(handle, &out);
HG_Destroy(handle); margo_destroy(handle);
free(buffer); free(buffer);
printf("ULT [%d] done.\n", arg->val); printf("ULT [%d] done.\n", arg->val);
return; return;
} }
...@@ -7,13 +7,13 @@ ...@@ -7,13 +7,13 @@
#include <stdio.h> #include <stdio.h>
#include <assert.h> #include <assert.h>
#include <unistd.h> #include <unistd.h>
#include <mercury.h>
#include <abt.h> #include <abt.h>
#include <abt-snoozer.h>
#include <margo.h> #include <margo.h>
#include "my-rpc.h" #include "my-rpc.h"
/* example server program. Starts HG engine, registers the example RPC type, /* example server program. Starts margo, registers the example RPC type,
* and then executes indefinitely. * and then executes indefinitely.
*/ */
...@@ -28,31 +28,26 @@ static void parse_args(int argc, char **argv, struct options *opts); ...@@ -28,31 +28,26 @@ static void parse_args(int argc, char **argv, struct options *opts);
int main(int argc, char **argv) int main(int argc, char **argv)
{ {
int ret; hg_return_t hret;
margo_instance_id mid; margo_instance_id mid;
ABT_xstream handler_xstream;
ABT_pool handler_pool;
ABT_xstream progress_xstream;
ABT_pool progress_pool;
hg_context_t *hg_context;
hg_class_t *hg_class;
struct options opts; struct options opts;
parse_args(argc, argv, &opts); parse_args(argc, argv, &opts);
/* boilerplate HG initialization steps */ /* actually start margo -- this step encapsulates the Mercury and
* Argobots initialization and must precede their use */
/* If single pool mode, use the calling xstream to drive progress and
* execute handlers. If not, use a dedicated progress xstream and
* run handlers directly on the calling xstream
*/
/***************************************/ /***************************************/
hg_class = HG_Init(opts.listen_addr, HG_TRUE); if(opts.single_pool_mode)
if(!hg_class) mid = margo_init(opts.listen_addr, MARGO_SERVER_MODE, 0, -1);
{ else
fprintf(stderr, "Error: HG_Init()\n"); mid = margo_init(opts.listen_addr, MARGO_SERVER_MODE, 1, 0);
return(-1); if(mid == MARGO_INSTANCE_NULL)
}
hg_context = HG_Context_create(hg_class);
if(!hg_context)
{ {
fprintf(stderr, "Error: HG_Context_create()\n"); fprintf(stderr, "Error: margo_init()\n");
HG_Finalize(hg_class);
return(-1); return(-1);
} }
...@@ -64,32 +59,28 @@ int main(int argc, char **argv) ...@@ -64,32 +59,28 @@ int main(int argc, char **argv)
hg_size_t addr_self_string_sz = 128; hg_size_t addr_self_string_sz = 128;
/* figure out what address this server is listening on */ /* figure out what address this server is listening on */
ret = HG_Addr_self(hg_class, &addr_self); hret = margo_addr_self(mid, &addr_self);
if(ret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
fprintf(stderr, "Error: HG_Addr_self()\n"); fprintf(stderr, "Error: margo_addr_self()\n");
HG_Context_destroy(hg_context); margo_finalize(mid);
HG_Finalize(hg_class);
return(-1); return(-1);
} }
ret = HG_Addr_to_string(hg_class, addr_self_string, &addr_self_string_sz, addr_self); hret = margo_addr_to_string(mid, addr_self_string, &addr_self_string_sz, addr_self);
if(ret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
fprintf(stderr, "Error: HG_Addr_self()\n"); fprintf(stderr, "Error: margo_addr_self()\n");
HG_Context_destroy(hg_context); margo_addr_free(mid, addr_self);
HG_Finalize(hg_class); margo_finalize(mid);
HG_Addr_free(hg_class, addr_self);
return(-1); return(-1);
} }
HG_Addr_free(hg_class, addr_self); margo_addr_free(mid, addr_self);
fp = fopen(opts.hostfile, "w"); fp = fopen(opts.hostfile, "w");
if(!fp) if(!fp)
{ {
perror("fopen"); perror("fopen");
HG_Context_destroy(hg_context); margo_finalize(mid);
HG_Finalize(hg_class);
HG_Addr_free(hg_class, addr_self);
return(-1); return(-1);
} }
...@@ -97,67 +88,13 @@ int main(int argc, char **argv) ...@@ -97,67 +88,13 @@ int main(int argc, char **argv)
fclose(fp); fclose(fp);
} }
/* set up argobots */
/***************************************/
ret = ABT_init(argc, argv);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_init()\n");
return(-1);
}
/* set primary ES to idle without polling */
ret = ABT_snoozer_xstream_self_set();
if(ret != 0)