Commit a21fb678 authored by Matthieu Dorier's avatar Matthieu Dorier

Merge branch 'dev-async-api' into 'master'

async api implemented and tested

See merge request !2
parents 59fd13a9 28d89e85
......@@ -49,7 +49,8 @@ src_client_libmobject_store_la_SOURCES = \
src/client/read-op.c \
src/client/write-op.c \
src/client/omap-iter.c \
src/client/aio/completion.c
src/client/aio/completion.c \
src/client/aio/aio-operate.c
src_client_libmobject_store_la_CPPFLAGS = ${AM_CPPFLAGS} ${CLIENT_CPPFLAGS}
src_client_libmobject_store_la_LIBADD = src/omap-iter/libomap-iter.la \
src/io-chain/libio-chain.la ${CLIENT_LIBS}
......
......@@ -7,25 +7,84 @@
#include <stdlib.h>
#include "mobject-store-config.h"
#include "libmobject-store.h"
#include "src/client/cluster.h"
#include "src/io-chain/prepare-write-op.h"
#include "src/io-chain/prepare-read-op.h"
#include "src/rpc-types/write-op.h"
#include "src/rpc-types/read-op.h"
#include "src/client/aio/completion.h"
#include "src/util/log.h"
// global variables for RPC ids, defined in client/cluster.c
extern hg_id_t mobject_write_op_rpc_id;
extern hg_id_t mobject_read_op_rpc_id;
int mobject_store_aio_write_op_operate(mobject_store_write_op_t write_op,
mobject_store_ioctx_t io,
mobject_store_completion_t completion,
const char *oid,
time_t *mtime,
int flags)
{
mobject_store_ioctx_t io,
mobject_store_completion_t completion,
const char *oid,
time_t *mtime,
int flags)
{
hg_return_t ret;
}
write_op_in_t in;
in.object_name = oid;
in.pool_name = io->pool_name;
in.write_op = write_op;
// TODO take mtime into account
prepare_write_op(io->cluster->mid, write_op);
hg_addr_t svr_addr = ssg_get_addr(io->cluster->gid, 0); // XXX pick other servers using ch-placement
MOBJECT_ASSERT(svr_addr != HG_ADDR_NULL, "NULL server address");
hg_handle_t h;
ret = margo_create(io->cluster->mid, svr_addr, mobject_write_op_rpc_id, &h);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not create RPC handle");
margo_request req;
ret = margo_iforward(h, &in, &req);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not forward RPC");
completion->request = req;
completion->handle = h;
completion->type = AIO_WRITE_COMPLETION;
completion->op.write_op = write_op;
return 0;
}
int mobject_store_aio_read_op_operate(mobject_store_read_op_t read_op,
mobject_store_ioctx_t io,
mobject_store_completion_t completion,
const char *oid,
int flags)
{
mobject_store_ioctx_t io,
mobject_store_completion_t completion,
const char *oid,
int flags)
{
hg_return_t ret;
read_op_in_t in;
in.object_name = oid;
in.pool_name = io->pool_name;
in.read_op = read_op;
prepare_read_op(io->cluster->mid, read_op);
hg_addr_t svr_addr = ssg_get_addr(io->cluster->gid, 0); // XXX pick other servers using ch-placement
MOBJECT_ASSERT(svr_addr != HG_ADDR_NULL, "NULL server address");
hg_handle_t h;
ret = margo_create(io->cluster->mid, svr_addr, mobject_read_op_rpc_id, &h);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not create RPC handle");
margo_request req;
ret = margo_iforward(h, &in, &req);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not forward RPC");
completion->request = req;
completion->handle = h;
completion->type = AIO_READ_COMPLETION;
completion->op.read_op = read_op;
return 0;
}
......@@ -8,8 +8,10 @@
#include "mobject-store-config.h"
#include "libmobject-store.h"
#include "src/client/aio/completion.h"
#include "src/rpc-types/read-op.h"
#include "src/rpc-types/write-op.h"
#include "src/util/log.h"
#if 0
int mobject_store_aio_create_completion(void *cb_arg,
mobject_store_callback_t cb_complete,
mobject_store_callback_t cb_safe,
......@@ -19,9 +21,12 @@ int mobject_store_aio_create_completion(void *cb_arg,
mobject_store_completion_t completion =
(mobject_store_completion_t)calloc(1, sizeof(struct mobject_store_completion));
MOBJECT_ASSERT(completion != 0, "Could not allocate mobject_store_completion_t object");
completion->cb_complete = cb_complete;
completion->type = AIO_NULL_COMPLETION;
completion->op.read_op = NULL;
completion->cb_complete = cb_complete;
completion->cb_safe = cb_safe;
completion->cb_arg = cb_arg;
completion->handle = HG_HANDLE_NULL;
completion->request = MARGO_REQUEST_NULL;
*pc = completion;
return 0;
......@@ -35,12 +40,33 @@ int mobject_store_aio_wait_for_complete(mobject_store_completion_t c)
}
MOBJECT_ASSERT(c->request != MARGO_REQUEST_NULL, "Invalid completion handle");
c->ret_value = margo_wait(c->request);
if(c->ret_value != HG_SUCCESS) {
int ret = margo_wait(c->request);
// TODO check the return value of margo_wait
if(ret != HG_SUCCESS) {
MOBJECT_LOG("Warning: margo_wait returned something different from HG_SUCCESS");
}
c->request = MARGO_REQUEST_NULL;
switch(c->type) {
case AIO_WRITE_COMPLETION: {
write_op_out_t resp;
ret = margo_get_output(c->handle, &resp);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not get RPC output");
c->ret_value = resp.ret;
ret = margo_free_output(c->handle,&resp);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not free RPC output");
} break;
case AIO_READ_COMPLETION: {
read_op_out_t resp;
ret = margo_get_output(c->handle, &resp);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not get RPC output");
feed_read_op_pointers_from_response(c->op.read_op, resp.responses);
ret = margo_free_output(c->handle,&resp);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not free RPC output");
}
}
if(c->cb_safe)
(c->cb_safe)(c, c->cb_arg);
......@@ -52,8 +78,15 @@ int mobject_store_aio_wait_for_complete(mobject_store_completion_t c)
int mobject_store_aio_is_complete(mobject_store_completion_t c)
{
MOBJECT_ASSERT(0,"mobject_store_aio_is_complete is not yet implemented");
return 0;
if(c == MOBJECT_COMPLETION_NULL) {
MOBJECT_LOG("Warning: passing NULL to mobject_store_aio_wait_for_complete");
return 1;
}
int flag;
margo_test(c->request, &flag);
return flag;
}
int mobject_store_aio_get_return_value(mobject_store_completion_t c)
......@@ -61,18 +94,16 @@ int mobject_store_aio_get_return_value(mobject_store_completion_t c)
int r;
if(c == MOBJECT_COMPLETION_NULL) {
MOBJECT_LOG("Warning: passing NULL to mobject_store_aio_get_return_value");
return 0;
return -1;
}
MOBJECT_ASSERT((c->request == MARGO_REQUEST_NULL),
"calling mobject_store_aio_get_return_value on a non-terminated completion");
return c->ret_value;
}
void mobject_store_aio_release(mobject_store_completion_t c)
{
if(c == MOBJECT_COMPLETION_NULL) return;
MOBJECT_ASSERT(c->request != MARGO_REQUEST_NULL,
MOBJECT_ASSERT(c->request == MARGO_REQUEST_NULL,
"Trying to release a completion handle before operation completed");
free(c);
margo_destroy(c->handle);
free(c);
}
#endif
......@@ -8,7 +8,14 @@
#include <margo.h>
#include "mobject-store-config.h"
#if 0
#include "libmobject-store.h"
typedef enum completion_op_type {
AIO_NULL_COMPLETION,
AIO_WRITE_COMPLETION,
AIO_READ_COMPLETION
} completion_op_type;
/**
* The mobject_store_completion object is used for asynchronous
* functions. It contains the callbacks to call when the data is
......@@ -19,13 +26,18 @@
* in libmobject-store.h.
*/
struct mobject_store_completion {
completion_op_type type; // completion for write or for reads
union {
mobject_store_read_op_t read_op;
mobject_store_write_op_t write_op;
} op; // operation that initiated the completion
mobject_store_callback_t cb_complete; // completion callback
mobject_store_callback_t cb_safe; // safe callback
void* cb_arg; // arguments for callbacks
margo_request request; // margo request to wait on
hg_handle_t handle; // handle of the RPC sent for this operation
int ret_value; // return value of the operation
};
#endif
#endif
......@@ -159,76 +159,3 @@ void mobject_store_read_op_omap_get_vals_by_keys(mobject_store_read_op_t read_op
read_op->num_actions += 1;
}
/*
typedef struct read_op_ult_args {
mobject_store_read_op_t read_op;
mobject_store_ioctx_t ioctx;
mobject_store_completion_t completion
char* oid;
int flags;
} read_op_ult_args;
static void aio_read_op_operate_ult(read_op_ult_args* args) {
read_op_in_t in;
in.object_name = args->oid;
in.pool_name = args->ioctx->pool_name;
in.read_op = args->read_op;
prepare_read_op(io->mid, read_op);
// TODO: svr_addr should be computed based on the pool name, object name,
// and SSG structures accessible via the io context
hg_handle_t h;
margo_create(io->mid, io->svr_addr, mobject_read_op_rpc_id, &h);
margo_forward(h, &in);
read_op_out_t resp;
margo_get_output(h, &resp);
feed_read_op_pointers_from_response(read_op, resp.responses);
margo_free_output(h,&resp);
margo_destroy(h);
free(args->oid);
ABT_rwlock_wrlock(args->completion->lock);
int ret = 0; // TODO change that depending on results of the read_op
ABT_eventual_set (args->completion->eventual, &ret, sizeof(int));
mobject_store_callback_t cb_complete = args->completion->cb_complete;
void* cb_arg = args->completion->cb_arg;
ABT_rwlock_unlock(args->completion->lock);
if(complete_cb)
complete_cb(args->completion, cb_arg);
free(args);
return 0;
}
*/
int mobject_store_aio_read_op_operate(mobject_store_read_op_t read_op,
mobject_store_ioctx_t io,
mobject_store_completion_t completion,
const char *oid,
int flags)
{
/* MOBJECT_ASSERT(read_op != MOBJECT_READ_OP_NULL, "invalid mobject_store_read_op_t object");
// TODO this is not great, we should use the margo non-blocking API instead
ABT_xstream self_es;
ABT_xstream_self(&self_es);
ABT_pool pool;
ABT_xstream_get_main_pools(self_es, 1, &pool);
ABT_thread ult;
read_op_ult_args* args = (read_op_ult_args*)calloc(1, sizeof(*args);
args->read_op = read_op;
args->ioctx = io;
args->completion = completion;
args->oid = strdup(oid);
args->flags = flags;
ABT_thread_create(pool, aio_read_op_operate_ult, args, ABT_THREAD_ATTR_NULL, &ult);
completion->ult = ult;
*/
return 0;
}
......@@ -274,35 +274,3 @@ void mobject_store_write_op_omap_rm_keys(mobject_store_write_op_t write_op,
write_op->num_actions += 1;
}
/*
int mobject_store_write_op_operate(mobject_store_write_op_t write_op,
mobject_store_ioctx_t io,
const char *oid,
time_t *mtime,
int flags)
{
int r;
MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
mobject_store_completion_t completion = MOBJECT_COMPLETION_NULL;
r = mobject_store_aio_create_completion(NULL, NULL, NULL, &completion);
MOBJECT_ASSERT(0 == r, "Could not create completion object");
r = mobject_store_aio_write_op_operate(write_op, io, completion, oid, mtime, flags);
MOBJECT_ASSERT(0 == r, "Call to mobject_store_aio_write_op_operate failed");
r = mobject_store_aio_wait_for_complete(completion);
MOBJECT_ASSERT(0 == r, "Could not wait for completion");
int ret = mobject_store_aio_get_return_value(completion);
mobject_store_aio_release(completion);
return ret;
}
int mobject_store_aio_write_op_operate(mobject_store_write_op_t write_op,
mobject_store_ioctx_t io,
mobject_store_completion_t completion,
const char *oid,
time_t *mtime,
int flags)
{
MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
// TODO
}
*/
......@@ -4,7 +4,8 @@ TESTS_ENVIRONMENT += \
check_PROGRAMS += \
tests/mobject-connect-test \
tests/mobject-client-test
tests/mobject-client-test \
tests/mobject-aio-test
# don't include rados programs in make check
if HAVE_RADOS
......@@ -30,3 +31,5 @@ endif
tests_mobject_client_test_LDADD = src/client/libmobject-store.la ${CLIENT_LIBS}
tests_mobject_aio_test_LDADD = src/client/libmobject-store.la ${CLIENT_LIBS}
#include <assert.h>
#include <stdio.h>
#include <margo.h>
#include <libmobject-store.h>
const char* content = "AAAABBBBCCCCDDDDEEEEFFFF";
/* Main function. */
int main(int argc, char** argv)
{
mobject_store_t cluster;
mobject_store_create(&cluster, "admin");
mobject_store_connect(cluster);
mobject_store_ioctx_t ioctx;
mobject_store_ioctx_create(cluster, "my-object-pool", &ioctx);
{ // WRITE OP TEST
mobject_store_write_op_t write_op = mobject_store_create_write_op();
// Add a "create" operation
mobject_store_write_op_create(write_op, LIBMOBJECT_CREATE_EXCLUSIVE, NULL);
// Add a "write_full" operation to write "AAAABBBB"
mobject_store_write_op_write_full(write_op, content, 8);
// Add a "write" operation to write "CCCC"
mobject_store_write_op_write(write_op, content+8, 4, 8);
// Add a "writesame" operation to write "DDDD" in two "DD"
mobject_store_write_op_writesame(write_op, content+12, 2, 4, 12);
// Add a "append" operation to append "EEEEFFFF"
mobject_store_write_op_append(write_op, content+16, 8);
// Add a "remove" operation
// mobject_store_write_op_remove(write_op);
// Add a "truncate" operation to remove the "FFFF" part
mobject_store_write_op_truncate(write_op, 20);
// Add a "zero" operation zero-ing the "BBBBCCCC"
mobject_store_write_op_zero(write_op, 4, 8);
// Add a "omap_set" operation
const char* keys[] = { "matthieu", "rob", "shane", "phil", "robl" };
const char* values[] = { "mdorier@anl.gov", "rross@anl.gov", "ssnyder@anl.gov", "carns@anl.gov", "robl@anl.gov" };
size_t val_sizes[] = { 16, 14, 16, 14, 13 };
mobject_store_write_op_omap_set(write_op, keys, values, val_sizes, 5);
// Add a omap_rm_keys" operation
// mobject_store_write_op_omap_rm_keys(write_op, keys, 5);
mobject_store_completion_t completion = MOBJECT_COMPLETION_NULL;
mobject_store_aio_create_completion(NULL,NULL,NULL, &completion);
mobject_store_aio_write_op_operate(write_op, ioctx, completion, "test-object", NULL, LIBMOBJECT_OPERATION_NOFLAG);
mobject_store_aio_wait_for_complete(completion);
mobject_store_release_write_op(write_op);
mobject_store_aio_release(completion);
}
{ // READ OP TEST
mobject_store_read_op_t read_op = mobject_store_create_read_op();
// Add "stat" operation
uint64_t psize;
time_t pmtime;
int prval1;
mobject_store_read_op_stat(read_op, &psize, &pmtime, &prval1);
// Add "read" operation
char read_buf[512];
size_t bytes_read;
int prval2;
mobject_store_read_op_read(read_op, 0, 512, read_buf, &bytes_read, &prval2);
// Add "omap_get_keys" operation
const char* start_after1 = "shane";
mobject_store_omap_iter_t iter3;
int prval3;
mobject_store_read_op_omap_get_keys(read_op, start_after1, 7, &iter3, &prval3);
// Add "omap_get_vals" operation
const char* start_after2 = "matthieu";
const char* filter_prefix2 = "p";
mobject_store_omap_iter_t iter4;
int prval4;
mobject_store_read_op_omap_get_vals(read_op, start_after2, filter_prefix2, 3, &iter4, &prval4);
// Add "omap_get_vals_by_keys" operation
const char* keys[] = {"matthieu", "robl"};
mobject_store_omap_iter_t iter5;
int prval5;
mobject_store_read_op_omap_get_vals_by_keys(read_op, keys, 2, &iter5, &prval5);
mobject_store_completion_t completion = MOBJECT_COMPLETION_NULL;
mobject_store_aio_create_completion(NULL,NULL,NULL, &completion);
mobject_store_aio_read_op_operate(read_op, ioctx, completion, "test-object",LIBMOBJECT_OPERATION_NOFLAG);
mobject_store_aio_wait_for_complete(completion);
mobject_store_release_read_op(read_op);
mobject_store_aio_release(completion);
// print the results of the read operations
printf("Client received the following results:\n");
printf("stat: psize=%ld pmtime=%lld prval=%d\n", psize, (long long)pmtime, prval1);
{
printf("read: bytes_read = %ld, prval=%d content: ", bytes_read, prval2);
unsigned i;
for(i=0; i<bytes_read; i++) printf("%c", read_buf[i] ? read_buf[i] : '*' );
printf("\n");
}
printf("omap_get_keys: prval=%d\n", prval3);
{
char* key = NULL;
char* val = NULL;
size_t size;
do {
mobject_store_omap_get_next(iter3, &key, &val, &size);
if(key) printf("===> key: \"%s\"\n", key);
} while(key);
}
printf("omap_get_vals: prval=%d\n", prval4);
{
char* key = NULL;
char* val = NULL;
size_t size;
do {
mobject_store_omap_get_next(iter4, &key, &val, &size);
if(key) printf("===> key: \"%s\" , val: %s \n", key, val);
} while(key);
}
printf("omap_get_vals_by_keys: prval=%d\n", prval5);
{
char* key = NULL;
char* val = NULL;
size_t size;
do {
mobject_store_omap_get_next(iter5, &key, &val, &size);
if(key) printf("===> key: \"%s\" , val: %s \n", key, val);
} while(key);
}
}
mobject_store_ioctx_destroy(ioctx);
mobject_store_shutdown(cluster);
return 0;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment