Commit 252a42b6 authored by Philip Carns's avatar Philip Carns

add option to control memory buffer size

- size of buffer to iterate over during transfer
- if this size matches transfer size, then a single hot buffer will be used
  repeatedly for all operations
parent 04bf9aed
......@@ -41,6 +41,7 @@ struct options
unsigned int mercury_timeout_server;
char* diag_file_name;
char* na_transport;
unsigned long g_buffer_size;
};
#define BW_TOTAL_MEM_SIZE 2147483648UL
......@@ -80,7 +81,6 @@ static ABT_pool g_transfer_pool;
static ABT_eventual g_bw_done_eventual;
static struct options g_opts;
static char *g_buffer = NULL;
static hg_size_t g_buffer_size = BW_TOTAL_MEM_SIZE;
static hg_bulk_t g_bulk_handle = HG_BULK_NULL;
int main(int argc, char **argv)
......@@ -124,13 +124,13 @@ int main(int argc, char **argv)
/* On server side, optionally use an mmap'd buffer. Always calloc on
* client. */
if(rank == 1 && g_opts.mmap_filename)
g_buffer = custom_mmap_alloc(g_opts.mmap_filename, g_buffer_size, rank);
g_buffer = custom_mmap_alloc(g_opts.mmap_filename, g_opts.g_buffer_size, rank);
else
g_buffer = calloc(g_buffer_size, 1);
g_buffer = calloc(g_opts.g_buffer_size, 1);
if(!g_buffer)
{
fprintf(stderr, "Error: unable to allocate %lu byte buffer.\n", g_buffer_size);
fprintf(stderr, "Error: unable to allocate %lu byte buffer.\n", g_opts.g_buffer_size);
return(-1);
}
......@@ -186,7 +186,7 @@ int main(int argc, char **argv)
void* buffer = g_buffer;
/* register memory for xfer */
ret = margo_bulk_create(mid, 1, &buffer, &g_buffer_size, HG_BULK_READWRITE, &g_bulk_handle);
ret = margo_bulk_create(mid, 1, &buffer, &g_opts.g_buffer_size, HG_BULK_READWRITE, &g_bulk_handle);
assert(ret == 0);
/* set up abt pool */
......@@ -269,7 +269,7 @@ int main(int argc, char **argv)
if(g_opts.mmap_filename == NULL) {
free(g_buffer);
} else {
custom_mmap_free(g_opts.mmap_filename, g_buffer, g_buffer_size, rank);
custom_mmap_free(g_opts.mmap_filename, g_buffer, g_opts.g_buffer_size, rank);
}
margo_finalize(mid);
......@@ -290,8 +290,9 @@ static int parse_args(int argc, char **argv, struct options *opts)
/* default to using whatever the standard timeout is in margo */
opts->mercury_timeout_client = UINT_MAX;
opts->mercury_timeout_server = UINT_MAX;
opts->g_buffer_size = BW_TOTAL_MEM_SIZE;
while((opt = getopt(argc, argv, "n:x:c:T:d:t:D:m:")) != -1)
while((opt = getopt(argc, argv, "n:x:c:T:d:t:D:m:X:")) != -1)
{
switch(opt)
{
......@@ -308,6 +309,11 @@ static int parse_args(int argc, char **argv, struct options *opts)
if(ret != 1)
return(-1);
break;
case 'X':
ret = sscanf(optarg, "%lu", &opts->g_buffer_size);
if(ret != 1)
return(-1);
break;
case 'c':
ret = sscanf(optarg, "%d", &opts->concurrency);
if(ret != 1)
......@@ -364,11 +370,12 @@ static void usage(void)
"\t-x <xfer_size> - size of each bulk tranfer in bytes\n"
"\t-D <duration> - duration of test in seconds\n"
"\t-n <na> - na transport\n"
"\t[-c concurrency] - number of concurrent operations to issue with ULTs\n"
"\t[-T <os threads] - number of dedicated operating system threads to run ULTs on\n"
"\t[-d filename] - enable diagnostics output\n"
"\t[-t client_progress_timeout,server_progress_timeout] # use \"-t 0,0\" to busy spin\n"
"\t[-m filename] - use memory-mapped file as buffers instead of malloc\n"
"\t[-c <concurrency>] - number of concurrent operations to issue with ULTs\n"
"\t[-T <os threads>] - number of dedicated operating system threads to run ULTs on\n"
"\t[-d <filename>] - enable diagnostics output\n"
"\t[-t <client_progress_timeout,server_progress_timeout>] # use \"-t 0,0\" to busy spin\n"
"\t[-m <filename>] - use memory-mapped file as buffers instead of malloc\n"
"\t[-X <xfer_memory>] - size of total memory buffer to allocate (and iterate over during transfer) in each process\n"
"\t\texample: mpiexec -n 2 ./margo-p2p-bw -x 4096 -D 30 -n verbs://\n"
"\t\t(must be run with exactly 2 processes\n");
......@@ -438,7 +445,7 @@ static void bw_ult(hg_handle_t handle)
if(in.op == HG_BULK_PULL)
{
/* calculate how many bytes of the buffer have been transferred */
bytes_to_check = (g_buffer_size / g_opts.xfer_size) * g_opts.xfer_size;
bytes_to_check = (g_opts.g_buffer_size / g_opts.xfer_size) * g_opts.xfer_size;
if(out.bytes_moved < bytes_to_check)
bytes_to_check = out.bytes_moved;
......@@ -453,7 +460,7 @@ static void bw_ult(hg_handle_t handle)
}
/* fill pattern for return trip, increment each value by 1 */
for(x=0; x<(g_buffer_size/sizeof(x)); x++)
for(x=0; x<(g_opts.g_buffer_size/sizeof(x)); x++)
((hg_size_t*)g_buffer)[x] = x+1;
}
......@@ -487,7 +494,7 @@ static int run_benchmark(hg_id_t id, ssg_member_id_t target,
double start_ts, end_ts;
/* fill pattern in origin buffer */
for(i=0; i<(g_buffer_size/sizeof(i)); i++)
for(i=0; i<(g_opts.g_buffer_size/sizeof(i)); i++)
((hg_size_t*)buffer)[i] = i;
target_addr = ssg_get_addr(gid, target);
......@@ -496,7 +503,7 @@ static int run_benchmark(hg_id_t id, ssg_member_id_t target,
ret = margo_create(mid, target_addr, id, &handle);
assert(ret == 0);
ret = margo_bulk_create(mid, 1, &buffer, &g_buffer_size, HG_BULK_READWRITE, &in.bulk_handle);
ret = margo_bulk_create(mid, 1, &buffer, &g_opts.g_buffer_size, HG_BULK_READWRITE, &in.bulk_handle);
assert(ret == 0);
in.op = HG_BULK_PULL;
......@@ -508,14 +515,16 @@ static int run_benchmark(hg_id_t id, ssg_member_id_t target,
ret = margo_get_output(handle, &out);
assert(ret == HG_SUCCESS);
printf("<op>\t<concurrency>\t<threads>\t<xfer_size>\t<total_bytes>\t<seconds>\t<MiB/s>\n");
printf("PULL\t%d\t%d\t%d\t%lu\t%f\t%f\n",
printf("<op>\t<concurrency>\t<threads>\t<xfer_size>\t<total_bytes>\t<seconds>\t<MiB/s>\t<xfer_memory>\n");
printf("PULL\t%d\t%d\t%d\t%lu\t%f\t%f\t%lu\n",
g_opts.concurrency,
g_opts.threads,
g_opts.xfer_size,
out.bytes_moved,
(end_ts-start_ts),
((double)out.bytes_moved/(end_ts-start_ts))/(1024.0*1024.0));
((double)out.bytes_moved/(end_ts-start_ts))/(1024.0*1024.0),
g_opts.g_buffer_size);
margo_free_output(handle, &out);
......@@ -532,16 +541,17 @@ static int run_benchmark(hg_id_t id, ssg_member_id_t target,
ret = margo_get_output(handle, &out);
assert(ret == HG_SUCCESS);
printf("PUSH\t%d\t%d\t%d\t%lu\t%f\t%f\n",
printf("PUSH\t%d\t%d\t%d\t%lu\t%f\t%f\t%lu\n",
g_opts.concurrency,
g_opts.threads,
g_opts.xfer_size,
out.bytes_moved,
(end_ts-start_ts),
((double)out.bytes_moved/(end_ts-start_ts))/(1024.0*1024.0));
((double)out.bytes_moved/(end_ts-start_ts))/(1024.0*1024.0),
g_opts.g_buffer_size);
/* calculate how many bytes of the buffer have been transferred */
bytes_to_check = (g_buffer_size / g_opts.xfer_size) * g_opts.xfer_size;
bytes_to_check = (g_opts.g_buffer_size / g_opts.xfer_size) * g_opts.xfer_size;
if(out.bytes_moved < bytes_to_check)
bytes_to_check = out.bytes_moved;
/* check fill pattern we got back; should be what we set plus one */
......@@ -577,7 +587,7 @@ static void bw_worker(void *_arg)
ABT_mutex_spinlock(*arg->cur_off_mutex);
my_off = *arg->cur_off;
(*arg->cur_off) += g_opts.xfer_size;
if(((*arg->cur_off)+g_opts.xfer_size) > g_buffer_size)
if(((*arg->cur_off)+g_opts.xfer_size) > g_opts.g_buffer_size)
*arg->cur_off = 0;
ABT_mutex_unlock(*arg->cur_off_mutex);
......@@ -587,6 +597,7 @@ static void bw_worker(void *_arg)
arg->bytes_moved += g_opts.xfer_size;
now = ABT_get_wtime();
//printf("now: %f\n", now);
}
// printf("# DBG: worker stopped.\n");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment