Commit 18e2fb95 authored by Philip Carns's avatar Philip Carns
Browse files

modify benchmark to calculate effective rate

- timing now performed by client to get effective rate including rpc
  req to start each stream and rpc ack to complete it
- intent is to more accurately measure transfer rate instead of just
  injection rate
parent fa747a0f
......@@ -27,6 +27,9 @@ margo-p2p-bw is a point to point bandwidth benchmark. It measures Margo
(Mercury) bulk transfer operations in both PULL and PUSH mode and includes
command line arguments to control the concurrency level.
The timing and bandwidth calculation is performed by the client and includes
a round trip RPC to initiate and complete the streaming transfer.
Example compile (must build with MPI support):
```
......
......@@ -4,6 +4,15 @@
* See COPYRIGHT in top-level directory.
*/
/* Effective streaming bandwidth test, as measured by client including RPC
* used to start and complete the streaming operation.
*
* NOTE: This test is not as clean as it could be. Because it is set up as
* an MPI program, the server is able to make assumptions about the pattern;
* it assumes that it should set a fill pattern after the first RPC and shut
* down after the second RPC. It assumes it can read all params from argv.
*/
#include "ssg-config.h"
#include <unistd.h>
......@@ -42,9 +51,10 @@ static void parse_args(int argc, char **argv, struct options *opts);
static void usage(void);
MERCURY_GEN_PROC(bw_rpc_in_t,
((hg_bulk_t)(bulk_handle)))
((hg_bulk_t)(bulk_handle))\
((int32_t)(op)))
MERCURY_GEN_PROC(bw_rpc_out_t,
((int64_t)(bytes_to_check)))
((hg_size_t)(bytes_moved)))
DECLARE_MARGO_RPC_HANDLER(bw_ult);
static int run_benchmark(hg_id_t id, ssg_member_id_t target,
......@@ -59,7 +69,6 @@ struct bw_worker_arg
hg_bulk_t *client_bulk_handle;
const hg_addr_t *target_addr;
hg_size_t bytes_moved;
double end_ts;
hg_bulk_op_t op;
};
......@@ -431,10 +440,7 @@ static void bw_ult(hg_handle_t handle)
const struct hg_info *hgi;
size_t cur_off = 0;
ABT_mutex cur_off_mutex;
unsigned long bytes_moved = 0;
double end_ts = 0.0;
unsigned long bytes_to_check = 0;
unsigned long bytes_to_check2 = 0;
hg_size_t x;
ABT_mutex_create(&cur_off_mutex);
......@@ -463,112 +469,55 @@ static void bw_ult(hg_handle_t handle)
arg_array[i].cur_off_mutex = &cur_off_mutex;
arg_array[i].client_bulk_handle = &in.bulk_handle;
arg_array[i].target_addr = &hgi->addr;
arg_array[i].op = HG_BULK_PULL;
arg_array[i].op = in.op;
ret = ABT_thread_create(g_transfer_pool, bw_worker, &arg_array[i], ABT_THREAD_ATTR_NULL, &tid_array[i]);
assert(ret == 0);
}
out.bytes_moved = 0;
for(i=0; i<g_opts.concurrency; i++)
{
ABT_thread_join(tid_array[i]);
ABT_thread_free(&tid_array[i]);
bytes_moved += arg_array[i].bytes_moved;
if(arg_array[i].end_ts > end_ts)
end_ts = arg_array[i].end_ts;
out.bytes_moved += arg_array[i].bytes_moved;
}
margo_respond(handle, &out);
/* calculate how many bytes of the buffer have been transferred */
bytes_to_check = (g_buffer_size / g_opts.xfer_size) * g_opts.xfer_size;
if(bytes_moved < bytes_to_check)
bytes_to_check = bytes_moved;
/* check integrity of fill pattern. Note that this isn't as strong as
* checking every RDMA transfer separately since we are looping around
* and overwriting in a ring-buffer style. We could corrupt early and
* then get it right later and miss the problem.
*/
for(x=0; x<(bytes_to_check/sizeof(x)); x++)
if(in.op == HG_BULK_PULL)
{
assert(((hg_size_t*)g_buffer)[x] == x);
/* while we are at it, modify pattern so that we can check integrity
* on client side for reverse direction transfer
/* calculate how many bytes of the buffer have been transferred */
bytes_to_check = (g_buffer_size / g_opts.xfer_size) * g_opts.xfer_size;
if(out.bytes_moved < bytes_to_check)
bytes_to_check = out.bytes_moved;
/* check integrity of fill pattern. Note that this isn't as strong as
* checking every RDMA transfer separately since we are looping around
* and overwriting in a ring-buffer style. We could corrupt early and
* but then overwrite it with correct results on a later pass.
*/
((hg_size_t*)g_buffer)[x] = x+1;
}
printf("<op>\t<concurrency>\t<threads>\t<xfer_size>\t<total_bytes>\t<seconds>\t<MiB/s>\n");
printf("PULL\t%d\t%d\t%d\t%lu\t%f\t%f\n",
g_opts.concurrency,
g_opts.threads,
g_opts.xfer_size,
bytes_moved,
(end_ts-start_time),
((double)bytes_moved/(end_ts-start_time))/(1024.0*1024.0));
/* pause a moment */
margo_thread_sleep(mid, 100);
cur_off = 0;
memset(arg_array, 0, sizeof(*arg_array) * g_opts.concurrency);
start_time = ABT_get_wtime();
/* create requested number of workers to run transfer */
for(i=0; i<g_opts.concurrency; i++)
{
arg_array[i].start_tm = start_time;
arg_array[i].mid = mid;
arg_array[i].cur_off = &cur_off;
arg_array[i].cur_off_mutex = &cur_off_mutex;
arg_array[i].client_bulk_handle = &in.bulk_handle;
arg_array[i].target_addr = &hgi->addr;
arg_array[i].op = HG_BULK_PUSH;
for(x=0; x<(bytes_to_check/sizeof(x)); x++)
{
assert(((hg_size_t*)g_buffer)[x] == x);
}
ret = ABT_thread_create(g_transfer_pool, bw_worker, &arg_array[i], ABT_THREAD_ATTR_NULL, &tid_array[i]);
assert(ret == 0);
/* fill pattern for return trip, increment each value by 1 */
for(x=0; x<(g_buffer_size/sizeof(x)); x++)
((hg_size_t*)g_buffer)[x] = x+1;
}
for(i=0; i<g_opts.concurrency; i++)
{
ABT_thread_join(tid_array[i]);
ABT_thread_free(&tid_array[i]);
bytes_moved += arg_array[i].bytes_moved;
if(arg_array[i].end_ts > end_ts)
end_ts = arg_array[i].end_ts;
}
printf("PUSH\t%d\t%d\t%d\t%lu\t%f\t%f\n",
g_opts.concurrency,
g_opts.threads,
g_opts.xfer_size,
bytes_moved,
(end_ts-start_time),
((double)bytes_moved/(end_ts-start_time))/(1024.0*1024.0));
/* calculate how many bytes of the buffer have been transferred */
bytes_to_check2 = (g_buffer_size / g_opts.xfer_size) * g_opts.xfer_size;
if(bytes_moved < bytes_to_check2)
bytes_to_check2 = bytes_moved;
/* tell client how many bytes to validate on that end. Will be minimum
* of pull and push buffer used
*/
out.bytes_to_check = bytes_to_check;
if(bytes_to_check > bytes_to_check2)
out.bytes_to_check = bytes_to_check2;
margo_respond(handle, &out);
margo_free_input(handle, &in);
margo_destroy(handle);
free(tid_array);
free(arg_array);
ABT_eventual_set(g_bw_done_eventual, NULL, 0);
ABT_mutex_free(&cur_off_mutex);
if(arg_array[0].op == HG_BULK_PUSH)
ABT_eventual_set(g_bw_done_eventual, NULL, 0);
return;
}
......@@ -584,6 +533,8 @@ static int run_benchmark(hg_id_t id, ssg_member_id_t target,
bw_rpc_out_t out;
void* buffer = g_buffer;
hg_size_t i;
hg_size_t bytes_to_check;
double start_ts, end_ts;
/* fill pattern in origin buffer */
for(i=0; i<(g_buffer_size/sizeof(i)); i++)
......@@ -597,15 +548,54 @@ static int run_benchmark(hg_id_t id, ssg_member_id_t target,
ret = margo_bulk_create(mid, 1, &buffer, &g_buffer_size, HG_BULK_READWRITE, &in.bulk_handle);
assert(ret == 0);
in.op = HG_BULK_PULL;
start_ts = ABT_get_wtime();
ret = margo_forward(handle, &in);
end_ts = ABT_get_wtime();
assert(ret == 0);
ret = margo_get_output(handle, &out);
assert(ret == HG_SUCCESS);
printf("<op>\t<concurrency>\t<threads>\t<xfer_size>\t<total_bytes>\t<seconds>\t<MiB/s>\n");
printf("PULL\t%d\t%d\t%d\t%lu\t%f\t%f\n",
g_opts.concurrency,
g_opts.threads,
g_opts.xfer_size,
out.bytes_moved,
(end_ts-start_ts),
((double)out.bytes_moved/(end_ts-start_ts))/(1024.0*1024.0));
margo_free_output(handle, &out);
/* pause a moment */
margo_thread_sleep(mid, 100);
in.op = HG_BULK_PUSH;
start_ts = ABT_get_wtime();
ret = margo_forward(handle, &in);
end_ts = ABT_get_wtime();
assert(ret == 0);
ret = margo_get_output(handle, &out);
assert(ret == HG_SUCCESS);
printf("PULL\t%d\t%d\t%d\t%lu\t%f\t%f\n",
g_opts.concurrency,
g_opts.threads,
g_opts.xfer_size,
out.bytes_moved,
(end_ts-start_ts),
((double)out.bytes_moved/(end_ts-start_ts))/(1024.0*1024.0));
/* calculate how many bytes of the buffer have been transferred */
bytes_to_check = (g_buffer_size / g_opts.xfer_size) * g_opts.xfer_size;
if(out.bytes_moved < bytes_to_check)
bytes_to_check = out.bytes_moved;
/* check fill pattern we got back; should be what we set plus one */
for(i=0; i<(out.bytes_to_check/sizeof(i)); i++)
for(i=0; i<(bytes_to_check/sizeof(i)); i++)
{
assert(((hg_size_t*)g_buffer)[i] == i+1);
}
......@@ -648,7 +638,6 @@ static void bw_worker(void *_arg)
arg->bytes_moved += g_opts.xfer_size;
now = ABT_get_wtime();
}
arg->end_ts = now;
// printf("# DBG: worker stopped.\n");
return;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment