Commit 867f543d authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

refactoring complete

parent b057e75e
......@@ -17,7 +17,11 @@ test_bench_client_LDADD = ${LIBS} -lkvclient
lib_LTLIBRARIES = lib/libkvclient.la \
lib/libkvserver.la \
lib/libkvgroupclient.la \
lib/libkvgroupserver.la
lib/libkvgroupserver.la \
lib/libsdskv-client.la \
lib/libsdskv-server.la
lib_libsdskv_client_la_SOURCES = src/sdskv-client.c
lib_libkvclient_la_SOURCES = src/kv-client.c
......@@ -26,17 +30,27 @@ lib_libkvgroupclient_la_SOURCES = src/kvgroup-client.cc
lib_libkvserver_la_SOURCES = src/kv-server.cc \
src/datastore/datastore.cc
lib_libsdskv_server_la_SOURCES = src/sdskv-server.cc \
src/datastore/datastore.cc
if BUILD_BWTREE
lib_libkvserver_la_SOURCES += src/BwTree/src/bwtree.cpp \
src/datastore/bwtree_datastore.cc
lib_libsdskv_server_la_SOURCES += src/BwTree/src/bwtree.cpp \
src/datastore/bwtree_datastore.cc
endif
if BUILD_BDB
lib_libkvserver_la_SOURCES += src/datastore/berkeleydb_datastore.cc
lib_libsdskv_server_la_SOURCES += src/datastore/berkeleydb_datastore.cc
endif
if BUILD_LEVELDB
lib_libkvserver_la_SOURCES += src/datastore/leveldb_datastore.cc
lib_libsdskv_server_la_SOURCES += src/datastore/leveldb_datastore.cc
endif
......
......@@ -5,7 +5,6 @@
#include "kv-config.h"
#include "bulk.h"
#include "sds-keyval.h"
#include <boost/functional/hash.hpp>
#include <vector>
......
#ifndef datastore_factory_h
#define datastore_factory_h
#include "datastore.h"
#ifdef SDSKV
#include "sdskv-common.h"
#else
#include "sds-keyval.h"
#endif
#include "datastore.h"
#ifdef USE_BWTREE
#include "bwtree_datastore.h"
......@@ -44,7 +48,11 @@ class datastore_factory {
public:
#ifdef SDSKV
static AbstractDataStore* create_datastore(sdskv_db_type_t type)
#else
static AbstractDataStore* create_datastore(kv_db_type_t type)
#endif
{
switch(type) {
case KVDB_BWTREE:
......
......@@ -536,7 +536,7 @@ static hg_return_t bench_handler(hg_handle_t handle)
}
DEFINE_MARGO_RPC_HANDLER(bench_handler)
kv_context_t *kv_server_register(const margo_instance_id mid)
extern "C" kv_context_t *kv_server_register(const margo_instance_id mid)
{
hg_return_t ret;
hg_addr_t addr_self;
......@@ -601,13 +601,13 @@ kv_context_t *kv_server_register(const margo_instance_id mid)
return context;
}
hg_return_t kv_server_wait_for_shutdown(kv_context_t *context) {
extern "C" hg_return_t kv_server_wait_for_shutdown(kv_context_t *context) {
margo_wait_for_finalize(context->mid);
return HG_SUCCESS;
}
/* this is the same as client. should be moved to common utility library */
hg_return_t kv_server_deregister(kv_context_t *context) {
extern "C" hg_return_t kv_server_deregister(kv_context_t *context) {
free(context);
delete datastore;
std::cout << "SERVER: cleanup complete, deregistered" << std::endl;
......
......@@ -11,7 +11,7 @@
unsigned long server_indexes[CH_MAX_REPLICATION];
kv_group_t *kvgroup_client_register(margo_instance_id mid, ssg_group_id_t gid)
extern "C" kv_group_t *kvgroup_client_register(margo_instance_id mid, ssg_group_id_t gid)
{
kv_group_t *group = (kv_group_t*)calloc(1, sizeof(group));
......@@ -28,7 +28,7 @@ kv_group_t *kvgroup_client_register(margo_instance_id mid, ssg_group_id_t gid)
return group;
}
hg_return_t kvgroup_open(kv_group_t *group, const char *db_name, kv_db_type_t db_type)
extern "C" hg_return_t kvgroup_open(kv_group_t *group, const char *db_name, kv_db_type_t db_type)
{
hg_size_t addr_str_sz = 128;
char addr_str[addr_str_sz];
......@@ -74,7 +74,7 @@ hg_return_t kvgroup_open(kv_group_t *group, const char *db_name, kv_db_type_t db
// oid is unique associated with key
// in ParSplice key is already a uint64_t hashed value
hg_return_t kvgroup_put(kv_group_t * group, uint64_t oid,
extern "C" hg_return_t kvgroup_put(kv_group_t * group, uint64_t oid,
void *key, hg_size_t ksize,
void *value, hg_size_t vsize)
{
......@@ -89,7 +89,7 @@ hg_return_t kvgroup_put(kv_group_t * group, uint64_t oid,
// oid is unique associated with key
// in ParSplice key is already a uint64_t hashed value
// vsize is in/out
hg_return_t kvgroup_get(kv_group_t *group, uint64_t oid,
extern "C" hg_return_t kvgroup_get(kv_group_t *group, uint64_t oid,
void *key, hg_size_t ksize,
void *value, hg_size_t *vsize)
{
......@@ -100,7 +100,7 @@ hg_return_t kvgroup_get(kv_group_t *group, uint64_t oid,
return kv_get(group->db[server_indexes[0]], key, ksize, value, vsize);
}
hg_return_t kvgroup_close(kv_group_t * group)
extern "C" hg_return_t kvgroup_close(kv_group_t * group)
{
hg_return_t ret;
for (hg_size_t i=0; i<group->gsize; i++) {
......@@ -110,7 +110,7 @@ hg_return_t kvgroup_close(kv_group_t * group)
return HG_SUCCESS;
}
hg_return_t kvgroup_client_deregister(kv_group_t * group)
extern "C" hg_return_t kvgroup_client_deregister(kv_group_t * group)
{
hg_return_t ret;
ret = kv_client_deregister(group->kv_context);
......@@ -127,7 +127,7 @@ hg_return_t kvgroup_client_deregister(kv_group_t * group)
}
// only one client calls shutdown
hg_return_t kvgroup_client_signal_shutdown(kv_group_t *group)
extern "C" hg_return_t kvgroup_client_signal_shutdown(kv_group_t *group)
{
hg_return_t ret;
for (hg_size_t i=0; i<group->gsize; i++) {
......@@ -140,7 +140,7 @@ hg_return_t kvgroup_client_signal_shutdown(kv_group_t *group)
// collective along with kvgroup_server_send_gid
// single server rank calls send, all client ranks call recv
// gid is an output argument
void kvgroup_client_recv_gid(ssg_group_id_t *gid, MPI_Comm comm)
extern "C" void kvgroup_client_recv_gid(ssg_group_id_t *gid, MPI_Comm comm)
{
char *serialized_gid = NULL;
size_t gid_size = 0;
......
......@@ -19,7 +19,7 @@ static void group_update_cb(ssg_membership_update_t update, void *cb_dat)
}
/* this is a collective operation */
kv_group_t *kvgroup_server_register(margo_instance_id mid,
extern "C" kv_group_t *kvgroup_server_register(margo_instance_id mid,
const char *ssg_name, MPI_Comm ssg_comm)
{
kv_group_t * group = (kv_group_t *)calloc(1, sizeof(kv_group_t));
......@@ -45,7 +45,7 @@ kv_group_t *kvgroup_server_register(margo_instance_id mid,
return group;
}
hg_return_t kvgroup_server_deregister(kv_group_t *group)
extern "C" hg_return_t kvgroup_server_deregister(kv_group_t *group)
{
hg_return_t ret = kv_server_deregister(group->kv_context);
ssg_group_destroy(group->gid);
......@@ -55,7 +55,7 @@ hg_return_t kvgroup_server_deregister(kv_group_t *group)
return ret;
}
hg_return_t kvgroup_server_wait_for_shutdown(kv_group_t *group)
extern "C" hg_return_t kvgroup_server_wait_for_shutdown(kv_group_t *group)
{
hg_return_t ret = kv_server_wait_for_shutdown(group->kv_context);
return ret;
......@@ -64,7 +64,7 @@ hg_return_t kvgroup_server_wait_for_shutdown(kv_group_t *group)
// collective along with kvgroup_client_recv_gid
// single server rank calls send, all client ranks call recv
// gid is an input argument
void kvgroup_server_send_gid(ssg_group_id_t gid, MPI_Comm comm)
extern "C" void kvgroup_server_send_gid(ssg_group_id_t gid, MPI_Comm comm)
{
char *serialized_gid = NULL;
size_t gid_size = 0;
......
......@@ -54,7 +54,7 @@ hg_return_t kvgroup_server_deregister(kv_group_t *group);
hg_return_t kvgroup_server_wait_for_shutdown(kv_group_t *group);
kv_group_t *kvgroup_client_register(margo_instance_id mid, ssg_group_id_t gid);
hg_return_t kvgroup_open(kv_group_t *group, const char *db_name);
hg_return_t kvgroup_open(kv_group_t *group, const char *db_name, kv_db_type_t db_type);
hg_return_t kvgroup_put(kv_group_t *group, uint64_t oid,
void *key, hg_size_t ksize,
void *value, hg_size_t vsize);
......
#include "sdskv-client.h"
#include "sdskv-rpc-types.h"
#define MAX_RPC_MESSAGE_SIZE 4000 // in bytes
struct sdskv_client {
margo_instance_id mid;
hg_id_t sdskv_put_id;
hg_id_t sdskv_bulk_put_id;
hg_id_t sdskv_get_id;
hg_id_t sdskv_length_id;
hg_id_t sdskv_bulk_get_id;
hg_id_t sdskv_open_id;
hg_id_t sdskv_list_id;
uint64_t num_provider_handles;
};
struct sdskv_provider_handle {
sdskv_client_t client;
hg_addr_t addr;
uint8_t mplex_id;
uint64_t refcount;
};
static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
{
client->mid = mid;
/* check if RPCs have already been registered */
hg_bool_t flag;
hg_id_t id;
margo_registered_name(mid, "sdskv_put_rpc", &id, &flag);
if(flag == HG_TRUE) { /* RPCs already registered */
margo_registered_name(mid, "sdskv_put_rpc", &client->sdskv_put_id, &flag);
margo_registered_name(mid, "sdskv_bulk_put_rpc", &client->sdskv_bulk_put_id, &flag);
margo_registered_name(mid, "sdskv_get_rpc", &client->sdskv_get_id, &flag);
margo_registered_name(mid, "sdskv_length_rpc", &client->sdskv_length_id, &flag);
margo_registered_name(mid, "sdskv_bulk_get_rpc", &client->sdskv_bulk_get_id, &flag);
margo_registered_name(mid, "sdskv_open_rpc", &client->sdskv_open_id, &flag);
margo_registered_name(mid, "sdskv_list_rpc", &client->sdskv_list_id, &flag);
} else {
client->sdskv_put_id =
MARGO_REGISTER(mid, "sdskv_put_rpc", put_in_t, put_out_t, NULL);
client->sdskv_bulk_put_id =
MARGO_REGISTER(mid, "sdskv_bulk_put_rpc", bulk_put_in_t, bulk_put_out_t, NULL);
client->sdskv_get_id =
MARGO_REGISTER(mid, "sdskv_get_rpc", get_in_t, get_out_t, NULL);
client->sdskv_length_id =
MARGO_REGISTER(mid, "sdskv_length_rpc", length_in_t, length_out_t, NULL);
client->sdskv_bulk_get_id =
MARGO_REGISTER(mid, "sdskv_bulk_get_rpc", bulk_get_in_t, bulk_get_out_t, NULL);
client->sdskv_open_id =
MARGO_REGISTER(mid, "sdskv_open_rpc", open_in_t, open_out_t, NULL);
client->sdskv_list_id =
MARGO_REGISTER(mid, "sdskv_list_rpc", list_in_t, list_out_t, NULL);
}
return 0;
}
int sdskv_client_init(margo_instance_id mid, sdskv_client_t* client)
{
sdskv_client_t c = (sdskv_client_t)calloc(1, sizeof(*c));
if(!c) return -1;
c->num_provider_handles = 0;
int ret = sdskv_client_register(c, mid);
if(ret != 0) return ret;
*client = c;
return 0;
}
int sdskv_client_finalize(sdskv_client_t client)
{
if(client->num_provider_handles != 0) {
fprintf(stderr,
"[SDSKV] Warning: %d provider handles not released before sdskv_client_finalize was called\n",
client->num_provider_handles);
}
free(client);
return 0;
}
int sdskv_provider_handle_create(
sdskv_client_t client,
hg_addr_t addr,
uint8_t mplex_id,
sdskv_provider_handle_t* handle)
{
if(client == SDSKV_CLIENT_NULL) return -1;
sdskv_provider_handle_t provider =
(sdskv_provider_handle_t)calloc(1, sizeof(*provider));
if(!provider) return -1;
hg_return_t ret = margo_addr_dup(client->mid, addr, &(provider->addr));
if(ret != HG_SUCCESS) {
free(provider);
return -1;
}
provider->client = client;
provider->mplex_id = mplex_id;
provider->refcount = 1;
client->num_provider_handles += 1;
*handle = provider;
return 0;
}
int sdskv_provider_handle_ref_incr(
sdskv_provider_handle_t handle)
{
if(handle == SDSKV_PROVIDER_HANDLE_NULL) return -1;
handle->refcount += 1;
return 0;
}
int sdskv_provider_release(sdskv_provider_handle_t handle)
{
if(handle == SDSKV_PROVIDER_HANDLE_NULL) return -1;
handle->refcount -= 1;
if(handle->refcount == 0) {
margo_addr_free(handle->client->mid, handle->addr);
handle->client->num_provider_handles -= 1;
free(handle);
}
return 0;
}
int sdskv_open(
sdskv_provider_handle_t provider,
const char* db_name,
sdskv_database_id_t* db_id)
{
hg_return_t hret;
int ret;
open_in_t in;
open_out_t out;
hg_handle_t handle;
/* create handle */
hret = margo_create(
provider->client->mid,
provider->addr,
provider->client->sdskv_open_id,
&handle);
if(hret != HG_SUCCESS) return -1;
hret = margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
}
hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
}
ret = out.ret;
*db_id = out.db_id;
margo_free_output(handle, &out);
margo_destroy(handle);
return ret;
}
int sdskv_put(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id,
const void *key, hg_size_t ksize,
const void *value, hg_size_t vsize)
{
hg_return_t hret;
int ret;
hg_handle_t handle;
hg_size_t msize = ksize + vsize + 2*sizeof(hg_size_t);
if(msize <= MAX_RPC_MESSAGE_SIZE) {
put_in_t in;
put_out_t out;
in.db_id = db_id;
in.key = (kv_data_t)key;
in.ksize = ksize;
in.value = (kv_data_t)value;
in.vsize = vsize;
/* create handle */
hret = margo_create(
provider->client->mid,
provider->addr,
provider->client->sdskv_put_id,
&handle);
if(hret != HG_SUCCESS) return -1;
hret = margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
}
hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
}
ret = out.ret;
margo_free_output(handle, &out);
} else {
bulk_put_in_t in;
bulk_put_out_t out;
in.bulk.db_id = db_id;
in.bulk.key = (kv_data_t)key;
in.bulk.ksize = ksize;
in.bulk.vsize = vsize;
hret = margo_bulk_create(provider->client->mid, 1, (void**)(&value), &in.bulk.vsize,
HG_BULK_READ_ONLY, &in.bulk.handle);
if(ret != HG_SUCCESS) return -1;
/* create handle */
hret = margo_create(
provider->client->mid,
provider->addr,
provider->client->sdskv_bulk_put_id,
&handle);
if(hret != HG_SUCCESS) {
margo_bulk_free(in.bulk.handle);
return -1;
}
hret = margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS) {
margo_bulk_free(in.bulk.handle);
margo_destroy(handle);
return -1;
}
hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS) {
margo_bulk_free(in.bulk.handle);
margo_destroy(handle);
return -1;
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
margo_bulk_free(in.bulk.handle);
margo_destroy(handle);
return -1;
}
ret = out.ret;
margo_free_output(handle, &out);
margo_bulk_free(in.bulk.handle);
}
margo_destroy(handle);
return ret;
}
int sdskv_get(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id,
const void *key, hg_size_t ksize,
void *value, hg_size_t* vsize)
{
hg_return_t hret;
hg_size_t size;
hg_size_t msize;
int ret;
hg_handle_t handle;
size = *(hg_size_t*)vsize;
msize = size + sizeof(hg_size_t) + sizeof(hg_return_t);
if (msize <= MAX_RPC_MESSAGE_SIZE) {
get_in_t in;
get_out_t out;
in.db_id = db_id;
in.key = (kv_data_t)key;
in.ksize = ksize;
in.vsize = size;
/* create handle */
hret = margo_create(
provider->client->mid,
provider->addr,
provider->client->sdskv_get_id,
&handle);
if(hret != HG_SUCCESS) return -1;
hret = margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
}
hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
}
ret = out.ret;
*vsize = (hg_size_t)out.vsize;
if (out.vsize > 0) {
memcpy(value, out.value, out.vsize);
}
margo_free_output(handle, &out);
} else {
bulk_get_in_t in;
bulk_get_out_t out;
in.bulk.db_id = db_id;
in.bulk.key = (kv_data_t)key;
in.bulk.ksize = ksize;
in.bulk.vsize = size;
hret = margo_bulk_create(provider->client->mid, 1, &value, &in.bulk.vsize,
HG_BULK_WRITE_ONLY, &in.bulk.handle);
if(hret != HG_SUCCESS) return -1;
/* create handle */
hret = margo_create(
provider->client->mid,
provider->addr,
provider->client->sdskv_bulk_get_id,
&handle);
if(hret != HG_SUCCESS) {
margo_bulk_free(in.bulk.handle);
return -1;
}
hret = margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS) {
margo_bulk_free(in.bulk.handle);
margo_destroy(handle);
return -1;
}
hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS) {
margo_bulk_free(in.bulk.handle);
margo_destroy(handle);
return -1;
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
margo_bulk_free(in.bulk.handle);
margo_destroy(handle);
return -1;
}
ret = out.ret;
*vsize = (hg_size_t)out.size;
margo_free_output(handle, &out);
margo_bulk_free(in.bulk.handle);
}