Commit a2de1206 authored by David Rich's avatar David Rich Committed by Rob Latham
Browse files

Reworked/simplified datastore. Added support for BwTree and LevelDB. Both tested with test-mpi app.

parent d1fb9292
ACLOCAL_AMFLAGS="-Im4"
bin_PROGRAMS = test/bench-client
test_bench_client_SOURCES = test/bench-client.cc
test_bench_client_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src
test_bench_client_LDADD = -lkvclient
lib_LTLIBRARIES = libkvclient.la \
libkvserver.la
libkvclient_la_SOURCES = src/kv-client.c
libkvserver_la_SOURCES = src/kv-server.cc \
src/datastore.cc \
src/BwTree/src/bwtree.cpp
libkvserver_la_CXXFLAGS = -pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof
libkvserver_la_CPPFLAGS = -I${srcdir}/src/BwTree/src
libkvserver_la_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src/BwTree/src
include_HEADERS = src/sds-keyval.h
noinst_HEADERS = src/BwTree/src/bwtree.h \
......@@ -32,16 +38,21 @@ TESTS = test/test-client \
test/test-mpi
test_test_client_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src
test_test_client_LDADD = ${LIBS} -lkvclient
test_test_client_LDADD = -lkvclient
test_test_server_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src
test_test_server_LDADD = ${LIBS} -lkvserver -lstdc++
test_test_server_SOURCES = test/test-server.cc
test_test_server_CXXFLAGS = -pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof
test_test_server_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src -I${srcdir}/src/BwTree/src
test_test_server_LDADD = -lkvserver -lleveldb -lsnappy
test_test_mpi_SOURCES = test/test-mpi.cc
test_test_mpi_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src
test_test_mpi_LDADD = ${LIBS} -lkvclient -lkvserver
test_test_mpi_CXXFLAGS = -pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof
test_test_mpi_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src -I${srcdir}/src/BwTree/src
test_test_mpi_LDADD = -lkvclient -lkvserver -lleveldb -lsnappy
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = maint/kv-server.pc \
maint/kv-client.pc
// Copyright (c) 2017, Los Alamos National Security, LLC.
// All rights reserved.
#include "bwtree.h"
#include <vector>
using namespace wangziqi2013::bwtree;
#ifndef datastore_cc
#define datastore_cc
template <typename KeyType, typename ValueType> class AbstractDataStore {
public:
virtual void createDatabase(std::string homeDir, std::string baseName)=0;
virtual bool put(const KeyType &key, ValueType &data)=0;
virtual bool get(const KeyType &key, ValueType &data)=0;
virtual bool get(const KeyType &key, std::vector<ValueType> &data)=0;
};
template <typename KeyType,
typename ValueType,
typename KeyComparator = std::less<KeyType>,
typename KeyEqualityChecker = std::equal_to<KeyType>,
typename KeyHashFunc = std::hash<KeyType>,
typename ValueEqualityChecker = std::equal_to<ValueType>,
typename ValueHashFunc = std::hash<ValueType>>
class BwTreeDataStore : public AbstractDataStore<KeyType, ValueType> {
public:
BwTreeDataStore(Duplicates duplicates, bool eraseOnGet, bool debug) {
_duplicates = duplicates;
_eraseOnGet = eraseOnGet;
_debug = debug;
_tree = NULL;
};
#include "datastore.h"
AbstractDataStore::AbstractDataStore() {
_duplicates = Duplicates::IGNORE;
_eraseOnGet = false;
_debug = false;
};
AbstractDataStore::AbstractDataStore(Duplicates duplicates, bool eraseOnGet, bool debug) {
_duplicates = duplicates;
_eraseOnGet = eraseOnGet;
_debug = debug;
};
AbstractDataStore::~AbstractDataStore()
{};
BwTreeDataStore::BwTreeDataStore() :
AbstractDataStore() {
_tree = NULL;
};
BwTreeDataStore::BwTreeDataStore(Duplicates duplicates, bool eraseOnGet, bool debug) :
AbstractDataStore(duplicates, eraseOnGet, debug) {
_tree = NULL;
};
virtual ~BwTreeDataStore() {
delete _tree;
};
virtual void createDatabase(std::string homeDir, std::string baseName) {
_tree = new BwTree<KeyType, ValueType,
KeyComparator, KeyEqualityChecker, KeyHashFunc,
ValueEqualityChecker, ValueHashFunc>();
if (_debug) {
_tree->SetDebugLogging(1);
}
else {
_tree->SetDebugLogging(0);
}
_tree->UpdateThreadLocal(1);
_tree->AssignGCID(0);
};
BwTreeDataStore::~BwTreeDataStore() {
// deleting BwTree can cause core dump
delete _tree;
};
void BwTreeDataStore::createDatabase(std::string db_name) {
_tree = new BwTree<kv_key_t, ds_bulk_t, std::less<kv_key_t>,
std::equal_to<kv_key_t>, std::hash<kv_key_t>,
my_equal_to/*ds_bulk_t*/, my_hash/*ds_bulk_t*/>();
if (_debug) {
_tree->SetDebugLogging(1);
}
else {
_tree->SetDebugLogging(0);
}
_tree->UpdateThreadLocal(1);
_tree->AssignGCID(0);
};
virtual bool put(const KeyType &key, ValueType &data) {
std::vector<ValueType> values;
bool success = false;
bool BwTreeDataStore::put(const kv_key_t &key, ds_bulk_t &data) {
std::vector<ds_bulk_t> values;
bool success = false;
_tree->GetValue(key, values);
bool duplicate_key = (values.size() != 0);
if (duplicate_key) {
if (_duplicates == Duplicates::IGNORE) {
success = true;
}
else { // Duplicates::ALLOW
success = _tree->Insert(key, data);
}
_tree->GetValue(key, values);
bool duplicate_key = (values.size() != 0);
if (duplicate_key) {
if (_duplicates == Duplicates::IGNORE) {
// silently ignore
success = true;
}
else {
else { // Duplicates::ALLOW (default)
success = _tree->Insert(key, data);
}
}
else {
success = _tree->Insert(key, data);
}
return success;
};
return success;
};
virtual bool get(const KeyType &key, ValueType &data) {
std::vector<ValueType> values;
bool success = false;
bool BwTreeDataStore::get(const kv_key_t &key, ds_bulk_t &data) {
std::vector<ds_bulk_t> values;
bool success = false;
_tree->GetValue(key, values);
if (values.size() == 1) {
data = values.front();
_tree->GetValue(key, values);
if (values.size() == 1) {
data = values.front();
success = true;
}
else if (values.size() > 1) {
// this should only happen if duplicates are allowed
if (_duplicates == Duplicates::ALLOW) {
data = values.front(); // caller is asking for just 1
success = true;
}
else if (values.size() > 1) {
// this should only happen if duplicates are allowed
if (_duplicates == Duplicates::ALLOW) {
data = values.front(); // caller is asking for just 1
success = true;
}
}
return success;
};
bool BwTreeDataStore::get(const kv_key_t &key, std::vector<ds_bulk_t> &data) {
std::vector<ds_bulk_t> values;
bool success = false;
_tree->GetValue(key, values);
if (values.size() > 1) {
// this should only happen if duplicates are allowed
if (_duplicates == Duplicates::ALLOW) {
data = values;
success = true;
}
}
else {
data = values;
success = true;
}
return success;
};
return success;
};
virtual bool get(const KeyType &key, std::vector<ValueType> &data) {
std::vector<ValueType> values;
bool success = false;
LevelDBDataStore::LevelDBDataStore() :
AbstractDataStore() {
_dbm = NULL;
};
_tree->GetValue(key, values);
if (values.size() > 1) {
// this should only happen if duplicates are allowed
if (_duplicates == Duplicates::ALLOW) {
data = values;
success = true;
}
LevelDBDataStore::LevelDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug) :
AbstractDataStore(duplicates, eraseOnGet, debug) {
_dbm = NULL;
};
std::string LevelDBDataStore::key2string(const kv_key_t &key) {
kv_key_t k = key; // grab a copy to work with
char *c = reinterpret_cast<char *>(&k);
std::string keystr(c, sizeof(k));
return keystr;
};
kv_key_t LevelDBDataStore::string2key(std::string &keystr) {
kv_key_t *key = reinterpret_cast<kv_key_t*>(&(keystr[0]));
return *key;
};
LevelDBDataStore::~LevelDBDataStore() {
// deleting LevelDB can cause core dump
delete _dbm;
};
void LevelDBDataStore::createDatabase(std::string db_name) {
leveldb::Options options;
leveldb::Status status;
// db_name assumed to include the full path (e.g. /var/data/db.dat)
options.create_if_missing = true;
status = leveldb::DB::Open(options, db_name, &_dbm);
if (!status.ok()) {
// error
std::cerr << "LevelDBDataStore::createDatabase: LevelDB error on Open = " << status.ToString() << std::endl;
}
assert(status.ok()); // fall over
// debugging support?
};
bool LevelDBDataStore::put(const kv_key_t &key, ds_bulk_t &data) {
leveldb::Status status;
bool success = false;
std::string value;
status = _dbm->Get(leveldb::ReadOptions(), key2string(key), &value);
bool duplicate_key = false;
if (status.ok()) {
duplicate_key = true;
}
else if (!status.IsNotFound()) {
std::cerr << "LevelDBDataStore::put: LevelDB error on Get = " << status.ToString() << std::endl;
return false; // give up and return
}
bool insert_key = true;
if (duplicate_key) {
insert_key = false;
if (_duplicates == Duplicates::IGNORE) {
// silently ignore
success = true;
}
else {
data = values;
success = true;
std::cerr << "LevelDBDataStore::put: duplicate key " << key
<< ", duplicates not supported" << std::endl;
}
}
return success;
};
if (insert_key) {
std::string datastr(data.begin(), data.end());
status = _dbm->Put(leveldb::WriteOptions(), key2string(key), datastr);
if (status.ok()) {
success = true;
}
else {
std::cerr << "LevelDBDataStore::put: LevelDB error on Put = " << status.ToString() << std::endl;
}
}
protected:
BwTree<KeyType, ValueType,
KeyComparator, KeyEqualityChecker, KeyHashFunc,
ValueEqualityChecker, ValueHashFunc> *_tree = NULL;
Duplicates _duplicates;
bool _eraseOnGet;
bool _debug;
return success;
};
#endif // datastore_cc
bool LevelDBDataStore::get(const kv_key_t &key, ds_bulk_t &data) {
leveldb::Status status;
bool success = false;
data.clear();
std::string value;
status = _dbm->Get(leveldb::ReadOptions(), key2string(key), &value);
if (status.ok()) {
data = ds_bulk_t(value.begin(), value.end());
success = true;
}
else if (!status.IsNotFound()) {
std::cerr << "LevelDBDataStore::get: LevelDB error on Get = " << status.ToString() << std::endl;
}
return success;
};
bool LevelDBDataStore::get(const kv_key_t &key, std::vector<ds_bulk_t> &data) {
bool success = false;
data.clear();
ds_bulk_t value;
if (get(key, value)) {
data.push_back(value);
success = true;
}
return success;
};
// Copyright (c) 2017, Los Alamos National Security, LLC.
// All rights reserved.
#include "bwtree.h"
#include "sds-keyval.h"
#include <boost/functional/hash.hpp>
#include <vector>
#include <leveldb/db.h>
#include <db_cxx.h>
#include <dbstl_map.h>
using namespace wangziqi2013::bwtree;
#ifndef datastore_h
#define datastore_h
enum class Duplicates : int {ALLOW, IGNORE};
#endif // datastore_h
// implementation is std::vector<char> specific
// typedef is for convenience
typedef std::vector<char> ds_bulk_t;
struct my_hash {
size_t operator()(const ds_bulk_t &v) const {
size_t hash = 0;
boost::hash_range(hash, v.begin(), v.end());
return hash;
}
};
struct my_equal_to {
bool operator()(const ds_bulk_t &v1, const ds_bulk_t &v2) const {
return (v1 == v2);
}
};
#include "datastore.cc"
class AbstractDataStore {
public:
AbstractDataStore();
AbstractDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
virtual ~AbstractDataStore();
virtual void createDatabase(std::string db_name)=0;
virtual bool put(const kv_key_t &key, ds_bulk_t &data)=0;
virtual bool get(const kv_key_t &key, ds_bulk_t &data)=0;
virtual bool get(const kv_key_t &key, std::vector<ds_bulk_t> &data)=0;
protected:
Duplicates _duplicates;
bool _eraseOnGet;
bool _debug;
};
class BwTreeDataStore : public AbstractDataStore {
public:
BwTreeDataStore();
BwTreeDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
virtual ~BwTreeDataStore();
virtual void createDatabase(std::string db_name);
virtual bool put(const kv_key_t &key, ds_bulk_t &data);
virtual bool get(const kv_key_t &key, ds_bulk_t &data);
virtual bool get(const kv_key_t &key, std::vector<ds_bulk_t> &data);
protected:
BwTree<kv_key_t, ds_bulk_t, std::less<kv_key_t>,
std::equal_to<kv_key_t>, std::hash<kv_key_t>,
my_equal_to/*ds_bulk_t*/, my_hash/*ds_bulk_t*/> *_tree = NULL;
};
// may want to implement some caching for persistent stores like LevelDB
class LevelDBDataStore : public AbstractDataStore {
public:
LevelDBDataStore();
LevelDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
virtual ~LevelDBDataStore();
virtual void createDatabase(std::string db_name);
virtual bool put(const kv_key_t &key, ds_bulk_t &data);
virtual bool get(const kv_key_t &key, ds_bulk_t &data);
virtual bool get(const kv_key_t &key, std::vector<ds_bulk_t> &data);
protected:
leveldb::DB *_dbm = NULL;
Duplicates _duplicates;
bool _eraseOnGet;
bool _debug;
private:
std::string key2string(const kv_key_t &key);
kv_key_t string2key(std::string &keystr);
};
#endif // datastore_h
......@@ -7,7 +7,6 @@
#include <assert.h>
// pass in NULL pointer to get default behavior
kv_context *kv_client_register(const char *addr_str) {
hg_return_t ret;
......@@ -41,7 +40,7 @@ kv_context *kv_client_register(const char *addr_str) {
open_in_t, open_out_t, NULL);
context->close_id = MARGO_REGISTER(context->mid, "close",
close_in_t, close_out_t, NULL);
void, close_out_t, NULL);
context->bench_id = MARGO_REGISTER(context->mid, "bench",
bench_in_t, bench_out_t, NULL);
......@@ -51,135 +50,149 @@ kv_context *kv_client_register(const char *addr_str) {
return context;
}
hg_return_t kv_open(kv_context *context, const char *server_addr, const char *db_name,
kv_type keytype, kv_type valtype) {
hg_return_t ret = HG_SUCCESS;
hg_handle_t handle;
open_in_t open_in;
open_out_t open_out;
hg_return_t kv_open(kv_context *context, const char *server_addr, const char *db_name) {
hg_return_t ret = HG_SUCCESS;
hg_handle_t handle;
open_in_t open_in;
open_out_t open_out;
printf("kv-client: kv_open, server_addr %s\n", server_addr);
ret = margo_addr_lookup(context->mid, server_addr, &(context->svr_addr));
assert(ret == HG_SUCCESS);
printf("kv-client: kv_open, server_addr %s\n", server_addr);
ret = margo_addr_lookup(context->mid, server_addr, &(context->svr_addr));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
context->open_id, &handle);
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
context->open_id, &handle);
assert(ret == HG_SUCCESS);
open_in.name = (hg_string_t)db_name;
open_in.keytype = keytype;
open_in.valtype = valtype;
open_in.name = (hg_string_t)db_name;
ret = margo_forward(handle, &open_in);
assert(ret == HG_SUCCESS);
ret = margo_get_output(handle, &open_out);
assert(ret == HG_SUCCESS);
assert(open_out.ret == HG_SUCCESS);
/* set up the other calls here: idea is we'll pay the registration cost
* once here but can reuse the handles and identifiers multiple times*/
/* put/get handles: can have as many in flight as we have registered.
* BAKE has a handle-caching mechanism we should consult.
* should margo be caching handles? */
ret = margo_create(context->mid, context->svr_addr,
context->put_id, &(context->put_handle));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
context->bulk_put_id, &(context->bulk_put_handle));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
context->get_id, &(context->get_handle));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
context->bulk_get_id, &(context->bulk_get_handle));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
context->bench_id, &(context->bench_handle));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
context->shutdown_id, &(context->shutdown_handle));
assert(ret == HG_SUCCESS);
ret = margo_forward(handle, &open_in);
assert(ret == HG_SUCCESS);
ret = margo_get_output(handle, &open_out);
assert(ret == HG_SUCCESS);
assert(open_out.ret == HG_SUCCESS);
/* set up the other calls here: idea is we'll pay the registration cost
* once here but can reuse the handles and identifiers multiple times */
/* put/get handles: can have as many in flight as we have registered.
* BAKE has a handle-caching mechanism we should consult.
* should margo be caching handles? */
ret = margo_create(context->mid, context->svr_addr,
context->put_id, &(context->put_handle));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
context->bulk_put_id, &(context->bulk_put_handle));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
context->get_id, &(context->get_handle));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
context->bulk_get_id, &(context->bulk_get_handle));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
context->bench_id, &(context->bench_handle));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
context->shutdown_id, &(context->shutdown_handle));
assert(ret == HG_SUCCESS);
margo_free_output(handle, &open_out);
margo_destroy(handle);
margo_free_output(handle, &open_out);
margo_destroy(handle);
return HG_SUCCESS;
return HG_SUCCESS;
}
/* we gave types in the open call. Will need to maintain in 'context' the
* size. */
hg_return_t kv_put(kv_context *context, void *key, void *value) {
hg_return_t ret;
put_in_t put_in;
put_out_t put_out;
hg_return_t ret;
put_in_t put_in;
put_in.key = *(int*)key;
put_in.value = *(int*)value;
ret = margo_forward(context->put_handle, &put_in);
assert(ret == HG_SUCCESS);
ret = margo_get_output(context->put_handle, &put_out);
assert(ret == HG_SUCCESS);
assert(put_out.ret == HG_SUCCESS);
margo_free_output(context->put_handle, &put_out);
return ret;
put_in.key = *(kv_key_t*)key;
put_in.value = *(kv_value_t*)value;
ret = margo_forward(context->put_handle, &put_in);
assert(ret == HG_SUCCESS);