Commit 4bc8ccdc authored by Philip Carns's avatar Philip Carns
Browse files

prototype drain coalescing in benchmark

parent 5f18bc66
......@@ -20,6 +20,16 @@
#include <abt.h>
#include <libpmemobj.h>
struct drainer
{
int started;
int waiting;
int done;
ABT_eventual ev;
};
struct drainer *g_drainer;
ABT_mutex g_drainer_mutex = ABT_MUTEX_NULL;
struct options
{
......@@ -28,6 +38,7 @@ struct options
int concurrency;
char* pmdk_pool;
int xstreams;
int highwater; /* highwater mark if drain coalescing */
};
struct bench_worker_arg
......@@ -62,6 +73,7 @@ int main(int argc, char **argv)
PMEMobjpool **pmem_pools;
int pmem_pools_count = 0;
char *tmp_pool_name;
char max_xstreams[256];
ret = parse_args(argc, argv, &g_opts);
if(ret < 0)
......@@ -70,6 +82,21 @@ int main(int argc, char **argv)
exit(EXIT_FAILURE);
}
/* set up argobots tunables that would normally be handled by margo */
if(!getenv("ABT_THREAD_STACKSIZE"))
putenv("ABT_THREAD_STACKSIZE=2097152");
if(!getenv("ABT_MEM_MAX_NUM_STACKS"))
putenv("ABT_MEM_MAX_NUM_STACKS=8");
/* If we over-subscribe the cores on a machine we have to bump a limit
* in argobots too, see:
* https://github.com/pmodels/argobots/issues/34
*/
if(g_opts.xstreams > 1)
{
sprintf(max_xstreams, "ABT_MAX_NUM_XSTREAMS=%d", g_opts.xstreams+1);
putenv(max_xstreams);
}
ret = ABT_init(argc, argv);
assert(ret == 0);
......@@ -82,6 +109,17 @@ int main(int argc, char **argv)
ret = ABT_xstream_set_main_sched(self_xstream, self_sched);
assert(ret == 0);
if(g_opts.highwater)
{
/* initialize tracking data structure for drain coalescing */
g_drainer = calloc(1, sizeof(*g_drainer));
assert(g_drainer);
ret = ABT_eventual_create(0, &g_drainer->ev);
assert(ret == 0);
ret = ABT_mutex_create(&g_drainer_mutex);
assert(ret == 0);
}
/* count number of commas in pmdk_pool argument to see how many pools we
* have
*/
......@@ -159,6 +197,11 @@ int main(int argc, char **argv)
ABT_xstream_free(&bw_worker_xstreams[i]);
}
if(g_opts.highwater)
{
ABT_mutex_free(&g_drainer_mutex);
}
for(i=0; i<pmem_pools_count; i++)
pmemobj_close(pmem_pools[i]);
......@@ -179,7 +222,7 @@ static int parse_args(int argc, char **argv, struct options *opts)
opts->xfer_size = DEF_BW_XFER_SIZE;
opts->xstreams = -1;
while((opt = getopt(argc, argv, "x:c:p:m:T:")) != -1)
while((opt = getopt(argc, argv, "x:c:p:m:T:h:")) != -1)
{
switch(opt)
{
......@@ -211,6 +254,11 @@ static int parse_args(int argc, char **argv, struct options *opts)
if(ret != 1)
return(-1);
break;
case 'h':
ret = sscanf(optarg, "%d", &opts->highwater);
if(ret != 1)
return(-1);
break;
default:
return(-1);
}
......@@ -236,6 +284,7 @@ static void usage(void)
"\t\t pools will be round-robin distributed across ULTs.\n"
"\t[-c concurrency] - number of concurrent operations to issue with ULTs\n"
"\t[-T <os threads] - number of dedicated operating system threads to run ULTs on\n"
"\t[-h <highwater>] - enable drain coalescing with this highwater mark\n"
"\t\texample: ./pmdk-bw -x 4096 -p /dev/shm/test.dat\n");
return;
......@@ -320,6 +369,7 @@ static void bench_worker(void *_arg)
uint64_t *buffer;
uint64_t val;
int ret;
struct drainer *my_drainer;
ABT_mutex_spinlock(*arg->cur_off_mutex);
while(*arg->cur_off < g_opts.total_mem_size)
......@@ -327,6 +377,22 @@ static void bench_worker(void *_arg)
(*arg->cur_off) += g_opts.xfer_size;
ABT_mutex_unlock(*arg->cur_off_mutex);
if(g_opts.highwater)
{
ABT_mutex_lock(g_drainer_mutex);
/* join current drainer */
g_drainer->started++;
my_drainer = g_drainer;
if(g_drainer->started == g_opts.highwater)
{
g_drainer = calloc(1, sizeof(*g_drainer));
assert(g_drainer);
ret = ABT_eventual_create(0, &g_drainer->ev);
assert(ret == 0);
}
ABT_mutex_unlock(g_drainer_mutex);
}
/* create an object */
/* NOTE: for now we don't try to keep up with oid */
ret = pmemobj_alloc(arg->pmem_pool, &oid, g_opts.xfer_size, 0, NULL, NULL);
......@@ -338,15 +404,63 @@ static void bench_worker(void *_arg)
/* TODO: the offset tracking stuff is superfluous if we are just
* setting values. Need to think about what we want to measure
* here.
* here. A memcpy from a buffer local to this ULT might be a better
* measure?
*/
/* fill in values */
buffer = pmemobj_direct(oid);
for(val = 0; val < g_opts.xfer_size/sizeof(val); val++)
buffer[val] = val;
/* persist */
pmemobj_persist(arg->pmem_pool, buffer, g_opts.xfer_size);
if(g_opts.highwater)
{
/* first half of persist; the drain will be done by coalescer */
pmemobj_flush(arg->pmem_pool, buffer, g_opts.xfer_size);
/* check if I am the drainer or not */
ABT_mutex_lock(g_drainer_mutex);
my_drainer->waiting++;
if(my_drainer->waiting == my_drainer->started)
{
/* I'm the drainer */
if(my_drainer == g_drainer)
{
/* put a new drainer struct in place for the next
* batch while we hold lock. People using old drainer
* should have a ref to it in a local var.
*/
g_drainer = calloc(1, sizeof(*g_drainer));
assert(g_drainer);
ret = ABT_eventual_create(0, &g_drainer->ev);
assert(ret == 0);
}
ABT_mutex_unlock(g_drainer_mutex);
pmemobj_drain(arg->pmem_pool);
ABT_eventual_set(my_drainer->ev, NULL, 0);
}
else
{
/* I'm waiting for someone else to drain */
ABT_mutex_unlock(g_drainer_mutex);
ABT_eventual_wait(my_drainer->ev, NULL);
}
/* everyone: check to see if you are the last one done; if so you
* must clean up. Mutex is held here.
*/
ABT_mutex_lock(g_drainer_mutex);
my_drainer->done++;
if(my_drainer->done == my_drainer->waiting)
{
ABT_eventual_free(&my_drainer->ev);
free(my_drainer);
}
ABT_mutex_unlock(g_drainer_mutex);
}
else
{
pmemobj_persist(arg->pmem_pool, buffer, g_opts.xfer_size);
}
ABT_mutex_spinlock(*arg->cur_off_mutex);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment