Commit 6c226199 authored by David Rich's avatar David Rich Committed by Rob Latham
Browse files

Fixup duplicate handling, add in-memory support for BerkeleyDB.

parent 94539640
...@@ -7,12 +7,14 @@ AbstractDataStore::AbstractDataStore() { ...@@ -7,12 +7,14 @@ AbstractDataStore::AbstractDataStore() {
_duplicates = Duplicates::IGNORE; _duplicates = Duplicates::IGNORE;
_eraseOnGet = false; _eraseOnGet = false;
_debug = false; _debug = false;
_in_memory = false;
}; };
AbstractDataStore::AbstractDataStore(Duplicates duplicates, bool eraseOnGet, bool debug) { AbstractDataStore::AbstractDataStore(Duplicates duplicates, bool eraseOnGet, bool debug) {
_duplicates = duplicates; _duplicates = duplicates;
_eraseOnGet = eraseOnGet; _eraseOnGet = eraseOnGet;
_debug = debug; _debug = debug;
_in_memory = false;
}; };
AbstractDataStore::~AbstractDataStore() AbstractDataStore::~AbstractDataStore()
...@@ -51,20 +53,22 @@ bool BwTreeDataStore::put(ds_bulk_t &key, ds_bulk_t &data) { ...@@ -51,20 +53,22 @@ bool BwTreeDataStore::put(ds_bulk_t &key, ds_bulk_t &data) {
std::vector<ds_bulk_t> values; std::vector<ds_bulk_t> values;
bool success = false; bool success = false;
_tree->GetValue(key, values); if (_duplicates == Dupliates::ALLOW) {
bool duplicate_key = (values.size() != 0); success = _tree->Insert(key, data);
}
if (duplicate_key) { else if (_duplicates == Duplicates::IGNORE) {
if (_duplicates == Duplicates::IGNORE) { _tree->GetValue(key, values);
bool duplicate_key = (values.size() != 0);
if (duplicate_key) {
// silently ignore // silently ignore
success = true; success = true;
} }
else { // Duplicates::ALLOW (default) else {
success = _tree->Insert(key, data); success = _tree->Insert(key, data);
} }
} }
else { else {
success = _tree->Insert(key, data); std::cerr << "BwTreeDataStore::put: Unexpected Duplicates option = " << _duplicates << std::endl;
} }
return success; return success;
...@@ -87,6 +91,14 @@ bool BwTreeDataStore::get(ds_bulk_t &key, ds_bulk_t &data) { ...@@ -87,6 +91,14 @@ bool BwTreeDataStore::get(ds_bulk_t &key, ds_bulk_t &data) {
} }
} }
if (success && _eraseOnGet) {
status = _tree->Delete(key, data);
if (status != 0) {
success = false;
std::cerr << "BwTreeDataStore::get: BwTree error on delete (eraseOnGet) = " << status << std::endl;
}
}
return success; return success;
}; };
...@@ -110,6 +122,9 @@ bool BwTreeDataStore::get(ds_bulk_t &key, std::vector<ds_bulk_t> &data) { ...@@ -110,6 +122,9 @@ bool BwTreeDataStore::get(ds_bulk_t &key, std::vector<ds_bulk_t> &data) {
return success; return success;
}; };
BwTreeDataStore::BwTreeDataStore::set_in_memory(bool enable)
{};
LevelDBDataStore::LevelDBDataStore() : LevelDBDataStore::LevelDBDataStore() :
AbstractDataStore(Duplicates::IGNORE, false, false) { AbstractDataStore(Duplicates::IGNORE, false, false) {
...@@ -163,31 +178,9 @@ bool LevelDBDataStore::put(ds_bulk_t &key, ds_bulk_t &data) { ...@@ -163,31 +178,9 @@ bool LevelDBDataStore::put(ds_bulk_t &key, ds_bulk_t &data) {
leveldb::Status status; leveldb::Status status;
bool success = false; bool success = false;
std::string value; // IGNORE case deals with redundant puts (where key/value is the same). In LevelDB a
status = _dbm->Get(leveldb::ReadOptions(), toString(key), &value); // redundant put simply overwrites previous value which is fine when key/value is the same.
bool duplicate_key = false; if (_duplicates == Duplicates::IGNORE) {
if (status.ok()) {
duplicate_key = true;
}
else if (!status.IsNotFound()) {
std::cerr << "LevelDBDataStore::put: LevelDB error on Get = " << status.ToString() << std::endl;
return false; // give up and return
}
bool insert_key = true;
if (duplicate_key) {
insert_key = false;
if (_duplicates == Duplicates::IGNORE) {
// silently ignore
success = true;
}
else {
std::cerr << "LevelDBDataStore::put: duplicate key detected "
<< ", duplicates not supported" << std::endl;
}
}
if (insert_key) {
status = _dbm->Put(leveldb::WriteOptions(), toString(key), toString(data)); status = _dbm->Put(leveldb::WriteOptions(), toString(key), toString(data));
if (status.ok()) { if (status.ok()) {
success = true; success = true;
...@@ -196,6 +189,12 @@ bool LevelDBDataStore::put(ds_bulk_t &key, ds_bulk_t &data) { ...@@ -196,6 +189,12 @@ bool LevelDBDataStore::put(ds_bulk_t &key, ds_bulk_t &data) {
std::cerr << "LevelDBDataStore::put: LevelDB error on Put = " << status.ToString() << std::endl; std::cerr << "LevelDBDataStore::put: LevelDB error on Put = " << status.ToString() << std::endl;
} }
} }
else if (_duplicates == Duplicates::ALLOW) {
std::cerr << "LevelDBDataStore::put: Duplicates::ALLOW set, LevelDB does not support duplicates" << std::endl;
}
else {
std::cerr << "LevelDBDataStore::put: Unexpected Duplicates option = " << _duplicates << std::endl;
}
return success; return success;
}; };
...@@ -231,6 +230,10 @@ bool LevelDBDataStore::get(ds_bulk_t &key, std::vector<ds_bulk_t> &data) { ...@@ -231,6 +230,10 @@ bool LevelDBDataStore::get(ds_bulk_t &key, std::vector<ds_bulk_t> &data) {
return success; return success;
}; };
LevelDBDataStore::LevelDBDataStore::set_in_memory(bool enable)
{};
BerkeleyDBDataStore::BerkeleyDBDataStore() : BerkeleyDBDataStore::BerkeleyDBDataStore() :
AbstractDataStore(Duplicates::IGNORE, false, false) { AbstractDataStore(Duplicates::IGNORE, false, false) {
_dbm = NULL; _dbm = NULL;
...@@ -260,27 +263,46 @@ void BerkeleyDBDataStore::createDatabase(std::string db_name) { ...@@ -260,27 +263,46 @@ void BerkeleyDBDataStore::createDatabase(std::string db_name) {
} }
// initialize the environment // initialize the environment
uint32_t flags= uint32_t flags = 0;
DB_CREATE | // Create the environment if it does not exist if (_in_memory) {
DB_PRIVATE | flags =
DB_RECOVER | // Run normal recovery. DB_CREATE | // Create the environment if it does not exist
//DB_INIT_LOCK | // Initialize the locking subsystem DB_PRIVATE |
DB_INIT_LOG | // Initialize the logging subsystem //DB_RECOVER | // Run normal recovery.
DB_INIT_TXN | // Initialize the transactional subsystem. This //DB_INIT_LOCK | // Initialize the locking subsystem
DB_AUTO_COMMIT | DB_INIT_LOG | // Initialize the logging subsystem
//DB_THREAD | // Cause the environment to be free-threaded DB_INIT_TXN | // Initialize the transactional subsystem. This
DB_INIT_MPOOL; // Initialize the memory pool (in-memory cache) //DB_THREAD | // Cause the environment to be free-threaded
DB_INIT_MPOOL; // Initialize the memory pool (in-memory cache)
}
else {
flags =
DB_CREATE | // Create the environment if it does not exist
DB_PRIVATE |
DB_RECOVER | // Run normal recovery.
//DB_INIT_LOCK | // Initialize the locking subsystem
DB_INIT_LOG | // Initialize the logging subsystem
DB_INIT_TXN | // Initialize the transactional subsystem. This
//DB_THREAD | // Cause the environment to be free-threaded
DB_INIT_MPOOL; // Initialize the memory pool (in-memory cache)
}
try { try {
// create and open the environment // create and open the environment
uint32_t flag = DB_CXX_NO_EXCEPTIONS; uint32_t flag = DB_CXX_NO_EXCEPTIONS;
int scratch_size = 1; // what's this? int scratch_size = 1; // 1GB cache
_dbenv = new DbEnv(flag); _dbenv = new DbEnv(flag);
_dbenv->set_lk_detect(DB_LOCK_MINWRITE);
_dbenv->set_error_stream(&std::cerr); _dbenv->set_error_stream(&std::cerr);
_dbenv->set_cachesize(scratch_size, 0, 0); _dbenv->set_cachesize(scratch_size, 0, 0);
_dbenv->open(basepath.c_str(), flags, 0644); if (_in_memory) {
_dbenv->log_set_config(DB_LOG_IN_MEMORY, 1);
_dbenv->set_lg_bsize(scratch_size * 1024 * 1024 * 1024); // in GB
_dbenv->open(NULL, flags, 0);
}
else {
_dbenv->set_lk_detect(DB_LOCK_MINWRITE);
_dbenv->open(basepath.c_str(), flags, 0644);
}
} }
catch (DbException &e) { catch (DbException &e) {
std::cerr << "BerkeleyDBDataStore::createDatabase: BerkeleyDB error on environment open = " std::cerr << "BerkeleyDBDataStore::createDatabase: BerkeleyDB error on environment open = "
...@@ -291,13 +313,36 @@ void BerkeleyDBDataStore::createDatabase(std::string db_name) { ...@@ -291,13 +313,36 @@ void BerkeleyDBDataStore::createDatabase(std::string db_name) {
if (status == 0) { if (status == 0) {
_dbm = new Db(_dbenv, DB_CXX_NO_EXCEPTIONS); _dbm = new Db(_dbenv, DB_CXX_NO_EXCEPTIONS);
uint32_t flags = DB_CREATE; // Allow database creation uint32_t flags = DB_CREATE | DB_AUTO_COMMIT; // Allow database creation
status = _dbm->open(NULL, dbname.c_str(), NULL, DB_HASH, flags, 0); if (_in_memory) {
DB_MPOOLFILE *mpf = NULL;
status = _dbm->open(NULL, // txn pointer
NULL, // NULL for in-memory DB
dbname.c_str(), // logical DB name
DB_BTREE, // DB type (e.g. BTREE, HASH)
flags,
0);
mpf = _dbm->get_mpf();
mpf->set_flags(DB_MPOOL_NOFILE, 1);
}
else {
status = _dbm->open(NULL, // txn pointer
dbname.c_str(), // file name
dbname.c_str(), // logical DB name
DB_BTREE, // DB type (e.g. BTREE, HASH)
flags,
0);
}
if (status != 0) { // is this the right test for error? if (status != 0) { // is this the right test for error?
std::cerr << "BerkeleyDBDataStore::createDatabase: BerkeleyDB error on DB open" << std::endl; std::cerr << "BerkeleyDBDataStore::createDatabase: BerkeleyDB error on DB open" << std::endl;
} }
} }
assert(status == 0); // fall over assert(status == 0); // fall over
if (_duplicates = Duplicates::ALLOW) {
uint32_t flags = DB_DUP; // Allow duplicate keys
_dbm->set_flags(flags);
}
// debugging support? // debugging support?
}; };
...@@ -306,29 +351,11 @@ bool BerkeleyDBDataStore::put(ds_bulk_t &key, ds_bulk_t &data) { ...@@ -306,29 +351,11 @@ bool BerkeleyDBDataStore::put(ds_bulk_t &key, ds_bulk_t &data) {
int status = 0; int status = 0;
bool success = false; bool success = false;
ds_bulk_t keydata; // IGNORE case deals with redundant puts (where key/value is the same). In BerkeleyDB a
Dbt db_key(&(keydata[0]), uint32_t(keydata.size())); // redundant put simply overwrites previous value which is fine when key/value is the same.
Dbt get_data; // ALLOW case deals with actual duplicates (where key is the same but value is different).
status = _dbm->get(NULL, &db_key, &get_data, 0); // This option might be used when eraseOnGet is set (e.g. ParSplice hotpoint use case).
if (_duplicates == Duplicates::IGNORE || _duplicates == Duplicates::ALLOW) {
bool duplicate_key = false;
if (status != DB_NOTFOUND && status != DB_KEYEMPTY) {
duplicate_key = true;
}
bool insert_key = true;
if (duplicate_key) {
insert_key = false;
if (_duplicates == Duplicates::IGNORE) {
// silently ignore
success = true;
}
else {
std::cerr << "BerkeleyDBDataStore::put: duplicate key detected "
<< ", duplicates not supported" << std::endl;
}
}
if (insert_key) {
Dbt put_data(&(data[0]), uint32_t(data.size())); Dbt put_data(&(data[0]), uint32_t(data.size()));
uint32_t flags = DB_NOOVERWRITE; uint32_t flags = DB_NOOVERWRITE;
status = _dbm->put(NULL, &db_key, &put_data, flags); status = _dbm->put(NULL, &db_key, &put_data, flags);
...@@ -339,10 +366,15 @@ bool BerkeleyDBDataStore::put(ds_bulk_t &key, ds_bulk_t &data) { ...@@ -339,10 +366,15 @@ bool BerkeleyDBDataStore::put(ds_bulk_t &key, ds_bulk_t &data) {
std::cerr << "BerkeleyDBDataStore::put: BerkeleyDB error on put = " << status << std::endl; std::cerr << "BerkeleyDBDataStore::put: BerkeleyDB error on put = " << status << std::endl;
} }
} }
else {
std::cerr << "BerkeleyDBDataStore::put: Unexpected Duplicates option = " << _duplicates << std::endl;
}
return success; return success;
}; };
// In the case where Duplicates::ALLOW, this will return the first
// value found using key.
bool BerkeleyDBDataStore::get(ds_bulk_t &key, ds_bulk_t &data) { bool BerkeleyDBDataStore::get(ds_bulk_t &key, ds_bulk_t &data) {
int status = 0; int status = 0;
bool success = false; bool success = false;
...@@ -362,10 +394,20 @@ bool BerkeleyDBDataStore::get(ds_bulk_t &key, ds_bulk_t &data) { ...@@ -362,10 +394,20 @@ bool BerkeleyDBDataStore::get(ds_bulk_t &key, ds_bulk_t &data) {
else { else {
std::cerr << "BerkeleyDBDataStore::get: BerkeleyDB error on Get = " << status << std::endl; std::cerr << "BerkeleyDBDataStore::get: BerkeleyDB error on Get = " << status << std::endl;
} }
if (success && _eraseOnGet) {
status = _dbm->del(NULL, &db_key, 0);
if (status != 0) {
success = false;
std::cerr << "BerkeleyDBDataStore::get: BerkeleyDB error on delete (eraseOnGet) = " << status << std::endl;
}
}
return success; return success;
}; };
// TODO: To return more than 1 value (when Duplicates::ALLOW), this code should
// use the c_get interface.
bool BerkeleyDBDataStore::get(ds_bulk_t &key, std::vector<ds_bulk_t> &data) { bool BerkeleyDBDataStore::get(ds_bulk_t &key, std::vector<ds_bulk_t> &data) {
bool success = false; bool success = false;
...@@ -378,3 +420,7 @@ bool BerkeleyDBDataStore::get(ds_bulk_t &key, std::vector<ds_bulk_t> &data) { ...@@ -378,3 +420,7 @@ bool BerkeleyDBDataStore::get(ds_bulk_t &key, std::vector<ds_bulk_t> &data) {
return success; return success;
}; };
BerkeleyDBDataStore::BerkeleyDBDataStore::set_in_memory(bool enable) {
_in_memory = enable;
};
...@@ -50,10 +50,12 @@ public: ...@@ -50,10 +50,12 @@ public:
virtual bool put(ds_bulk_t &key, ds_bulk_t &data)=0; virtual bool put(ds_bulk_t &key, ds_bulk_t &data)=0;
virtual bool get(ds_bulk_t &key, ds_bulk_t &data)=0; virtual bool get(ds_bulk_t &key, ds_bulk_t &data)=0;
virtual bool get(ds_bulk_t &key, std::vector<ds_bulk_t> &data)=0; virtual bool get(ds_bulk_t &key, std::vector<ds_bulk_t> &data)=0;
virtual void set_in_memory(bool enable)=0; // enable/disable in-memory mode (where supported)
protected: protected:
Duplicates _duplicates; Duplicates _duplicates;
bool _eraseOnGet; bool _eraseOnGet;
bool _debug; bool _debug;
bool _in_memory;
}; };
class BwTreeDataStore : public AbstractDataStore { class BwTreeDataStore : public AbstractDataStore {
...@@ -65,6 +67,7 @@ public: ...@@ -65,6 +67,7 @@ public:
virtual bool put(ds_bulk_t &key, ds_bulk_t &data); virtual bool put(ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(ds_bulk_t &key, ds_bulk_t &data); virtual bool get(ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(ds_bulk_t &key, std::vector<ds_bulk_t> &data); virtual bool get(ds_bulk_t &key, std::vector<ds_bulk_t> &data);
virtual void set_in_memory(bool enable); // a no-op
protected: protected:
BwTree<ds_bulk_t, ds_bulk_t, BwTree<ds_bulk_t, ds_bulk_t,
my_less, my_equal, my_hash, my_less, my_equal, my_hash,
...@@ -81,6 +84,7 @@ public: ...@@ -81,6 +84,7 @@ public:
virtual bool put(ds_bulk_t &key, ds_bulk_t &data); virtual bool put(ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(ds_bulk_t &key, ds_bulk_t &data); virtual bool get(ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(ds_bulk_t &key, std::vector<ds_bulk_t> &data); virtual bool get(ds_bulk_t &key, std::vector<ds_bulk_t> &data);
virtual void set_in_memory(bool enable); // not supported, a no-op
protected: protected:
leveldb::DB *_dbm = NULL; leveldb::DB *_dbm = NULL;
private: private:
...@@ -98,6 +102,7 @@ public: ...@@ -98,6 +102,7 @@ public:
virtual bool put(ds_bulk_t &key, ds_bulk_t &data); virtual bool put(ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(ds_bulk_t &key, ds_bulk_t &data); virtual bool get(ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(ds_bulk_t &key, std::vector<ds_bulk_t> &data); virtual bool get(ds_bulk_t &key, std::vector<ds_bulk_t> &data);
virtual void set_in_memory(bool enable); // enable/disable in-memory mode
protected: protected:
DbEnv *_dbenv = NULL; DbEnv *_dbenv = NULL;
Db *_dbm = NULL; Db *_dbm = NULL;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment