Commit 79490301 authored by Philip Carns's avatar Philip Carns

add ability to do a "warmup" run

- default 1 second, each direction, before measurements
parent 0e86c184
......@@ -43,6 +43,7 @@ struct options
char* na_transport;
unsigned long g_buffer_size;
int align_buffer;
int warmup_seconds;
};
#define BW_TOTAL_MEM_SIZE 2147483648UL
......@@ -56,13 +57,14 @@ static void usage(void);
MERCURY_GEN_PROC(bw_rpc_in_t,
((hg_bulk_t)(bulk_handle))\
((int32_t)(op))\
((int32_t)(shutdown)))
((int32_t)(shutdown))\
((int32_t)(duration)))
MERCURY_GEN_PROC(bw_rpc_out_t,
((hg_size_t)(bytes_moved)))
DECLARE_MARGO_RPC_HANDLER(bw_ult);
static int run_benchmark(hg_id_t id, ssg_member_id_t target,
ssg_group_id_t gid, margo_instance_id mid);
static int run_benchmark(hg_id_t id, ssg_member_id_t target,
ssg_group_id_t gid, margo_instance_id mid, int shutdown_flag, int duration, int print_flag);
struct bw_worker_arg
{
......@@ -74,6 +76,7 @@ struct bw_worker_arg
const hg_addr_t *target_addr;
hg_size_t bytes_moved;
hg_bulk_op_t op;
int duration;
};
static void bw_worker(void *_arg);
......@@ -257,7 +260,13 @@ int main(int argc, char **argv)
if(self == 0)
{
/* ssg id 0 (client) initiates benchmark */
ret = run_benchmark(g_bw_id, 1, gid, mid);
/* warmup */
if(g_opts.warmup_seconds)
ret = run_benchmark(g_bw_id, 1, gid, mid, 0, g_opts.warmup_seconds, 0);
assert(ret == 0);
ret = run_benchmark(g_bw_id, 1, gid, mid, 1, g_opts.duration_seconds, 1);
assert(ret == 0);
}
else
......@@ -309,10 +318,12 @@ static int parse_args(int argc, char **argv, struct options *opts)
/* default to using whatever the standard timeout is in margo */
opts->mercury_timeout_client = UINT_MAX;
opts->mercury_timeout_server = UINT_MAX;
opts->mercury_timeout_server = UINT_MAX;
opts->g_buffer_size = BW_TOTAL_MEM_SIZE;
/* warm up for 1 second by default */
opts->warmup_seconds = 1;
while((opt = getopt(argc, argv, "n:x:c:T:d:t:D:m:X:a")) != -1)
while((opt = getopt(argc, argv, "n:x:c:T:d:t:D:m:X:a:w:")) != -1)
{
switch(opt)
{
......@@ -329,6 +340,11 @@ static int parse_args(int argc, char **argv, struct options *opts)
if(ret != 1)
return(-1);
break;
case 'w':
ret = sscanf(optarg, "%d", &opts->warmup_seconds);
if(ret != 1)
return(-1);
break;
case 'X':
ret = sscanf(optarg, "%lu", &opts->g_buffer_size);
if(ret != 1)
......@@ -377,7 +393,7 @@ static int parse_args(int argc, char **argv, struct options *opts)
}
}
if(opts->xfer_size < 1 || opts->concurrency < 1 || opts->duration_seconds < 1 || !opts->na_transport)
if(opts->xfer_size < 1 || opts->concurrency < 1 || opts->duration_seconds < 1 || !opts->na_transport || opts->warmup_seconds < 0)
{
return(-1);
}
......@@ -400,6 +416,7 @@ static void usage(void)
"\t[-m <filename>] - use memory-mapped file as buffers instead of malloc\n"
"\t[-X <xfer_memory>] - size of total memory buffer to allocate (and iterate over during transfer) in each process\n"
"\t[-a] - explicitly align memory buffer to page size\n"
"\t[-w] - number of seconds to warm up before benchmark measurement\n"
"\t\texample: mpiexec -n 2 ./margo-p2p-bw -x 4096 -D 30 -n verbs://\n"
"\t\t(must be run with exactly 2 processes\n");
......@@ -450,6 +467,7 @@ static void bw_ult(hg_handle_t handle)
arg_array[i].client_bulk_handle = &in.bulk_handle;
arg_array[i].target_addr = &hgi->addr;
arg_array[i].op = in.op;
arg_array[i].duration = in.duration;
ret = ABT_thread_create(g_transfer_pool, bw_worker, &arg_array[i], ABT_THREAD_ATTR_NULL, &tid_array[i]);
assert(ret == 0);
......@@ -504,8 +522,8 @@ static void bw_ult(hg_handle_t handle)
}
DEFINE_MARGO_RPC_HANDLER(bw_ult)
static int run_benchmark(hg_id_t id, ssg_member_id_t target,
ssg_group_id_t gid, margo_instance_id mid)
static int run_benchmark(hg_id_t id, ssg_member_id_t target,
ssg_group_id_t gid, margo_instance_id mid, int shutdown, int duration, int print_flag)
{
hg_handle_t handle;
hg_addr_t target_addr;
......@@ -531,6 +549,7 @@ static int run_benchmark(hg_id_t id, ssg_member_id_t target,
assert(ret == 0);
in.op = HG_BULK_PULL;
in.shutdown = 0;
in.duration = duration;
start_ts = ABT_get_wtime();
ret = margo_forward(handle, &in);
......@@ -540,17 +559,21 @@ static int run_benchmark(hg_id_t id, ssg_member_id_t target,
ret = margo_get_output(handle, &out);
assert(ret == HG_SUCCESS);
printf("<op>\t<concurrency>\t<threads>\t<xfer_size>\t<total_bytes>\t<seconds>\t<MiB/s>\t<xfer_memory>\t<align_buffer>\n");
printf("PULL\t%d\t%d\t%d\t%lu\t%f\t%f\t%lu\t%d\n",
g_opts.concurrency,
g_opts.threads,
g_opts.xfer_size,
out.bytes_moved,
(end_ts-start_ts),
((double)out.bytes_moved/(end_ts-start_ts))/(1024.0*1024.0),
g_opts.g_buffer_size,
g_opts.align_buffer);
if(print_flag)
{
printf("<op>\t<warmup_seconds>\t<concurrency>\t<threads>\t<xfer_size>\t<total_bytes>\t<seconds>\t<MiB/s>\t<xfer_memory>\t<align_buffer>\n");
printf("PULL\t%d\t%d\t%d\t%d\t%lu\t%f\t%f\t%lu\t%d\n",
g_opts.concurrency,
g_opts.warmup_seconds,
g_opts.threads,
g_opts.xfer_size,
out.bytes_moved,
(end_ts-start_ts),
((double)out.bytes_moved/(end_ts-start_ts))/(1024.0*1024.0),
g_opts.g_buffer_size,
g_opts.align_buffer);
}
margo_free_output(handle, &out);
......@@ -558,7 +581,8 @@ static int run_benchmark(hg_id_t id, ssg_member_id_t target,
margo_thread_sleep(mid, 100);
in.op = HG_BULK_PUSH;
in.shutdown = 1;
in.shutdown = shutdown;
in.duration = duration;
start_ts = ABT_get_wtime();
ret = margo_forward(handle, &in);
......@@ -568,15 +592,19 @@ static int run_benchmark(hg_id_t id, ssg_member_id_t target,
ret = margo_get_output(handle, &out);
assert(ret == HG_SUCCESS);
printf("PUSH\t%d\t%d\t%d\t%lu\t%f\t%f\t%lu\t%d\n",
g_opts.concurrency,
g_opts.threads,
g_opts.xfer_size,
out.bytes_moved,
(end_ts-start_ts),
((double)out.bytes_moved/(end_ts-start_ts))/(1024.0*1024.0),
g_opts.g_buffer_size,
g_opts.align_buffer);
if(print_flag)
{
printf("PUSH\t%d\t%d\t%d\t%d\t%lu\t%f\t%f\t%lu\t%d\n",
g_opts.concurrency,
g_opts.warmup_seconds,
g_opts.threads,
g_opts.xfer_size,
out.bytes_moved,
(end_ts-start_ts),
((double)out.bytes_moved/(end_ts-start_ts))/(1024.0*1024.0),
g_opts.g_buffer_size,
g_opts.align_buffer);
}
/* calculate how many bytes of the buffer have been transferred */
bytes_to_check = (g_opts.g_buffer_size / g_opts.xfer_size) * g_opts.xfer_size;
......@@ -607,7 +635,7 @@ static void bw_worker(void *_arg)
now = ABT_get_wtime();
while((now - arg->start_tm) < g_opts.duration_seconds)
while((now - arg->start_tm) < arg->duration)
{
/* find the offset for this transfer and then increment for next
* one
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment