Commit 948463c4 authored by David Rich's avatar David Rich Committed by Rob Latham
Browse files

BerkeleyDB support added to datastore.

parent b3fa964f
......@@ -43,13 +43,13 @@ test_test_client_LDADD = -lkvclient
test_test_server_SOURCES = test/test-server.cc
test_test_server_CXXFLAGS = -pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof
test_test_server_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src -I${srcdir}/src/BwTree/src
test_test_server_LDADD = -lkvserver -lleveldb -lsnappy -lboost_filesystem -lboost_system
test_test_server_LDADD = -lkvserver -ldb -ldb_cxx -ldb_stl -lleveldb -lsnappy -lboost_filesystem -lboost_system
test_test_mpi_SOURCES = test/test-mpi.cc
test_test_mpi_CXXFLAGS = -pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof
test_test_mpi_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src -I${srcdir}/src/BwTree/src
test_test_mpi_LDADD = -lkvclient -lkvserver -lleveldb -lsnappy -lboost_filesystem -lboost_system
test_test_mpi_LDADD = -lkvclient -lkvserver -ldb -ldb_cxx -ldb_stl -lleveldb -lsnappy -lboost_filesystem -lboost_system
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = maint/kv-server.pc \
......
......@@ -144,7 +144,10 @@ void LevelDBDataStore::createDatabase(std::string db_name) {
// db_name assumed to include the full path (e.g. /var/data/db.dat)
boost::filesystem::path p(db_name);
boost::filesystem::create_directories(p.parent_path().string());
std::string basepath = p.parent_path().string();
if (!basepath.empty()) {
boost::filesystem::create_directories(basepath);
}
options.create_if_missing = true;
status = leveldb::DB::Open(options, db_name, &_dbm);
......@@ -230,3 +233,163 @@ bool LevelDBDataStore::get(const kv_key_t &key, std::vector<ds_bulk_t> &data) {
return success;
};
BerkeleyDBDataStore::BerkeleyDBDataStore() :
AbstractDataStore(Duplicates::IGNORE, false, false) {
_dbm = NULL;
_dbenv = NULL;
};
BerkeleyDBDataStore::BerkeleyDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug) :
AbstractDataStore(duplicates, eraseOnGet, debug) {
_dbm = NULL;
_dbenv = NULL;
};
ds_bulk_t BerkeleyDBDataStore::key2ds_bulk(const kv_key_t &key) {
ds_bulk_t keydata(sizeof(kv_key_t), 0);
uint64_t *p = reinterpret_cast<uint64_t*>(&(keydata[0]));
*p = key;
return keydata;
};
kv_key_t BerkeleyDBDataStore::ds_bulk2key(ds_bulk_t &keydata) {
kv_key_t *key = reinterpret_cast<kv_key_t*>(&(keydata[0]));
return *key;
};
BerkeleyDBDataStore::~BerkeleyDBDataStore() {
delete _dbm;
delete _dbenv;
};
void BerkeleyDBDataStore::createDatabase(std::string db_name) {
int status = 0;
// db_name assumed to include the full path (e.g. /var/data/db.dat)
boost::filesystem::path path(db_name);
std::string basepath = path.parent_path().string();
std::string dbname = path.filename().string();
if (!basepath.empty()) {
boost::filesystem::create_directories(basepath);
}
// initialize the environment
uint32_t flags=
DB_CREATE | // Create the environment if it does not exist
DB_PRIVATE |
DB_RECOVER | // Run normal recovery.
//DB_INIT_LOCK | // Initialize the locking subsystem
DB_INIT_LOG | // Initialize the logging subsystem
DB_INIT_TXN | // Initialize the transactional subsystem. This
DB_AUTO_COMMIT |
//DB_THREAD | // Cause the environment to be free-threaded
DB_INIT_MPOOL; // Initialize the memory pool (in-memory cache)
try {
// create and open the environment
uint32_t flag = DB_CXX_NO_EXCEPTIONS;
int scratch_size = 1; // what's this?
_dbenv = new DbEnv(flag);
_dbenv->set_lk_detect(DB_LOCK_MINWRITE);
_dbenv->set_error_stream(&std::cerr);
_dbenv->set_cachesize(scratch_size, 0, 0);
_dbenv->open(basepath.c_str(), flags, 0644);
}
catch (DbException &e) {
std::cerr << "BerkeleyDBDataStore::createDatabase: BerkeleyDB error on environment open = "
<< e.what() << std::endl;
status = 1; // failure
}
if (status == 0) {
_dbm = new Db(_dbenv, DB_CXX_NO_EXCEPTIONS);
uint32_t flags = DB_CREATE; // Allow database creation
status = _dbm->open(NULL, dbname.c_str(), NULL, DB_HASH, flags, 0);
if (status != 0) { // is this the right test for error?
std::cerr << "BerkeleyDBDataStore::createDatabase: BerkeleyDB error on DB open" << std::endl;
}
}
assert(status == 0); // fall over
// debugging support?
};
bool BerkeleyDBDataStore::put(const kv_key_t &key, ds_bulk_t &data) {
int status = 0;
bool success = false;
ds_bulk_t keydata = key2ds_bulk(key);
Dbt db_key(&(keydata[0]), uint32_t(keydata.size()));
Dbt get_data;
status = _dbm->get(NULL, &db_key, &get_data, 0);
bool duplicate_key = false;
if (status != DB_NOTFOUND && status != DB_KEYEMPTY) {
duplicate_key = true;
}
bool insert_key = true;
if (duplicate_key) {
insert_key = false;
if (_duplicates == Duplicates::IGNORE) {
// silently ignore
success = true;
}
else {
std::cerr << "BerkeleyDBDataStore::put: duplicate key " << key
<< ", duplicates not supported" << std::endl;
}
}
if (insert_key) {
Dbt put_data(&(data[0]), uint32_t(data.size()));
uint32_t flags = DB_NOOVERWRITE;
status = _dbm->put(NULL, &db_key, &put_data, flags);
if (status == 0) { // is this the right test for success?
success = true;
}
else {
std::cerr << "BerkeleyDBDataStore::put: BerkeleyDB error on put = " << status << std::endl;
}
}
return success;
};
bool BerkeleyDBDataStore::get(const kv_key_t &key, ds_bulk_t &data) {
int status = 0;
bool success = false;
data.clear();
ds_bulk_t keydata = key2ds_bulk(key);
Dbt db_key(&(keydata[0]), uint32_t(keydata.size()));
Dbt db_data;
status = _dbm->get(NULL, &db_key, &db_data, 0);
if (status != DB_NOTFOUND && status != DB_KEYEMPTY) {
data.resize(db_data.get_size(), 0);
memcpy(&(data[0]), db_data.get_data(), db_data.get_size());
success = true;
}
else {
std::cerr << "BerkeleyDBDataStore::get: BerkeleyDB error on Get = " << status << std::endl;
}
return success;
};
bool BerkeleyDBDataStore::get(const kv_key_t &key, std::vector<ds_bulk_t> &data) {
bool success = false;
data.clear();
ds_bulk_t value;
if (get(key, value)) {
data.push_back(value);
success = true;
}
return success;
};
......@@ -82,4 +82,22 @@ private:
kv_key_t string2key(std::string &keystr);
};
// may want to implement some caching for persistent stores like BerkeleyDB
class BerkeleyDBDataStore : public AbstractDataStore {
public:
BerkeleyDBDataStore();
BerkeleyDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
virtual ~BerkeleyDBDataStore();
virtual void createDatabase(std::string db_name);
virtual bool put(const kv_key_t &key, ds_bulk_t &data);
virtual bool get(const kv_key_t &key, ds_bulk_t &data);
virtual bool get(const kv_key_t &key, std::vector<ds_bulk_t> &data);
protected:
DbEnv *_dbenv = NULL;
Db *_dbm = NULL;
private:
ds_bulk_t key2ds_bulk(const kv_key_t &key);
kv_key_t ds_bulk2key(ds_bulk_t &keydata);
};
#endif // datastore_h
......@@ -26,7 +26,8 @@ static hg_return_t open_handler(hg_handle_t handle)
if (!datastore) {
//datastore = new BwTreeDataStore(); // testing BwTree
datastore = new LevelDBDataStore(); // testing LevelDB
//datastore = new LevelDBDataStore(); // testing LevelDB
datastore = new BerkeleyDBDataStore(); // testing BerkeleyDB
db_name = in_name;
datastore->createDatabase(db_name);
std::cout << "SERVER OPEN: DataStore initialized and ready for " << db_name << std::endl;
......
......@@ -65,15 +65,17 @@ int main(int argc, char **argv)
size_t items = atoi(argv[1]);
context = kv_client_register(NULL);
ret = kv_open(context, argv[2], "testdb");
ret = kv_open(context, argv[2], "db/testdb");
assert(ret == HG_SUCCESS);
RandomInsertSpeedTest(context, items, &rpc);
print_results(&rpc);
#if 0 // ifdef out when testing with LevelDB or BerkeleyDB
server = kv_benchmark(context, items);
print_results(server);
free(server);
#endif
/* close */
ret = kv_close(context);
......
......@@ -7,7 +7,7 @@ int main(int argc, char **argv) {
kv_context * context = kv_client_register(NULL);
/* open */
ret = kv_open(context, argv[1], "booger");
ret = kv_open(context, argv[1], "db/booger");
assert(ret == HG_SUCCESS);
/* put */
......@@ -22,12 +22,14 @@ int main(int argc, char **argv) {
assert(ret == HG_SUCCESS);
printf("key: %d in: %d out: %d\n", key, val, remote_val);
#if 0 // ifdef out when testing with LevelDB or BerkeleyDB
bench_result *output;
output = kv_benchmark(context, 1000);
printf("insert: %zd keys in %f seconds: %f Million-insert per sec\n",
output->nkeys, output->insert_time,
output->nkeys/(output->insert_time*1024*1024) );
free(output);
#endif
/* close */
ret = kv_close(context);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment