Commit 79afcb1c authored by David Rich's avatar David Rich
Browse files

Merge branch 'master' of xgitlab.cels.anl.gov:sds/sds-keyval

parents 2983e6b5 2b4322d6
ACLOCAL_AMFLAGS="-Im4"
SERVER_LIBS=@SERVER_LIBS@
CLIENT_LiBS=@CLIENT_LIBS@
AM_CPPFLAGS = -I${srcdir}/src
bin_PROGRAMS = test/bench-client \
test/test-client \
test/test-server
test_bench_client_SOURCES = test/bench-client.cc
test_bench_client_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src
test_bench_client_DEPENDENCIES = lib/libkvclient.la
test_bench_client_LDFLAGS = -Llib -lkvclient
test_bench_client_LDADD = ${LIBS} -lkvclient
......@@ -17,11 +22,14 @@ lib_LTLIBRARIES = lib/libkvclient.la \
lib_libkvclient_la_SOURCES = src/kv-client.c
lib_libkvgroupclient_la_SOURCES = src/kvgroup-client.cc
lib_libkvgroupclient_la_CPPFLAGS = ${CPPFLAGS}
lib_libkvserver_la_SOURCES = src/kv-server.cc \
src/datastore.cc \
src/BwTree/src/bwtree.cpp
src/datastore.cc
if BUILD_BWTREE
lib_libkvserver_la_SOURCES += src/BwTree/src/bwtree.cpp
endif
include_HEADERS = src/sds-keyval.h \
src/sds-keyval-group.h
......@@ -34,7 +42,6 @@ noinst_HEADERS = src/datastore.h \
lib_libkvgroupserver_la_SOURCES = src/kvgroup-server.cc
lib_libkvgroupserver_la_CPPFLAGS = ${CPPFLAGS}
check_PROGRAMS = test/test-client \
......@@ -51,27 +58,22 @@ TESTS = test/test-client \
test/test-mpi-group
test_test_client_SOURCES = test/test-client.cc
test_test_client_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src
test_test_client_DEPENDENCIES = lib/libkvclient.la
test_test_client_LDFLAGS = -Llib -lkvclient
test_test_server_SOURCES = test/test-server.cc
test_test_server_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src
test_test_server_DEPENDENCIES = lib/libkvserver.la
test_test_server_LDFLAGS = -Llib -lkvserver -lssg -lboost_filesystem -lboost_system
test_test_server_LDFLAGS = -Llib -lkvserver ${SERVER_LIBS}
test_test_mpi_SOURCES = test/test-mpi.cc
test_test_mpi_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src
test_test_mpi_DEPENDENCIES = lib/libkvserver.la lib/libkvclient.la
test_test_mpi_LDFLAGS = -Llib -lkvclient -lkvserver -lssg -lboost_filesystem -lboost_system
test_test_mpi_LDFLAGS = -Llib -lkvclient -lkvserver ${SERVER_LIBS}
test_test_mpi_group_SOURCES = test/test-mpi-group.cc
test_test_mpi_group_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src
test_test_mpi_group_DEPENDENCIES = lib/libkvgroupserver.la lib/libkvgroupclient.la lib/libkvserver.la lib/libkvclient.la
test_test_mpi_group_LDFLAGS = -Llib -lkvgroupserver -lkvgroupclient -lkvclient -lkvserver -lssg -lch-placement -lboost_filesystem -lboost_system
test_test_mpi_group_LDFLAGS = -Llib -lkvgroupserver -lkvgroupclient -lkvclient -lkvserver ${SERVER_LIBS} ${GROUP_LIBS}
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = maint/kv-server.pc \
......
......@@ -3,7 +3,7 @@
AC_PREREQ([2.69])
AC_INIT([sds-keyval], [0.1], [robl@mcs.anl.gov])
AM_INIT_AUTOMAKE([1.14.1 -Wall -Werror foreign subdir-objects silent-rules])
AM_INIT_AUTOMAKE([1.13.4 -Wall -Werror foreign subdir-objects silent-rules])
AM_SILENT_RULES([yes])
AC_CONFIG_MACRO_DIR([m4])
AC_CONFIG_SRCDIR([src/sds-keyval.h])
......@@ -45,6 +45,20 @@ LIBS="$MARGO_LIBS $LIBS"
CPPFLAGS="$MARGO_CFLAGS $CPPFLAGS"
CFLAGS="$MARGO_CFLAGS $CFLAGS"
PKG_CHECK_MODULES([SSG], [ssg], [],
AC_MSG_ERROR([Could not find ssg]) )
CFLAGS="$SSG_CFLAGS $CFLAGS"
SERVER_LIBS="$SSG_LIBS $SERVER_LIBS"
PKG_CHECK_MODULES([CHPLACEMENT], [ch-placement], [],
AC_MSG_ERROR([Could not find ch-placement]) )
CFLAGS="$CHPLACEMENT_CFLAGS $CFLAGS"
SERVER_LIBS="$CHPLAEMENT_LIBS $SERVER_LIBS"
GROUP_LIBS="$CHPLACEMENT_LIBS $CLIENT_LIBS"
# todo: proper configure checks for boost
SERVER_LIBS="$SERVER_LIBS -lboost_filesystem -lboost_system"
# we have three possible backends for our datastore. If none are selected,
# then nothing will initialize the class and well that doesn't make any sense
# anyway. Set BwTree as a default option as it's self-contained to the tree.
......@@ -80,7 +94,7 @@ if test "x${berkelydb_backend}" == xyes ; then
AC_DEFINE([USE_BDB], 1, [Use Berkely DB backend])
bwtree_backend=no
leveldb_backend=no
LIBS="${LIBS} -ldb_cxx -ldb_stl"
SERVER_LIBS="${SERVER_LIBS} -ldb_cxx -ldb_stl"
AC_LANG_POP
fi
......@@ -90,7 +104,7 @@ if test "x${leveldb_backend}" == xyes ; then
AC_CHECK_HEADERS([leveldb/c.h], ,
AC_ERROR("Could not find leveldb headers"))
AC_DEFINE([USE_LEVELDB], 1, [use leveldb backend])
LIBS="${LIBS} -lleveldb"
SERVER_LIBS="${SERVER_LIBS} -lleveldb"
berkelydb_backend=no
bwtree_backend=no
fi
......@@ -103,7 +117,11 @@ if test "x${bwtree_backend}" == xyes ; then
berkeleydb_backend=no
leveldb_backend=no
fi
AM_CONDITIONAL([BUILD_BWTREE], [test "x${bwtree_backend}" == xyes])
AC_SUBST(SERVER_LIBS)
AC_SUBST(GROUP_LIBS)
AC_CONFIG_FILES([Makefile maint/kv-client.pc maint/kv-server.pc])
AC_OUTPUT
prefix=@prefix@
exec_prefix=@exec_prefix@
libdir=@libdir@
includedir=@includedir@
Name: kv-group
Description: services-based keyval, group logic
Version: 0.1
URL: https://xgitlab.cels.anl.gov/sds/sds-keyval
Requires: margo
Libs: -L${libdir} @GROUP_LIBS@
Cflags: -I${includedir}
......@@ -8,5 +8,5 @@ Description: services-based keyval server
Version: 0.1
URL: https://xgitlab.cels.anl.gov/sds/sds-keyval
Requires: margo
Libs: -L${libdir} -lkvserver
Libs: -L${libdir} -lkvserver @SERVER_LIBS@
Cflags: -I${includedir}
......@@ -23,10 +23,10 @@ extern "C" {
typedef int kv_id;
/* 'Context' describes operations available to keyval clients */
/* do we need one for server, one for client? */
typedef struct kv_context_s {
margo_instance_id mid;
hg_addr_t svr_addr;
hg_id_t put_id;
hg_id_t bulk_put_id;
hg_id_t get_id;
......@@ -35,13 +35,23 @@ typedef struct kv_context_s {
hg_id_t close_id;
hg_id_t bench_id;
hg_id_t shutdown_id;
hg_handle_t put_handle;
kv_id kv;
} kv_context_t;
/* 'Database' contains server-specific information: the instantiation of a
* particular keyval service; the handles used to send information back and
* forth */
typedef struct kv_database_s {
margo_instance_id mid; /* bulk xfer needs to create bulk handles */
hg_addr_t svr_addr;
hg_handle_t close_handle;
hg_handle_t put_handle;
hg_handle_t bulk_put_handle;
hg_handle_t get_handle;
hg_handle_t bulk_get_handle;
hg_handle_t shutdown_handle;
kv_id kv;
} kv_context_t;
hg_handle_t bench_handle;
} kv_database_t;
#define MAX_RPC_MESSAGE_SIZE 4000 // in bytes
......
......@@ -41,17 +41,22 @@ kv_context_t *kv_client_register(const margo_instance_id mid) {
return context;
}
hg_return_t kv_open(kv_context_t *context, const char *server_addr, const char *db_name) {
kv_database_t * kv_open(kv_context_t *context,
const char *server_addr, const char *db_name)
{
hg_return_t ret = HG_SUCCESS;
hg_handle_t handle;
open_in_t open_in;
open_out_t open_out;
kv_database_t *db = calloc(1, sizeof(*db));
db->mid = context->mid;
printf("kv-client: kv_open, server_addr %s\n", server_addr);
ret = margo_addr_lookup(context->mid, server_addr, &(context->svr_addr));
ret = margo_addr_lookup(context->mid, server_addr, &(db->svr_addr));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
ret = margo_create(context->mid, db->svr_addr,
context->open_id, &handle);
assert(ret == HG_SUCCESS);
......@@ -70,31 +75,39 @@ hg_return_t kv_open(kv_context_t *context, const char *server_addr, const char *
* BAKE has a handle-caching mechanism we should consult.
* should margo be caching handles? */
ret = margo_create(context->mid, context->svr_addr,
context->put_id, &(context->put_handle));
ret = margo_create(context->mid, db->svr_addr,
context->put_id, &(db->put_handle));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, db->svr_addr,
context->bulk_put_id, &(db->bulk_put_handle));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
context->bulk_put_id, &(context->bulk_put_handle));
ret = margo_create(context->mid, db->svr_addr,
context->get_id, &(db->get_handle));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
context->get_id, &(context->get_handle));
ret = margo_create(context->mid, db->svr_addr,
context->bulk_get_id, &(db->bulk_get_handle));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
context->bulk_get_id, &(context->bulk_get_handle));
ret = margo_create(context->mid, db->svr_addr,
context->shutdown_id, &(db->shutdown_handle));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
context->shutdown_id, &(context->shutdown_handle));
ret = margo_create(context->mid, db->svr_addr,
context->close_id, &(db->close_handle) );
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, db->svr_addr,
context->bench_id, &(db->bench_handle));
assert(ret == HG_SUCCESS);
margo_free_output(handle, &open_out);
margo_destroy(handle);
return HG_SUCCESS;
return db;
}
/* we gave types in the open call. Will need to maintain in 'context' the
* size. */
hg_return_t kv_put(kv_context_t *context,
hg_return_t kv_put(kv_database_t *db,
void *key, hg_size_t ksize,
void *value, hg_size_t vsize) {
hg_return_t ret;
......@@ -119,16 +132,16 @@ hg_return_t kv_put(kv_context_t *context,
pin.pi.vsize = vsize;
st2 = ABT_get_wtime();
ret = margo_forward(context->put_handle, &pin);
ret = margo_forward(db->put_handle, &pin);
et2 = ABT_get_wtime();
printf("kv_put forward time: %f microseconds, vsize = %lu\n", (et2-st2)*1000000, vsize);
assert(ret == HG_SUCCESS);
ret = margo_get_output(context->put_handle, &pout);
ret = margo_get_output(db->put_handle, &pout);
assert(ret == HG_SUCCESS);
ret = pout.ret;
margo_free_output(context->put_handle, &pout);
margo_free_output(db->put_handle, &pout);
}
else {
// use bulk transfer method to move value
......@@ -146,23 +159,23 @@ hg_return_t kv_put(kv_context_t *context,
bpin.bulk.vsize = vsize;
st2 = ABT_get_wtime();
ret = margo_bulk_create(context->mid, 1, &value, &bpin.bulk.vsize,
ret = margo_bulk_create(db->mid, 1, &value, &bpin.bulk.vsize,
HG_BULK_READ_ONLY, &bpin.bulk.handle);
et2 = ABT_get_wtime();
printf("kv_put bulk create time: %f microseconds\n", (et2-st2)*1000000);
assert(ret == HG_SUCCESS);
st2 = ABT_get_wtime();
ret = margo_forward(context->bulk_put_handle, &bpin);
ret = margo_forward(db->bulk_put_handle, &bpin);
et2 = ABT_get_wtime();
printf("kv_put bulk forward time: %f microseconds, vsize = %lu\n", (et2-st2)*1000000, vsize);
assert(ret == HG_SUCCESS);
ret = margo_get_output(context->bulk_put_handle, &bpout);
ret = margo_get_output(db->bulk_put_handle, &bpout);
assert(ret == HG_SUCCESS);
ret = bpout.ret; // make sure the server side says all is OK
margo_free_output(context->bulk_put_handle, &bpout);
margo_free_output(db->bulk_put_handle, &bpout);
}
et1 = ABT_get_wtime();
printf("kv_put time: %f microseconds\n", (et1-st1)*1000000);
......@@ -171,7 +184,7 @@ hg_return_t kv_put(kv_context_t *context,
}
// vsize is in/out
hg_return_t kv_get(kv_context_t *context,
hg_return_t kv_get(kv_database_t *db,
void *key, hg_size_t ksize,
void *value, hg_size_t *vsize)
{
......@@ -198,12 +211,12 @@ hg_return_t kv_get(kv_context_t *context,
gin.gi.vsize = size;
st2 = ABT_get_wtime();
ret = margo_forward(context->get_handle, &gin);
ret = margo_forward(db->get_handle, &gin);
et2 = ABT_get_wtime();
printf("kv_get forward time: %f microseconds, vsize = %lu\n", (et2-st2)*1000000, size);
assert(ret == HG_SUCCESS);
ret = margo_get_output(context->get_handle, &gout);
ret = margo_get_output(db->get_handle, &gout);
assert(ret == HG_SUCCESS);
ret = gout.go.ret;
......@@ -217,7 +230,7 @@ hg_return_t kv_get(kv_context_t *context,
memcpy(value, gout.go.value, gout.go.vsize);
}
margo_free_output(context->get_handle, &gout);
margo_free_output(db->get_handle, &gout);
}
else {
bulk_get_in_t bgin;
......@@ -228,19 +241,19 @@ hg_return_t kv_get(kv_context_t *context,
bgin.bulk.vsize = size;
st2 = ABT_get_wtime();
ret = margo_bulk_create(context->mid, 1, &value, &bgin.bulk.vsize,
ret = margo_bulk_create(db->mid, 1, &value, &bgin.bulk.vsize,
HG_BULK_WRITE_ONLY, &bgin.bulk.handle);
et2 = ABT_get_wtime();
printf("kv_get bulk create time: %f microseconds\n", (et2-st2)*1000000);
assert(ret == HG_SUCCESS);
st2 = ABT_get_wtime();
ret = margo_forward(context->bulk_get_handle, &bgin);
ret = margo_forward(db->bulk_get_handle, &bgin);
et2 = ABT_get_wtime();
printf("kv_get bulk forward time: %f microseconds, vsize = %lu\n", (et2-st2)*1000000, size);
assert(ret == HG_SUCCESS);
ret = margo_get_output(context->bulk_get_handle, &bgout);
ret = margo_get_output(db->bulk_get_handle, &bgout);
assert(ret == HG_SUCCESS);
ret = bgout.ret; // make sure the server side says all is OK
......@@ -251,7 +264,7 @@ hg_return_t kv_get(kv_context_t *context,
*/
*vsize = (hg_size_t)bgout.size;
margo_free_output(context->bulk_get_handle, &bgout);
margo_free_output(db->bulk_get_handle, &bgout);
}
et1 = ABT_get_wtime();
printf("kv_get time: %f microseconds\n", (et1-st1)*1000000);
......@@ -259,45 +272,43 @@ hg_return_t kv_get(kv_context_t *context,
return ret;
}
hg_return_t kv_close(kv_context_t *context)
hg_return_t kv_close(kv_database_t *db)
{
hg_return_t ret;
hg_handle_t handle;
close_out_t close_out;
ret = margo_create(context->mid, context->svr_addr,
context->close_id, &handle);
assert(ret == HG_SUCCESS);
ret = margo_forward(handle, NULL);
ret = margo_forward(db->close_handle, NULL);
assert(ret == HG_SUCCESS);
ret = margo_get_output(handle, &close_out);
ret = margo_get_output(db->close_handle, &close_out);
assert(ret == HG_SUCCESS);
assert(close_out.ret == HG_SUCCESS);
margo_free_output(handle, &close_out);
margo_destroy(handle);
margo_free_output(db->close_handle, &close_out);
margo_destroy(db->put_handle);
margo_destroy(db->get_handle);
margo_destroy(db->bulk_put_handle);
margo_destroy(db->bulk_get_handle);
margo_destroy(db->shutdown_handle);
ret = margo_addr_free(db->mid, db->svr_addr);
assert(ret == HG_SUCCESS);
return HG_SUCCESS;
}
bench_result_t *kv_benchmark(kv_context_t *context, int32_t count) {
bench_result_t *kv_benchmark(kv_database_t *db, int32_t count) {
hg_return_t ret;
hg_handle_t handle;
bench_in_t bench_in;
bench_out_t bench_out;
bench_result_t *result = NULL;
ret = margo_create(context->mid, context->svr_addr,
context->bench_id, &handle);
assert(ret == HG_SUCCESS);
bench_in.count = count;
ret = margo_forward(handle, &bench_in);
ret = margo_forward(db->bench_handle, &bench_in);
assert(ret == HG_SUCCESS);
ret = margo_get_output(handle, &bench_out);
ret = margo_get_output(db->bench_handle, &bench_out);
assert(ret == HG_SUCCESS);
result = malloc(sizeof(bench_result_t));
......@@ -306,23 +317,12 @@ bench_result_t *kv_benchmark(kv_context_t *context, int32_t count) {
result->read_time = bench_out.result.read_time;
result->overhead = bench_out.result.overhead;
margo_free_output(handle, &bench_out);
margo_destroy(handle);
margo_free_output(db->bench_handle, &bench_out);
return result;
}
hg_return_t kv_client_deregister(kv_context_t *context) {
hg_return_t ret;
margo_destroy(context->put_handle);
margo_destroy(context->get_handle);
margo_destroy(context->bulk_put_handle);
margo_destroy(context->bulk_get_handle);
margo_destroy(context->shutdown_handle);
ret = margo_addr_free(context->mid, context->svr_addr);
assert(ret == HG_SUCCESS);
free(context);
......@@ -330,10 +330,10 @@ hg_return_t kv_client_deregister(kv_context_t *context) {
}
// only one client calls shutdown
hg_return_t kv_client_signal_shutdown(kv_context_t *context) {
hg_return_t kv_client_signal_shutdown(kv_database_t *db) {
hg_return_t ret;
ret = margo_forward(context->shutdown_handle, NULL);
ret = margo_forward(db->shutdown_handle, NULL);
assert(ret == HG_SUCCESS);
return HG_SUCCESS;
......
#include "keyval-internal.h"
#include "sds-keyval-group.h"
#include "kv-config.h"
......@@ -10,10 +11,9 @@
unsigned long server_indexes[CH_MAX_REPLICATION];
kvgroup_context_t *kvgroup_client_register(margo_instance_id mid, ssg_group_id_t gid)
kv_group_t *kvgroup_client_register(margo_instance_id mid, ssg_group_id_t gid)
{
kvgroup_context_t *context = (kvgroup_context_t*)malloc(sizeof(kvgroup_context_t));
memset(context, 0, sizeof(kvgroup_context_t));
kv_group_t *group = (kv_group_t*)calloc(1, sizeof(group));
int sret = ssg_init(mid);
assert(sret == SSG_SUCCESS);
......@@ -22,13 +22,13 @@ kvgroup_context_t *kvgroup_client_register(margo_instance_id mid, ssg_group_id_t
assert(sret == SSG_SUCCESS);
/* update kvgroup_context_t with MID, GID */
context->mid = mid;
context->gid = gid;
group->mid = mid;
group->gid = gid;
return context;
return group;
}
hg_return_t kvgroup_open(kvgroup_context_t *context, const char *db_name)
hg_return_t kvgroup_open(kv_group_t *group, const char *db_name)
{
hg_size_t addr_str_sz = 128;
char addr_str[addr_str_sz];
......@@ -41,97 +41,97 @@ hg_return_t kvgroup_open(kvgroup_context_t *context, const char *db_name)
std::string separator("/");
// register and open a connection with each kv-server in the group
hg_size_t gsize = ssg_get_group_size(context->gid);
context->gsize = gsize;
context->kv_context = (kv_context_t**)malloc(gsize*sizeof(kv_context_t*));
hg_size_t gsize = ssg_get_group_size(group->gid);
group->gsize = gsize;
/* one 'context' for sds-keyval itself; one 'database' per server */
group->kv_context = (kv_context_t *)calloc(1,sizeof(kv_context_t));
group->db = (kv_database_t **) calloc(gsize, sizeof(kv_database_t *));
// register this client context with Margo
group->kv_context = kv_client_register(group->mid);
assert(group->kv_context != NULL);
for (hg_size_t i=0; i<gsize; i++) {
// register this client context with Margo
context->kv_context[i] = kv_client_register(context->mid);
assert(context->kv_context[i] != NULL);
hg_addr_t server_addr = ssg_get_addr(context->gid, i);
hg_addr_t server_addr = ssg_get_addr(group->gid, i);
hg_size_t str_size = addr_str_sz;
// turn server_addr into string
ret = margo_addr_to_string(context->mid, addr_str, &str_size, server_addr);
ret = margo_addr_to_string(group->mid, addr_str, &str_size, server_addr);
assert(ret == HG_SUCCESS);
margo_addr_free(context->mid, server_addr);
margo_addr_free(group->mid, server_addr);
std::string server_dbname = basepath + separator + std::string("kvdb.") + std::to_string(i)
+ separator + name; // each session uses unique db name
// open client connection with this server
std::cout << "request open of " << server_dbname << " from server " << addr_str << std::endl;
ret = kv_open(context->kv_context[i], addr_str, server_dbname.c_str());
group->db[i] = kv_open(group->kv_context, addr_str, server_dbname.c_str());
assert(ret == HG_SUCCESS);
}
// initialize consistent hash using "hash_lookup3" with gsize servers each with 1 virtual node for now
context->ch_instance = ch_placement_initialize("hash_lookup3", gsize, 4, 0);
group->ch_instance = ch_placement_initialize("hash_lookup3", gsize, 4, 0);
return HG_SUCCESS;
}
// oid is unique associated with key
// in ParSplice key is already a uint64_t hashed value
hg_return_t kvgroup_put(kvgroup_context_t *context, uint64_t oid,
hg_return_t kvgroup_put(kv_group_t * group, uint64_t oid,
void *key, hg_size_t ksize,
void *value, hg_size_t vsize)
{
// not using any replication for now (is this right?)
ch_placement_find_closest(context->ch_instance, oid, 1, server_indexes);
kv_context_t *kv_context = context->kv_context[server_indexes[0]];
ch_placement_find_closest(group->ch_instance, oid, 1, server_indexes);
std::cout << "kvgroup_put: key=" << oid << ", server_index=" << server_indexes[0] << std::endl;
return kv_put(kv_context, key, ksize, value, vsize);
return kv_put(group->db[server_indexes[0]], key, ksize, value, vsize);
}
// oid is unique associated with key
// in ParSplice key is already a uint64_t hashed value
// vsize is in/out
hg_return_t kvgroup_get(kvgroup_context_t *context, uint64_t oid,
hg_return_t kvgroup_get(kv_group_t *group, uint64_t oid,
void *key, hg_size_t ksize,
void *value, hg_size_t *vsize)
{
// not using any replication for now (is this right?)
ch_placement_find_closest(context->ch_instance, oid, 1, server_indexes);
kv_context_t *kv_context = context->kv_context[server_indexes[0]];
ch_placement_find_closest(group->ch_instance, oid, 1, server_indexes);
std::cout << "kvgroup_get: key=" << oid << ", server_index=" << server_indexes[0] << std::endl;
return kv_get(kv_context, key, ksize, value, vsize);
return kv_get(group->db[server_indexes[0]], key, ksize, value, vsize);
}
hg_return_t kvgroup_close(kvgroup_context_t *context)
hg_return_t kvgroup_close(kv_group_t * group)
{
hg_return_t ret;
for (hg_size_t i=0; i<context->gsize; i++) {
ret = kv_close(context->kv_context[i]);
for (hg_size_t i=0; i<group->gsize; i++) {
ret = kv_close(group->db[i]);
assert(ret == HG_SUCCESS);
}
return HG_SUCCESS;
}
hg_return_t kvgroup_client_deregister(kvgroup_context_t *context)
hg_return_t kvgroup_client_deregister(kv_group_t * group)
{
hg_return_t ret;
for (hg_size_t i=0; i<context->gsize; i++) {
ret = kv_client_deregister(context->kv_context[i]);
assert(ret == HG_SUCCESS);
}
ch_placement_finalize(context->ch_instance);
ssg_group_detach(context->gid);
ret = kv_client_deregister(group->kv_context);
ch_placement_finalize(group->ch_instance);