Commit 232599a4 authored by Philip Carns's avatar Philip Carns
Browse files

move memory allocation out of ults

parent ca89a6f0
......@@ -19,7 +19,7 @@
#define INFLIGHT_LIMIT 64
struct worker_ult_arg
struct worker_ult_common
{
int opt_io;
int opt_compute;
......@@ -33,6 +33,12 @@ struct worker_ult_arg
int completed;
};
struct worker_ult_arg
{
struct worker_ult_common *common;
void* buffer;
};
static void worker_ult(void *_arg);
static double wtime(void);
static int ABT_nosnoozer_xstream_create(int num_xstreams, ABT_pool *newpool, ABT_xstream *newxstreams);
......@@ -49,7 +55,8 @@ int main(int argc, char **argv)
ABT_pool compute_pool;
int io_es_count = -1;
int compute_es_count = -1;
struct worker_ult_arg arg;
struct worker_ult_arg *arg_array;
struct worker_ult_common common;
int *done;
if(argc != 9)
......@@ -58,18 +65,18 @@ int main(int argc, char **argv)
return(-1);
}
ret = sscanf(argv[1], "%d", &arg.opt_compute);
ret = sscanf(argv[1], "%d", &common.opt_compute);
assert(ret == 1);
ret = sscanf(argv[2], "%d", &arg.opt_io);
ret = sscanf(argv[2], "%d", &common.opt_io);
assert(ret == 1);
ret = sscanf(argv[3], "%d", &arg.opt_abt_io);
ret = sscanf(argv[3], "%d", &common.opt_abt_io);
assert(ret == 1);
ret = sscanf(argv[4], "%d", &arg.opt_abt_snoozer);
ret = sscanf(argv[4], "%d", &common.opt_abt_snoozer);
assert(ret == 1);
ret = sscanf(argv[5], "%d", &arg.opt_unit_size);
ret = sscanf(argv[5], "%d", &common.opt_unit_size);
assert(ret == 1);
assert(arg.opt_unit_size % 4096 == 0);
ret = sscanf(argv[6], "%d", &arg.opt_num_units);
assert(common.opt_unit_size % 4096 == 0);
ret = sscanf(argv[6], "%d", &common.opt_num_units);
assert(ret == 1);
ret = sscanf(argv[7], "%d", &compute_es_count);
assert(ret == 1);
......@@ -86,7 +93,7 @@ int main(int argc, char **argv)
ret = ABT_init(argc, argv);
assert(ret == 0);
if(arg.opt_abt_snoozer)
if(common.opt_abt_snoozer)
{
/* set primary ES to idle without polling */
ret = ABT_snoozer_xstream_self_set();
......@@ -102,9 +109,9 @@ int main(int argc, char **argv)
assert(ret == 0);
}
if(arg.opt_abt_io)
if(common.opt_abt_io)
{
if(arg.opt_abt_snoozer)
if(common.opt_abt_snoozer)
{
/* create dedicated pool drive IO */
ret = ABT_snoozer_xstream_create(io_es_count, &io_pool, io_xstreams);
......@@ -117,32 +124,42 @@ int main(int argc, char **argv)
}
/* initialize abt_io */
arg.aid = abt_io_init(io_pool);
assert(arg.aid != NULL);
common.aid = abt_io_init(io_pool);
assert(common.aid != NULL);
}
ABT_cond_create(&arg.cond);
ABT_mutex_create(&arg.mutex);
arg.completed = 0;
ABT_cond_create(&common.cond);
ABT_mutex_create(&common.mutex);
common.completed = 0;
arg_array = malloc(sizeof(*arg_array)*common.opt_num_units);
assert(arg_array);
for(i=0; i<common.opt_num_units; i++)
{
arg_array[i].common = &common;
ret = posix_memalign(&arg_array[i].buffer, 4096, common.opt_unit_size);
assert(ret == 0);
memset(arg_array[i].buffer, 0, common.opt_unit_size);
}
start = wtime();
for(i=0; i<arg.opt_num_units; i++)
for(i=0; i<common.opt_num_units; i++)
{
ABT_mutex_lock(arg.mutex);
while((i + 1 - arg.completed) >= INFLIGHT_LIMIT)
ABT_cond_wait(arg.cond, arg.mutex);
ABT_mutex_unlock(arg.mutex);
ABT_mutex_lock(common.mutex);
while((i + 1 - common.completed) >= INFLIGHT_LIMIT)
ABT_cond_wait(common.cond, common.mutex);
ABT_mutex_unlock(common.mutex);
/* create ULTs */
ret = ABT_thread_create(compute_pool, worker_ult, &arg, ABT_THREAD_ATTR_NULL, NULL);
ret = ABT_thread_create(compute_pool, worker_ult, &arg_array[i], ABT_THREAD_ATTR_NULL, NULL);
assert(ret == 0);
}
ABT_mutex_lock(arg.mutex);
while(arg.completed < arg.opt_num_units)
ABT_cond_wait(arg.cond, arg.mutex);
ABT_mutex_unlock(arg.mutex);
ABT_mutex_lock(common.mutex);
while(common.completed < common.opt_num_units)
ABT_cond_wait(common.cond, common.mutex);
ABT_mutex_unlock(common.mutex);
end = wtime();
......@@ -155,9 +172,9 @@ int main(int argc, char **argv)
ABT_xstream_free(&compute_xstreams[i]);
}
if(arg.opt_abt_io)
if(common.opt_abt_io)
{
abt_io_finalize(arg.aid);
abt_io_finalize(common.aid);
/* wait on IO ESs to complete */
for(i=0; i<io_es_count; i++)
......@@ -168,18 +185,22 @@ int main(int argc, char **argv)
}
ABT_cond_free(&arg.cond);
ABT_mutex_free(&arg.mutex);
ABT_cond_free(&common.cond);
ABT_mutex_free(&common.mutex);
for(i=0; i<common.opt_num_units; i++)
free(arg_array[i].buffer);
free(arg_array);
ABT_finalize();
free(io_xstreams);
free(compute_xstreams);
assert(arg.opt_num_units == arg.completed);
assert(common.opt_num_units == common.completed);
printf("#<opt_compute>\t<opt_io>\t<opt_abt_io>\t<opt_abt_snoozer>\t<opt_unit_size>\t<opt_num_units>\t<opt_compute_es_count>\t<opt_io_es_count>\t<time (s)>\t<bytes/s>\t<ops/s>\n");
printf("%d\t%d\t%d\t%d\t%d\t%d\t%d\t%d\t%f\t%f\t%f\n", arg.opt_compute, arg.opt_io, arg.opt_abt_io, arg.opt_abt_snoozer,
arg.opt_unit_size, arg.opt_num_units, compute_es_count, io_es_count, seconds, ((double)arg.opt_unit_size* (double)arg.opt_num_units)/seconds, (double)arg.opt_num_units/seconds);
printf("%d\t%d\t%d\t%d\t%d\t%d\t%d\t%d\t%f\t%f\t%f\n", common.opt_compute, common.opt_io, common.opt_abt_io, common.opt_abt_snoozer,
common.opt_unit_size, common.opt_num_units, compute_es_count, io_es_count, seconds, ((double)common.opt_unit_size* (double)common.opt_num_units)/seconds, (double)common.opt_num_units/seconds);
return(0);
}
......@@ -187,43 +208,39 @@ int main(int argc, char **argv)
static void worker_ult(void *_arg)
{
struct worker_ult_arg* arg = _arg;
void *buffer;
struct worker_ult_common *common = arg->common;
void *buffer = arg->buffer;
size_t ret;
char template[256];
int fd;
int done = 0;
//fprintf(stderr, "start\n");
ret = posix_memalign(&buffer, 4096, arg->opt_unit_size);
assert(ret == 0);
memset(buffer, 0, arg->opt_unit_size);
if(arg->opt_compute)
if(common->opt_compute)
{
ret = RAND_bytes(buffer, arg->opt_unit_size);
ret = RAND_bytes(buffer, common->opt_unit_size);
assert(ret == 1);
}
sprintf(template, "./data-XXXXXX");
if(arg->opt_io)
if(common->opt_io)
{
if(arg->opt_abt_io)
if(common->opt_abt_io)
{
fd = abt_io_mkostemp(arg->aid, template, O_DIRECT|O_SYNC);
fd = abt_io_mkostemp(common->aid, template, O_DIRECT|O_SYNC);
if(fd < 0)
{
fprintf(stderr, "abt_io_mkostemp: %d\n", fd);
}
assert(fd >= 0);
ret = abt_io_pwrite(arg->aid, fd, buffer, arg->opt_unit_size, 0);
assert(ret == arg->opt_unit_size);
ret = abt_io_pwrite(common->aid, fd, buffer, common->opt_unit_size, 0);
assert(ret == common->opt_unit_size);
ret = abt_io_close(arg->aid, fd);
ret = abt_io_close(common->aid, fd);
assert(ret == 0);
ret = abt_io_unlink(arg->aid, template);
ret = abt_io_unlink(common->aid, template);
assert(ret == 0);
}
else
......@@ -236,8 +253,8 @@ static void worker_ult(void *_arg)
}
assert(fd >= 0);
ret = pwrite(fd, buffer, arg->opt_unit_size, 0);
assert(ret == arg->opt_unit_size);
ret = pwrite(fd, buffer, common->opt_unit_size, 0);
assert(ret == common->opt_unit_size);
ret = close(fd);
assert(ret == 0);
......@@ -247,13 +264,10 @@ static void worker_ult(void *_arg)
}
}
free(buffer);
//fprintf(stderr, "end\n");
ABT_mutex_lock(arg->mutex);
arg->completed++;
ABT_cond_signal(arg->cond);
ABT_mutex_unlock(arg->mutex);
ABT_mutex_lock(common->mutex);
common->completed++;
ABT_cond_signal(common->cond);
ABT_mutex_unlock(common->mutex);
return;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment