Commit f16e0a0b authored by Philip Carns's avatar Philip Carns

reproducer for #40

parent 3f9fe3a1
...@@ -72,7 +72,6 @@ int main(int argc, char **argv) ...@@ -72,7 +72,6 @@ int main(int argc, char **argv)
fprintf(stderr, "Error: margo_init()\n"); fprintf(stderr, "Error: margo_init()\n");
return(-1); return(-1);
} }
margo_diag_start(mid);
/* retrieve current pool to use for ULT creation */ /* retrieve current pool to use for ULT creation */
ret = ABT_xstream_self(&xstream); ret = ABT_xstream_self(&xstream);
...@@ -147,7 +146,6 @@ int main(int argc, char **argv) ...@@ -147,7 +146,6 @@ int main(int argc, char **argv)
margo_addr_free(mid, svr_addr); margo_addr_free(mid, svr_addr);
/* shut down everything */ /* shut down everything */
margo_diag_dump(mid, "-", 0);
margo_finalize(mid); margo_finalize(mid);
return(0); return(0);
...@@ -162,6 +160,7 @@ static void run_my_rpc(void *_arg) ...@@ -162,6 +160,7 @@ static void run_my_rpc(void *_arg)
hg_return_t hret; hg_return_t hret;
hg_size_t size; hg_size_t size;
void* buffer; void* buffer;
int i;
printf("ULT [%d] running.\n", arg->val); printf("ULT [%d] running.\n", arg->val);
...@@ -180,6 +179,11 @@ static void run_my_rpc(void *_arg) ...@@ -180,6 +179,11 @@ static void run_my_rpc(void *_arg)
HG_BULK_READ_ONLY, &in.bulk_handle); HG_BULK_READ_ONLY, &in.bulk_handle);
assert(hret == HG_SUCCESS); assert(hret == HG_SUCCESS);
/* send many rpcs */
for(i=0; i<10000; i++)
{
HG_Reset(handle, arg->svr_addr, my_rpc_id);
/* Send rpc. Note that we are also transmitting the bulk handle in the /* Send rpc. Note that we are also transmitting the bulk handle in the
* input struct. It was set above. * input struct. It was set above.
*/ */
...@@ -191,11 +195,13 @@ static void run_my_rpc(void *_arg) ...@@ -191,11 +195,13 @@ static void run_my_rpc(void *_arg)
hret = margo_get_output(handle, &out); hret = margo_get_output(handle, &out);
assert(hret == HG_SUCCESS); assert(hret == HG_SUCCESS);
printf("Got response ret: %d\n", out.ret); // printf("Got response ret: %d\n", out.ret);
margo_free_output(handle, &out);
}
/* clean up resources consumed by this rpc */ /* clean up resources consumed by this rpc */
margo_bulk_free(in.bulk_handle); margo_bulk_free(in.bulk_handle);
margo_free_output(handle, &out);
margo_destroy(handle); margo_destroy(handle);
free(buffer); free(buffer);
......
...@@ -24,25 +24,45 @@ int main(int argc, char **argv) ...@@ -24,25 +24,45 @@ int main(int argc, char **argv)
hg_addr_t addr_self; hg_addr_t addr_self;
char addr_self_string[128]; char addr_self_string[128];
hg_size_t addr_self_string_sz = 128; hg_size_t addr_self_string_sz = 128;
int ret;
int num_rpc_threads;
int dedicated_progress;
if(argc != 2) if(argc != 4)
{ {
fprintf(stderr, "Usage: ./server <listen_addr>\n"); fprintf(stderr, "Usage: ./server <listen_addr> <dedicated_progress?> <num_rpc_threads>\n");
fprintf(stderr, "Example: ./server na+sm://\n"); fprintf(stderr, "Example: ./server na+sm:// 0 -1\n");
return(-1);
}
ret = sscanf(argv[2], "%d", &dedicated_progress);
if(ret != 1)
{
fprintf(stderr, "Usage: ./server <listen_addr> <dedicated_progress?> <num_rpc_threads>\n");
fprintf(stderr, "Example: ./server na+sm:// 0 0 -1\n");
return(-1);
}
ret = sscanf(argv[3], "%d", &num_rpc_threads);
if(ret != 1)
{
fprintf(stderr, "Usage: ./server <listen_addr> <dedicated_progress?> <num_rpc_threads>\n");
fprintf(stderr, "Example: ./server na+sm:// 0 0 -1\n");
return(-1); return(-1);
} }
/* actually start margo -- this step encapsulates the Mercury and /* actually start margo -- this step encapsulates the Mercury and
* Argobots initialization and must precede their use */ * Argobots initialization and must precede their use */
/* Use the calling xstream to drive progress and execute handlers. */ /* NOTE: for debugging #40, use dedicate progress thread and N rpc
* threads
*/
/***************************************/ /***************************************/
mid = margo_init(argv[1], MARGO_SERVER_MODE, 0, -1); mid = margo_init(argv[1], MARGO_SERVER_MODE, dedicated_progress, num_rpc_threads);
if(mid == MARGO_INSTANCE_NULL) if(mid == MARGO_INSTANCE_NULL)
{ {
fprintf(stderr, "Error: margo_init()\n"); fprintf(stderr, "Error: margo_init()\n");
return(-1); return(-1);
} }
margo_diag_start(mid);
/* figure out what address this server is listening on */ /* figure out what address this server is listening on */
hret = margo_addr_self(mid, &addr_self); hret = margo_addr_self(mid, &addr_self);
......
...@@ -37,7 +37,7 @@ static void my_rpc_ult(hg_handle_t handle) ...@@ -37,7 +37,7 @@ static void my_rpc_ult(hg_handle_t handle)
hret = margo_get_input(handle, &in); hret = margo_get_input(handle, &in);
assert(hret == HG_SUCCESS); assert(hret == HG_SUCCESS);
printf("Got RPC request with input_val: %d\n", in.input_val); //printf("Got RPC request with input_val: %d\n", in.input_val);
out.ret = 0; out.ret = 0;
/* set up target buffer for bulk transfer */ /* set up target buffer for bulk transfer */
...@@ -103,8 +103,6 @@ static void my_rpc_shutdown_ult(hg_handle_t handle) ...@@ -103,8 +103,6 @@ static void my_rpc_shutdown_ult(hg_handle_t handle)
margo_destroy(handle); margo_destroy(handle);
margo_diag_dump(mid, "-", 0);
/* NOTE: we assume that the server daemon is using /* NOTE: we assume that the server daemon is using
* margo_wait_for_finalize() to suspend until this RPC executes, so there * margo_wait_for_finalize() to suspend until this RPC executes, so there
* is no need to send any extra signal to notify it. * is no need to send any extra signal to notify it.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment