Commit 818f03ba authored by Matthieu Dorier's avatar Matthieu Dorier

done with refactoring with new BAKE API

parent b7c26b73
......@@ -14,6 +14,7 @@ extern "C" {
#include <inttypes.h>
#include <time.h>
#include <mobject-client.h>
// derived from: http://docs.ceph.com/docs/master/mobject_store/api/libmobject_store/
......@@ -110,8 +111,9 @@ typedef struct mobject_store_ioctx *mobject_store_ioctx_t;
* Used with mobject_store_read_op_omap_get_keys(), mobject_store_read_op_omap_get_vals(),
* mobject_store_read_op_omap_get_vals_by_keys(), mobject_store_omap_get_next(), and
* mobject_store_omap_get_end().
*
* NOW DECLARED in mobject-client.h
*/
typedef struct mobject_store_omap_iter *mobject_store_omap_iter_t;
/**
* @typedef mobject_store_write_op_t
......@@ -133,10 +135,9 @@ typedef struct mobject_store_omap_iter *mobject_store_omap_iter_t;
* mobject_store_write_op_truncate(), mobject_store_write_op_zero(), mobject_store_write_op_cmpext()
* - Hints: mobject_store_write_op_set_alloc_hint()
* - Performing the operation: mobject_store_write_op_operate(), mobject_store_aio_write_op_operate()
*
* NOW DECLARED in mobject-client.h
*/
typedef struct mobject_store_write_op *mobject_store_write_op_t;
#define MOBJECT_WRITE_OP_NULL ((mobject_store_write_op_t)0)
/**
* @typedef mobject_store_read_op_t
......@@ -157,10 +158,8 @@ typedef struct mobject_store_write_op *mobject_store_write_op_t;
* - Request properties: mobject_store_read_op_set_flags()
* - Performing the operation: mobject_store_read_op_operate(),
* mobject_store_aio_read_op_operate()
* NOW DECLARED in mobject-client.h
*/
typedef struct mobject_store_read_op *mobject_store_read_op_t;
#define MOBJECT_READ_OP_NULL ((mobject_store_read_op_t)0)
/**
* @typedef mobject_store_completion_t
......@@ -479,10 +478,10 @@ void mobject_store_read_op_read(mobject_store_read_op_t read_op,
* @param prval where to store the return value from this action
*/
void mobject_store_read_op_omap_get_keys(mobject_store_read_op_t read_op,
const char *start_after,
uint64_t max_return,
mobject_store_omap_iter_t *iter,
int *prval);
const char *start_after,
uint64_t max_return,
mobject_store_omap_iter_t *iter,
int *prval);
/**
* Start iterating over key/value pairs on an object.
......
This diff is collapsed.
......@@ -8,32 +8,33 @@
#define MOBJECT_SERVER_H
#include <margo.h>
#include <bake-client.h>
/* server-side utilities and routines. Clients are looking for either
* libmobject-store.h or librados-mobject-store.h */
#define MOBJECT_SERVER_GROUP_NAME "mobject-store-servers"
#define MOBJECT_ABT_POOL_DEFAULT ABT_POOL_NULL
typedef struct mobject_server_context mobject_server_context_t;
typedef struct mobject_server_context* mobject_provider_t;
/**
* Start a mobject server instance
*
* @param[in] mid margo instance id
* @param[in] mplex_id multiplex id of the provider
* @param[in] pool Argobots pool for the provider
* @param[in] bake_ph Bake provider handle to use to write/read data
* @param[in] cluster_file file name to write cluster connect info to
* @returns 0 on success, negative error code on failure
*/
mobject_server_context_t* mobject_server_init(margo_instance_id mid, const char *cluster_file);
/**
* Shutdown a mobject server instance
* @param[out] provider resulting provider
*
* @param[in] mid margo instance id
*/
void mobject_server_shutdown(mobject_server_context_t* svr_ctx);
/**
* Wait for a mobject server instance to get a shutdown request.
* @returns 0 on success, negative error code on failure
*/
void mobject_server_wait_for_shutdown(mobject_server_context_t* srv_ctx);
int mobject_provider_register(
margo_instance_id mid,
uint8_t mplex_id,
ABT_pool pool,
bake_provider_handle_t bake_ph,
const char *cluster_file,
mobject_provider_t* provider);
#endif
noinst_HEADERS += \
src/client/cluster.h \
src/client/mobject-client-impl.h \
src/client/aio/completion.h \
src/io-chain/args-read-actions.h \
src/io-chain/args-write-actions.h \
......@@ -45,11 +46,13 @@ src_io_chain_libio_chain_la_SOURCES = src/io-chain/prepare-read-op.c \
src_client_libmobject_store_la_SOURCES = \
src/client/mobject-client.c \
src/client/cluster.c \
src/client/read-op.c \
src/client/write-op.c \
src/client/omap-iter.c \
src/client/aio/completion.c \
src/client/aio/aio-cluster-operate.c \
src/client/aio/aio-operate.c
src_client_libmobject_store_la_CPPFLAGS = ${AM_CPPFLAGS} ${CLIENT_CPPFLAGS}
src_client_libmobject_store_la_LIBADD = src/omap-iter/libomap-iter.la \
......
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <stdlib.h>
#include <ssg.h>
#include "mobject-store-config.h"
#include "libmobject-store.h"
#include "src/client/cluster.h"
#include "src/client/aio/completion.h"
#include "src/util/log.h"
int mobject_store_aio_write_op_operate(
mobject_store_write_op_t write_op,
mobject_store_ioctx_t io,
mobject_store_completion_t completion,
const char *oid,
time_t *mtime,
int flags)
{
hg_return_t ret;
// XXX pick other servers using ch-placement
hg_addr_t svr_addr = ssg_get_addr(io->cluster->gid, 0);
mobject_provider_handle_t mph;
mobject_provider_handle_create(io->cluster->mobject_clt, svr_addr, 1, &mph);
mobject_request_t req;
mobject_aio_write_op_operate(mph, write_op, io->pool_name, oid, mtime, flags, &req);
mobject_provider_handle_release(mph);
completion->request = req;
completion->type = AIO_WRITE_COMPLETION;
return 0;
}
int mobject_store_aio_read_op_operate(mobject_store_read_op_t read_op,
mobject_store_ioctx_t io,
mobject_store_completion_t completion,
const char *oid,
int flags)
{
hg_return_t ret;
// XXX pick other servers using ch-placement
hg_addr_t svr_addr = ssg_get_addr(io->cluster->gid, 0);
mobject_provider_handle_t mph;
mobject_provider_handle_create(io->cluster->mobject_clt, svr_addr, 1, &mph);
mobject_request_t req;
mobject_aio_read_op_operate(mph, read_op, io->pool_name, oid, flags, &req);
mobject_provider_handle_release(mph);
completion->request = req;
completion->type = AIO_READ_COMPLETION;
return 0;
}
......@@ -7,84 +7,172 @@
#include <stdlib.h>
#include "mobject-store-config.h"
#include "libmobject-store.h"
#include "src/client/cluster.h"
#include "src/client/mobject-client-impl.h"
#include "src/io-chain/prepare-write-op.h"
#include "src/io-chain/prepare-read-op.h"
#include "src/rpc-types/write-op.h"
#include "src/rpc-types/read-op.h"
#include "src/client/aio/completion.h"
#include "src/util/log.h"
// global variables for RPC ids, defined in client/cluster.c
extern hg_id_t mobject_write_op_rpc_id;
extern hg_id_t mobject_read_op_rpc_id;
int mobject_store_aio_write_op_operate(mobject_store_write_op_t write_op,
mobject_store_ioctx_t io,
mobject_store_completion_t completion,
int mobject_aio_write_op_operate(
mobject_provider_handle_t mph,
mobject_store_write_op_t write_op,
const char *pool_name,
const char *oid,
time_t *mtime,
int flags)
int flags,
mobject_request_t* req)
{
hg_return_t ret;
write_op_in_t in;
in.object_name = oid;
in.pool_name = io->pool_name;
in.pool_name = pool_name;
in.write_op = write_op;
// TODO take mtime into account
prepare_write_op(io->cluster->mid, write_op);
prepare_write_op(mph->client->mid, write_op);
hg_addr_t svr_addr = ssg_get_addr(io->cluster->gid, 0); // XXX pick other servers using ch-placement
MOBJECT_ASSERT(svr_addr != HG_ADDR_NULL, "NULL server address");
hg_addr_t svr_addr = mph->addr;
if(svr_addr == HG_ADDR_NULL) {
fprintf(stderr, "[MOBJECT] NULL provider address passed to mobject_aio_write_op_operate\n");
return -1;
}
hg_handle_t h;
ret = margo_create(io->cluster->mid, svr_addr, mobject_write_op_rpc_id, &h);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not create RPC handle");
margo_request req;
ret = margo_iforward(h, &in, &req);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not forward RPC");
completion->request = req;
completion->handle = h;
completion->type = AIO_WRITE_COMPLETION;
completion->op.write_op = write_op;
ret = margo_create(mph->client->mid, svr_addr, mph->client->mobject_write_op_rpc_id, &h);
if(ret != HG_SUCCESS) {
fprintf(stderr, "[MOBJECT] margo_create() failed in mobject_aio_write_op_operate()\n");
return -1;
}
margo_set_target_id(h, mph->mplex_id);
margo_request mreq;
ret = margo_iforward(h, &in, &mreq);
if(ret != HG_SUCCESS) {
fprintf(stderr, "[MOBJECT] margo_iforward() failed in mobject_aio_write_op_operate()\n");
margo_destroy(h);
return -1;
}
mobject_request_t tmp_req = calloc(1, sizeof(*tmp_req));
tmp_req->type = MOBJECT_AIO_WRITE;
tmp_req->op.write_op = write_op;
tmp_req->request = mreq;
tmp_req->handle = h;
*req = tmp_req;
return 0;
}
int mobject_store_aio_read_op_operate(mobject_store_read_op_t read_op,
mobject_store_ioctx_t io,
mobject_store_completion_t completion,
int mobject_aio_read_op_operate(
mobject_provider_handle_t mph,
mobject_store_read_op_t read_op,
const char *pool_name,
const char *oid,
int flags)
int flags,
mobject_request_t* req)
{
hg_return_t ret;
read_op_in_t in;
in.object_name = oid;
in.pool_name = io->pool_name;
in.pool_name = pool_name;
in.read_op = read_op;
prepare_read_op(io->cluster->mid, read_op);
prepare_read_op(mph->client->mid, read_op);
hg_addr_t svr_addr = ssg_get_addr(io->cluster->gid, 0); // XXX pick other servers using ch-placement
MOBJECT_ASSERT(svr_addr != HG_ADDR_NULL, "NULL server address");
hg_addr_t svr_addr = mph->addr;
if(svr_addr == HG_ADDR_NULL) {
fprintf(stderr, "[MOBJECT] NULL provider address passed to mobject_aio_read_op_operate\n");
return -1;
}
hg_handle_t h;
ret = margo_create(io->cluster->mid, svr_addr, mobject_read_op_rpc_id, &h);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not create RPC handle");
margo_request req;
ret = margo_iforward(h, &in, &req);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not forward RPC");
completion->request = req;
completion->handle = h;
completion->type = AIO_READ_COMPLETION;
completion->op.read_op = read_op;
ret = margo_create(mph->client->mid, svr_addr, mph->client->mobject_read_op_rpc_id, &h);
if(ret != HG_SUCCESS) {
fprintf(stderr, "[MOBJECT] margo_forward() failed in mobject_write_op_operate()\n");
return -1;
}
margo_set_target_id(h, mph->mplex_id);
margo_request mreq;
ret = margo_iforward(h, &in, &mreq);
if(ret != HG_SUCCESS) {
fprintf(stderr, "[MOBJECT] margo_iforward() failed in mobject_aio_write_op_operate()\n");
margo_destroy(h);
return -1;
}
mobject_request_t tmp_req = calloc(1, sizeof(*tmp_req));
tmp_req->type = MOBJECT_AIO_READ;
tmp_req->op.read_op = read_op;
tmp_req->request = mreq;
tmp_req->handle = h;
*req = tmp_req;
return 0;
}
int mobject_aio_wait(mobject_request_t req, int* ret)
{
if(req == MOBJECT_REQUEST_NULL)
return -1;
int r = margo_wait(req->request);
if(r != HG_SUCCESS) {
return r;
}
req->request = MARGO_REQUEST_NULL;
switch(req->type) {
case MOBJECT_AIO_WRITE: {
write_op_out_t resp;
r = margo_get_output(req->handle, &resp);
if(r != HG_SUCCESS) {
*ret = r;
margo_destroy(req->handle);
free(req);
return r;
}
*ret = resp.ret;
r = margo_free_output(req->handle,&resp);
if(r != HG_SUCCESS) {
*ret = r;
}
margo_destroy(req->handle);
free(req);
return r;
} break;
case MOBJECT_AIO_READ: {
read_op_out_t resp;
r = margo_get_output(req->handle, &resp);
if(r != HG_SUCCESS) {
*ret = r;
margo_destroy(req->handle);
free(req);
return r;
}
feed_read_op_pointers_from_response(req->op.read_op, resp.responses);
r = margo_free_output(req->handle, &resp);
if(r != HG_SUCCESS) {
*ret = r;
}
margo_destroy(req->handle);
free(req);
return r;
} break;
}
}
int mobject_aio_test(mobject_request_t req, int* flag)
{
if(req == MOBJECT_REQUEST_NULL) return -1;
return margo_test(req->request, flag);
}
......@@ -19,15 +19,12 @@ int mobject_store_aio_create_completion(void *cb_arg,
{
int r;
mobject_store_completion_t completion =
(mobject_store_completion_t)calloc(1, sizeof(struct mobject_store_completion));
(mobject_store_completion_t)calloc(1, sizeof(*completion));
MOBJECT_ASSERT(completion != 0, "Could not allocate mobject_store_completion_t object");
completion->type = AIO_NULL_COMPLETION;
completion->op.read_op = NULL;
completion->request = MOBJECT_REQUEST_NULL;
completion->cb_complete = cb_complete;
completion->cb_safe = cb_safe;
completion->cb_arg = cb_arg;
completion->handle = HG_HANDLE_NULL;
completion->request = MARGO_REQUEST_NULL;
*pc = completion;
return 0;
}
......@@ -35,38 +32,15 @@ int mobject_store_aio_create_completion(void *cb_arg,
int mobject_store_aio_wait_for_complete(mobject_store_completion_t c)
{
if(c == MOBJECT_COMPLETION_NULL) {
MOBJECT_LOG("Warning: passing NULL to mobject_store_aio_wait_for_complete");
return -1;
}
MOBJECT_ASSERT(c->request != MARGO_REQUEST_NULL, "Invalid completion handle");
int ret = margo_wait(c->request);
// TODO check the return value of margo_wait
if(ret != HG_SUCCESS) {
MOBJECT_LOG("Warning: margo_wait returned something different from HG_SUCCESS");
}
int ret;
int r = mobject_aio_wait(c->request, &ret);
c->ret_value = ret;
c->request = MARGO_REQUEST_NULL;
switch(c->type) {
case AIO_WRITE_COMPLETION: {
write_op_out_t resp;
ret = margo_get_output(c->handle, &resp);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not get RPC output");
c->ret_value = resp.ret;
ret = margo_free_output(c->handle,&resp);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not free RPC output");
} break;
case AIO_READ_COMPLETION: {
read_op_out_t resp;
ret = margo_get_output(c->handle, &resp);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not get RPC output");
feed_read_op_pointers_from_response(c->op.read_op, resp.responses);
ret = margo_free_output(c->handle,&resp);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not free RPC output");
}
}
if(c->cb_safe)
(c->cb_safe)(c, c->cb_arg);
......@@ -79,12 +53,15 @@ int mobject_store_aio_wait_for_complete(mobject_store_completion_t c)
int mobject_store_aio_is_complete(mobject_store_completion_t c)
{
if(c == MOBJECT_COMPLETION_NULL) {
MOBJECT_LOG("Warning: passing NULL to mobject_store_aio_wait_for_complete");
return 1;
}
if(c->request == MOBJECT_REQUEST_NULL) {
return 1;
}
int flag;
margo_test(c->request, &flag);
mobject_aio_test(c->request, &flag);
return flag;
}
......@@ -103,7 +80,6 @@ void mobject_store_aio_release(mobject_store_completion_t c)
{
if(c == MOBJECT_COMPLETION_NULL) return;
MOBJECT_ASSERT(c->request == MARGO_REQUEST_NULL,
"Trying to release a completion handle before operation completed");
margo_destroy(c->handle);
"Trying to release a completion handle before operation completed (will lead to memory leaks)");
free(c);
}
......@@ -10,12 +10,6 @@
#include "mobject-store-config.h"
#include "libmobject-store.h"
typedef enum completion_op_type {
AIO_NULL_COMPLETION,
AIO_WRITE_COMPLETION,
AIO_READ_COMPLETION
} completion_op_type;
/**
* The mobject_store_completion object is used for asynchronous
* functions. It contains the callbacks to call when the data is
......@@ -26,16 +20,10 @@ typedef enum completion_op_type {
* in libmobject-store.h.
*/
struct mobject_store_completion {
completion_op_type type; // completion for write or for reads
union {
mobject_store_read_op_t read_op;
mobject_store_write_op_t write_op;
} op; // operation that initiated the completion
mobject_store_callback_t cb_complete; // completion callback
mobject_store_callback_t cb_safe; // safe callback
void* cb_arg; // arguments for callbacks
margo_request request; // margo request to wait on
hg_handle_t handle; // handle of the RPC sent for this operation
mobject_request_t request; // margo request to wait on
int ret_value; // return value of the operation
};
......
This diff is collapsed.
......@@ -9,6 +9,7 @@
#include <margo.h>
#include <ssg.h>
#include "libmobject-store.h"
#include "mobject-client.h"
#define MOBJECT_CLUSTER_FILE_ENV "MOBJECT_CLUSTER_FILE"
#define MOBJECT_CLUSTER_SHUTDOWN_KILL_ENV "MOBJECT_SHUTDOWN_KILL_SERVERS"
......@@ -16,8 +17,8 @@
struct mobject_store_handle
{
margo_instance_id mid;
mobject_client_t mobject_clt;
ssg_group_id_t gid;
const char* my_address;
int connected;
};
......
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __MOBJECT_CLIENT_IMPL_H
#define __MOBJECT_CLIENT_IMPL_H
#include "mobject-store-config.h"
#include <stdlib.h>
#include <margo.h>
#include <ssg.h>
#include "mobject-client.h"
struct mobject_client {
margo_instance_id mid;
char* client_addr;
hg_id_t mobject_write_op_rpc_id;
hg_id_t mobject_read_op_rpc_id;
hg_id_t mobject_shutdown_rpc_id;
uint64_t num_provider_handles;
};
struct mobject_provider_handle {
mobject_client_t client;
hg_addr_t addr;
uint8_t mplex_id;
uint64_t refcount;
};
typedef enum mobject_op_req_type {
MOBJECT_AIO_WRITE,
MOBJECT_AIO_READ
} mobject_op_req_type;
struct mobject_request {
mobject_op_req_type type; // type of operation that initiated the request
union {
mobject_store_read_op_t read_op;
mobject_store_write_op_t write_op;
} op; // operation that initiated the request
margo_request request; // margo request to wait on
hg_handle_t handle; // handle of the RPC sent for this operation
};
#endif
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include "mobject-store-config.h"
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <margo.h>
#include <ssg.h>
#include "mobject-client.h"
#include "src/io-chain/prepare-write-op.h"
#include "src/io-chain/prepare-read-op.h"
#include "src/rpc-types/write-op.h"
#include "src/rpc-types/read-op.h"
#include "src/util/log.h"
struct mobject_client {
margo_instance_id mid;
char* client_addr;
hg_id_t mobject_write_op_rpc_id;
hg_id_t mobject_read_op_rpc_id;
hg_id_t mobject_shutdown_rpc_id;
uint64_t num_provider_handles;
};
struct mobject_provider_handle {
mobject_client_t client;
hg_addr_t addr;
uint8_t mplex_id;
uint64_t refcount;
};
static int mobject_client_register(mobject_client_t client, margo_instance_id mid)
{
int ret;
hg_addr_t client_addr = HG_ADDR_NULL;
char client_addr_str[128];
hg_size_t client_addr_len = 128;
ret = margo_addr_self(mid, &client_addr);
if(ret != HG_SUCCESS) return -1;
ret = margo_addr_to_string(mid, client_addr_str, &client_addr_len, client_addr);
if(ret != HG_SUCCESS) {
margo_addr_free(mid, client_addr);
return -1;
}
margo_addr_free(mid, client_addr);
client->mid = mid;
client->client_addr = strdup(client_addr_str);
/* check if RPCs have already been registered */
hg_bool_t flag;
hg_id_t id;
margo_registered_name(mid, "mobject_write_op", &id, &flag);
if(flag == HG_TRUE) { /* RPCs already registered */
margo_registered_name(mid, "mobject_write_op", &client->mobject_write_op_rpc_id, &flag);
margo_registered_name(mid, "mobject_read_op", &client->mobject_read_op_rpc_id, &flag);
margo_registered_name(mid, "mobject_shutdown", &client->mobject_shutdown_rpc_id, &flag);
} else {
client->mobject_write_op_rpc_id =
MARGO_REGISTER(mid, "mobject_write_op", write_op_in_t, write_op_out_t, NULL);
client->mobject_read_op_rpc_id =
MARGO_REGISTER(mid, "mobject_read_op", read_op_in_t, read_op_out_t, NULL);
client->mobject_shutdown_rpc_id =
MARGO_REGISTER(mid, "mobject_shutdown", void, void, NULL);
}
return 0;
}
int mobject_client_init(margo_instance_id mid, mobject_client_t* client)
{
mobject_client_t c = (mobject_client_t)calloc(1, sizeof(*c));
if(!c) return -1;
c->num_provider_handles = 0;
int ret = mobject_client_register(c, mid);
if(ret != 0) return ret;
*client = c;
return 0;
}
int mobject_client_finalize(mobject_client_t client)
{
if(client->num_provider_handles != 0) {
fprintf(stderr,
"[MOBJECT] Warning: %d provider handles not released before mobject_client_finalize was called\n",
client->num_provider_handles);
}
free(client->client_addr);
free(client);
return 0;
}
int mobject_provider_handle_create(
mobject_client_t client,
hg_addr_t addr,
uint8_t mplex_id,
mobject_provider_handle_t* handle)
{
if(client == MOBJECT_CLIENT_NULL) return -1;
mobject_provider_handle_t provider =
(mobject_provider_handle_t)calloc(1, sizeof(*provider));
if(!provider) return -1;
hg_return_t ret = margo_addr_dup(client->mid, addr, &(provider->addr));
if(ret != HG_SUCCESS) {
free(provider);
return -1;
}
provider->client = client;
provider->mplex_id = mplex_id;
provider->refcount = 1;
client->num_provider_handles += 1;
*handle = provider;