Commit 73174fa5 authored by Jonathan Jenkins's avatar Jonathan Jenkins
Browse files

non-blocking abt-io variants

parent b492ae9e
......@@ -37,7 +37,9 @@ struct write_abt_arg
};
static void write_abt_bench(void *_arg);
static void abt_bench(int argc, char **argv, int buffer_per_thread, unsigned int concurrency, size_t size,
static void abt_bench(int buffer_per_thread, unsigned int concurrency, size_t size,
double duration, const char* filename, unsigned int* ops_done, double *seconds);
static void abt_bench_nb(int buffer_per_thread, unsigned int concurrency, size_t size,
double duration, const char* filename, unsigned int* ops_done, double *seconds);
/* pthread data types and fn prototypes */
......@@ -62,13 +64,20 @@ static double wtime(void);
int main(int argc, char **argv)
{
int ret;
unsigned abt_ops_done, pthread_ops_done;
double abt_seconds, pthread_seconds;
unsigned abt_ops_done, abt_nb_ops_done, pthread_ops_done;
double abt_seconds, abt_nb_seconds, pthread_seconds;
size_t size;
unsigned int concurrency;
double duration;
int buffer_per_thread = 0;
ABT_init(argc, argv);
/* set primary ES to idle without polling */
ret = ABT_snoozer_xstream_self_set();
assert(ret == 0);
if(argc != 6)
{
fprintf(stderr, "Usage: concurrent-write-bench <write_size> <concurrency> <duration> <file> <buffer_per_thread (0|1)>\n");
......@@ -102,11 +111,18 @@ int main(int argc, char **argv)
fprintf(stderr, "Usage: concurrent-write-bench <write_size> <concurrency> <duration> <file> <buffer_per_thread (0|1)>\n");
return(-1);
}
buffer_per_thread = !!buffer_per_thread;
/* run benchmarks */
printf("# Running ABT benchmark...\n");
abt_bench(argc, argv, buffer_per_thread, concurrency, size, duration, argv[4], &abt_ops_done, &abt_seconds);
printf("# ...abt benchmark done.\n");
abt_bench(buffer_per_thread, concurrency, size, duration, argv[4], &abt_ops_done, &abt_seconds);
printf("# ...ABT benchmark done.\n");
printf("# Running ABT (nonblocking) benchmark...\n");
abt_bench_nb(buffer_per_thread, concurrency, size, duration, argv[4], &abt_nb_ops_done, &abt_nb_seconds);
printf("# ...ABT (nonblocking) benchmark done.\n");
ABT_finalize();
sleep(1);
......@@ -120,6 +136,9 @@ int main(int argc, char **argv)
printf("abt\t%u\t%zu\t%u\t%f\t%f\n",
concurrency, size, abt_ops_done, abt_seconds,
((((double)size*(double)abt_ops_done))/abt_seconds)/(1024.0*1024.0));
printf("abt_nb\t%u\t%zu\t%u\t%f\t%f\n",
concurrency, size, abt_nb_ops_done, abt_nb_seconds,
((((double)size*(double)abt_nb_ops_done))/abt_nb_seconds)/(1024.0*1024.0));
printf("pthread\t%u\t%zu\t%u\t%f\t%f\n",
concurrency, size, pthread_ops_done, pthread_seconds,
((((double)size*(double)pthread_ops_done))/pthread_seconds)/(1024.0*1024.0));
......@@ -127,7 +146,7 @@ int main(int argc, char **argv)
return(0);
}
static void abt_bench(int argc, char **argv, int buffer_per_thread, unsigned int concurrency, size_t size, double duration,
static void abt_bench(int buffer_per_thread, unsigned int concurrency, size_t size, double duration,
const char *filename, unsigned int* ops_done, double *seconds)
{
ABT_thread *tid_array = NULL;
......@@ -136,7 +155,7 @@ static void abt_bench(int argc, char **argv, int buffer_per_thread, unsigned int
off_t next_offset = 0;
int ret;
double end;
int i;
unsigned int i;
ABT_xstream *progress_xstreams;
ABT_pool progress_pool;
ABT_xstream xstream;
......@@ -156,14 +175,6 @@ static void abt_bench(int argc, char **argv, int buffer_per_thread, unsigned int
progress_xstreams = malloc(concurrency * sizeof(*progress_xstreams));
assert(progress_xstreams);
/* set up argobots */
ret = ABT_init(argc, argv);
assert(ret == 0);
/* set primary ES to idle without polling */
ret = ABT_snoozer_xstream_self_set();
assert(ret == 0);
/* create a dedicated ES drive Mercury progress */
/* NOTE: for now we are going to use the same number of execution streams
* in the io pool as the desired level of issue concurrency, but this
......@@ -224,14 +235,12 @@ static void abt_bench(int argc, char **argv, int buffer_per_thread, unsigned int
abt_io_finalize(aid);
/* wait on the ESs to complete */
for(i=0; i<4; i++)
for(i=0; i<concurrency; i++)
{
ABT_xstream_join(progress_xstreams[i]);
ABT_xstream_free(&progress_xstreams[i]);
}
ABT_finalize();
ABT_mutex_free(&mutex);
free(tid_array);
free(progress_xstreams);
......@@ -245,6 +254,116 @@ static void abt_bench(int argc, char **argv, int buffer_per_thread, unsigned int
return;
}
static void abt_bench_nb(int buffer_per_thread, unsigned int concurrency, size_t size, double duration,
const char *filename, unsigned int* ops_done, double *seconds)
{
int fd;
off_t next_offset = 0;
int ret;
double end;
unsigned int i;
ABT_xstream *progress_xstreams;
ABT_pool progress_pool;
abt_io_instance_id aid;
void **buffers = NULL;
unsigned int num_buffers = 0;
double start_time;
abt_io_op_t **ops;
ssize_t *wrets;
fd = open(filename, O_WRONLY|O_CREAT|O_DIRECT|O_SYNC, S_IWUSR|S_IRUSR);
if(!fd)
{
perror("open");
assert(0);
}
progress_xstreams = malloc(concurrency * sizeof(*progress_xstreams));
assert(progress_xstreams);
/* create a dedicated ES drive Mercury progress */
/* NOTE: for now we are going to use the same number of execution streams
* in the io pool as the desired level of issue concurrency, but this
* doesn't need to be the case in general.
*/
ret = ABT_snoozer_xstream_create(concurrency, &progress_pool, progress_xstreams);
assert(ret == 0);
/* initialize abt_io */
aid = abt_io_init(progress_pool);
assert(aid != NULL);
/* set up buffers */
num_buffers = buffer_per_thread ? concurrency : 1;
buffers = malloc(num_buffers*sizeof(*buffers));
assert(buffers);
for (i = 0; i < num_buffers; i++) {
ret = posix_memalign(&buffers[i], 4096, size);
assert(ret == 0);
memset(buffers[i], 0, size);
}
/* set up async contexts */
ops = calloc(concurrency, sizeof(*ops));
assert(ops);
wrets = malloc(concurrency * sizeof(*wrets));
assert(wrets);
/* start the benchmark */
start_time = wtime();
/* in the absence of a waitany, just going through one-by-one */
for(i = 0; ; i = (i+1) % concurrency)
{
if (ops[i] != NULL)
{
ret = abt_io_op_wait(ops[i]);
assert(ret == 0 && wrets[i] > 0 && (size_t)wrets[i] == size);
abt_io_op_free(ops[i]);
}
if (wtime() - start_time < duration)
{
ops[i] = abt_io_pwrite_nb(aid, fd, buffers[i*buffer_per_thread],
size, next_offset, wrets+i);
assert(ops[i]);
next_offset += size;
}
else if (ops[i] == NULL)
break;
else
ops[i] = NULL;
}
end = wtime();
*seconds = end-start_time;
*ops_done = next_offset/size;
abt_io_finalize(aid);
/* wait on the ESs to complete (should already be by now, joining just in
* case...) */
for(i = 0; i < concurrency; i++)
{
ABT_xstream_join(progress_xstreams[i]);
ABT_xstream_free(&progress_xstreams[i]);
}
free(progress_xstreams);
for (i = 0; i < num_buffers; i++)
free(buffers[i]);
free(buffers);
free(wrets);
close(fd);
unlink(filename);
return;
}
static void pthread_bench(int buffer_per_thread, unsigned int concurrency, size_t size, double duration,
const char *filename, unsigned int* ops_done, double *seconds)
{
......@@ -254,7 +373,7 @@ static void pthread_bench(int buffer_per_thread, unsigned int concurrency, size_
off_t next_offset = 0;
int ret;
double end;
int i;
unsigned int i;
arg.fd = open(filename, O_WRONLY|O_CREAT|O_DIRECT|O_SYNC, S_IWUSR|S_IRUSR);
if(!arg.fd)
......
......@@ -20,6 +20,9 @@ typedef struct abt_io_instance* abt_io_instance_id;
#define ABT_IO_INSTANCE_NULL ((abt_io_instance_id)NULL)
struct abt_io_op;
typedef struct abt_io_op abt_io_op_t;
/**
* Initializes abt_io library.
* @param [in] progress_pool Argobots pool to drive I/O
......@@ -36,29 +39,92 @@ void abt_io_finalize(abt_io_instance_id aid);
/**
* wrapper for open()
*/
int abt_io_open(abt_io_instance_id aid, const char* pathname, int flags, mode_t mode);
int abt_io_open(
abt_io_instance_id aid,
const char* pathname,
int flags,
mode_t mode);
/**
* non-blocking wrapper for open()
*/
abt_io_op_t* abt_io_open_nb(
abt_io_instance_id aid,
const char* pathname,
int flags,
mode_t mode,
int *ret);
/**
* wrapper for pwrite()
*/
ssize_t abt_io_pwrite(abt_io_instance_id aid, int fd, const void *buf, size_t count,
off_t offset);
ssize_t abt_io_pwrite(
abt_io_instance_id aid,
int fd,
const void *buf,
size_t count,
off_t offset);
/**
* non-blocking wrapper for pwrite()
*/
abt_io_op_t* abt_io_pwrite_nb(
abt_io_instance_id aid,
int fd,
const void *buf,
size_t count,
off_t offset,
ssize_t *ret);
/**
* wrapper for mkostemp()
*/
int abt_io_mkostemp(abt_io_instance_id aid, char *template, int flags);
/**
* non-blocking wrapper for mkostemp()
*/
abt_io_op_t* abt_io_mkostemp_nb(
abt_io_instance_id aid,
char *template,
int flags,
int *ret);
/**
* wrapper for unlink()
*/
int abt_io_unlink(abt_io_instance_id aid, const char *pathname);
/**
* non-blocking wrapper for unlink()
*/
abt_io_op_t* abt_io_unlink_nb(
abt_io_instance_id aid,
const char *pathname,
int *ret);
/**
* wrapper for close()
*/
int abt_io_close(abt_io_instance_id aid, int fd);
/**
* non-blocking wrapper for close()
*/
abt_io_op_t* abt_io_close_nb(abt_io_instance_id aid, int fd, int *ret);
/**
* wait on an abt-io operation
* return: 0 if success, non-zero on failure
*/
int abt_io_op_wait(abt_io_op_t* op);
/**
* release resources comprising the op. DO NOT call until the op has been
* successfully waited on
*/
void abt_io_op_free(abt_io_op_t* op);
#ifdef __cplusplus
}
#endif
......
......@@ -27,13 +27,19 @@ struct abt_io_instance
ABT_pool progress_pool;
};
struct abt_io_op
{
ABT_eventual e;
void *state;
void (*free_fn)(void*);
};
abt_io_instance_id abt_io_init(ABT_pool progress_pool)
{
struct abt_io_instance *aid;
aid = malloc(sizeof(*aid));
if(!aid)
return(ABT_IO_INSTANCE_NULL);
if(!aid) return(ABT_IO_INSTANCE_NULL);
memset(aid, 0, sizeof(*aid));
aid->progress_pool = progress_pool;
......@@ -49,7 +55,7 @@ void abt_io_finalize(abt_io_instance_id aid)
struct abt_io_open_state
{
int ret;
int *ret;
const char *pathname;
int flags;
mode_t mode;
......@@ -60,39 +66,82 @@ static void abt_io_open_fn(void *foo)
{
struct abt_io_open_state *state = foo;
state->ret = open(state->pathname, state->flags, state->mode);
if(state->ret < 0)
state->ret = -errno;
*state->ret = open(state->pathname, state->flags, state->mode);
if(*state->ret < 0)
*state->ret = -errno;
ABT_eventual_set(state->eventual, NULL, 0);
return;
}
int abt_io_open(abt_io_instance_id aid, const char* pathname, int flags, mode_t mode)
static int issue_open(ABT_pool pool, abt_io_op_t *op, const char* pathname, int flags, mode_t mode, int *ret)
{
struct abt_io_open_state state;
int ret;
state.ret = -ENOSYS;
state.pathname = pathname;
state.flags = flags;
state.mode = mode;
ABT_eventual_create(0, &state.eventual);
struct abt_io_open_state *pstate = NULL;
int rc;
ret = ABT_task_create(aid->progress_pool, abt_io_open_fn, &state, NULL);
if(ret != 0)
if (op == NULL) pstate = &state;
else
{
return(-EINVAL);
pstate = malloc(sizeof(*pstate));
if (pstate == NULL) { *ret = -ENOMEM; goto err; }
}
ABT_eventual_wait(state.eventual, NULL);
*ret = -ENOSYS;
pstate->ret = ret;
pstate->pathname = pathname;
pstate->flags = flags;
pstate->mode = mode;
pstate->eventual = NULL;
rc = ABT_eventual_create(0, &pstate->eventual);
if (rc != ABT_SUCCESS) { *ret = -ENOMEM; goto err; }
return(state.ret);
if (op != NULL) op->e = pstate->eventual;
rc = ABT_task_create(pool, abt_io_open_fn, pstate, NULL);
if(rc != ABT_SUCCESS) { *ret = -EINVAL; goto err; }
if (op == NULL) {
rc = ABT_eventual_wait(pstate->eventual, NULL);
// what error should we use here?
if (rc != ABT_SUCCESS) { *ret = -EINVAL; goto err; }
}
else {
op->e = pstate->eventual;
op->state = pstate;
op->free_fn = free;
}
return 0;
err:
if (pstate->eventual != NULL) ABT_eventual_free(&pstate->eventual);
if (pstate != NULL && op != NULL) free(pstate);
return -1;
}
int abt_io_open(abt_io_instance_id aid, const char* pathname, int flags, mode_t mode)
{
int ret;
issue_open(aid->progress_pool, NULL, pathname, flags, mode, &ret);
return ret;
}
abt_io_op_t* abt_io_open_nb(abt_io_instance_id aid, const char* pathname, int flags, mode_t mode, int *ret)
{
abt_io_op_t *op;
int iret;
op = malloc(sizeof(*op));
if (op == NULL) return NULL;
iret = issue_open(aid->progress_pool, op, pathname, flags, mode, ret);
if (iret != 0) { free(op); return NULL; }
else return op;
}
struct abt_io_pwrite_state
{
ssize_t ret;
ssize_t *ret;
int fd;
const void *buf;
size_t count;
......@@ -104,42 +153,86 @@ static void abt_io_pwrite_fn(void *foo)
{
struct abt_io_pwrite_state *state = foo;
state->ret = pwrite(state->fd, state->buf, state->count, state->offset);
if(state->ret < 0)
state->ret = -errno;
*state->ret = pwrite(state->fd, state->buf, state->count, state->offset);
if(*state->ret < 0)
*state->ret = -errno;
ABT_eventual_set(state->eventual, NULL, 0);
return;
}
ssize_t abt_io_pwrite(abt_io_instance_id aid, int fd, const void *buf,
size_t count, off_t offset)
static int issue_pwrite(ABT_pool pool, abt_io_op_t *op, int fd, const void *buf,
size_t count, off_t offset, ssize_t *ret)
{
struct abt_io_pwrite_state state;
int ret;
struct abt_io_pwrite_state *pstate = NULL;
int rc;
state.ret = -ENOSYS;
state.fd = fd;
state.buf = buf;
state.count = count;
state.offset = offset;
ABT_eventual_create(0, &state.eventual);
ret = ABT_task_create(aid->progress_pool, abt_io_pwrite_fn, &state, NULL);
if(ret != 0)
if (op == NULL) pstate = &state;
else
{
return(-EINVAL);
pstate = malloc(sizeof(*pstate));
if (pstate == NULL) { *ret = -ENOMEM; goto err; }
}
*ret = -ENOSYS;
pstate->ret = ret;
pstate->fd = fd;
pstate->buf = buf;
pstate->count = count;
pstate->offset = offset;
pstate->eventual = NULL;
rc = ABT_eventual_create(0, &pstate->eventual);
if (rc != ABT_SUCCESS) { *ret = -ENOMEM; goto err; }
if (op != NULL) op->e = pstate->eventual;
rc = ABT_task_create(pool, abt_io_pwrite_fn, pstate, NULL);
if(rc != ABT_SUCCESS) { *ret = -EINVAL; goto err; }
if (op == NULL) {
rc = ABT_eventual_wait(pstate->eventual, NULL);
// what error should we use here?
if (rc != ABT_SUCCESS) { *ret = -EINVAL; goto err; }
}
else {
op->e = pstate->eventual;
op->state = pstate;
op->free_fn = free;
}
return 0;
err:
if (pstate->eventual != NULL) ABT_eventual_free(&pstate->eventual);
if (pstate != NULL && op != NULL) free(pstate);
return -1;
}
ssize_t abt_io_pwrite(abt_io_instance_id aid, int fd, const void *buf,
size_t count, off_t offset)
{
ssize_t ret = -1;
issue_pwrite(aid->progress_pool, NULL, fd, buf, count, offset, &ret);
return ret;
}
abt_io_op_t* abt_io_pwrite_nb(abt_io_instance_id aid, int fd, const void *buf,
size_t count, off_t offset, ssize_t *ret)
{
abt_io_op_t *op;
int iret;
ABT_eventual_wait(state.eventual, NULL);
op = malloc(sizeof(*op));
if (op == NULL) return NULL;
return(state.ret);
iret = issue_pwrite(aid->progress_pool, op, fd, buf, count, offset, ret);
if (iret != 0) { free(op); return NULL; }
else return op;
}
struct abt_io_mkostemp_state
{
int ret;
int *ret;
char *template;
int flags;
ABT_eventual eventual;
......@@ -149,38 +242,81 @@ static void abt_io_mkostemp_fn(void *foo)
{
struct abt_io_mkostemp_state *state = foo;
state->ret = mkostemp(state->template, state->flags);
if(state->ret < 0)
state->ret = -errno;
*state->ret = mkostemp(state->template, state->flags);
if(*state->ret < 0)
*state->ret = -errno;
ABT_eventual_set(state->eventual, NULL, 0);
return;
}
int abt_io_mkostemp(abt_io_instance_id aid, char *template, int flags)
static int issue_mkostemp(ABT_pool pool, abt_io_op_t *op, char* template, int flags, int *ret)
{
struct abt_io_mkostemp_state state;
int ret;
struct abt_io_mkostemp_state *pstate = NULL;
int rc;
state.ret = -ENOSYS;
state.template = template;
state.flags = flags;
ABT_eventual_create(0, &state.eventual);
ret = ABT_task_create(aid->progress_pool, abt_io_mkostemp_fn, &state, NULL);
if(ret != 0)
if (op == NULL) pstate = &state;
else
{
return(-EINVAL);
pstate = malloc(sizeof(*pstate));
if (pstate == NULL) { *ret = -ENOMEM; goto err; }
}
ABT_eventual_wait(state.eventual, NULL);
*ret = -ENOSYS;
pstate->ret = ret;
pstate->template = template;