Commit 336b01ae authored by Shane Snyder's avatar Shane Snyder

reorg client cluster & io-context code

parent d1632836
...@@ -102,7 +102,7 @@ enum { ...@@ -102,7 +102,7 @@ enum {
* libmobject_store users must synchronize any of these changes on their own, * libmobject_store users must synchronize any of these changes on their own,
* or use separate io contexts for each thread * or use separate io contexts for each thread
*/ */
typedef struct mobject_ioctx *mobject_store_ioctx_t; typedef struct mobject_store_ioctx *mobject_store_ioctx_t;
/** /**
* @typedef mobject_store_omap_iter_t * @typedef mobject_store_omap_iter_t
......
noinst_HEADERS += \ noinst_HEADERS += \
src/client/cluster.h \
src/client/aio/completion.h \ src/client/aio/completion.h \
src/client/cluster-handle.h \
src/client/io-context.h \
src/io-chain/args-read-actions.h \ src/io-chain/args-read-actions.h \
src/io-chain/args-write-actions.h \ src/io-chain/args-write-actions.h \
src/io-chain/prepare-read-op.h \ src/io-chain/prepare-read-op.h \
...@@ -46,12 +45,11 @@ src_io_chain_libio_chain_la_SOURCES = src/io-chain/prepare-read-op.c \ ...@@ -46,12 +45,11 @@ src_io_chain_libio_chain_la_SOURCES = src/io-chain/prepare-read-op.c \
src_client_libmobject_store_la_SOURCES = \ src_client_libmobject_store_la_SOURCES = \
src/client/aio/completion.c \ src/client/cluster.c \
src/client/io-context.c \
src/client/omap-iter.c \
src/client/read-op.c \ src/client/read-op.c \
src/client/write-op.c \ src/client/write-op.c \
src/client/libmobject-store.c src/client/omap-iter.c \
src/client/aio/completion.c
src_client_libmobject_store_la_CPPFLAGS = ${AM_CPPFLAGS} ${CLIENT_CPPFLAGS} src_client_libmobject_store_la_CPPFLAGS = ${AM_CPPFLAGS} ${CLIENT_CPPFLAGS}
src_client_libmobject_store_la_LIBADD = src/omap-iter/libomap-iter.la \ src_client_libmobject_store_la_LIBADD = src/omap-iter/libomap-iter.la \
src/io-chain/libio-chain.la ${CLIENT_LIBS} src/io-chain/libio-chain.la ${CLIENT_LIBS}
......
...@@ -14,16 +14,12 @@ ...@@ -14,16 +14,12 @@
#include <ssg.h> #include <ssg.h>
#include "libmobject-store.h" #include "libmobject-store.h"
#include "src/util/log.h" #include "src/client/cluster.h"
#include "src/io-chain/prepare-write-op.h"
#include "src/io-chain/prepare-read-op.h"
#include "src/rpc-types/write-op.h" #include "src/rpc-types/write-op.h"
#include "src/rpc-types/read-op.h" #include "src/rpc-types/read-op.h"
#include "src/rpc-types/read-op.h" #include "src/util/log.h"
#include "src/client/io-context.h"
#include "src/io-chain/prepare-read-op.h"
#include "src/io-chain/prepare-write-op.h"
#include "src/client/cluster-handle.h"
#define MOBJECT_CLUSTER_FILE_ENV "MOBJECT_CLUSTER_FILE"
// global variables for RPC ids // global variables for RPC ids
...@@ -32,18 +28,18 @@ hg_id_t mobject_read_op_rpc_id; ...@@ -32,18 +28,18 @@ hg_id_t mobject_read_op_rpc_id;
hg_id_t mobject_shutdown_rpc_id; hg_id_t mobject_shutdown_rpc_id;
static void mobject_store_register(margo_instance_id mid); static void mobject_store_register(margo_instance_id mid);
static int mobject_store_shutdown_servers(mobject_store_handle_t *cluster_handle); static int mobject_store_shutdown_servers(struct mobject_store_handle *cluster_handle);
int mobject_store_create(mobject_store_t *cluster, const char * const id) int mobject_store_create(mobject_store_t *cluster, const char * const id)
{ {
mobject_store_handle_t *cluster_handle; struct mobject_store_handle *cluster_handle;
char *cluster_file; char *cluster_file;
int ret; int ret;
(void)id; /* XXX: id unused in mobject */ (void)id; /* XXX: id unused in mobject */
/* allocate a new cluster handle and set some fields */ /* allocate a new cluster handle and set some fields */
cluster_handle = (mobject_store_handle_t*)calloc(1,sizeof(*cluster_handle)); cluster_handle = (struct mobject_store_handle*)calloc(1,sizeof(*cluster_handle));
if (!cluster_handle) if (!cluster_handle)
return -1; return -1;
...@@ -75,12 +71,15 @@ int mobject_store_create(mobject_store_t *cluster, const char * const id) ...@@ -75,12 +71,15 @@ int mobject_store_create(mobject_store_t *cluster, const char * const id)
int mobject_store_connect(mobject_store_t cluster) int mobject_store_connect(mobject_store_t cluster)
{ {
mobject_store_handle_t *cluster_handle = (mobject_store_handle_t *)cluster; struct mobject_store_handle *cluster_handle;
char *svr_addr_str; char *svr_addr_str;
char proto[24] = {0}; char proto[24] = {0};
int i; int i;
int ret; int ret;
cluster_handle = (struct mobject_store_handle *)cluster;
assert(cluster_handle);
if (cluster_handle->connected) if (cluster_handle->connected)
return 0; return 0;
...@@ -95,7 +94,7 @@ int mobject_store_connect(mobject_store_t cluster) ...@@ -95,7 +94,7 @@ int mobject_store_connect(mobject_store_t cluster)
free(cluster_handle); free(cluster_handle);
return -1; return -1;
} }
/* we only need to the proto portion of the address to initialize */ /* we only need to get the proto portion of the address to initialize */
for(i=0; i<24 && svr_addr_str[i] != '\0' && svr_addr_str[i] != ':'; i++) for(i=0; i<24 && svr_addr_str[i] != '\0' && svr_addr_str[i] != ':'; i++)
proto[i] = svr_addr_str[i]; proto[i] = svr_addr_str[i];
...@@ -148,11 +147,11 @@ int mobject_store_connect(mobject_store_t cluster) ...@@ -148,11 +147,11 @@ int mobject_store_connect(mobject_store_t cluster)
void mobject_store_shutdown(mobject_store_t cluster) void mobject_store_shutdown(mobject_store_t cluster)
{ {
mobject_store_handle_t *cluster_handle = struct mobject_store_handle *cluster_handle;
(mobject_store_handle_t *)cluster;
char *svr_kill_env_str; char *svr_kill_env_str;
int ret; int ret;
cluster_handle = (struct mobject_store_handle *)cluster;
assert(cluster_handle != NULL); assert(cluster_handle != NULL);
if (!cluster_handle->connected) if (!cluster_handle->connected)
...@@ -179,11 +178,28 @@ void mobject_store_shutdown(mobject_store_t cluster) ...@@ -179,11 +178,28 @@ void mobject_store_shutdown(mobject_store_t cluster)
return; return;
} }
int mobject_store_ioctx_create(
mobject_store_t cluster,
const char * pool_name,
mobject_store_ioctx_t *ioctx)
{
*ioctx = (mobject_store_ioctx_t)calloc(1, sizeof(**ioctx));
(*ioctx)->pool_name = strdup(pool_name);
(*ioctx)->cluster = cluster;
return 0;
}
void mobject_store_ioctx_destroy(mobject_store_ioctx_t ioctx)
{
if(ioctx) free(ioctx->pool_name);
free(ioctx);
}
int mobject_store_write_op_operate(mobject_store_write_op_t write_op, int mobject_store_write_op_operate(mobject_store_write_op_t write_op,
mobject_store_ioctx_t io, mobject_store_ioctx_t io,
const char *oid, const char *oid,
time_t *mtime, time_t *mtime,
int flags) int flags)
{ {
hg_return_t ret; hg_return_t ret;
...@@ -206,10 +222,10 @@ int mobject_store_write_op_operate(mobject_store_write_op_t write_op, ...@@ -206,10 +222,10 @@ int mobject_store_write_op_operate(mobject_store_write_op_t write_op,
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not forward RPC"); MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not forward RPC");
write_op_out_t resp; write_op_out_t resp;
ret = margo_get_output(h, &resp); ret = margo_get_output(h, &resp);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not get RPC output"); MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not get RPC output");
ret = margo_free_output(h,&resp); ret = margo_free_output(h,&resp);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not free RPC output"); MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not free RPC output");
ret = margo_destroy(h); ret = margo_destroy(h);
...@@ -221,16 +237,16 @@ int mobject_store_read_op_operate(mobject_store_read_op_t read_op, ...@@ -221,16 +237,16 @@ int mobject_store_read_op_operate(mobject_store_read_op_t read_op,
mobject_store_ioctx_t ioctx, mobject_store_ioctx_t ioctx,
const char *oid, const char *oid,
int flags) int flags)
{ {
hg_return_t ret; hg_return_t ret;
read_op_in_t in; read_op_in_t in;
in.object_name = oid; in.object_name = oid;
in.pool_name = ioctx->pool_name; in.pool_name = ioctx->pool_name;
in.read_op = read_op; in.read_op = read_op;
prepare_read_op(ioctx->cluster->mid, read_op); prepare_read_op(ioctx->cluster->mid, read_op);
hg_addr_t svr_addr = ssg_get_addr(ioctx->cluster->gid, 0); // XXX pick other servers using ch-placement hg_addr_t svr_addr = ssg_get_addr(ioctx->cluster->gid, 0); // XXX pick other servers using ch-placement
MOBJECT_ASSERT(svr_addr != HG_ADDR_NULL, "NULL server address"); MOBJECT_ASSERT(svr_addr != HG_ADDR_NULL, "NULL server address");
...@@ -239,18 +255,18 @@ int mobject_store_read_op_operate(mobject_store_read_op_t read_op, ...@@ -239,18 +255,18 @@ int mobject_store_read_op_operate(mobject_store_read_op_t read_op,
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not create RPC handle"); MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not create RPC handle");
ret = margo_forward(h, &in); ret = margo_forward(h, &in);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not forward RPC"); MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not forward RPC");
read_op_out_t resp; read_op_out_t resp;
ret = margo_get_output(h, &resp); ret = margo_get_output(h, &resp);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not get RPC output"); MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not get RPC output");
feed_read_op_pointers_from_response(read_op, resp.responses); feed_read_op_pointers_from_response(read_op, resp.responses);
ret = margo_free_output(h,&resp); ret = margo_free_output(h,&resp);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not free RPC output"); MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not free RPC output");
ret = margo_destroy(h); ret = margo_destroy(h);
MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not destroy RPC handle"); MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not destroy RPC handle");
return 0; return 0;
} }
...@@ -269,7 +285,7 @@ static void mobject_store_register(margo_instance_id mid) ...@@ -269,7 +285,7 @@ static void mobject_store_register(margo_instance_id mid)
} }
// send a shutdown signal to a server cluster // send a shutdown signal to a server cluster
static int mobject_store_shutdown_servers(mobject_store_handle_t *cluster_handle) static int mobject_store_shutdown_servers(struct mobject_store_handle *cluster_handle)
{ {
hg_addr_t svr_addr; hg_addr_t svr_addr;
hg_handle_t h; hg_handle_t h;
......
...@@ -3,17 +3,26 @@ ...@@ -3,17 +3,26 @@
* *
* See COPYRIGHT in top-level directory. * See COPYRIGHT in top-level directory.
*/ */
#ifndef __MOBJECT_STORE_HANDLE_H #ifndef __CLUSTER_H
#define __MOBJECT_STORE_HANDLE_H #define __CLUSTER_H
#include <margo.h> #include <margo.h>
#include <ssg.h> #include <ssg.h>
#include "libmobject-store.h"
typedef struct mobject_store_handle #define MOBJECT_CLUSTER_FILE_ENV "MOBJECT_CLUSTER_FILE"
struct mobject_store_handle
{ {
margo_instance_id mid; margo_instance_id mid;
ssg_group_id_t gid; ssg_group_id_t gid;
int connected; int connected;
} mobject_store_handle_t; };
struct mobject_store_ioctx
{
mobject_store_t cluster;
char* pool_name;
};
#endif #endif
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include "libmobject-store.h"
#include "src/client/io-context.h"
#include "src/client/cluster-handle.h"
int mobject_store_ioctx_create(
mobject_store_t cluster,
const char * pool_name,
mobject_store_ioctx_t *ioctx)
{
(void)pool_name; /* XXX pool is ignored for now and instead use one global "pool" */
*ioctx = (mobject_store_ioctx_t)calloc(1, sizeof(**ioctx));
(*ioctx)->pool_name = strdup(pool_name);
(*ioctx)->cluster = cluster;
return 0;
}
void mobject_store_ioctx_destroy(mobject_store_ioctx_t ioctx)
{
if(ioctx) free(ioctx->pool_name);
free(ioctx);
}
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __MOBJECT_IOCTX_H
#define __MOBJECT_IOCTX_H
#include <margo.h>
#include "mobject-store-config.h"
#include "libmobject-store.h"
typedef struct mobject_ioctx {
mobject_store_t cluster;
char* pool_name;
}* mobject_store_ioctx_t;
#endif
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment