Commit 8ebd8e6b authored by David Rich's avatar David Rich Committed by Rob Latham
Browse files

Fix BerkeleyDB datastore bug.

parent 811aa9a9
...@@ -287,6 +287,7 @@ void BerkeleyDBDataStore::createDatabase(std::string db_name) { ...@@ -287,6 +287,7 @@ void BerkeleyDBDataStore::createDatabase(std::string db_name) {
DB_INIT_LOG | // Initialize the logging subsystem DB_INIT_LOG | // Initialize the logging subsystem
DB_INIT_TXN | // Initialize the transactional subsystem. This DB_INIT_TXN | // Initialize the transactional subsystem. This
//DB_THREAD | // Cause the environment to be free-threaded //DB_THREAD | // Cause the environment to be free-threaded
DB_AUTO_COMMIT |
DB_INIT_MPOOL; // Initialize the memory pool (in-memory cache) DB_INIT_MPOOL; // Initialize the memory pool (in-memory cache)
} }
else { else {
...@@ -298,6 +299,7 @@ void BerkeleyDBDataStore::createDatabase(std::string db_name) { ...@@ -298,6 +299,7 @@ void BerkeleyDBDataStore::createDatabase(std::string db_name) {
DB_INIT_LOG | // Initialize the logging subsystem DB_INIT_LOG | // Initialize the logging subsystem
DB_INIT_TXN | // Initialize the transactional subsystem. This DB_INIT_TXN | // Initialize the transactional subsystem. This
//DB_THREAD | // Cause the environment to be free-threaded //DB_THREAD | // Cause the environment to be free-threaded
DB_AUTO_COMMIT |
DB_INIT_MPOOL; // Initialize the memory pool (in-memory cache) DB_INIT_MPOOL; // Initialize the memory pool (in-memory cache)
} }
...@@ -327,25 +329,28 @@ void BerkeleyDBDataStore::createDatabase(std::string db_name) { ...@@ -327,25 +329,28 @@ void BerkeleyDBDataStore::createDatabase(std::string db_name) {
if (status == 0) { if (status == 0) {
_dbm = new Db(_dbenv, DB_CXX_NO_EXCEPTIONS); _dbm = new Db(_dbenv, DB_CXX_NO_EXCEPTIONS);
if (_duplicates == Duplicates::ALLOW) {
_dbm->set_flags(DB_DUP); // Allow duplicate keys
}
uint32_t flags = DB_CREATE | DB_AUTO_COMMIT; // Allow database creation uint32_t flags = DB_CREATE | DB_AUTO_COMMIT; // Allow database creation
if (_in_memory) { if (_in_memory) {
DbMpoolFile *mpf = NULL;
status = _dbm->open(NULL, // txn pointer status = _dbm->open(NULL, // txn pointer
NULL, // NULL for in-memory DB NULL, // NULL for in-memory DB
dbname.c_str(), // logical DB name NULL, // logical DB name
DB_BTREE, // DB type (e.g. BTREE, HASH) DB_HASH, // DB type (e.g. BTREE, HASH)
flags, flags,
0); 0);
if (status == 0) { if (status == 0) {
mpf = _dbm->get_mpf(); DbMpoolFile *mpf = _dbm->get_mpf();
mpf->set_flags(DB_MPOOL_NOFILE, 1); mpf->set_flags(DB_MPOOL_NOFILE, 1);
} }
} }
else { else {
status = _dbm->open(NULL, // txn pointer status = _dbm->open(NULL, // txn pointer
dbname.c_str(), // file name dbname.c_str(), // file name
dbname.c_str(), // logical DB name NULL, // logical DB name
DB_BTREE, // DB type (e.g. BTREE, HASH) DB_HASH, // DB type (e.g. BTREE, HASH)
flags, flags,
0); 0);
} }
...@@ -355,11 +360,6 @@ void BerkeleyDBDataStore::createDatabase(std::string db_name) { ...@@ -355,11 +360,6 @@ void BerkeleyDBDataStore::createDatabase(std::string db_name) {
} }
assert(status == 0); // fall over assert(status == 0); // fall over
if (_duplicates == Duplicates::ALLOW) {
uint32_t flags = DB_DUP; // Allow duplicate keys
_dbm->set_flags(flags);
}
// debugging support? // debugging support?
}; };
...@@ -372,8 +372,7 @@ bool BerkeleyDBDataStore::put(ds_bulk_t &key, ds_bulk_t &data) { ...@@ -372,8 +372,7 @@ bool BerkeleyDBDataStore::put(ds_bulk_t &key, ds_bulk_t &data) {
// ALLOW case deals with actual duplicates (where key is the same but value is different). // ALLOW case deals with actual duplicates (where key is the same but value is different).
// This option might be used when eraseOnGet is set (e.g. ParSplice hotpoint use case). // This option might be used when eraseOnGet is set (e.g. ParSplice hotpoint use case).
if (_duplicates == Duplicates::IGNORE || _duplicates == Duplicates::ALLOW) { if (_duplicates == Duplicates::IGNORE || _duplicates == Duplicates::ALLOW) {
ds_bulk_t keydata; Dbt db_key(&(key[0]), uint32_t(key.size()));
Dbt db_key(&(keydata[0]), uint32_t(keydata.size()));
Dbt put_data(&(data[0]), uint32_t(data.size())); Dbt put_data(&(data[0]), uint32_t(data.size()));
uint32_t flags = DB_NOOVERWRITE; // to simply overwrite value, don't use this flag uint32_t flags = DB_NOOVERWRITE; // to simply overwrite value, don't use this flag
status = _dbm->put(NULL, &db_key, &put_data, flags); status = _dbm->put(NULL, &db_key, &put_data, flags);
...@@ -400,8 +399,7 @@ bool BerkeleyDBDataStore::get(ds_bulk_t &key, ds_bulk_t &data) { ...@@ -400,8 +399,7 @@ bool BerkeleyDBDataStore::get(ds_bulk_t &key, ds_bulk_t &data) {
data.clear(); data.clear();
ds_bulk_t keydata; Dbt db_key(&(key[0]), uint32_t(key.size()));
Dbt db_key(&(keydata[0]), uint32_t(keydata.size()));
Dbt db_data; Dbt db_data;
status = _dbm->get(NULL, &db_key, &db_data, 0); status = _dbm->get(NULL, &db_key, &db_data, 0);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment