Commit 5dc4308c authored by Philip Carns's avatar Philip Carns
Browse files

implement reverse direction transfer

parent 655b1b6e
......@@ -58,6 +58,7 @@ struct bw_worker_arg
const hg_addr_t *target_addr;
hg_size_t bytes_moved;
double end_ts;
hg_bulk_op_t op;
};
static void bw_worker(void *_arg);
......@@ -458,6 +459,7 @@ static void bw_ult(hg_handle_t handle)
arg_array[i].cur_off_mutex = &cur_off_mutex;
arg_array[i].client_bulk_handle = &in.bulk_handle;
arg_array[i].target_addr = &hgi->addr;
arg_array[i].op = HG_BULK_PULL;
ret = ABT_thread_create(g_transfer_pool, bw_worker, &arg_array[i], ABT_THREAD_ATTR_NULL, &tid_array[i]);
assert(ret == 0);
......@@ -484,7 +486,13 @@ static void bw_ult(hg_handle_t handle)
* then get it right later and miss the problem.
*/
for(x=0; x<(bytes_to_check/sizeof(x)); x++)
{
assert(((hg_size_t*)g_buffer)[x] == x);
/* while we are at it, modify pattern so that we can check integrity
* on client side for reverse direction transfer
*/
((hg_size_t*)g_buffer)[x] = x+1;
}
printf("<op>\t<concurrency>\t<threads>\t<bytes>\t<seconds>\t<MiB/s>\n");
printf("PULL\t%d\t%d\t%lu\t%f\t%f\n",
......@@ -494,6 +502,45 @@ static void bw_ult(hg_handle_t handle)
(end_ts-start_time),
((double)bytes_moved/(end_ts-start_time))/(1024.0*1024.0));
/* pause a moment */
margo_thread_sleep(mid, 100);
cur_off = 0;
memset(arg_array, 0, sizeof(*arg_array) * g_opts.concurrency);
start_time = ABT_get_wtime();
/* create requested number of workers to run transfer */
for(i=0; i<g_opts.concurrency; i++)
{
arg_array[i].start_tm = start_time;
arg_array[i].mid = mid;
arg_array[i].cur_off = &cur_off;
arg_array[i].cur_off_mutex = &cur_off_mutex;
arg_array[i].client_bulk_handle = &in.bulk_handle;
arg_array[i].target_addr = &hgi->addr;
arg_array[i].op = HG_BULK_PUSH;
ret = ABT_thread_create(g_transfer_pool, bw_worker, &arg_array[i], ABT_THREAD_ATTR_NULL, &tid_array[i]);
assert(ret == 0);
}
for(i=0; i<g_opts.concurrency; i++)
{
ABT_thread_join(tid_array[i]);
ABT_thread_free(&tid_array[i]);
bytes_moved += arg_array[i].bytes_moved;
if(arg_array[i].end_ts > end_ts)
end_ts = arg_array[i].end_ts;
}
printf("PUSH\t%d\t%d\t%lu\t%f\t%f\n",
g_opts.concurrency,
g_opts.threads,
bytes_moved,
(end_ts-start_time),
((double)bytes_moved/(end_ts-start_time))/(1024.0*1024.0));
margo_respond(handle, NULL);
margo_free_input(handle, &in);
margo_destroy(handle);
......@@ -553,7 +600,6 @@ static void bw_worker(void *_arg)
now = ABT_get_wtime();
/* TODO: need two runs, one in each direction? */
while((now - arg->start_tm) < g_opts.duration_seconds)
{
/* find the offset for this transfer and then increment for next
......@@ -566,7 +612,7 @@ static void bw_worker(void *_arg)
*arg->cur_off = 0;
ABT_mutex_unlock(*arg->cur_off_mutex);
ret = margo_bulk_transfer(arg->mid, HG_BULK_PULL,
ret = margo_bulk_transfer(arg->mid, arg->op,
*arg->target_addr, *arg->client_bulk_handle, my_off, g_bulk_handle, my_off, g_opts.xfer_size);
assert(ret == 0);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment