Commit 5854b186 authored by Philip Carns's avatar Philip Carns

add option to write to multiple pmdk pools

- also remove buffer allocation since we aren't using it
parent 6416eb8b
......@@ -46,10 +46,9 @@ static int parse_args(int argc, char **argv, struct options *opts);
static void usage(void);
static struct options g_opts;
static char *g_buffer = NULL;
static ABT_pool g_transfer_pool;
static int run_benchmark(struct options *opts, PMEMobjpool *pmem_pool);
static int run_benchmark(struct options *opts, PMEMobjpool **pmem_pools, int pmem_pools_count);
static void bench_worker(void *_arg);
int main(int argc, char **argv)
......@@ -60,7 +59,9 @@ int main(int argc, char **argv)
ABT_sched self_sched;
ABT_xstream self_xstream;
int i;
PMEMobjpool *pmem_pool;
PMEMobjpool **pmem_pools;
int pmem_pools_count = 0;
char *tmp_pool_name;
ret = parse_args(argc, argv, &g_opts);
if(ret < 0)
......@@ -81,20 +82,37 @@ int main(int argc, char **argv)
ret = ABT_xstream_set_main_sched(self_xstream, self_sched);
assert(ret == 0);
pmem_pool = pmemobj_open(g_opts.pmdk_pool, NULL);
if(!pmem_pool)
/* count number of commas in pmdk_pool argument to see how many pools we
* have
*/
pmem_pools_count = 1;
for(i=0; i<strlen(g_opts.pmdk_pool); i++)
if(g_opts.pmdk_pool[i] == ',')
pmem_pools_count++;
pmem_pools = malloc(pmem_pools_count * sizeof(*pmem_pools));
assert(pmem_pools);
tmp_pool_name = strtok(g_opts.pmdk_pool, ",");
pmem_pools[0] = pmemobj_open(tmp_pool_name, NULL);
if(!pmem_pools[0])
{
fprintf(stderr, "pmemobj_open: %s\n", pmemobj_errormsg());
return(-1);
}
/* allocate one big buffer for writes */
g_buffer = calloc(g_opts.xfer_size, 1);
if(!g_buffer)
i=1;
while((tmp_pool_name = strtok(NULL, ",")))
{
fprintf(stderr, "Error: unable to allocate %lu byte buffer.\n", g_opts.xfer_size);
return(-1);
pmem_pools[i] = pmemobj_open(tmp_pool_name, NULL);
if(!pmem_pools[i])
{
fprintf(stderr, "pmemobj_open: %s\n", pmemobj_errormsg());
return(-1);
}
i++;
}
assert(i==pmem_pools_count);
/* set up abt pool */
if(g_opts.xstreams <= 0)
......@@ -133,7 +151,7 @@ int main(int argc, char **argv)
}
}
run_benchmark(&g_opts, pmem_pool);
run_benchmark(&g_opts, pmem_pools, pmem_pools_count);
for(i=0; i<g_opts.xstreams; i++)
{
......@@ -141,9 +159,8 @@ int main(int argc, char **argv)
ABT_xstream_free(&bw_worker_xstreams[i]);
}
pmemobj_close(pmem_pool);
free(g_buffer);
for(i=0; i<pmem_pools_count; i++)
pmemobj_close(pmem_pools[i]);
ABT_finalize();
......@@ -212,9 +229,11 @@ static void usage(void)
fprintf(stderr,
"Usage: "
"bake-p2p-bw -x <xfer_size> -m <total_mem_size> -p <pmdk pool>\n"
"\t-x <xfer_size> - size of each bulk tranfer in bytes\n"
"\t-m <total_mem_size> - total amount of data to write from each client process\n"
"\t-x <xfer_size> - size of each write operation in bytes\n"
"\t-m <total_mem_size> - total amount of data to write to each pool\n"
"\t-p <pmdk pool> - existing pool created with pmempool create obj\n"
"\t\tnote: can be comma-separated list to use multiple pools;\n"
"\t\t pools will be round-robin distributed across ULTs.\n"
"\t[-c concurrency] - number of concurrent operations to issue with ULTs\n"
"\t[-T <os threads] - number of dedicated operating system threads to run ULTs on\n"
"\t\texample: ./pmdk-bw -x 4096 -p /dev/shm/test.dat\n");
......@@ -222,29 +241,48 @@ static void usage(void)
return;
}
static int run_benchmark(struct options *opts, PMEMobjpool *pmem_pool)
struct pool_offset
{
ABT_mutex mutex;
unsigned long off;
};
static int run_benchmark(struct options *opts, PMEMobjpool **pmem_pools, int pmem_pools_count)
{
int ret;
int i;
ABT_thread *tid_array;
struct bench_worker_arg *arg_array;
ABT_mutex cur_off_mutex;
unsigned long cur_off = 0;
double start_tm, end_tm;
int j;
struct pool_offset* cur_off_array;
tid_array = malloc(g_opts.concurrency * sizeof(*tid_array));
assert(tid_array);
arg_array = malloc(g_opts.concurrency * sizeof(*arg_array));
assert(arg_array);
cur_off_array = malloc(pmem_pools_count * sizeof(*cur_off_array));
assert(cur_off_array);
ABT_mutex_create(&cur_off_mutex);
/* we keep a current offset *per pool* so that any threads sharing that
* pool will likewise share an offset
*/
for(i=0; i<pmem_pools_count; i++)
{
ABT_mutex_create(&cur_off_array[i].mutex);
cur_off_array[i].off = 0;
}
j=0;
start_tm = ABT_get_wtime();
for(i=0; i<g_opts.concurrency; i++)
{
arg_array[i].cur_off_mutex = &cur_off_mutex;
arg_array[i].cur_off = &cur_off;
arg_array[i].pmem_pool = pmem_pool;
arg_array[i].cur_off_mutex = &cur_off_array[j].mutex;
arg_array[i].cur_off = &cur_off_array[j].off;
arg_array[i].pmem_pool = pmem_pools[j];
j++;
if(j>=pmem_pools_count)
j=0;
ret = ABT_thread_create(g_transfer_pool, bench_worker,
&arg_array[i], ABT_THREAD_ATTR_NULL, &tid_array[i]);
assert(ret == 0);
......@@ -266,7 +304,11 @@ static int run_benchmark(struct options *opts, PMEMobjpool *pmem_pool)
((double)g_opts.total_mem_size/(end_tm-start_tm))/(1024.0*1024.0));
free(tid_array);
ABT_mutex_free(&cur_off_mutex);
for(i=0; i<pmem_pools_count; i++)
{
ABT_mutex_free(&cur_off_array[i].mutex);
}
free(cur_off_array);
return(0);
}
......@@ -294,6 +336,10 @@ static void bench_worker(void *_arg)
assert(0);
}
/* TODO: the offset tracking stuff is superfluous if we are just
* setting values. Need to think about what we want to measure
* here.
*/
/* fill in values */
buffer = pmemobj_direct(oid);
for(val = 0; val < g_opts.xfer_size/sizeof(val); val++)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment