Commit 85114d42 authored by Matthieu Dorier's avatar Matthieu Dorier

started implementing aio

parent d10996a5
......@@ -19,16 +19,10 @@ int mobject_store_aio_create_completion(void *cb_arg,
mobject_store_completion_t completion =
(mobject_store_completion_t)calloc(1, sizeof(struct mobject_store_completion));
MOBJECT_ASSERT(completion != 0, "Could not allocate mobject_store_completion_t object");
completion->state = COMPLETION_CREATED;
completion->cb_complete = cb_complete;
completion->cb_safe = cb_safe;
completion->cb_arg = cb_arg;
// r = ABT_eventual_create(sizeof(int), (void**)(&(completion->eventual)));
// MOBJECT_ASSERT(r == ABT_SUCCESS, "Could not create ABT_eventual");
// completion->ret_value_ptr = (int*)0;
r = ABT_rwlock_create(&(completion->lock));
MOBJECT_ASSERT(r == ABT_SUCCESS, "Could not create ABT_rwlock");
completion->ult = ABT_THREAD_NULL;
completion->request = MARGO_REQUEST_NULL;
*pc = completion;
return 0;
}
......@@ -39,42 +33,27 @@ int mobject_store_aio_wait_for_complete(mobject_store_completion_t c)
MOBJECT_LOG("Warning: passing NULL to mobject_store_aio_wait_for_complete");
return -1;
}
if(c->state == COMPLETION_IN_PROGRESS || c->state == COMPLETION_TERMINATED) {
ABT_thread_join(c->ult);
ABT_thread_free(c->ult);
c->ult = ABT_THREAD_NULL;
c->state = COMPLETION_JOINED;
MOBJECT_ASSERT(c->request != MARGO_REQUEST_NULL, "Invalid completion handle");
c->ret_value = margo_wait(c->request);
if(c->ret_value != HG_SUCCESS) {
MOBJECT_LOG("Warning: margo_wait returned something different from HG_SUCCESS");
}
c->request = MARGO_REQUEST_NULL;
if(c->cb_safe)
(c->cb_safe)(c, c->cb_arg);
// int r;
// int* val_ptr = (int*)0;
// r = ABT_eventual_wait(c->eventual, (void**)(&val_ptr));
// MOBJECT_ASSERT(r == ABT_SUCCESS, "ABT_eventual_wait failed");
// r = ABT_rwlock_wrlock(c->lock);
// MOBJECT_ASSERT(r == ABT_SUCCESS, "ABT_rwlock_wrlock failed");
// c->ret_value_ptr = val_ptr;
// r = ABT_rwlock_unlock(c->lock);
// MOBJECT_ASSERT(r == ABT_SUCCESS, "ABT_rwlock_unlock failed");
if(c->cb_complete)
(c->cb_complete)(c, c->cb_arg);
return 0;
}
int mobject_store_aio_is_complete(mobject_store_completion_t c)
{
int r;
if(c == MOBJECT_COMPLETION_NULL) {
MOBJECT_LOG("Warning: passing NULL to mobject_store_aio_is_complete");
return 0;
}
return (c->state == COMPLETION_TERMINATED) || (c->state == COMPLETION_JOINED);
int result;
r = ABT_rwlock_rdlock(c->lock);
MOBJECT_ASSERT(r == ABT_SUCCESS, "ABT_rwlock_rdlock failed");
result = (c->state == COMPLETION_TERMINATED) || (c->state == COMPLETION_JOINED);
r = ABT_rwlock_unlock(c->lock);
MOBJECT_ASSERT(r == ABT_SUCCESS, "ABT_rwlock_unlock failed");
return result;
MOBJECT_ASSERT(0,"mobject_store_aio_is_complete is not yet implemented");
return 0;
}
int mobject_store_aio_get_return_value(mobject_store_completion_t c)
......@@ -84,28 +63,15 @@ int mobject_store_aio_get_return_value(mobject_store_completion_t c)
MOBJECT_LOG("Warning: passing NULL to mobject_store_aio_get_return_value");
return 0;
}
if(c->state == COMPLETION_TERMINATED) {
mobject_store_aio_wait_for_complete(c);
}
MOBJECT_ASSERT((c->state != COMPLETION_JOINED),
MOBJECT_ASSERT((c->request == MARGO_REQUEST_NULL),
"calling mobject_store_aio_get_return_value on a non-terminated completion");
int result = 0;
r = ABT_rwlock_rdlock(c->lock);
MOBJECT_ASSERT(r == ABT_SUCCESS, "ABT_rwlock_rdlock failed");
result = c->ret_value;
//if(c->ret_value_ptr != (int*)0) result = *(c->ret_value_ptr);
r = ABT_rwlock_unlock(c->lock);
MOBJECT_ASSERT(r == ABT_SUCCESS, "ABT_rwlock_unlock failed");
return result;
return c->ret_value;
}
void mobject_store_aio_release(mobject_store_completion_t c)
{
int r;
if(c == MOBJECT_COMPLETION_NULL) return;
//r = ABT_eventual_free(c->eventual);
//MOBJECT_ASSERT(r == ABT_SUCCESS, "ABT_eventual_free failed");
r = ABT_rwlock_free(c->lock);
MOBJECT_ASSERT(r == ABT_SUCCESS, "ABT_rwlock_free failed");
if(c == MOBJECT_COMPLETION_NULL) return;
MOBJECT_ASSERT(c->request != MARGO_REQUEST_NULL,
"Trying to release a completion handle before operation completed");
free(c);
}
......@@ -6,16 +6,9 @@
#ifndef __MOBJECT_COMPLETION_H
#define __MOBJECT_COMPLETION_H
#include <abt.h>
#include <margo.h>
#include "mobject-store-config.h"
typedef enum {
COMPLETION_CREATED = 1,
COMPLETION_IN_PROGRESS,
COMPLETION_TERMINATED,
COMPLETION_JOINED
} completion_state_t;
/**
* The mobject_store_completion object is used for asynchronous
* functions. It contains the callbacks to call when the data is
......@@ -26,15 +19,11 @@ typedef enum {
* in libmobject-store.h.
*/
struct mobject_store_completion {
completion_state_t state; // state of the completion
mobject_store_callback_t cb_complete; // completion callback
mobject_store_callback_t cb_safe; // safe callback
void* cb_arg; // arguments for callbacks
// ABT_eventual eventual; // eventual used to notify completion
// int* ret_value_ptr; // pointer to eventual's internal value
margo_request request; // margo request to wait on
int ret_value; // return value of the operation
ABT_rwlock lock; // lock protecting access to this structure
ABT_thread ult; // thread running the operation
};
#endif
......
......@@ -191,6 +191,7 @@ int mobject_store_write_op_operate(mobject_store_write_op_t write_op,
in.object_name = oid;
in.pool_name = io->pool_name;
in.write_op = write_op;
// TODO take mtime into account
prepare_write_op(io->cluster->mid, write_op);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment