Commit 80e21fc6 authored by Matthieu Dorier's avatar Matthieu Dorier

separated datastore implementations

parent 1cd29dd4
......@@ -27,7 +27,16 @@ lib_libkvserver_la_SOURCES = src/kv-server.cc \
src/datastore.cc
if BUILD_BWTREE
lib_libkvserver_la_SOURCES += src/BwTree/src/bwtree.cpp
lib_libkvserver_la_SOURCES += src/BwTree/src/bwtree.cpp \
src/bwtree_datastore.cc
endif
if BUILD_BDB
lib_libkvserver_la_SOURCES += src/berkeleydb_datastore.cc
endif
if BUILD_LEVELDB
lib_libkvserver_la_SOURCES += src/leveldb_datastore.cc
endif
......
......@@ -92,8 +92,6 @@ if test "x${berkelydb_backend}" == xyes ; then
AC_CHECK_HEADERS([dbstl_map.h],,
AC_ERROR("Could not find Berkely DB STL headers"))
AC_DEFINE([USE_BDB], 1, [Use Berkely DB backend])
bwtree_backend=no
leveldb_backend=no
SERVER_LIBS="${SERVER_LIBS} -ldb_cxx -ldb_stl"
AC_LANG_POP
fi
......@@ -105,20 +103,17 @@ if test "x${leveldb_backend}" == xyes ; then
AC_ERROR("Could not find leveldb headers"))
AC_DEFINE([USE_LEVELDB], 1, [use leveldb backend])
SERVER_LIBS="${SERVER_LIBS} -lleveldb"
berkelydb_backend=no
bwtree_backend=no
fi
if test "x${bwtree_backend}" == xyes ; then
AC_DEFINE([USE_BWTREE], 1, [use BwTree backend])
CPPFLAGS="-I${srcdir}/src/BwTree/src ${CPPFLAGS}"
CXXFLAGS="-pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof ${CXXFLAGS}"
berkeleydb_backend=no
leveldb_backend=no
fi
AM_CONDITIONAL([BUILD_BWTREE], [test "x${bwtree_backend}" == xyes])
AM_CONDITIONAL([BUILD_BDB], [test "x${berkelydb_backend}" == xyes])
AM_CONDITIONAL([BUILD_LEVELDB], [test "x${leveldb_backend}" == xyes])
AM_CONDITIONAL([BUILD_BWTREE], [test "x${bwtree_backend}" == xyes])
AC_SUBST(SERVER_LIBS)
AC_SUBST(GROUP_LIBS)
......
// Copyright (c) 2017, Los Alamos National Security, LLC.
// All rights reserved.
#include "berkeleydb_datastore.h"
#include "kv-config.h"
#include <chrono>
#include <iostream>
#include <boost/filesystem.hpp>
using namespace std::chrono;
BerkeleyDBDataStore::BerkeleyDBDataStore() :
AbstractDataStore(Duplicates::IGNORE, false, false) {
_dbm = NULL;
_dbenv = NULL;
};
BerkeleyDBDataStore::BerkeleyDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug) :
AbstractDataStore(duplicates, eraseOnGet, debug) {
_dbm = NULL;
_dbenv = NULL;
};
BerkeleyDBDataStore::~BerkeleyDBDataStore() {
delete _dbm;
delete _dbenv;
};
void BerkeleyDBDataStore::createDatabase(std::string db_name) {
int status = 0;
// db_name assumed to include the full path (e.g. /var/data/db.dat)
boost::filesystem::path path(db_name);
std::string basepath = path.parent_path().string();
std::string dbname = path.filename().string();
if (!basepath.empty()) {
boost::filesystem::create_directories(basepath);
}
// initialize the environment
uint32_t flags = 0;
if (_in_memory) {
// not sure if we want all of these for in_memory
flags =
DB_CREATE | // Create the environment if it does not exist
DB_PRIVATE |
DB_RECOVER | // Run normal recovery.
DB_INIT_LOCK | // Initialize the locking subsystem
DB_INIT_LOG | // Initialize the logging subsystem
DB_INIT_TXN | // Initialize the transactional subsystem. This
DB_THREAD | // Cause the environment to be free-threaded
DB_AUTO_COMMIT |
DB_INIT_MPOOL; // Initialize the memory pool (in-memory cache)
}
else {
flags =
DB_CREATE | // Create the environment if it does not exist
DB_PRIVATE |
DB_RECOVER | // Run normal recovery.
DB_INIT_LOCK | // Initialize the locking subsystem
DB_INIT_LOG | // Initialize the logging subsystem
DB_INIT_TXN | // Initialize the transactional subsystem. This
DB_THREAD | // Cause the environment to be free-threaded
DB_AUTO_COMMIT |
DB_INIT_MPOOL; // Initialize the memory pool (in-memory cache)
}
try {
// create and open the environment
uint32_t flag = DB_CXX_NO_EXCEPTIONS;
int scratch_size = 1; // 1GB cache
_dbenv = new DbEnv(flag);
_dbenv->set_error_stream(&std::cerr);
_dbenv->set_cachesize(scratch_size, 0, 0);
if (_in_memory) {
_dbenv->log_set_config(DB_LOG_IN_MEMORY, 1);
_dbenv->set_lg_bsize(scratch_size * 1024 * 1024 * 1024); // in GB
_dbenv->open(NULL, flags, 0);
}
else {
_dbenv->set_lk_detect(DB_LOCK_MINWRITE);
_dbenv->open(basepath.c_str(), flags, 0644);
}
}
catch (DbException &e) {
std::cerr << "BerkeleyDBDataStore::createDatabase: BerkeleyDB error on environment open = "
<< e.what() << std::endl;
status = 1; // failure
}
if (status == 0) {
_dbm = new Db(_dbenv, DB_CXX_NO_EXCEPTIONS);
if (_duplicates == Duplicates::ALLOW) {
_dbm->set_flags(DB_DUP); // Allow duplicate keys
}
uint32_t flags = DB_CREATE | DB_AUTO_COMMIT | DB_THREAD; // Allow database creation
if (_in_memory) {
status = _dbm->open(NULL, // txn pointer
NULL, // NULL for in-memory DB
NULL, // logical DB name
DB_HASH, // DB type (e.g. BTREE, HASH)
flags,
0);
if (status == 0) {
DbMpoolFile *mpf = _dbm->get_mpf();
mpf->set_flags(DB_MPOOL_NOFILE, 1);
}
}
else {
status = _dbm->open(NULL, // txn pointer
dbname.c_str(), // file name
NULL, // logical DB name
DB_HASH, // DB type (e.g. BTREE, HASH)
flags,
0);
}
if (status != 0) { // is this the right test for error?
std::cerr << "BerkeleyDBDataStore::createDatabase: BerkeleyDB error on DB open" << std::endl;
}
}
assert(status == 0); // fall over
// debugging support?
};
bool BerkeleyDBDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) {
int status = 0;
bool success = false;
// IGNORE case deals with redundant puts (where key/value is the same). In BerkeleyDB a
// redundant may overwrite previous value which is fine when key/value is the same.
// ALLOW case deals with actual duplicates (where key is the same but value is different).
// This option might be used when eraseOnGet is set (e.g. ParSplice hotpoint use case).
if (_duplicates == Duplicates::IGNORE || _duplicates == Duplicates::ALLOW) {
Dbt db_key(&(key[0]), uint32_t(key.size()));
Dbt db_data(&(data[0]), uint32_t(data.size()));
db_key.set_flags(DB_DBT_USERMEM);
db_data.set_flags(DB_DBT_USERMEM);
uint32_t flags = DB_NOOVERWRITE; // to simply overwrite value, don't use this flag
status = _dbm->put(NULL, &db_key, &db_data, flags);
if (status == 0 ||
(_duplicates == Duplicates::IGNORE && status == DB_KEYEXIST)) {
success = true;
}
else {
std::cerr << "BerkeleyDBDataStore::put: BerkeleyDB error on put = " << status << std::endl;
}
}
else {
std::cerr << "BerkeleyDBDataStore::put: Unexpected Duplicates option = " << int32_t(_duplicates) << std::endl;
}
return success;
};
// In the case where Duplicates::ALLOW, this will return the first
// value found using key.
bool BerkeleyDBDataStore::get(const ds_bulk_t &key, ds_bulk_t &data) {
int status = 0;
bool success = false;
data.clear();
Dbt db_key(&(key[0]), uint32_t(key.size()));
Dbt db_data;
db_key.set_flags(DB_DBT_USERMEM);
db_data.set_flags(DB_DBT_REALLOC);
status = _dbm->get(NULL, &db_key, &db_data, 0);
if (status != DB_NOTFOUND && status != DB_KEYEMPTY && db_data.get_size() > 0) {
data.resize(db_data.get_size(), 0);
memcpy(&(data[0]), db_data.get_data(), db_data.get_size());
free(db_data.get_data());
success = true;
}
else {
std::cerr << "BerkeleyDBDataStore::get: BerkeleyDB error on Get = " << status << std::endl;
}
if (success && _eraseOnGet) {
status = _dbm->del(NULL, &db_key, 0);
if (status != 0) {
success = false;
std::cerr << "BerkeleyDBDataStore::get: BerkeleyDB error on delete (eraseOnGet) = " << status << std::endl;
}
}
return success;
};
// TODO: To return more than 1 value (when Duplicates::ALLOW), this code should
// use the c_get interface.
bool BerkeleyDBDataStore::get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data) {
bool success = false;
data.clear();
ds_bulk_t value;
if (get(key, value)) {
data.push_back(value);
success = true;
}
return success;
};
void BerkeleyDBDataStore::BerkeleyDBDataStore::set_in_memory(bool enable) {
_in_memory = enable;
};
std::vector<ds_bulk_t> BerkeleyDBDataStore::BerkeleyDBDataStore::list(const ds_bulk_t &start, size_t count)
{
std::vector<ds_bulk_t> keys;
Dbc * cursorp;
Dbt key, data;
int ret;
_dbm->cursor(NULL, &cursorp, 0);
while (ret = cursorp->get(&key, &data, DB_NEXT) == 0) {
ds_bulk_t k(key.get_size() );
memcpy(k.data(), key.get_data(), key.get_size() );
/* I hope this is a deep copy! */
keys.push_back(std::move(k));
}
cursorp->close();
return keys;
}
// Copyright (c) 2017, Los Alamos National Security, LLC.
// All rights reserved.
#ifndef bdb_datastore_h
#define bdb_datastore_h
#include "kv-config.h"
#include "sds-keyval.h"
#include <db_cxx.h>
#include <dbstl_map.h>
// may want to implement some caching for persistent stores like BerkeleyDB
class BerkeleyDBDataStore : public AbstractDataStore {
public:
BerkeleyDBDataStore();
BerkeleyDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
virtual ~BerkeleyDBDataStore();
virtual void createDatabase(std::string db_name);
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data);
virtual void set_in_memory(bool enable); // enable/disable in-memory mode
virtual std::vector<ds_bulk_t> list(const ds_bulk_t &start, size_t count);
protected:
DbEnv *_dbenv = NULL;
Db *_dbm = NULL;
};
#endif // bdb_datastore_h
// Copyright (c) 2017, Los Alamos National Security, LLC.
// All rights reserved.
#ifndef bulk_h
#define bulk_h
#include "kv-config.h"
#include <boost/functional/hash.hpp>
#include <vector>
// implementation is std::vector<char> specific
// typedef is for convenience
typedef std::vector<char> ds_bulk_t;
struct ds_bulk_hash {
size_t operator()(const ds_bulk_t &v) const {
size_t hash = 0;
boost::hash_range(hash, v.begin(), v.end());
return hash;
}
};
struct ds_bulk_equal {
bool operator()(const ds_bulk_t &v1, const ds_bulk_t &v2) const {
return (v1 == v2);
}
};
struct ds_bulk_less {
bool operator()(const ds_bulk_t &v1, const ds_bulk_t &v2) const {
return (v1 < v2);
}
};
#endif // bulk_h
// Copyright (c) 2017, Los Alamos National Security, LLC.
// All rights reserved.
#include "bwtree_datastore.h"
#include "kv-config.h"
#include <chrono>
#include <iostream>
#include <boost/filesystem.hpp>
using namespace std::chrono;
BwTreeDataStore::BwTreeDataStore() :
AbstractDataStore(Duplicates::IGNORE, false, false) {
_tree = NULL;
};
BwTreeDataStore::BwTreeDataStore(Duplicates duplicates, bool eraseOnGet, bool debug) :
AbstractDataStore(duplicates, eraseOnGet, debug) {
_tree = NULL;
};
BwTreeDataStore::~BwTreeDataStore() {
// deleting BwTree can cause core dump
delete _tree;
};
void BwTreeDataStore::createDatabase(std::string db_name) {
_tree = new BwTree<ds_bulk_t, ds_bulk_t,
ds_bulk_less, ds_bulk_equal, ds_bulk_hash,
ds_bulk_equal, ds_bulk_hash>();
if (_debug) {
_tree->SetDebugLogging(1);
}
else {
_tree->SetDebugLogging(0);
}
_tree->UpdateThreadLocal(1);
_tree->AssignGCID(0);
};
bool BwTreeDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) {
std::vector<ds_bulk_t> values;
bool success = false;
if(!_tree) return false;
if (_duplicates == Duplicates::ALLOW) {
success = _tree->Insert(key, data);
}
else if (_duplicates == Duplicates::IGNORE) {
_tree->GetValue(key, values);
bool duplicate_key = (values.size() != 0);
if (duplicate_key) {
// silently ignore
success = true;
}
else {
success = _tree->Insert(key, data);
}
}
else {
std::cerr << "BwTreeDataStore::put: Unexpected Duplicates option = " << int32_t(_duplicates) << std::endl;
}
return success;
};
bool BwTreeDataStore::get(const ds_bulk_t &key, ds_bulk_t &data) {
std::vector<ds_bulk_t> values;
bool success = false;
_tree->GetValue(key, values);
if (values.size() == 1) {
data = std::move(values.front());
success = true;
}
else if (values.size() > 1) {
// this should only happen if duplicates are allowed
if (_duplicates == Duplicates::ALLOW) {
data = std::move(values.front()); // caller is asking for just 1
success = true;
}
}
if (success && _eraseOnGet) {
bool status = _tree->Delete(key, data);
if (!status) {
success = false;
std::cerr << "BwTreeDataStore::get: BwTree error on delete (eraseOnGet) = " << status << std::endl;
}
}
return success;
};
bool BwTreeDataStore::get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data) {
std::vector<ds_bulk_t> values;
bool success = false;
_tree->GetValue(key, values);
if (values.size() > 1) {
// this should only happen if duplicates are allowed
if (_duplicates == Duplicates::ALLOW) {
data = std::move(values);
success = true;
}
}
else {
data = std::move(values);
success = true;
}
return success;
};
void BwTreeDataStore::BwTreeDataStore::set_in_memory(bool enable)
{};
std::vector<ds_bulk_t> BwTreeDataStore::BwTreeDataStore::list(const ds_bulk_t &start, size_t count)
{
std::vector<ds_bulk_t> keys;
auto it = _tree->Begin(start);
while (it.IsEnd() == false) {
/* BUG: bwtree doesn't support "list keys" or "get a key" */
//keys.push_back(it.GetLeafNode());
}
return keys;
}
// Copyright (c) 2017, Los Alamos National Security, LLC.
// All rights reserved.
#ifndef bwtree_datastore_h
#define bwtree_datastore_h
#include "kv-config.h"
#include "bwtree.h"
#include "datastore.h"
using namespace wangziqi2013::bwtree;
class BwTreeDataStore : public AbstractDataStore {
public:
BwTreeDataStore();
BwTreeDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
virtual ~BwTreeDataStore();
virtual void createDatabase(std::string db_name);
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data);
virtual void set_in_memory(bool enable); // a no-op
virtual std::vector<ds_bulk_t> list(const ds_bulk_t &start, size_t count);
protected:
BwTree<ds_bulk_t, ds_bulk_t,
ds_bulk_less, ds_bulk_equal, ds_bulk_hash,
ds_bulk_equal, ds_bulk_hash> *_tree = NULL;
};
#endif // bwtree_datastore_h
This diff is collapsed.
// Copyright (c) 2017, Los Alamos National Security, LLC.
// All rights reserved.
#ifndef datastore_h
#define datastore_h
#include "kv-config.h"
#if USE_BWTREE
#include "bwtree.h"
#endif
#include "bulk.h"
#include "sds-keyval.h"
#include <boost/functional/hash.hpp>
#include <vector>
#if USE_LEVELDB
#include <leveldb/db.h>
#include <leveldb/env.h>
#elif USE_BDB
#include <db_cxx.h>
#include <dbstl_map.h>
#elif USE_BWTREE
using namespace wangziqi2013::bwtree;
#else
#error "No backend selected at configure time"
#endif
#ifndef datastore_h
#define datastore_h
enum class Duplicates : int {ALLOW, IGNORE};
// implementation is std::vector<char> specific
// typedef is for convenience
typedef std::vector<char> ds_bulk_t;
struct my_hash {
size_t operator()(const ds_bulk_t &v) const {
size_t hash = 0;
boost::hash_range(hash, v.begin(), v.end());
return hash;
}
};
struct my_equal {
bool operator()(const ds_bulk_t &v1, const ds_bulk_t &v2) const {
return (v1 == v2);
}
};
struct my_less {
bool operator()(const ds_bulk_t &v1, const ds_bulk_t &v2) const {
return (v1 < v2);
}
};
class AbstractDataStore {
public:
AbstractDataStore();
......@@ -67,61 +30,4 @@ protected:
bool _in_memory;
};
#if USE_BWTREE
class BwTreeDataStore : public AbstractDataStore {
public:
BwTreeDataStore();
BwTreeDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
virtual ~BwTreeDataStore();
virtual void createDatabase(std::string db_name);
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data);
virtual void set_in_memory(bool enable); // a no-op
virtual std::vector<ds_bulk_t> list(const ds_bulk_t &start, size_t count);
protected:
BwTree<ds_bulk_t, ds_bulk_t,
my_less, my_equal, my_hash,
my_equal, my_hash> *_tree = NULL;
};
#elif USE_LEVELDB
// may want to implement some caching for persistent stores like LevelDB
class LevelDBDataStore : public AbstractDataStore {
public:
LevelDBDataStore();
LevelDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
virtual ~LevelDBDataStore();
virtual void createDatabase(std::string db_name);
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data);
virtual void set_in_memory(bool enable); // not supported, a no-op
virtual std::vector<ds_bulk_t> list(const ds_bulk_t &start, size_t count);
protected:
leveldb::DB *_dbm = NULL;
private:
std::string toString(ds_bulk_t &key);
ds_bulk_t fromString(std::string &keystr);
};
#elif USE_BDB
// may want to implement some caching for persistent stores like BerkeleyDB
class BerkeleyDBDataStore : public AbstractDataStore {
public:
BerkeleyDBDataStore();
BerkeleyDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
virtual ~BerkeleyDBDataStore();
virtual void createDatabase(std::string db_name);
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data);
virtual void set_in_memory(bool enable); // enable/disable in-memory mode
virtual std::vector<ds_bulk_t> list(const ds_bulk_t &start, size_t count);
protected:
DbEnv *_dbenv = NULL;
Db *_dbm = NULL;
};
#else
#error "No datastore backend selected"
#endif
#endif // datastore_h
......@@ -4,6 +4,18 @@
#include "datastore.h"
#include "kv-config.h"
#ifdef USE_BWTREE
#include "bwtree_datastore.h"
#endif
#ifdef USE_BDB
#include "berkeleydb_datastore.h"
#endif
#ifdef USE_LEVELDB
#include "leveldb_datastore.h"
#endif
#include <mercury.h>
#include <margo.h>
#include <abt-snoozer.h>
......
// Copyright (c) 2017, Los Alamos National Security, LLC.
// All rights reserved.
#include "leveldb_datastore.h"
#include "kv-config.h"
#include <chrono>
#include <iostream>
#include <boost/filesystem.hpp>
using namespace std::chrono;
LevelDBDataStore::LevelDBDataStore() :
AbstractDataStore(Duplicates::IGNORE, false, false) {
_dbm = NULL;
};
LevelDBDataStore::LevelDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug) :
AbstractDataStore(duplicates, eraseOnGet, debug) {
_dbm = NULL;
};
std::string LevelDBDataStore::toString(const ds_bulk_t &bulk_val) {
std::string str_val(bulk_val.begin(), bulk_val.end());
return str_val;
};
ds_bulk_t LevelDBDataStore::fromString(const std::string &str_val) {
ds_bulk_t bulk_val(str_val.begin(), str_val.end());
return bulk_val;
};
LevelDBDataStore::~LevelDBDataStore() {
delete _dbm;
//leveldb::Env::Shutdown(); // Riak version only
};
void LevelDBDataStore::createDatabase(std::string db_name) {
leveldb::Options options;
leveldb::Status status;
// db_name assumed to include the full path (e.g. /var/data/db.dat)
boost::filesystem::path p(db_name);
std::string basepath = p.parent_path().string();
if (!basepath.empty()) {
boost::filesystem::create_directories(basepath);
}
options.create_if_missing = true;
status = leveldb::DB::Open(options, db_name, &_dbm);
if (!status.ok()) {
// error
std::cerr << "LevelDBDataStore::createDatabase: LevelDB error on Open = " << status.ToString() << std::endl;
}
assert(status.ok()); // fall over
// debugging support?
};
bool LevelDBDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) {
leveldb::Status status;
bool success = false;
high_resolution_clock::time_point start = high_resolution_clock::now();
// IGNORE case deals with redundant puts (where key/value is the same). In LevelDB a
// redundant put simply overwrites previous value which is fine when key/value is the same.
if (_duplicates == Duplicates::IGNORE) {
status = _dbm->Put(leveldb::WriteOptions(), toString(key), toString(data));
if (status.ok()) {
success = true;
}
else {
std::cerr << "LevelDBDataStore::put: LevelDB error on Put = " << status.ToString() << std::endl;
}
}
else if (_duplicates == Duplicates::ALLOW) {
std::cerr << "LevelDBDataStore::put: Duplicates::ALLOW set, LevelDB does not support duplicates" << std::endl;
}
else {