Commit dc34e6e6 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

removed deprecated files

parent ccbf9bb6
......@@ -96,8 +96,6 @@ include_HEADERS = include/sdskv-client.h \
include/sdskv-common.hpp
noinst_HEADERS = src/bulk.h \
src/sds-keyval.h \
src/sds-keyval-group.h \
src/sdskv-rpc-types.h \
src/datastore/datastore.h \
src/datastore/map_datastore.h \
......@@ -105,11 +103,13 @@ noinst_HEADERS = src/bulk.h \
src/datastore/leveldb_datastore.h \
src/datastore/berkeleydb_datastore.h \
src/datastore/datastore_factory.h \
src/keyval-internal.h \
src/BwTree/src/bwtree.h \
src/BwTree/src/atomic_stack.h\
src/BwTree/src/bloom_filter.h \
src/BwTree/src/sorted_small_set.h
# src/sds-keyval.h \
# src/sds-keyval-group.h \
# src/keyval-internal.h
#lib_libkvgroupserver_la_SOURCES = src/kvgroup-server.cc
......
......@@ -6,7 +6,7 @@ AC_INIT([sds-keyval], [0.1.4], [robl@mcs.anl.gov])
AM_INIT_AUTOMAKE([1.13.4 -Wall -Werror foreign subdir-objects silent-rules])
AM_SILENT_RULES([yes])
AC_CONFIG_MACRO_DIR([m4])
AC_CONFIG_SRCDIR([src/sds-keyval.h])
AC_CONFIG_SRCDIR([src/sdskv-rpc-types.h])
AC_CONFIG_HEADERS([src/kv-config.h])
LT_PREREQ([2.2])
# LT_INIT needs to know about AR
......
#include <stdint.h>
#include <assert.h>
#include <mercury.h>
#include <mercury_macros.h>
#include <mercury_proc_string.h>
#include <margo.h>
#include <abt.h>
#include "sds-keyval.h"
#ifndef keyval_internal_h
#define keyval_internal_h
// uncomment to re-enable print statements
//#define KV_DEBUG
#if defined(__cplusplus)
extern "C" {
#endif
typedef int kv_id;
/* 'Context' describes operations available to keyval clients */
/* do we need one for server, one for client? */
typedef struct kv_context_s {
margo_instance_id mid;
hg_id_t put_id;
hg_id_t bulk_put_id;
hg_id_t get_id;
hg_id_t bulk_get_id;
hg_id_t open_id;
hg_id_t close_id;
hg_id_t bench_id;
hg_id_t shutdown_id;
hg_id_t list_id;
kv_id kv;
} kv_context_t;
/* 'Database' contains server-specific information: the instantiation of a
* particular keyval service; the handles used to send information back and
* forth */
typedef struct kv_database_s {
margo_instance_id mid; /* bulk xfer needs to create bulk handles */
hg_addr_t svr_addr;
hg_handle_t close_handle;
hg_handle_t put_handle;
hg_handle_t bulk_put_handle;
hg_handle_t get_handle;
hg_handle_t bulk_get_handle;
hg_handle_t shutdown_handle;
hg_handle_t bench_handle;
hg_handle_t list_handle;
} kv_database_t;
#define MAX_RPC_MESSAGE_SIZE 4000 // in bytes
// setup to support opaque type handling
typedef char* kv_data_t;
typedef struct {
kv_data_t key;
hg_size_t ksize;
kv_data_t value;
hg_size_t vsize;
} kv_put_in_t;
typedef struct {
kv_data_t key;
hg_size_t ksize;
hg_size_t vsize;
} kv_get_in_t;
typedef struct {
kv_data_t value;
hg_size_t vsize;
hg_return_t ret;
} kv_get_out_t;
static inline hg_return_t hg_proc_hg_return_t(hg_proc_t proc, void *data)
{
return hg_proc_hg_int32_t(proc, data);
}
/* the put_in, get_in, put_out, get_out, list_in, list_out code is repetitive. candidate for a template? */
static inline hg_return_t hg_proc_kv_put_in_t(hg_proc_t proc, void *data)
{
hg_return_t ret;
kv_put_in_t *in = (kv_put_in_t*)data;
ret = hg_proc_hg_size_t(proc, &in->ksize);
assert(ret == HG_SUCCESS);
if (in->ksize) {
switch (hg_proc_get_op(proc)) {
case HG_ENCODE:
ret = hg_proc_raw(proc, in->key, in->ksize);
assert(ret == HG_SUCCESS);
break;
case HG_DECODE:
in->key = (kv_data_t)malloc(in->ksize);
ret = hg_proc_raw(proc, in->key, in->ksize);
assert(ret == HG_SUCCESS);
break;
case HG_FREE:
free(in->key);
break;
default:
break;
}
}
ret = hg_proc_hg_size_t(proc, &in->vsize);
assert(ret == HG_SUCCESS);
if (in->vsize) {
switch (hg_proc_get_op(proc)) {
case HG_ENCODE:
ret = hg_proc_raw(proc, in->value, in->vsize);
assert(ret == HG_SUCCESS);
break;
case HG_DECODE:
in->value = (kv_data_t)malloc(in->vsize);
ret = hg_proc_raw(proc, in->value, in->vsize);
assert(ret == HG_SUCCESS);
break;
case HG_FREE:
free(in->value);
break;
default:
break;
}
}
return HG_SUCCESS;
}
static inline hg_return_t hg_proc_kv_get_in_t(hg_proc_t proc, void *data)
{
hg_return_t ret;
kv_get_in_t *in = (kv_get_in_t*)data;
ret = hg_proc_hg_size_t(proc, &in->ksize);
assert(ret == HG_SUCCESS);
if (in->ksize) {
switch (hg_proc_get_op(proc)) {
case HG_ENCODE:
ret = hg_proc_raw(proc, in->key, in->ksize);
assert(ret == HG_SUCCESS);
break;
case HG_DECODE:
in->key = (kv_data_t)malloc(in->ksize);
ret = hg_proc_raw(proc, in->key, in->ksize);
assert(ret == HG_SUCCESS);
break;
case HG_FREE:
free(in->key);
break;
default:
break;
}
}
ret = hg_proc_hg_size_t(proc, &in->vsize);
assert(ret == HG_SUCCESS);
return HG_SUCCESS;
}
static inline hg_return_t hg_proc_kv_get_out_t(hg_proc_t proc, void *data)
{
hg_return_t ret;
kv_get_out_t *out = (kv_get_out_t*)data;
ret = hg_proc_hg_size_t(proc, &out->vsize);
assert(ret == HG_SUCCESS);
if (out->vsize) {
switch (hg_proc_get_op(proc)) {
case HG_ENCODE:
ret = hg_proc_raw(proc, out->value, out->vsize);
assert(ret == HG_SUCCESS);
break;
case HG_DECODE:
out->value = (kv_data_t)malloc(out->vsize);
ret = hg_proc_raw(proc, out->value, out->vsize);
assert(ret == HG_SUCCESS);
break;
case HG_FREE:
free(out->value);
break;
default:
break;
}
}
ret = hg_proc_hg_return_t(proc, &out->ret);
assert(ret == HG_SUCCESS);
return HG_SUCCESS;
}
typedef struct {
kv_data_t start_key;
hg_size_t start_ksize;
hg_size_t max_keys;
} kv_list_in_t;
typedef struct {
hg_size_t nkeys;
kv_data_t *keys;
hg_size_t *ksizes;
hg_return_t ret;
} kv_list_out_t;
static inline hg_return_t hg_proc_kv_list_in_t(hg_proc_t proc, void *data)
{
hg_return_t ret;
kv_list_in_t *in = (kv_list_in_t*)data;
ret = hg_proc_hg_size_t(proc, &in->start_ksize);
assert(ret == HG_SUCCESS);
if (in->start_ksize) {
switch(hg_proc_get_op(proc)) {
case HG_ENCODE:
ret = hg_proc_raw(proc, in->start_key, in->start_ksize);
assert(ret == HG_SUCCESS);
break;
case HG_DECODE:
in->start_key = (kv_data_t)malloc(in->start_ksize);
ret = hg_proc_raw(proc, in->start_key, in->start_ksize);
assert(ret == HG_SUCCESS);
break;
case HG_FREE:
free(in->start_key);
default:
break;
}
}
ret = hg_proc_hg_size_t(proc, &in->max_keys);
assert(ret == HG_SUCCESS);
return ret;
}
static inline hg_return_t hg_proc_kv_list_out_t(hg_proc_t proc, void *data)
{
hg_return_t ret;
unsigned int i;
kv_list_out_t *out = (kv_list_out_t*)data;
/* typedef struct {
hg_size_t nkeys;
kv_data_t *keys;
hg_size_t *ksizes;
} kv_list_out_t; */
ret = hg_proc_hg_size_t(proc, &out->nkeys);
assert (ret == HG_SUCCESS);
if (out->nkeys) {
switch(hg_proc_get_op(proc)) {
case HG_ENCODE:
for (i=0; i<out->nkeys; i++) {
ret = hg_proc_raw(proc, &(out->ksizes[i]),
sizeof(*(out->ksizes)) );
}
for (i=0; i<out->nkeys; i++) {
ret = hg_proc_raw(proc, out->keys[i], out->ksizes[i]);
assert(ret == HG_SUCCESS);
}
break;
case HG_DECODE:
out->ksizes =
(hg_size_t*)malloc(out->nkeys*sizeof(*out->ksizes));
for (i=0; i<out->nkeys; i++) {
ret = hg_proc_raw(proc, &(out->ksizes[i]),
sizeof(*out->ksizes));
}
out->keys = (kv_data_t *)malloc(out->nkeys*sizeof(kv_data_t));
for (i=0; i<out->nkeys; i++) {
out->keys[i] = (kv_data_t)malloc(out->ksizes[i]);
ret = hg_proc_raw(proc, out->keys[i], out->ksizes[i]);
assert(ret == HG_SUCCESS);
}
break;
case HG_FREE:
for (i=0; i<out->nkeys; i++) {
free(out->keys[i]);
}
free(out->keys);
free(out->ksizes);
break;
default:
break;
}
}
ret = hg_proc_hg_return_t(proc, &out->ret);
assert (ret == HG_SUCCESS);
return ret;
}
MERCURY_GEN_PROC(put_in_t, ((kv_put_in_t)(pi)))
MERCURY_GEN_PROC(put_out_t, ((hg_return_t)(ret)))
DECLARE_MARGO_RPC_HANDLER(put_handler)
MERCURY_GEN_PROC(get_in_t, ((kv_get_in_t)(gi)))
MERCURY_GEN_PROC(get_out_t, ((kv_get_out_t)(go)))
DECLARE_MARGO_RPC_HANDLER(get_handler)
MERCURY_GEN_PROC(open_in_t,
((hg_string_t)(name))\
((int32_t)(db_type)))
MERCURY_GEN_PROC(open_out_t, ((hg_return_t)(ret)))
DECLARE_MARGO_RPC_HANDLER(open_handler)
MERCURY_GEN_PROC(close_out_t, ((hg_return_t)(ret)))
DECLARE_MARGO_RPC_HANDLER(close_handler)
// for handling bulk puts/gets (e.g. for ParSplice use case)
typedef struct {
kv_data_t key;
hg_size_t ksize;
hg_size_t vsize;
hg_bulk_t handle;
} kv_bulk_t;
static inline hg_return_t hg_proc_kv_bulk_t(hg_proc_t proc, void *data)
{
hg_return_t ret;
kv_bulk_t *bulk = (kv_bulk_t*)data;
ret = hg_proc_hg_size_t(proc, &bulk->ksize);
assert(ret == HG_SUCCESS);
if (bulk->ksize) {
switch (hg_proc_get_op(proc)) {
case HG_ENCODE:
ret = hg_proc_raw(proc, bulk->key, bulk->ksize);
assert(ret == HG_SUCCESS);
break;
case HG_DECODE:
bulk->key = (kv_data_t)malloc(bulk->ksize);
ret = hg_proc_raw(proc, bulk->key, bulk->ksize);
assert(ret == HG_SUCCESS);
break;
case HG_FREE:
free(bulk->key);
break;
default:
break;
}
}
ret = hg_proc_hg_size_t(proc, &bulk->vsize);
assert(ret == HG_SUCCESS);
ret = hg_proc_hg_bulk_t(proc, &bulk->handle);
assert(ret == HG_SUCCESS);
return HG_SUCCESS;
}
MERCURY_GEN_PROC(list_in_t, ((kv_list_in_t)(list_in)))
MERCURY_GEN_PROC(list_out_t, ((kv_list_out_t)(list_out)))
MERCURY_GEN_PROC(bulk_put_in_t, ((kv_bulk_t)(bulk)))
MERCURY_GEN_PROC(bulk_put_out_t, ((hg_return_t)(ret)))
DECLARE_MARGO_RPC_HANDLER(bulk_put_handler)
MERCURY_GEN_PROC(bulk_get_in_t, ((kv_bulk_t)(bulk)))
MERCURY_GEN_PROC(bulk_get_out_t, ((hg_size_t)(size)) ((hg_return_t)(ret)))
DECLARE_MARGO_RPC_HANDLER(bulk_get_handler)
DECLARE_MARGO_RPC_HANDLER(shutdown_handler)
DECLARE_MARGO_RPC_HANDLER(list_keys_handler)
// some setup to support simple benchmarking
static inline hg_return_t hg_proc_double(hg_proc_t proc, void *data)
{
hg_return_t ret;
hg_size_t size = sizeof(double);
ret = hg_proc_raw(proc, data, size);
assert(ret == HG_SUCCESS);
return HG_SUCCESS;
}
static inline hg_return_t hg_proc_bench_result_t(hg_proc_t proc, void *data)
{
hg_return_t ret;
bench_result_t *in = (bench_result_t*)data;
ret = hg_proc_hg_size_t(proc, &in->nkeys);
assert(ret == HG_SUCCESS);
ret = hg_proc_double(proc, &in->insert_time);
assert(ret == HG_SUCCESS);
ret = hg_proc_double(proc, &in->read_time);
assert(ret == HG_SUCCESS);
ret = hg_proc_double(proc, &in->overhead);
assert(ret == HG_SUCCESS);
return HG_SUCCESS;
}
MERCURY_GEN_PROC(bench_in_t, ((int32_t)(count)))
MERCURY_GEN_PROC(bench_out_t, ((bench_result_t)(result)))
DECLARE_MARGO_RPC_HANDLER(bench_handler)
#if defined(__cplusplus)
}
#endif
#endif
#include "sds-keyval.h"
#include "keyval-internal.h"
#include <mercury.h>
#include <margo.h>
#include <abt.h>
#include <assert.h>
// pass in Margo instance ID
kv_context_t *kv_client_register(const margo_instance_id mid) {
kv_context_t *context;
context = calloc(1, sizeof(kv_context_t));
context->mid = mid;
context->put_id = MARGO_REGISTER(context->mid, "put",
put_in_t, put_out_t, NULL);
context->bulk_put_id = MARGO_REGISTER(context->mid, "bulk_put",
bulk_put_in_t, bulk_put_out_t, NULL);
context->get_id = MARGO_REGISTER(context->mid, "get",
get_in_t, get_out_t, NULL);
context->bulk_get_id = MARGO_REGISTER(context->mid, "bulk_get",
bulk_get_in_t, bulk_get_out_t, NULL);
context->open_id = MARGO_REGISTER(context->mid, "open",
open_in_t, open_out_t, NULL);
context->close_id = MARGO_REGISTER(context->mid, "close",
void, close_out_t, NULL);
context->bench_id = MARGO_REGISTER(context->mid, "bench",
bench_in_t, bench_out_t, NULL);
context->shutdown_id = MARGO_REGISTER(context->mid, "shutdown",
void, void, NULL);
context->list_id = MARGO_REGISTER(context->mid, "list",
list_in_t, list_out_t, NULL);
return context;
}
kv_database_t * kv_open(kv_context_t *context,
const char *server_addr, const char *db_name, kv_db_type_t db_type)
{
hg_return_t ret = HG_SUCCESS;
hg_handle_t handle;
open_in_t open_in;
open_out_t open_out;
kv_database_t *db = calloc(1, sizeof(*db));
db->mid = context->mid;
printf("kv-client: kv_open, server_addr %s\n", server_addr);
ret = margo_addr_lookup(context->mid, server_addr, &(db->svr_addr));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, db->svr_addr,
context->open_id, &handle);
assert(ret == HG_SUCCESS);
open_in.name = (hg_string_t)db_name;
open_in.db_type = (int32_t)db_type;
ret = margo_forward(handle, &open_in);
assert(ret == HG_SUCCESS);
ret = margo_get_output(handle, &open_out);
assert(ret == HG_SUCCESS);
assert(open_out.ret == HG_SUCCESS);
/* set up the other calls here: idea is we'll pay the registration cost
* once here but can reuse the handles and identifiers multiple times */
/* put/get handles: can have as many in flight as we have registered.
* BAKE has a handle-caching mechanism we should consult.
* should margo be caching handles? */
ret = margo_create(context->mid, db->svr_addr,
context->put_id, &(db->put_handle));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, db->svr_addr,
context->bulk_put_id, &(db->bulk_put_handle));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, db->svr_addr,
context->get_id, &(db->get_handle));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, db->svr_addr,
context->bulk_get_id, &(db->bulk_get_handle));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, db->svr_addr,
context->shutdown_id, &(db->shutdown_handle));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, db->svr_addr,
context->close_id, &(db->close_handle) );
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, db->svr_addr,
context->bench_id, &(db->bench_handle));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, db->svr_addr,
context->list_id, &(db->list_handle));
assert(ret == HG_SUCCESS);
margo_free_output(handle, &open_out);
margo_destroy(handle);
return db;
}
/* we gave types in the open call. Will need to maintain in 'context' the
* size. */
hg_return_t kv_put(kv_database_t *db,
void *key, hg_size_t ksize,
void *value, hg_size_t vsize) {
hg_return_t ret;
hg_size_t msize;
msize = ksize + vsize + 2*sizeof(hg_size_t);
printf("kv_put ksize %lu, vsize %lu, msize %lu\n", ksize, vsize, msize);
/*
* If total payload is large, we'll do our own
* explicit transfer of the value data.
*/
double st1, et1, st2, et2;
st1 = ABT_get_wtime();
if (msize <= MAX_RPC_MESSAGE_SIZE) {
put_in_t pin;
put_out_t pout;
pin.pi.key = (kv_data_t)key;
pin.pi.ksize = ksize;
pin.pi.value = (kv_data_t)value;
pin.pi.vsize = vsize;
st2 = ABT_get_wtime();
ret = margo_forward(db->put_handle, &pin);
et2 = ABT_get_wtime();
printf("kv_put forward time: %f microseconds, vsize = %lu\n", (et2-st2)*1000000, vsize);
assert(ret == HG_SUCCESS);
ret = margo_get_output(db->put_handle, &pout);
assert(ret == HG_SUCCESS);
ret = pout.ret;
margo_free_output(db->put_handle, &pout);
}
else {
// use bulk transfer method to move value
bulk_put_in_t bpin;
bulk_put_out_t bpout;
/*
* If (ksize + sizeof(hg_size_t) is too large
* we'll simply rely on HG to handle it rather
* than do multiple bulk transfers. Most likely
* key payload size is << value payload size
*/
bpin.bulk.key = (kv_data_t)key;
bpin.bulk.ksize = ksize;
bpin.bulk.vsize = vsize;
st2 = ABT_get_wtime();
ret = margo_bulk_create(db->mid, 1, &value, &bpin.bulk.vsize,
HG_BULK_READ_ONLY, &bpin.bulk.handle);
et2 = ABT_get_wtime();
printf("kv_put bulk create time: %f microseconds\n", (et2-st2)*1000000);
assert(ret == HG_SUCCESS);