Commit c85a5cdc authored by Philip Carns's avatar Philip Carns
Browse files

setup rdma buffer, reorder test a little

parent a08ab367
......@@ -36,11 +36,14 @@ struct options
char* na_transport;
};
#define BW_TOTAL_MEM_SIZE 2097152UL
static void parse_args(int argc, char **argv, struct options *opts);
static void usage(void);
MERCURY_GEN_PROC(bw_rpc_in_t,
((hg_bulk_t)(bulk_handle)))
DECLARE_MARGO_RPC_HANDLER(bw_ult);
static hg_id_t bw_id;
static int run_benchmark(hg_id_t id, ssg_member_id_t target,
ssg_group_id_t gid, margo_instance_id mid);
......@@ -58,11 +61,13 @@ struct bw_worker_arg
static void bw_worker(void *_arg);
static ABT_eventual bw_done_eventual;
static hg_id_t g_bw_id;
static ABT_pool g_transfer_pool;
static ABT_eventual g_bw_done_eventual;
static struct options g_opts;
static ABT_pool transfer_pool;
static char *g_buffer = NULL;
static hg_size_t g_buffer_size = BW_TOTAL_MEM_SIZE;
static hg_bulk_t g_bulk_handle = HG_BULK_NULL;
int main(int argc, char **argv)
{
......@@ -100,6 +105,15 @@ int main(int argc, char **argv)
parse_args(argc, argv, &g_opts);
/* allocate one big buffer for rdma transfers */
g_buffer = calloc(g_buffer_size, 1);
if(!g_buffer)
{
perror("calloc");
fprintf(stderr, "Error: unable to allocate %lu byte buffer.\n", g_buffer_size);
return(-1);
}
/* boilerplate HG initialization steps */
/***************************************/
......@@ -179,10 +193,10 @@ int main(int argc, char **argv)
if(rank == 1 && g_opts.mercury_timeout_server != UINT_MAX)
margo_set_param(mid, MARGO_PARAM_PROGRESS_TIMEOUT_UB, &g_opts.mercury_timeout_server);
bw_id = MARGO_REGISTER_MPLEX(
g_bw_id = MARGO_REGISTER_MPLEX(
mid,
"bw_rpc",
void,
bw_rpc_in_t,
void,
bw_ult,
MARGO_DEFAULT_MPLEX_ID,
......@@ -201,25 +215,22 @@ int main(int argc, char **argv)
printf("MPI rank %d has SSG ID %lu\n", rank, self);
#endif
if(self == 0)
if(self == 1)
{
/* TODO: something smarter than this; don't contact server before
* it is ready.
/* server side: prep everything before letting the client initiate
* benchmark
*/
sleep(5);
/* ssg id 0 initiates benchmark */
ret = run_benchmark(bw_id, 1, gid, mid);
void* buffer = g_buffer;
/* register memory for xfer */
ret = margo_bulk_create(mid, 1, &buffer, &g_buffer_size, HG_BULK_READWRITE, &g_bulk_handle);
assert(ret == 0);
}
else
{
/* ssg id 1 acts as server to run transfers */
int i;
/* set up abt pool */
if(g_opts.threads == 0)
{
/* run bulk transfers from primary pool on server */
transfer_pool = pool;
g_transfer_pool = pool;
}
else
{
......@@ -227,17 +238,30 @@ int main(int argc, char **argv)
bw_worker_xstreams = malloc(
g_opts.threads * sizeof(*bw_worker_xstreams));
assert(bw_worker_xstreams);
ret = ABT_snoozer_xstream_create(g_opts.threads, &transfer_pool,
ret = ABT_snoozer_xstream_create(g_opts.threads, &g_transfer_pool,
bw_worker_xstreams);
assert(ret == ABT_SUCCESS);
}
ret = ABT_eventual_create(0, &bw_done_eventual);
/* signaling mechanism for server to exit at conclusion of test */
ret = ABT_eventual_create(0, &g_bw_done_eventual);
assert(ret == 0);
}
MPI_Barrier(MPI_COMM_WORLD);
if(self == 0)
{
/* ssg id 0 (client) initiates benchmark */
ret = run_benchmark(g_bw_id, 1, gid, mid);
assert(ret == 0);
}
else
{
/* ssg id 1 (server) waits for test RPC to complete */
int i;
/* wait for test RPC to complete */
ABT_eventual_wait(bw_done_eventual, NULL);
sleep(1);
ABT_eventual_wait(g_bw_done_eventual, NULL);
/* cleanup dedicated pool if needed */
for (i = 0; i < g_opts.threads; i++) {
......@@ -246,6 +270,8 @@ int main(int argc, char **argv)
}
if(bw_worker_xstreams)
free(bw_worker_xstreams);
margo_bulk_free(g_bulk_handle);
}
ssg_group_destroy(gid);
......@@ -254,6 +280,8 @@ int main(int argc, char **argv)
if(g_opts.diag_file_name)
margo_diag_dump(mid, g_opts.diag_file_name, 1);
free(g_buffer);
margo_finalize(mid);
HG_Context_destroy(hg_context);
HG_Finalize(hg_class);
......@@ -419,7 +447,7 @@ static void bw_ult(hg_handle_t handle)
{
arg_array[i].start_tm = start_time;
arg_array[i].mid = mid;
ret = ABT_thread_create(transfer_pool, bw_worker, &arg_array[i], ABT_THREAD_ATTR_NULL, &tid_array[i]);
ret = ABT_thread_create(g_transfer_pool, bw_worker, &arg_array[i], ABT_THREAD_ATTR_NULL, &tid_array[i]);
assert(ret == 0);
}
......@@ -435,7 +463,7 @@ static void bw_ult(hg_handle_t handle)
free(tid_array);
free(arg_array);
ABT_eventual_set(bw_done_eventual, NULL, 0);
ABT_eventual_set(g_bw_done_eventual, NULL, 0);
return;
}
......@@ -447,6 +475,8 @@ static int run_benchmark(hg_id_t id, ssg_member_id_t target,
hg_handle_t handle;
hg_addr_t target_addr;
int ret;
bw_rpc_in_t in;
void* buffer = g_buffer;
target_addr = ssg_get_addr(gid, target);
assert(target_addr != HG_ADDR_NULL);
......@@ -454,9 +484,13 @@ static int run_benchmark(hg_id_t id, ssg_member_id_t target,
ret = margo_create(mid, target_addr, id, &handle);
assert(ret == 0);
ret = margo_bulk_create(mid, 1, &buffer, &g_buffer_size, HG_BULK_READWRITE, &in.bulk_handle);
assert(ret == 0);
ret = margo_forward(handle, NULL);
assert(ret == 0);
margo_bulk_free(in.bulk_handle);
margo_destroy(handle);
return(0);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment