Commit 501aa9b1 authored by Matthieu Dorier's avatar Matthieu Dorier

Merge branch 'master' of xgitlab.cels.anl.gov:sds/sds-keyval

parents cc38602b 984db4c0
......@@ -89,6 +89,8 @@ bool BerkeleyDBDataStore::openDatabase(const std::string& db_name, const std::st
_dbenv->set_lk_detect(DB_LOCK_MINWRITE);
_dbenv->open(fullpath.c_str(), flags, 0644);
}
_dbenv->set_flags(DB_TXN_WRITE_NOSYNC,1);
_dbenv->set_flags(DB_TXN_NOSYNC,1);
}
catch (DbException &e) {
std::cerr << "BerkeleyDBDataStore::createDatabase: BerkeleyDB error on environment open = "
......@@ -141,12 +143,12 @@ void BerkeleyDBDataStore::set_comparison_function(const std::string& name, compa
_wrapper->_less = less;
}
bool BerkeleyDBDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) {
bool BerkeleyDBDataStore::put(const void* key, size_t ksize, const void* val, size_t vsize) {
int status = 0;
bool success = false;
if(_no_overwrite) {
if(exists(key)) return false;
if(exists(key, ksize)) return false;
}
// IGNORE case deals with redundant puts (where key/value is the same). In BerkeleyDB a
......@@ -154,13 +156,13 @@ bool BerkeleyDBDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) {
// ALLOW case deals with actual duplicates (where key is the same but value is different).
// This option might be used when eraseOnGet is set (e.g. ParSplice hotpoint use case).
if (_duplicates == Duplicates::IGNORE || _duplicates == Duplicates::ALLOW) {
Dbt db_key((void*)&(key[0]), uint32_t(key.size()));
Dbt db_data((void*)&(data[0]), uint32_t(data.size()));
Dbt db_key((void*)key, ksize);
Dbt db_data((void*)val, vsize);
db_key.set_flags(DB_DBT_USERMEM);
db_data.set_flags(DB_DBT_USERMEM);
status = _dbm->put(NULL, &db_key, &db_data, 0);
if (status == 0 ||
(_duplicates == Duplicates::IGNORE && status == DB_KEYEXIST)) {
(_duplicates == Duplicates::IGNORE && status == DB_KEYEXIST)) {
success = true;
}
else {
......@@ -174,8 +176,50 @@ bool BerkeleyDBDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) {
return success;
};
bool BerkeleyDBDataStore::exists(const ds_bulk_t &key) {
Dbt db_key((void*)key.data(), key.size());
bool BerkeleyDBDataStore::put_multi(size_t num_items,
const void* const* keys,
const size_t* ksizes,
const void* const* values,
const size_t* vsizes)
{
size_t sk = 0;
size_t sv = 0;
for(unsigned i = 0; i < num_items; i++) {
sk += ksizes[i];
sv += vsizes[i];
}
sk *= 2;
sv *= 2;
if(sk % 4 != 0) sk += (4 - (sk % 4));
if(sv % 4 != 0) sv += (4 - (sv % 4));
std::vector<char> kbuffer(sk);
std::vector<char> vbuffer(sv);
Dbt mkey, mdata;
mkey.set_ulen(kbuffer.size());
mkey.set_data(kbuffer.data());
mdata.set_ulen(vbuffer.size());
mdata.set_data(vbuffer.data());
DbMultipleDataBuilder keybuilder(mkey);
DbMultipleDataBuilder databuilder(mdata);
for(size_t i = 0; i < num_items; i++) {
keybuilder.append((void*)keys[i], ksizes[i]);
databuilder.append((void*)values[i], vsizes[i]);
}
int status = _dbm->put(NULL, &mkey, &mdata, DB_MULTIPLE);
return status == 0;
}
bool BerkeleyDBDataStore::exists(const void* key, size_t size) const {
Dbt db_key((void*)key, size);
db_key.set_flags(DB_DBT_USERMEM);
int status = _dbm->exists(NULL, &db_key, 0);
return status == 0;
}
......
......@@ -28,30 +28,37 @@ class BerkeleyDBDataStore : public AbstractDataStore {
BerkeleyDBDataStore();
BerkeleyDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
virtual ~BerkeleyDBDataStore();
virtual bool openDatabase(const std::string& db_name, const std::string& path);
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data);
virtual bool exists(const ds_bulk_t &key);
virtual bool erase(const ds_bulk_t &key);
virtual void set_in_memory(bool enable); // enable/disable in-memory mode
virtual void set_comparison_function(const std::string& name, comparator_fn less);
virtual void set_no_overwrite() {
virtual bool openDatabase(const std::string& db_name, const std::string& path) override;
virtual bool put(const void* key, size_t ksize, const void* value, size_t vsize) override;
virtual bool put_multi(size_t num_items,
const void* const* keys,
const size_t* ksizes,
const void* const* values,
const size_t* vsizes) override;
/* virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data) override;
virtual bool put(ds_bulk_t &&key, ds_bulk_t &&data) override; */
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data) override;
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data) override;
virtual bool exists(const void* key, size_t ksize) const override;
virtual bool erase(const ds_bulk_t &key) override;
virtual void set_in_memory(bool enable) override; // enable/disable in-memory mode
virtual void set_comparison_function(const std::string& name, comparator_fn less) override;
virtual void set_no_overwrite() override {
_no_overwrite = true;
}
virtual void sync();
virtual void sync() override;
#ifdef USE_REMI
virtual remi_fileset_t create_and_populate_fileset() const;
virtual remi_fileset_t create_and_populate_fileset() const override;
#endif
protected:
virtual std::vector<ds_bulk_t> vlist_keys(
const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix) const;
const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix) const override;
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyvals(
const ds_bulk_t &start_key, size_t count, const ds_bulk_t &) const;
const ds_bulk_t &start_key, size_t count, const ds_bulk_t &) const override;
virtual std::vector<ds_bulk_t> vlist_key_range(
const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys) const;
const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys) const override;
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyval_range(
const ds_bulk_t &lower_bound, const ds_bulk_t& upper_bound, size_t max_keys) const;
const ds_bulk_t &lower_bound, const ds_bulk_t& upper_bound, size_t max_keys) const override;
DbEnv *_dbenv = nullptr;
Db *_dbm = nullptr;
DbWrapper* _wrapper = nullptr;
......
......@@ -23,10 +23,32 @@ class AbstractDataStore {
AbstractDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
virtual ~AbstractDataStore();
virtual bool openDatabase(const std::string& db_name, const std::string& path)=0;
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data)=0;
virtual bool put(const void* kdata, size_t ksize, const void* vdata, size_t vsize)=0;
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data) {
return put(key.data(), key.size(), data.data(), data.size());
}
virtual bool put(ds_bulk_t&& key, ds_bulk_t&& data) {
return put(key.data(), key.size(), data.data(), data.size());
}
virtual bool put_multi(size_t num_items,
const void* const* keys,
const size_t* ksizes,
const void* const* values,
const size_t* vsizes)
{
bool b = true;
for(size_t i=0; i < num_items; i++) {
bool b2 = put(keys[i], ksizes[i], values[i], vsizes[i]);
b = b && b2;
}
return b;
}
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data)=0;
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data)=0;
virtual bool exists(const ds_bulk_t &key) = 0;
virtual bool exists(const void* key, size_t ksize) const = 0;
virtual bool exists(const ds_bulk_t &key) const {
return exists(key.data(), key.size());
}
virtual bool erase(const ds_bulk_t &key) = 0;
virtual void set_in_memory(bool enable)=0; // enable/disable in-memory mode (where supported)
virtual void set_comparison_function(const std::string& name, comparator_fn less)=0;
......
......@@ -25,6 +25,11 @@ std::string LevelDBDataStore::toString(const ds_bulk_t &bulk_val) {
return str_val;
};
std::string LevelDBDataStore::toString(const char* buf, size_t buf_size) {
std::string str_val(buf, buf_size);
return str_val;
};
ds_bulk_t LevelDBDataStore::fromString(const std::string &str_val) {
ds_bulk_t bulk_val(str_val.begin(), str_val.end());
return bulk_val;
......@@ -69,19 +74,21 @@ void LevelDBDataStore::set_comparison_function(const std::string& name, comparat
_less = less;
}
bool LevelDBDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) {
bool LevelDBDataStore::put(const void* key, size_t ksize, const void* value, size_t vsize) {
leveldb::Status status;
bool success = false;
if(_no_overwrite) {
if(exists(key)) return false;
if(exists(key, ksize)) return false;
}
//high_resolution_clock::time_point start = high_resolution_clock::now();
// IGNORE case deals with redundant puts (where key/value is the same). In LevelDB a
// redundant put simply overwrites previous value which is fine when key/value is the same.
if (_duplicates == Duplicates::IGNORE) {
status = _dbm->Put(leveldb::WriteOptions(), toString(key), toString(data));
status = _dbm->Put(leveldb::WriteOptions(),
leveldb::Slice((const char*)key, ksize),
leveldb::Slice((const char*)value, vsize));
if (status.ok()) {
success = true;
}
......@@ -107,10 +114,10 @@ bool LevelDBDataStore::erase(const ds_bulk_t &key) {
return status.ok();
}
bool LevelDBDataStore::exists(const ds_bulk_t &key) {
bool LevelDBDataStore::exists(const void* key, size_t ksize) const {
leveldb::Status status;
std::string value;
status = _dbm->Get(leveldb::ReadOptions(), toString(key), &value);
status = _dbm->Get(leveldb::ReadOptions(), leveldb::Slice((const char*)key, ksize), &value);
return status.ok();
}
......
......@@ -41,34 +41,39 @@ class LevelDBDataStore : public AbstractDataStore {
LevelDBDataStore();
LevelDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
virtual ~LevelDBDataStore();
virtual bool openDatabase(const std::string& db_name, const std::string& path);
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data);
virtual bool exists(const ds_bulk_t &key);
virtual bool erase(const ds_bulk_t &key);
virtual void set_in_memory(bool enable); // not supported, a no-op
virtual void set_comparison_function(const std::string& name, comparator_fn less);
virtual void set_no_overwrite() {
virtual bool openDatabase(const std::string& db_name, const std::string& path) override;
/*
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data) override;
virtual bool put(ds_bulk_t &&key, ds_bulk_t &&data) override;
*/
virtual bool put(const void* key, size_t ksize, const void* kdata, size_t dsize) override;
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data) override;
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data) override;
virtual bool exists(const void* key, size_t ksize) const override;
virtual bool erase(const ds_bulk_t &key) override;
virtual void set_in_memory(bool enable) override; // not supported, a no-op
virtual void set_comparison_function(const std::string& name, comparator_fn less) override;
virtual void set_no_overwrite() override {
_no_overwrite = true;
}
virtual void sync();
virtual void sync() override;
#ifdef USE_REMI
virtual remi_fileset_t create_and_populate_fileset() const;
virtual remi_fileset_t create_and_populate_fileset() const override;
#endif
protected:
virtual std::vector<ds_bulk_t> vlist_keys(
const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix) const;
const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix) const override;
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyvals(
const ds_bulk_t &start_key, size_t count, const ds_bulk_t &prefix) const;
const ds_bulk_t &start_key, size_t count, const ds_bulk_t &prefix) const override;
virtual std::vector<ds_bulk_t> vlist_key_range(
const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys) const;
const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys) const override;
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyval_range(
const ds_bulk_t &lower_bound, const ds_bulk_t& upper_bound, size_t max_keys) const;
const ds_bulk_t &lower_bound, const ds_bulk_t& upper_bound, size_t max_keys) const override;
leveldb::DB *_dbm = NULL;
private:
std::string toString(const ds_bulk_t &key);
ds_bulk_t fromString(const std::string &keystr);
static std::string toString(const ds_bulk_t &key);
static std::string toString(const char* bug, size_t buf_size);
static ds_bulk_t fromString(const std::string &keystr);
AbstractDataStore::comparator_fn _less;
LevelDBDataStoreComparator _keycmp;
};
......
......@@ -42,7 +42,7 @@ class MapDataStore : public AbstractDataStore {
ABT_rwlock_free(&_map_lock);
}
virtual bool openDatabase(const std::string& db_name, const std::string& path) {
virtual bool openDatabase(const std::string& db_name, const std::string& path) override {
_name = db_name;
_path = path;
ABT_rwlock_wrlock(_map_lock);
......@@ -51,9 +51,9 @@ class MapDataStore : public AbstractDataStore {
return true;
}
virtual void sync() {}
virtual void sync() override {}
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data) {
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data) override {
ABT_rwlock_wrlock(_map_lock);
auto x = _map.count(key);
if(_no_overwrite && (x != 0)) {
......@@ -69,7 +69,29 @@ class MapDataStore : public AbstractDataStore {
return true;
}
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data) {
virtual bool put(ds_bulk_t &&key, ds_bulk_t &&data) override {
ABT_rwlock_wrlock(_map_lock);
auto x = _map.count(key);
if(_no_overwrite && (x != 0)) {
ABT_rwlock_unlock(_map_lock);
return false;
}
if(_duplicates == Duplicates::IGNORE && (x != 0)) {
ABT_rwlock_unlock(_map_lock);
return false;
}
_map.insert(std::make_pair(std::move(key),std::move(data)));
ABT_rwlock_unlock(_map_lock);
return true;
}
virtual bool put(const void* key, size_t ksize, const void* value, size_t vsize) override {
ds_bulk_t k((const char*)key, ((const char*)key)+ksize);
ds_bulk_t v((const char*)value, ((const char*)value)+vsize);
return put(std::move(k), std::move(v));
}
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data) override {
ABT_rwlock_rdlock(_map_lock);
auto it = _map.find(key);
if(it == _map.end()) {
......@@ -81,20 +103,24 @@ class MapDataStore : public AbstractDataStore {
return true;
}
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t>& values) {
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t>& values) override {
values.clear();
values.resize(1);
return get(key, values[0]);
}
virtual bool exists(const ds_bulk_t& key) {
virtual bool exists(const ds_bulk_t& key) const override {
ABT_rwlock_rdlock(_map_lock);
bool e = _map.count(key) > 0;
ABT_rwlock_unlock(_map_lock);
return e;
}
virtual bool erase(const ds_bulk_t &key) {
virtual bool exists(const void* key, size_t ksize) const override {
return exists(ds_bulk_t((const char*)key, ((const char*)key)+ksize));
}
virtual bool erase(const ds_bulk_t &key) override {
ABT_rwlock_wrlock(_map_lock);
bool b = _map.find(key) != _map.end();
_map.erase(key);
......@@ -102,21 +128,21 @@ class MapDataStore : public AbstractDataStore {
return b;
}
virtual void set_in_memory(bool enable) {
virtual void set_in_memory(bool enable) override {
_in_memory = enable;
}
virtual void set_comparison_function(const std::string& name, comparator_fn less) {
virtual void set_comparison_function(const std::string& name, comparator_fn less) override {
_comp_fun_name = name;
_less = less;
}
virtual void set_no_overwrite() {
virtual void set_no_overwrite() override {
_no_overwrite = true;
}
#ifdef USE_REMI
virtual remi_fileset_t create_and_populate_fileset() const {
virtual remi_fileset_t create_and_populate_fileset() const override {
return REMI_FILESET_NULL;
}
#endif
......@@ -124,7 +150,7 @@ class MapDataStore : public AbstractDataStore {
protected:
virtual std::vector<ds_bulk_t> vlist_keys(
const ds_bulk_t &start_key, size_t count, const ds_bulk_t &prefix) const {
const ds_bulk_t &start_key, size_t count, const ds_bulk_t &prefix) const override {
ABT_rwlock_rdlock(_map_lock);
std::vector<ds_bulk_t> result;
decltype(_map.begin()) it;
......@@ -149,7 +175,7 @@ class MapDataStore : public AbstractDataStore {
}
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyvals(
const ds_bulk_t &start_key, size_t count, const ds_bulk_t &prefix) const {
const ds_bulk_t &start_key, size_t count, const ds_bulk_t &prefix) const override {
ABT_rwlock_rdlock(_map_lock);
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> result;
decltype(_map.begin()) it;
......@@ -174,7 +200,7 @@ class MapDataStore : public AbstractDataStore {
}
virtual std::vector<ds_bulk_t> vlist_key_range(
const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys) const {
const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys) const override {
ABT_rwlock_rdlock(_map_lock);
std::vector<ds_bulk_t> result;
decltype(_map.begin()) it, ub;
......@@ -198,7 +224,7 @@ class MapDataStore : public AbstractDataStore {
}
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyval_range(
const ds_bulk_t &lower_bound, const ds_bulk_t& upper_bound, size_t max_keys) const {
const ds_bulk_t &lower_bound, const ds_bulk_t& upper_bound, size_t max_keys) const override {
ABT_rwlock_rdlock(_map_lock);
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> result;
decltype(_map.begin()) it, ub;
......
......@@ -179,10 +179,18 @@ int main(int argc, char **argv)
return(-1);
}
char* path = opts.db_names[i];
char* x = strrchr(path, '/');
char* db_name = path;
if(x != NULL) {
db_name = x+1;
*x = '\0';
}
sdskv_database_id_t db_id;
sdskv_config_t db_config = {
.db_name = opts.db_names[i],
.db_path = "",
.db_name = db_name,
.db_path = (x == NULL ? "" : path),
.db_type = opts.db_types[i],
.db_comp_fn_name = SDSKV_COMPARE_DEFAULT,
.db_no_overwrite = 0
......@@ -216,9 +224,16 @@ int main(int argc, char **argv)
for(i=0; i < opts.num_db; i++) {
sdskv_database_id_t db_id;
char* path = opts.db_names[i];
char* x = strrchr(path, '/');
char* db_name = path;
if(x != NULL) {
db_name = x+1;
*x = '\0';
}
sdskv_config_t db_config = {
.db_name = opts.db_names[i],
.db_path = "",
.db_name = db_name,
.db_path = (x == NULL ? "" : path),
.db_type = opts.db_types[i],
.db_comp_fn_name = SDSKV_COMPARE_DEFAULT,
.db_no_overwrite = 0
......
......@@ -779,13 +779,17 @@ static void sdskv_put_multi_ult(hg_handle_t handle)
/* go through the key/value pairs and insert them */
uint64_t keys_offset = sizeof(hg_size_t)*in.num_keys;
uint64_t vals_offset = sizeof(hg_size_t)*in.num_keys;
std::vector<const void*> kptrs(in.num_keys);
std::vector<const void*> vptrs(in.num_keys);
for(unsigned i=0; i < in.num_keys; i++) {
ds_bulk_t kdata(local_keys_buffer.data()+keys_offset, local_keys_buffer.data()+keys_offset+key_sizes[i]);
ds_bulk_t vdata(local_vals_buffer.data()+vals_offset, local_vals_buffer.data()+vals_offset+val_sizes[i]);
db->put(kdata, vdata);
kptrs[i] = local_keys_buffer.data()+keys_offset;
vptrs[i] = local_vals_buffer.data()+vals_offset;
keys_offset += key_sizes[i];
vals_offset += val_sizes[i];
}
bool result = db->put_multi(in.num_keys, kptrs.data(), key_sizes, vptrs.data(), val_sizes);
if(not result)
out.ret = SDSKV_ERR_PUT;
return;
}
......
......@@ -195,10 +195,18 @@ int main(int argc, char **argv)
return(-1);
}
char* path = opts.db_names[i];
char* x = strrchr(path, '/');
char* db_name = path;
if(x != NULL) {
db_name = x+1;
*x = '\0';
}
sdskv_database_id_t db_id;
sdskv_config_t db_config = {
.db_name = opts.db_names[i],
.db_path = "",
.db_name = db_name,
.db_path = (x == NULL ? "" : path),
.db_type = opts.db_types[i],
.db_comp_fn_name = "my_custom_comp_function",
.db_no_overwrite = 0
......@@ -241,9 +249,16 @@ int main(int argc, char **argv)
for(i=0; i < opts.num_db; i++) {
sdskv_database_id_t db_id;
char* path = opts.db_names[i];
char* x = strrchr(path, '/');
char* db_name = path;
if(x != NULL) {
db_name = x+1;
*x = '\0';
}
sdskv_config_t db_config = {
.db_name = opts.db_names[i],
.db_path = "",
.db_name = db_name,
.db_path = (x == NULL ? "" : path),
.db_type = opts.db_types[i],
.db_comp_fn_name = "my_custom_comp_function",
.db_no_overwrite = 0
......
......@@ -56,7 +56,7 @@ function test_start_custom_server ()
function find_db_name ()
{
test_db_name=$TMPBASE/${SDSKV_TEST_DB_NAME:-"sdskv-test-db"}
test_db_name=${SDSKV_TEST_DB_NAME:-"sdskv-test-db"}
test_db_type=${SDSKV_TEST_DB_TYPE:-"map"}
test_db_full="${test_db_name}:${test_db_type}"
test_db_full="${TMPBASE}/${test_db_name}:${test_db_type}"
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment