Commit ef005e4a authored by Philip Carns's avatar Philip Carns
Browse files

quick hack libaio proof of concept

- doesn't work right yet
parent f11a9c21
......@@ -94,6 +94,13 @@ AC_DEFINE([HAVE_MKOSTEMP], [], [Define if mkostemp available])
NONCOMPLIANT_IO=1
AC_MSG_RESULT(no))
AIO_LIBS=
AC_CHECK_HEADER([libaio.h], [
AC_SUBST([AIO_LIBS], ["-laio"])
AC_DEFINE([HAVE_LIBAIO], 1, [Define if you have libaio])
], [ ])
LIBS="$AIO_LIBS $LIBS"
AC_CONFIG_FILES([Makefile maint/abt-io.pc])
AC_OUTPUT
......
......@@ -196,7 +196,8 @@ static void abt_bench(int buffer_per_thread, unsigned int concurrency, size_t si
* in the io pool as the desired level of issue concurrency, but this
* doesn't need to be the case in general.
*/
aid = abt_io_init(concurrency);
/* for libaio we only need one thread (the waiter) */
aid = abt_io_init(1);
assert(aid != NULL);
ABT_mutex_create(&mutex);
......
......@@ -19,6 +19,7 @@
#include <fcntl.h>
#include <abt.h>
#include <libaio.h>
#include "abt-io.h"
struct abt_io_instance
......@@ -26,6 +27,9 @@ struct abt_io_instance
ABT_pool progress_pool;
ABT_xstream *progress_xstreams;
int num_xstreams;
io_context_t ioc;
int ioc_stop;
ABT_thread progress_tid;
};
struct abt_io_op
......@@ -35,6 +39,8 @@ struct abt_io_op
void (*free_fn)(void*);
};
static void libaio_waiter(void *foo);
abt_io_instance_id abt_io_init(int backing_thread_count)
{
struct abt_io_instance *aid;
......@@ -50,6 +56,9 @@ abt_io_instance_id abt_io_init(int backing_thread_count)
aid = malloc(sizeof(*aid));
if (aid == NULL) return ABT_IO_INSTANCE_NULL;
ret = io_setup(1024, &aid->ioc);
assert(ret == 0);
if (backing_thread_count == 0) {
aid->num_xstreams = 0;
ret = ABT_xstream_self(&self_xstream);
......@@ -106,6 +115,10 @@ abt_io_instance_id abt_io_init(int backing_thread_count)
aid->progress_pool = pool;
aid->progress_xstreams = progress_xstreams;
ret = ABT_thread_create(aid->progress_pool, libaio_waiter, aid,
ABT_THREAD_ATTR_NULL, &aid->progress_tid);
assert(ret == 0);
return aid;
}
......@@ -127,6 +140,9 @@ void abt_io_finalize(abt_io_instance_id aid)
{
int i;
aid->ioc_stop = 1;
ABT_thread_join(aid->progress_tid);
if (aid->num_xstreams) {
for (i = 0; i < aid->num_xstreams; i++) {
ABT_xstream_join(aid->progress_xstreams[i]);
......@@ -208,6 +224,10 @@ err:
int abt_io_open(abt_io_instance_id aid, const char* pathname, int flags, mode_t mode)
{
int ret;
/* the libaio stuff doesn't really work on non-directio path */
assert(flags & O_DIRECT);
issue_open(aid->progress_pool, NULL, pathname, flags, mode, &ret);
return ret;
}
......@@ -411,6 +431,7 @@ struct abt_io_pwrite_state
const void *buf;
size_t count;
off_t offset;
struct iocb cb;
ABT_eventual eventual;
};
......@@ -426,12 +447,13 @@ static void abt_io_pwrite_fn(void *foo)
return;
}
static int issue_pwrite(ABT_pool pool, abt_io_op_t *op, int fd, const void *buf,
static int issue_pwrite(struct abt_io_instance *aid, ABT_pool pool, abt_io_op_t *op, int fd, const void *buf,
size_t count, off_t offset, ssize_t *ret)
{
struct abt_io_pwrite_state state;
struct abt_io_pwrite_state *pstate = NULL;
int rc;
struct iocb *iocb;
if (op == NULL) pstate = &state;
else
......@@ -447,13 +469,22 @@ static int issue_pwrite(ABT_pool pool, abt_io_op_t *op, int fd, const void *buf,
pstate->count = count;
pstate->offset = offset;
pstate->eventual = NULL;
io_prep_pwrite(&pstate->cb, fd, (void*)buf, count, offset);
pstate->cb.data = &pstate;
rc = ABT_eventual_create(0, &pstate->eventual);
if (rc != ABT_SUCCESS) { *ret = -ENOMEM; goto err; }
if (op != NULL) op->e = pstate->eventual;
#if 0
rc = ABT_task_create(pool, abt_io_pwrite_fn, pstate, NULL);
if(rc != ABT_SUCCESS) { *ret = -EINVAL; goto err; }
#else
iocb = &pstate->cb;
rc = io_submit(aid->ioc, 1, &iocb);
assert(rc == 1);
#endif
if (op == NULL) {
rc = ABT_eventual_wait(pstate->eventual, NULL);
......@@ -477,7 +508,7 @@ ssize_t abt_io_pwrite(abt_io_instance_id aid, int fd, const void *buf,
size_t count, off_t offset)
{
ssize_t ret = -1;
issue_pwrite(aid->progress_pool, NULL, fd, buf, count, offset, &ret);
issue_pwrite(aid, aid->progress_pool, NULL, fd, buf, count, offset, &ret);
return ret;
}
......@@ -490,7 +521,7 @@ abt_io_op_t* abt_io_pwrite_nb(abt_io_instance_id aid, int fd, const void *buf,
op = malloc(sizeof(*op));
if (op == NULL) return NULL;
iret = issue_pwrite(aid->progress_pool, op, fd, buf, count, offset, ret);
iret = issue_pwrite(aid, aid->progress_pool, op, fd, buf, count, offset, ret);
if (iret != 0) { free(op); return NULL; }
else return op;
}
......@@ -1032,3 +1063,30 @@ size_t abt_io_get_pending_op_count(abt_io_instance_id aid)
else
return -1;
}
static void libaio_waiter(void *foo)
{
struct abt_io_instance *aid = foo;
int ret;
int count;
int i;
struct io_event events[8];
struct timespec to;
struct abt_io_pwrite_state *pstate = NULL;
while(!aid->ioc_stop)
{
to.tv_sec = 0;
to.tv_nsec = 1000*1000*10;
count = io_getevents(aid->ioc, 1, 8, events, &to);
for(i=0; i<count; i++)
{
assert(events[i].res >= 0);
pstate = events[i].data;
ABT_eventual_set(pstate->eventual, NULL, 0);
}
}
return;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment