Commit 9ce272ff authored by Philip Carns's avatar Philip Carns
Browse files

put getevents in same ES as submission

parent a701728f
......@@ -200,12 +200,7 @@ static void abt_bench(int buffer_per_thread, unsigned int concurrency, size_t si
assert(ret == 0);
/* initialize abt_io */
/* NOTE: for now we are going to use the same number of execution streams
* in the io pool as the desired level of issue concurrency, but this
* doesn't need to be the case in general.
*/
/* for libaio we only need one thread (the waiter) */
aid = abt_io_init(1);
aid = abt_io_init(0);
assert(aid != NULL);
ABT_mutex_create(&mutex);
......@@ -314,11 +309,7 @@ static void abt_bench_nb(int buffer_per_thread, unsigned int concurrency, size_t
assert(ret == 0);
/* initialize abt_io */
/* NOTE: for now we are going to use the same number of execution streams
* in the io pool as the desired level of issue concurrency, but this
* doesn't need to be the case in general.
*/
aid = abt_io_init(concurrency);
aid = abt_io_init(0);
assert(aid != NULL);
/* set up buffers */
......
......@@ -59,58 +59,13 @@ abt_io_instance_id abt_io_init(int backing_thread_count)
ret = io_setup(1024, &aid->ioc);
assert(ret == 0);
if (backing_thread_count == 0) {
aid->num_xstreams = 0;
ret = ABT_xstream_self(&self_xstream);
if (ret != ABT_SUCCESS) { free(aid); return ABT_IO_INSTANCE_NULL; }
ret = ABT_xstream_get_main_pools(self_xstream, 1, &pool);
if (ret != ABT_SUCCESS) { free(aid); return ABT_IO_INSTANCE_NULL; }
}
else {
aid->num_xstreams = backing_thread_count;
progress_xstreams = malloc(
backing_thread_count * sizeof(*progress_xstreams));
if (progress_xstreams == NULL) {
free(aid);
return ABT_IO_INSTANCE_NULL;
}
progress_scheds = malloc(
backing_thread_count * sizeof(*progress_scheds));
if (progress_scheds == NULL) {
free(progress_xstreams);
free(aid);
return ABT_IO_INSTANCE_NULL;
}
ret = ABT_pool_create_basic(ABT_POOL_FIFO_WAIT, ABT_POOL_ACCESS_MPMC,
ABT_TRUE, &pool);
if(ret != ABT_SUCCESS)
{
free(progress_xstreams);
free(progress_scheds);
free(aid);
return ABT_IO_INSTANCE_NULL;
}
for(i=0; i<backing_thread_count; i++)
{
ret = ABT_sched_create_basic(ABT_SCHED_BASIC_WAIT, 1, &pool,
ABT_SCHED_CONFIG_NULL, &progress_scheds[i]);
if (ret != ABT_SUCCESS) {
free(progress_xstreams);
free(progress_scheds);
free(aid);
return ABT_IO_INSTANCE_NULL;
}
ret = ABT_xstream_create(progress_scheds[i], &progress_xstreams[i]);
if (ret != ABT_SUCCESS) {
free(progress_xstreams);
free(progress_scheds);
free(aid);
return ABT_IO_INSTANCE_NULL;
}
}
}
/* this hacked variation doesn't use extra OS threads at all */
assert(backing_thread_count == 0);
aid->num_xstreams = 0;
ret = ABT_xstream_self(&self_xstream);
if (ret != ABT_SUCCESS) { free(aid); return ABT_IO_INSTANCE_NULL; }
ret = ABT_xstream_get_main_pools(self_xstream, 1, &pool);
if (ret != ABT_SUCCESS) { free(aid); return ABT_IO_INSTANCE_NULL; }
aid->progress_pool = pool;
aid->progress_xstreams = progress_xstreams;
......@@ -487,6 +442,7 @@ static int issue_pwrite(struct abt_io_instance *aid, ABT_pool pool, abt_io_op_t
#endif
if (op == NULL) {
ABT_thread_yield();
rc = ABT_eventual_wait(pstate->eventual, NULL);
// what error should we use here?
if (rc != ABT_SUCCESS) { *ret = -EINVAL; goto err; }
......@@ -1070,15 +1026,12 @@ static void libaio_waiter(void *foo)
int ret;
int count;
int i;
struct io_event events[8];
struct timespec to;
struct io_event events[1];
struct abt_io_pwrite_state *pstate = NULL;
while(!aid->ioc_stop)
{
to.tv_sec = 0;
to.tv_nsec = 1000*1000*10;
count = io_getevents(aid->ioc, 1, 8, events, &to);
count = io_getevents(aid->ioc, 1, 1, events, NULL);
for(i=0; i<count; i++)
{
assert(events[i].res >= 0);
......@@ -1086,6 +1039,7 @@ static void libaio_waiter(void *foo)
*pstate->ret = events[i].res;
ABT_eventual_set(pstate->eventual, NULL, 0);
}
ABT_thread_yield();
}
return;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment