Commit de757d3b authored by Matthieu Dorier's avatar Matthieu Dorier

done with refactoring with new BAKE API

parent b7c26b73
This diff is collapsed.
noinst_HEADERS += \
src/client/cluster.h \
src/client/mobject-client-impl.h \
src/client/aio/completion.h \
src/io-chain/args-read-actions.h \
src/io-chain/args-write-actions.h \
......@@ -45,11 +46,13 @@ src_io_chain_libio_chain_la_SOURCES = src/io-chain/prepare-read-op.c \
src_client_libmobject_store_la_SOURCES = \
src/client/mobject-client.c \
src/client/cluster.c \
src/client/read-op.c \
src/client/write-op.c \
src/client/omap-iter.c \
src/client/aio/completion.c \
src/client/aio/aio-cluster-operate.c \
src/client/aio/aio-operate.c
src_client_libmobject_store_la_CPPFLAGS = ${AM_CPPFLAGS} ${CLIENT_CPPFLAGS}
src_client_libmobject_store_la_LIBADD = src/omap-iter/libomap-iter.la \
......
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <stdlib.h>
#include <ssg.h>
#include "mobject-store-config.h"
#include "libmobject-store.h"
#include "src/client/cluster.h"
#include "src/client/aio/completion.h"
#include "src/util/log.h"
int mobject_store_aio_write_op_operate(
mobject_store_write_op_t write_op,
mobject_store_ioctx_t io,
mobject_store_completion_t completion,
const char *oid,
time_t *mtime,
int flags)
{
hg_return_t ret;
// XXX pick other servers using ch-placement
hg_addr_t svr_addr = ssg_get_addr(io->cluster->gid, 0);
mobject_provider_handle_t mph;
mobject_provider_handle_create(io->cluster->mobject_clt, svr_addr, 1, &mph);
mobject_request_t req;
mobject_aio_write_op_operate(mph, write_op, io->pool_name, oid, mtime, flags, &req);
mobject_provider_handle_release(mph);
completion->request = req;
completion->type = AIO_WRITE_COMPLETION;
return 0;
}
int mobject_store_aio_read_op_operate(mobject_store_read_op_t read_op,
mobject_store_ioctx_t io,
mobject_store_completion_t completion,
const char *oid,
int flags)
{
hg_return_t ret;
// XXX pick other servers using ch-placement
hg_addr_t svr_addr = ssg_get_addr(io->cluster->gid, 0);
mobject_provider_handle_t mph;
mobject_provider_handle_create(io->cluster->mobject_clt, svr_addr, 1, &mph);
mobject_request_t req;
mobject_aio_read_op_operate(mph, read_op, io->pool_name, oid, flags, &req);
mobject_provider_handle_release(mph);
completion->request = req;
completion->type = AIO_READ_COMPLETION;
return 0;
}
......@@ -7,84 +7,172 @@
#include <stdlib.h>
#include "mobject-store-config.h"
#include "libmobject-store.h"
#include "src/client/cluster.h"
#include "src/client/mobject-client-impl.h"
#include "src/io-chain/prepare-write-op.h"
#include "src/io-chain/prepare-read-op.h"
#include "src/rpc-types/write-op.h"
#include "src/rpc-types/read-op.h"
#include "src/client/aio/completion.h"
#include "src/util/log.h"
// global variables for RPC ids, defined in client/cluster.c
extern hg_id_t mobject_write_op_rpc_id;
extern hg_id_t mobject_read_op_rpc_id;
int mobject_store_aio_write_op_operate(mobject_store_write_op_t write_op,
mobject_store_ioctx_t io,
mobject_store_completion_t completion,
int mobject_aio_write_op_operate(
mobject_provider_handle_t mph,
mobject_store_write_op_t write_op,
const char *pool_name,
const char *oid,
time_t *mtime,
int flags)
int flags,
mobject_request_t* req)
{
hg_return_t ret;
write_op_in_t in;
in.object_name = oid;
in.pool_name = io->pool_name;
in.pool_name = pool_name;
in.write_op = write_op;
// TODO take mtime into account
prepare_write_op(io->cluster->mid, write_op);
prepare_write_op(mph->client->mid, write_op);
hg_addr_t svr_addr = ssg_get_addr(io->cluster->gid, 0); // XXX pick other servers using ch-placement
MOBJECT_ASSERT(svr_addr != HG_ADDR_NULL, "NULL server address");
hg_addr_t svr_addr = mph->addr;
if(svr_addr == HG_ADDR_NULL) {
fprintf(stderr, "[MOBJECT] NULL provider address passed to mobject_aio_write_op_operate\n");
return -1;
}
hg_handle_t h;
ret = margo_create(io->cluster->mid, svr_addr, mobject_write_op_rpc_id, &h);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not create RPC handle");
margo_request req;
ret = margo_iforward(h, &in, &req);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not forward RPC");
completion->request = req;
completion->handle = h;
completion->type = AIO_WRITE_COMPLETION;
completion->op.write_op = write_op;
ret = margo_create(mph->client->mid, svr_addr, mph->client->mobject_write_op_rpc_id, &h);
if(ret != HG_SUCCESS) {
fprintf(stderr, "[MOBJECT] margo_create() failed in mobject_aio_write_op_operate()\n");
return -1;
}
margo_set_target_id(h, mph->mplex_id);
margo_request mreq;
ret = margo_iforward(h, &in, &mreq);
if(ret != HG_SUCCESS) {
fprintf(stderr, "[MOBJECT] margo_iforward() failed in mobject_aio_write_op_operate()\n");
margo_destroy(h);
return -1;
}
mobject_request_t tmp_req = calloc(1, sizeof(*tmp_req));
tmp_req->type = MOBJECT_AIO_WRITE;
tmp_req->op.write_op = write_op;
tmp_req->request = mreq;
tmp_req->handle = h;
*req = tmp_req;
return 0;
}
int mobject_store_aio_read_op_operate(mobject_store_read_op_t read_op,
mobject_store_ioctx_t io,
mobject_store_completion_t completion,
int mobject_aio_read_op_operate(
mobject_provider_handle_t mph,
mobject_store_read_op_t read_op,
const char *pool_name,
const char *oid,
int flags)
int flags,
mobject_request_t* req)
{
hg_return_t ret;
read_op_in_t in;
in.object_name = oid;
in.pool_name = io->pool_name;
in.pool_name = pool_name;
in.read_op = read_op;
prepare_read_op(io->cluster->mid, read_op);
prepare_read_op(mph->client->mid, read_op);
hg_addr_t svr_addr = ssg_get_addr(io->cluster->gid, 0); // XXX pick other servers using ch-placement
MOBJECT_ASSERT(svr_addr != HG_ADDR_NULL, "NULL server address");
hg_addr_t svr_addr = mph->addr;
if(svr_addr == HG_ADDR_NULL) {
fprintf(stderr, "[MOBJECT] NULL provider address passed to mobject_aio_read_op_operate\n");
return -1;
}
hg_handle_t h;
ret = margo_create(io->cluster->mid, svr_addr, mobject_read_op_rpc_id, &h);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not create RPC handle");
margo_request req;
ret = margo_iforward(h, &in, &req);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not forward RPC");
completion->request = req;
completion->handle = h;
completion->type = AIO_READ_COMPLETION;
completion->op.read_op = read_op;
ret = margo_create(mph->client->mid, svr_addr, mph->client->mobject_read_op_rpc_id, &h);
if(ret != HG_SUCCESS) {
fprintf(stderr, "[MOBJECT] margo_forward() failed in mobject_write_op_operate()\n");
return -1;
}
margo_set_target_id(h, mph->mplex_id);
margo_request mreq;
ret = margo_iforward(h, &in, &mreq);
if(ret != HG_SUCCESS) {
fprintf(stderr, "[MOBJECT] margo_iforward() failed in mobject_aio_write_op_operate()\n");
margo_destroy(h);
return -1;
}
mobject_request_t tmp_req = calloc(1, sizeof(*tmp_req));
tmp_req->type = MOBJECT_AIO_READ;
tmp_req->op.read_op = read_op;
tmp_req->request = mreq;
tmp_req->handle = h;
*req = tmp_req;
return 0;
}
int mobject_aio_wait(mobject_request_t req, int* ret)
{
if(req == MOBJECT_REQUEST_NULL)
return -1;
int r = margo_wait(req->request);
if(r != HG_SUCCESS) {
return r;
}
req->request = MARGO_REQUEST_NULL;
switch(req->type) {
case MOBJECT_AIO_WRITE: {
write_op_out_t resp;
r = margo_get_output(req->handle, &resp);
if(r != HG_SUCCESS) {
*ret = r;
margo_destroy(req->handle);
free(req);
return r;
}
*ret = resp.ret;
r = margo_free_output(req->handle,&resp);
if(r != HG_SUCCESS) {
*ret = r;
}
margo_destroy(req->handle);
free(req);
return r;
} break;
case MOBJECT_AIO_READ: {
read_op_out_t resp;
r = margo_get_output(req->handle, &resp);
if(r != HG_SUCCESS) {
*ret = r;
margo_destroy(req->handle);
free(req);
return r;
}
feed_read_op_pointers_from_response(req->op.read_op, resp.responses);
r = margo_free_output(req->handle, &resp);
if(r != HG_SUCCESS) {
*ret = r;
}
margo_destroy(req->handle);
free(req);
return r;
} break;
}
}
int mobject_aio_test(mobject_request_t req, int* flag)
{
if(req == MOBJECT_REQUEST_NULL) return -1;
return margo_test(req->request, flag);
}
......@@ -19,15 +19,12 @@ int mobject_store_aio_create_completion(void *cb_arg,
{
int r;
mobject_store_completion_t completion =
(mobject_store_completion_t)calloc(1, sizeof(struct mobject_store_completion));
(mobject_store_completion_t)calloc(1, sizeof(*completion));
MOBJECT_ASSERT(completion != 0, "Could not allocate mobject_store_completion_t object");
completion->type = AIO_NULL_COMPLETION;
completion->op.read_op = NULL;
completion->request = MOBJECT_REQUEST_NULL;
completion->cb_complete = cb_complete;
completion->cb_safe = cb_safe;
completion->cb_arg = cb_arg;
completion->handle = HG_HANDLE_NULL;
completion->request = MARGO_REQUEST_NULL;
*pc = completion;
return 0;
}
......@@ -35,38 +32,15 @@ int mobject_store_aio_create_completion(void *cb_arg,
int mobject_store_aio_wait_for_complete(mobject_store_completion_t c)
{
if(c == MOBJECT_COMPLETION_NULL) {
MOBJECT_LOG("Warning: passing NULL to mobject_store_aio_wait_for_complete");
return -1;
}
MOBJECT_ASSERT(c->request != MARGO_REQUEST_NULL, "Invalid completion handle");
int ret = margo_wait(c->request);
// TODO check the return value of margo_wait
if(ret != HG_SUCCESS) {
MOBJECT_LOG("Warning: margo_wait returned something different from HG_SUCCESS");
}
int ret;
int r = mobject_aio_wait(c->request, &ret);
c->ret_value = ret;
c->request = MARGO_REQUEST_NULL;
switch(c->type) {
case AIO_WRITE_COMPLETION: {
write_op_out_t resp;
ret = margo_get_output(c->handle, &resp);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not get RPC output");
c->ret_value = resp.ret;
ret = margo_free_output(c->handle,&resp);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not free RPC output");
} break;
case AIO_READ_COMPLETION: {
read_op_out_t resp;
ret = margo_get_output(c->handle, &resp);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not get RPC output");
feed_read_op_pointers_from_response(c->op.read_op, resp.responses);
ret = margo_free_output(c->handle,&resp);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not free RPC output");
}
}
if(c->cb_safe)
(c->cb_safe)(c, c->cb_arg);
......@@ -79,12 +53,15 @@ int mobject_store_aio_wait_for_complete(mobject_store_completion_t c)
int mobject_store_aio_is_complete(mobject_store_completion_t c)
{
if(c == MOBJECT_COMPLETION_NULL) {
MOBJECT_LOG("Warning: passing NULL to mobject_store_aio_wait_for_complete");
return 1;
}
if(c->request == MOBJECT_REQUEST_NULL) {
return 1;
}
int flag;
margo_test(c->request, &flag);
mobject_aio_test(c->request, &flag);
return flag;
}
......@@ -103,7 +80,6 @@ void mobject_store_aio_release(mobject_store_completion_t c)
{
if(c == MOBJECT_COMPLETION_NULL) return;
MOBJECT_ASSERT(c->request == MARGO_REQUEST_NULL,
"Trying to release a completion handle before operation completed");
margo_destroy(c->handle);
"Trying to release a completion handle before operation completed (will lead to memory leaks)");
free(c);
}
......@@ -10,12 +10,6 @@
#include "mobject-store-config.h"
#include "libmobject-store.h"
typedef enum completion_op_type {
AIO_NULL_COMPLETION,
AIO_WRITE_COMPLETION,
AIO_READ_COMPLETION
} completion_op_type;
/**
* The mobject_store_completion object is used for asynchronous
* functions. It contains the callbacks to call when the data is
......@@ -26,16 +20,10 @@ typedef enum completion_op_type {
* in libmobject-store.h.
*/
struct mobject_store_completion {
completion_op_type type; // completion for write or for reads
union {
mobject_store_read_op_t read_op;
mobject_store_write_op_t write_op;
} op; // operation that initiated the completion
mobject_store_callback_t cb_complete; // completion callback
mobject_store_callback_t cb_safe; // safe callback
void* cb_arg; // arguments for callbacks
margo_request request; // margo request to wait on
hg_handle_t handle; // handle of the RPC sent for this operation
mobject_request_t request; // margo request to wait on
int ret_value; // return value of the operation
};
......
This diff is collapsed.
......@@ -9,6 +9,7 @@
#include <margo.h>
#include <ssg.h>
#include "libmobject-store.h"
#include "mobject-client.h"
#define MOBJECT_CLUSTER_FILE_ENV "MOBJECT_CLUSTER_FILE"
#define MOBJECT_CLUSTER_SHUTDOWN_KILL_ENV "MOBJECT_SHUTDOWN_KILL_SERVERS"
......@@ -16,8 +17,8 @@
struct mobject_store_handle
{
margo_instance_id mid;
mobject_client_t mobject_clt;
ssg_group_id_t gid;
const char* my_address;
int connected;
};
......
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __MOBJECT_CLIENT_IMPL_H
#define __MOBJECT_CLIENT_IMPL_H
#include "mobject-store-config.h"
#include <stdlib.h>
#include <margo.h>
#include <ssg.h>
#include "mobject-client.h"
struct mobject_client {
margo_instance_id mid;
char* client_addr;
hg_id_t mobject_write_op_rpc_id;
hg_id_t mobject_read_op_rpc_id;
hg_id_t mobject_shutdown_rpc_id;
uint64_t num_provider_handles;
};
struct mobject_provider_handle {
mobject_client_t client;
hg_addr_t addr;
uint8_t mplex_id;
uint64_t refcount;
};
typedef enum mobject_op_req_type {
MOBJECT_AIO_WRITE,
MOBJECT_AIO_READ
} mobject_op_req_type;
struct mobject_request {
mobject_op_req_type type; // type of operation that initiated the request
union {
mobject_store_read_op_t read_op;
mobject_store_write_op_t write_op;
} op; // operation that initiated the request
margo_request request; // margo request to wait on
hg_handle_t handle; // handle of the RPC sent for this operation
};
#endif
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include "mobject-store-config.h"
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <margo.h>
#include <ssg.h>
#include "mobject-client.h"
#include "src/io-chain/prepare-write-op.h"
#include "src/io-chain/prepare-read-op.h"
#include "src/rpc-types/write-op.h"
#include "src/rpc-types/read-op.h"
#include "src/util/log.h"
struct mobject_client {
margo_instance_id mid;
char* client_addr;
hg_id_t mobject_write_op_rpc_id;
hg_id_t mobject_read_op_rpc_id;
hg_id_t mobject_shutdown_rpc_id;
uint64_t num_provider_handles;
};
struct mobject_provider_handle {
mobject_client_t client;
hg_addr_t addr;
uint8_t mplex_id;
uint64_t refcount;
};
static int mobject_client_register(mobject_client_t client, margo_instance_id mid)
{
int ret;
hg_addr_t client_addr = HG_ADDR_NULL;
char client_addr_str[128];
hg_size_t client_addr_len = 128;
ret = margo_addr_self(mid, &client_addr);
if(ret != HG_SUCCESS) return -1;
ret = margo_addr_to_string(mid, client_addr_str, &client_addr_len, client_addr);
if(ret != HG_SUCCESS) {
margo_addr_free(mid, client_addr);
return -1;
}
margo_addr_free(mid, client_addr);
client->mid = mid;
client->client_addr = strdup(client_addr_str);
/* check if RPCs have already been registered */
hg_bool_t flag;
hg_id_t id;
margo_registered_name(mid, "mobject_write_op", &id, &flag);
if(flag == HG_TRUE) { /* RPCs already registered */
margo_registered_name(mid, "mobject_write_op", &client->mobject_write_op_rpc_id, &flag);
margo_registered_name(mid, "mobject_read_op", &client->mobject_read_op_rpc_id, &flag);
margo_registered_name(mid, "mobject_shutdown", &client->mobject_shutdown_rpc_id, &flag);
} else {
client->mobject_write_op_rpc_id =
MARGO_REGISTER(mid, "mobject_write_op", write_op_in_t, write_op_out_t, NULL);
client->mobject_read_op_rpc_id =
MARGO_REGISTER(mid, "mobject_read_op", read_op_in_t, read_op_out_t, NULL);
client->mobject_shutdown_rpc_id =
MARGO_REGISTER(mid, "mobject_shutdown", void, void, NULL);
}
return 0;
}
int mobject_client_init(margo_instance_id mid, mobject_client_t* client)
{
mobject_client_t c = (mobject_client_t)calloc(1, sizeof(*c));
if(!c) return -1;
c->num_provider_handles = 0;
int ret = mobject_client_register(c, mid);
if(ret != 0) return ret;
*client = c;
return 0;
}
int mobject_client_finalize(mobject_client_t client)
{
if(client->num_provider_handles != 0) {
fprintf(stderr,
"[MOBJECT] Warning: %d provider handles not released before mobject_client_finalize was called\n",
client->num_provider_handles);
}
free(client->client_addr);
free(client);
return 0;
}
int mobject_provider_handle_create(
mobject_client_t client,
hg_addr_t addr,
uint8_t mplex_id,
mobject_provider_handle_t* handle)
{
if(client == MOBJECT_CLIENT_NULL) return -1;
mobject_provider_handle_t provider =
(mobject_provider_handle_t)calloc(1, sizeof(*provider));
if(!provider) return -1;
hg_return_t ret = margo_addr_dup(client->mid, addr, &(provider->addr));
if(ret != HG_SUCCESS) {
free(provider);
return -1;
}
provider->client = client;
provider->mplex_id = mplex_id;
provider->refcount = 1;
client->num_provider_handles += 1;
*handle = provider;
return 0;
}
int mobject_provider_handle_ref_incr(mobject_provider_handle_t handle)
{
if(handle == MOBJECT_PROVIDER_HANDLE_NULL) return -1;
handle->refcount += 1;
return 0;
}
int mobject_provider_handle_release(mobject_provider_handle_t handle)
{
if(handle == MOBJECT_PROVIDER_HANDLE_NULL) return -1;
handle->refcount -= 1;
if(handle->refcount == 0) {
margo_addr_free(handle->client->mid, handle->addr);
handle->client->num_provider_handles -= 1;
free(handle);
}
return 0;
}
int mobject_shutdown(mobject_client_t client, hg_addr_t addr)
{
hg_return_t hret;
hg_handle_t handle;
hret = margo_create(client->mid, addr,
client->mobject_shutdown_rpc_id, &handle);
if(hret != HG_SUCCESS) {
fprintf(stderr, "[MOBJECT] margo_create() failed in mobject_shutdown()\n");
return -1;
}
hret = margo_forward(handle, NULL);
if(hret != HG_SUCCESS)
{
fprintf(stderr, "[MOBJECT] margo_forward() failed in mobject_shutdown()\n");
margo_destroy(handle);
return -1;
}
margo_destroy(handle);
return 0;
}
int mobject_write_op_operate(
mobject_provider_handle_t mph,
mobject_store_write_op_t write_op,
const char* pool_name,
const char *oid,
time_t *mtime,
int flags)
{
hg_return_t ret;
write_op_in_t in;
in.object_name = oid;
in.pool_name = pool_name;
in.write_op = write_op;
in.client_addr = mph->client->client_addr;
// TODO take mtime into account
prepare_write_op(mph->client->mid, write_op);
hg_addr_t svr_addr = mph->addr;
if(svr_addr == HG_ADDR_NULL) {
fprintf(stderr, "[MOBJECT] NULL provider address passed to mobject_write_op_operate\n");
return -1;
}
hg_handle_t h;
ret = margo_create(mph->client->mid, svr_addr, mph->client->mobject_write_op_rpc_id, &h);
if(ret != HG_SUCCESS) {
fprintf(stderr, "[MOBJECT] margo_create() failed in mobject_write_op_operate()\n");
return -1;
}
margo_set_target_id(h, mph->mplex_id);
ret = margo_forward(h, &in);
if(ret != HG_SUCCESS) {
fprintf(stderr, "[MOBJECT] margo_forward() failed in mobject_write_op_operate()\n");
margo_destroy(h);
return -1;
}
write_op_out_t resp;
ret = margo_get_output(h, &resp);
if(ret != HG_SUCCESS) {
fprintf(stderr, "[MOBJECT] margo_get_output() failed in mobject_write_op_operate()\n");
margo_destroy(h);
return -1;
}
margo_free_output(h,&resp);
margo_destroy(h);
return 0;
}