Commit 1aaf73ae authored by David Rich's avatar David Rich

Fix BerkeleyDB interface implementation to be multithread tolerant. Added...

Fix BerkeleyDB interface implementation to be multithread tolerant. Added simple ABT_mutex in kv-server to protect access to the 2 datastore related global variables in open_handler. Successfully tested with ParSplice.
parent c8d9ea77
......@@ -276,14 +276,15 @@ void BerkeleyDBDataStore::createDatabase(std::string db_name) {
// initialize the environment
uint32_t flags = 0;
if (_in_memory) {
// not sure if we want all of these for in_memory
flags =
DB_CREATE | // Create the environment if it does not exist
DB_PRIVATE |
//DB_RECOVER | // Run normal recovery.
//DB_INIT_LOCK | // Initialize the locking subsystem
DB_RECOVER | // Run normal recovery.
DB_INIT_LOCK | // Initialize the locking subsystem
DB_INIT_LOG | // Initialize the logging subsystem
DB_INIT_TXN | // Initialize the transactional subsystem. This
//DB_THREAD | // Cause the environment to be free-threaded
DB_THREAD | // Cause the environment to be free-threaded
DB_AUTO_COMMIT |
DB_INIT_MPOOL; // Initialize the memory pool (in-memory cache)
}
......@@ -292,10 +293,10 @@ void BerkeleyDBDataStore::createDatabase(std::string db_name) {
DB_CREATE | // Create the environment if it does not exist
DB_PRIVATE |
DB_RECOVER | // Run normal recovery.
//DB_INIT_LOCK | // Initialize the locking subsystem
DB_INIT_LOCK | // Initialize the locking subsystem
DB_INIT_LOG | // Initialize the logging subsystem
DB_INIT_TXN | // Initialize the transactional subsystem. This
//DB_THREAD | // Cause the environment to be free-threaded
DB_THREAD | // Cause the environment to be free-threaded
DB_AUTO_COMMIT |
DB_INIT_MPOOL; // Initialize the memory pool (in-memory cache)
}
......@@ -330,7 +331,7 @@ void BerkeleyDBDataStore::createDatabase(std::string db_name) {
_dbm->set_flags(DB_DUP); // Allow duplicate keys
}
uint32_t flags = DB_CREATE | DB_AUTO_COMMIT; // Allow database creation
uint32_t flags = DB_CREATE | DB_AUTO_COMMIT | DB_THREAD; // Allow database creation
if (_in_memory) {
status = _dbm->open(NULL, // txn pointer
NULL, // NULL for in-memory DB
......@@ -370,9 +371,11 @@ bool BerkeleyDBDataStore::put(ds_bulk_t &key, ds_bulk_t &data) {
// This option might be used when eraseOnGet is set (e.g. ParSplice hotpoint use case).
if (_duplicates == Duplicates::IGNORE || _duplicates == Duplicates::ALLOW) {
Dbt db_key(&(key[0]), uint32_t(key.size()));
Dbt put_data(&(data[0]), uint32_t(data.size()));
Dbt db_data(&(data[0]), uint32_t(data.size()));
db_key.set_flags(DB_DBT_USERMEM);
db_data.set_flags(DB_DBT_USERMEM);
uint32_t flags = DB_NOOVERWRITE; // to simply overwrite value, don't use this flag
status = _dbm->put(NULL, &db_key, &put_data, flags);
status = _dbm->put(NULL, &db_key, &db_data, flags);
if (status == 0 ||
(_duplicates == Duplicates::IGNORE && status == DB_KEYEXIST)) {
success = true;
......@@ -398,11 +401,14 @@ bool BerkeleyDBDataStore::get(ds_bulk_t &key, ds_bulk_t &data) {
Dbt db_key(&(key[0]), uint32_t(key.size()));
Dbt db_data;
db_key.set_flags(DB_DBT_USERMEM);
db_data.set_flags(DB_DBT_REALLOC);
status = _dbm->get(NULL, &db_key, &db_data, 0);
if (status != DB_NOTFOUND && status != DB_KEYEMPTY) {
if (status != DB_NOTFOUND && status != DB_KEYEMPTY && db_data.get_size() > 0) {
data.resize(db_data.get_size(), 0);
memcpy(&(data[0]), db_data.get_data(), db_data.get_size());
free(db_data.get_data());
success = true;
}
else {
......
......@@ -19,6 +19,7 @@
// since this is global, we're assuming this server instance will manage a single DB
AbstractDataStore *datastore = NULL;
std::string db_name;
ABT_mutex mutex; // lock for protecting above global vars in open_handler
static hg_return_t open_handler(hg_handle_t handle)
{
......@@ -32,6 +33,7 @@ static hg_return_t open_handler(hg_handle_t handle)
std::cout << "SERVER: OPEN " << in_name << std::endl;
#endif
ABT_mutex_lock(mutex);
if (!datastore) {
#if USE_BWTREE
datastore = new BwTreeDataStore(); // testing BwTree
......@@ -54,7 +56,7 @@ static hg_return_t open_handler(hg_handle_t handle)
else {
if (db_name == in_name) {
#ifdef KV_DEBUG
std::cout << "SERVER OPEN: DataStore initialized and ready for " << db_name << std::endl;
std::cout << "SERVER OPEN: DataStore ready for " << db_name << std::endl;
#endif
out.ret = HG_SUCCESS;
}
......@@ -66,6 +68,7 @@ static hg_return_t open_handler(hg_handle_t handle)
out.ret = HG_OTHER_ERROR;
}
}
ABT_mutex_unlock(mutex);
ret = margo_respond(handle, &out);
assert(ret == HG_SUCCESS);
......@@ -502,6 +505,8 @@ kv_context_t *kv_server_register(const margo_instance_id mid)
hg_size_t addr_self_string_sz = 128;
kv_context_t *context = NULL;
ABT_mutex_create(&mutex); // initialize mutex
/* sds keyval server init */
context = (kv_context_t*)malloc(sizeof(kv_context_t));
memset(context, 0, sizeof(kv_context_t));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment