Commit 6f745819 authored by Matthieu Dorier's avatar Matthieu Dorier

added better support for put-multi

parent 13c31d4f
...@@ -89,6 +89,8 @@ bool BerkeleyDBDataStore::openDatabase(const std::string& db_name, const std::st ...@@ -89,6 +89,8 @@ bool BerkeleyDBDataStore::openDatabase(const std::string& db_name, const std::st
_dbenv->set_lk_detect(DB_LOCK_MINWRITE); _dbenv->set_lk_detect(DB_LOCK_MINWRITE);
_dbenv->open(fullpath.c_str(), flags, 0644); _dbenv->open(fullpath.c_str(), flags, 0644);
} }
_dbenv->set_flags(DB_TXN_WRITE_NOSYNC,1);
_dbenv->set_flags(DB_TXN_NOSYNC,1);
} }
catch (DbException &e) { catch (DbException &e) {
std::cerr << "BerkeleyDBDataStore::createDatabase: BerkeleyDB error on environment open = " std::cerr << "BerkeleyDBDataStore::createDatabase: BerkeleyDB error on environment open = "
...@@ -141,12 +143,12 @@ void BerkeleyDBDataStore::set_comparison_function(const std::string& name, compa ...@@ -141,12 +143,12 @@ void BerkeleyDBDataStore::set_comparison_function(const std::string& name, compa
_wrapper->_less = less; _wrapper->_less = less;
} }
bool BerkeleyDBDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) { bool BerkeleyDBDataStore::put(const void* key, size_t ksize, const void* val, size_t vsize) {
int status = 0; int status = 0;
bool success = false; bool success = false;
if(_no_overwrite) { if(_no_overwrite) {
if(exists(key)) return false; if(exists(key, ksize)) return false;
} }
// IGNORE case deals with redundant puts (where key/value is the same). In BerkeleyDB a // IGNORE case deals with redundant puts (where key/value is the same). In BerkeleyDB a
...@@ -154,13 +156,13 @@ bool BerkeleyDBDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) { ...@@ -154,13 +156,13 @@ bool BerkeleyDBDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) {
// ALLOW case deals with actual duplicates (where key is the same but value is different). // ALLOW case deals with actual duplicates (where key is the same but value is different).
// This option might be used when eraseOnGet is set (e.g. ParSplice hotpoint use case). // This option might be used when eraseOnGet is set (e.g. ParSplice hotpoint use case).
if (_duplicates == Duplicates::IGNORE || _duplicates == Duplicates::ALLOW) { if (_duplicates == Duplicates::IGNORE || _duplicates == Duplicates::ALLOW) {
Dbt db_key((void*)&(key[0]), uint32_t(key.size())); Dbt db_key((void*)key, ksize);
Dbt db_data((void*)&(data[0]), uint32_t(data.size())); Dbt db_data((void*)val, vsize);
db_key.set_flags(DB_DBT_USERMEM); db_key.set_flags(DB_DBT_USERMEM);
db_data.set_flags(DB_DBT_USERMEM); db_data.set_flags(DB_DBT_USERMEM);
status = _dbm->put(NULL, &db_key, &db_data, 0); status = _dbm->put(NULL, &db_key, &db_data, 0);
if (status == 0 || if (status == 0 ||
(_duplicates == Duplicates::IGNORE && status == DB_KEYEXIST)) { (_duplicates == Duplicates::IGNORE && status == DB_KEYEXIST)) {
success = true; success = true;
} }
else { else {
...@@ -174,8 +176,50 @@ bool BerkeleyDBDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) { ...@@ -174,8 +176,50 @@ bool BerkeleyDBDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) {
return success; return success;
}; };
bool BerkeleyDBDataStore::exists(const ds_bulk_t &key) { bool BerkeleyDBDataStore::put_multi(size_t num_items,
Dbt db_key((void*)key.data(), key.size()); const void* const* keys,
const size_t* ksizes,
const void* const* values,
const size_t* vsizes)
{
size_t sk = 0;
size_t sv = 0;
for(unsigned i = 0; i < num_items; i++) {
sk += ksizes[i];
sv += vsizes[i];
}
sk *= 2;
sv *= 2;
if(sk % 4 != 0) sk += (4 - (sk % 4));
if(sv % 4 != 0) sv += (4 - (sv % 4));
std::vector<char> kbuffer(sk);
std::vector<char> vbuffer(sv);
Dbt mkey, mdata;
mkey.set_ulen(kbuffer.size());
mkey.set_data(kbuffer.data());
mdata.set_ulen(vbuffer.size());
mdata.set_data(vbuffer.data());
DbMultipleDataBuilder keybuilder(mkey);
DbMultipleDataBuilder databuilder(mdata);
for(size_t i = 0; i < num_items; i++) {
keybuilder.append((void*)keys[i], ksizes[i]);
databuilder.append((void*)values[i], vsizes[i]);
}
int status = _dbm->put(NULL, &mkey, &mdata, DB_MULTIPLE);
return status == 0;
}
bool BerkeleyDBDataStore::exists(const void* key, size_t size) const {
Dbt db_key((void*)key, size);
db_key.set_flags(DB_DBT_USERMEM);
int status = _dbm->exists(NULL, &db_key, 0); int status = _dbm->exists(NULL, &db_key, 0);
return status == 0; return status == 0;
} }
......
...@@ -28,30 +28,37 @@ class BerkeleyDBDataStore : public AbstractDataStore { ...@@ -28,30 +28,37 @@ class BerkeleyDBDataStore : public AbstractDataStore {
BerkeleyDBDataStore(); BerkeleyDBDataStore();
BerkeleyDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug); BerkeleyDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
virtual ~BerkeleyDBDataStore(); virtual ~BerkeleyDBDataStore();
virtual bool openDatabase(const std::string& db_name, const std::string& path); virtual bool openDatabase(const std::string& db_name, const std::string& path) override;
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data); virtual bool put(const void* key, size_t ksize, const void* value, size_t vsize) override;
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data); virtual bool put_multi(size_t num_items,
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data); const void* const* keys,
virtual bool exists(const ds_bulk_t &key); const size_t* ksizes,
virtual bool erase(const ds_bulk_t &key); const void* const* values,
virtual void set_in_memory(bool enable); // enable/disable in-memory mode const size_t* vsizes) override;
virtual void set_comparison_function(const std::string& name, comparator_fn less); /* virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data) override;
virtual void set_no_overwrite() { virtual bool put(ds_bulk_t &&key, ds_bulk_t &&data) override; */
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data) override;
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data) override;
virtual bool exists(const void* key, size_t ksize) const override;
virtual bool erase(const ds_bulk_t &key) override;
virtual void set_in_memory(bool enable) override; // enable/disable in-memory mode
virtual void set_comparison_function(const std::string& name, comparator_fn less) override;
virtual void set_no_overwrite() override {
_no_overwrite = true; _no_overwrite = true;
} }
virtual void sync(); virtual void sync() override;
#ifdef USE_REMI #ifdef USE_REMI
virtual remi_fileset_t create_and_populate_fileset() const; virtual remi_fileset_t create_and_populate_fileset() const override;
#endif #endif
protected: protected:
virtual std::vector<ds_bulk_t> vlist_keys( virtual std::vector<ds_bulk_t> vlist_keys(
const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix) const; const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix) const override;
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyvals( virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyvals(
const ds_bulk_t &start_key, size_t count, const ds_bulk_t &) const; const ds_bulk_t &start_key, size_t count, const ds_bulk_t &) const override;
virtual std::vector<ds_bulk_t> vlist_key_range( virtual std::vector<ds_bulk_t> vlist_key_range(
const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys) const; const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys) const override;
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyval_range( virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyval_range(
const ds_bulk_t &lower_bound, const ds_bulk_t& upper_bound, size_t max_keys) const; const ds_bulk_t &lower_bound, const ds_bulk_t& upper_bound, size_t max_keys) const override;
DbEnv *_dbenv = nullptr; DbEnv *_dbenv = nullptr;
Db *_dbm = nullptr; Db *_dbm = nullptr;
DbWrapper* _wrapper = nullptr; DbWrapper* _wrapper = nullptr;
......
...@@ -23,10 +23,32 @@ class AbstractDataStore { ...@@ -23,10 +23,32 @@ class AbstractDataStore {
AbstractDataStore(Duplicates duplicates, bool eraseOnGet, bool debug); AbstractDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
virtual ~AbstractDataStore(); virtual ~AbstractDataStore();
virtual bool openDatabase(const std::string& db_name, const std::string& path)=0; virtual bool openDatabase(const std::string& db_name, const std::string& path)=0;
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data)=0; virtual bool put(const void* kdata, size_t ksize, const void* vdata, size_t vsize)=0;
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data) {
return put(key.data(), key.size(), data.data(), data.size());
}
virtual bool put(ds_bulk_t&& key, ds_bulk_t&& data) {
return put(key.data(), key.size(), data.data(), data.size());
}
virtual bool put_multi(size_t num_items,
const void* const* keys,
const size_t* ksizes,
const void* const* values,
const size_t* vsizes)
{
bool b = true;
for(size_t i=0; i < num_items; i++) {
bool b2 = put(keys[i], ksizes[i], values[i], vsizes[i]);
b = b && b2;
}
return b;
}
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data)=0; virtual bool get(const ds_bulk_t &key, ds_bulk_t &data)=0;
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data)=0; virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data)=0;
virtual bool exists(const ds_bulk_t &key) = 0; virtual bool exists(const void* key, size_t ksize) const = 0;
virtual bool exists(const ds_bulk_t &key) const {
return exists(key.data(), key.size());
}
virtual bool erase(const ds_bulk_t &key) = 0; virtual bool erase(const ds_bulk_t &key) = 0;
virtual void set_in_memory(bool enable)=0; // enable/disable in-memory mode (where supported) virtual void set_in_memory(bool enable)=0; // enable/disable in-memory mode (where supported)
virtual void set_comparison_function(const std::string& name, comparator_fn less)=0; virtual void set_comparison_function(const std::string& name, comparator_fn less)=0;
......
...@@ -20,12 +20,17 @@ LevelDBDataStore::LevelDBDataStore(Duplicates duplicates, bool eraseOnGet, bool ...@@ -20,12 +20,17 @@ LevelDBDataStore::LevelDBDataStore(Duplicates duplicates, bool eraseOnGet, bool
_dbm = NULL; _dbm = NULL;
}; };
std::string LevelDBDataStore::toString(const ds_bulk_t &bulk_val) { std::string LevelDBDataStore::toString(const ds_bulk_t &bulk_val) const {
std::string str_val(bulk_val.begin(), bulk_val.end()); std::string str_val(bulk_val.begin(), bulk_val.end());
return str_val; return str_val;
}; };
ds_bulk_t LevelDBDataStore::fromString(const std::string &str_val) { std::string LevelDBDataStore::toString(const char* buf, size_t buf_size) const {
std::string str_val(buf, buf_size);
return str_val;
};
ds_bulk_t LevelDBDataStore::fromString(const std::string &str_val) const {
ds_bulk_t bulk_val(str_val.begin(), str_val.end()); ds_bulk_t bulk_val(str_val.begin(), str_val.end());
return bulk_val; return bulk_val;
}; };
...@@ -69,19 +74,21 @@ void LevelDBDataStore::set_comparison_function(const std::string& name, comparat ...@@ -69,19 +74,21 @@ void LevelDBDataStore::set_comparison_function(const std::string& name, comparat
_less = less; _less = less;
} }
bool LevelDBDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) { bool LevelDBDataStore::put(const void* key, size_t ksize, const void* value, size_t vsize) {
leveldb::Status status; leveldb::Status status;
bool success = false; bool success = false;
if(_no_overwrite) { if(_no_overwrite) {
if(exists(key)) return false; if(exists(key, ksize)) return false;
} }
//high_resolution_clock::time_point start = high_resolution_clock::now(); //high_resolution_clock::time_point start = high_resolution_clock::now();
// IGNORE case deals with redundant puts (where key/value is the same). In LevelDB a // IGNORE case deals with redundant puts (where key/value is the same). In LevelDB a
// redundant put simply overwrites previous value which is fine when key/value is the same. // redundant put simply overwrites previous value which is fine when key/value is the same.
if (_duplicates == Duplicates::IGNORE) { if (_duplicates == Duplicates::IGNORE) {
status = _dbm->Put(leveldb::WriteOptions(), toString(key), toString(data)); status = _dbm->Put(leveldb::WriteOptions(),
leveldb::Slice(key, ksize),
leveldb::Slice(value, vsize);
if (status.ok()) { if (status.ok()) {
success = true; success = true;
} }
...@@ -101,16 +108,24 @@ bool LevelDBDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) { ...@@ -101,16 +108,24 @@ bool LevelDBDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) {
return success; return success;
}; };
bool LevelDBDataStore::put(ds_bulk_t&& key, ds_bulk_t&& value) {
return put(key.data(), key.size(), value.data(), value.size());
}
bool LevelDBDataStore::put(const ds_bulk_t& key, const ds_bulk_t& value) {
return put(key.data(), key.size(), value.data(), value.size());
}
bool LevelDBDataStore::erase(const ds_bulk_t &key) { bool LevelDBDataStore::erase(const ds_bulk_t &key) {
leveldb::Status status; leveldb::Status status;
status = _dbm->Delete(leveldb::WriteOptions(), toString(key)); status = _dbm->Delete(leveldb::WriteOptions(), toString(key));
return status.ok(); return status.ok();
} }
bool LevelDBDataStore::exists(const ds_bulk_t &key) { bool LevelDBDataStore::exists(const void* key, size_t ksize) const {
leveldb::Status status; leveldb::Status status;
std::string value; std::string value;
status = _dbm->Get(leveldb::ReadOptions(), toString(key), &value); status = _dbm->Get(leveldb::ReadOptions(), leveldb::Slice(key, ksize), &value);
return status.ok(); return status.ok();
} }
......
...@@ -41,34 +41,39 @@ class LevelDBDataStore : public AbstractDataStore { ...@@ -41,34 +41,39 @@ class LevelDBDataStore : public AbstractDataStore {
LevelDBDataStore(); LevelDBDataStore();
LevelDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug); LevelDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
virtual ~LevelDBDataStore(); virtual ~LevelDBDataStore();
virtual bool openDatabase(const std::string& db_name, const std::string& path); virtual bool openDatabase(const std::string& db_name, const std::string& path) override;
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data); /*
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data); virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data) override;
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data); virtual bool put(ds_bulk_t &&key, ds_bulk_t &&data) override;
virtual bool exists(const ds_bulk_t &key); */
virtual bool erase(const ds_bulk_t &key); virtual bool put(const void* key, size_t ksize, const void* kdata, size_t kdata) override;
virtual void set_in_memory(bool enable); // not supported, a no-op virtual bool get(const ds_bulk_t &key, ds_bulk_t &data) override;
virtual void set_comparison_function(const std::string& name, comparator_fn less); virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data) override;
virtual void set_no_overwrite() { virtual bool exists(const void* key, size_t ksize) const override;
virtual bool erase(const ds_bulk_t &key) override;
virtual void set_in_memory(bool enable) override; // not supported, a no-op
virtual void set_comparison_function(const std::string& name, comparator_fn less) override;
virtual void set_no_overwrite() override {
_no_overwrite = true; _no_overwrite = true;
} }
virtual void sync(); virtual void sync() override;
#ifdef USE_REMI #ifdef USE_REMI
virtual remi_fileset_t create_and_populate_fileset() const; virtual remi_fileset_t create_and_populate_fileset() const override;
#endif #endif
protected: protected:
virtual std::vector<ds_bulk_t> vlist_keys( virtual std::vector<ds_bulk_t> vlist_keys(
const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix) const; const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix) const override;
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyvals( virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyvals(
const ds_bulk_t &start_key, size_t count, const ds_bulk_t &prefix) const; const ds_bulk_t &start_key, size_t count, const ds_bulk_t &prefix) const override;
virtual std::vector<ds_bulk_t> vlist_key_range( virtual std::vector<ds_bulk_t> vlist_key_range(
const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys) const; const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys) const override;
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyval_range( virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyval_range(
const ds_bulk_t &lower_bound, const ds_bulk_t& upper_bound, size_t max_keys) const; const ds_bulk_t &lower_bound, const ds_bulk_t& upper_bound, size_t max_keys) const override;
leveldb::DB *_dbm = NULL; leveldb::DB *_dbm = NULL;
private: private:
std::string toString(const ds_bulk_t &key); static std::string toString(const ds_bulk_t &key) const;
ds_bulk_t fromString(const std::string &keystr); static std::string toString(const char* bug, size_t buf_size) const;
static ds_bulk_t fromString(const std::string &keystr) const;
AbstractDataStore::comparator_fn _less; AbstractDataStore::comparator_fn _less;
LevelDBDataStoreComparator _keycmp; LevelDBDataStoreComparator _keycmp;
}; };
......
...@@ -42,7 +42,7 @@ class MapDataStore : public AbstractDataStore { ...@@ -42,7 +42,7 @@ class MapDataStore : public AbstractDataStore {
ABT_rwlock_free(&_map_lock); ABT_rwlock_free(&_map_lock);
} }
virtual bool openDatabase(const std::string& db_name, const std::string& path) { virtual bool openDatabase(const std::string& db_name, const std::string& path) override {
_name = db_name; _name = db_name;
_path = path; _path = path;
ABT_rwlock_wrlock(_map_lock); ABT_rwlock_wrlock(_map_lock);
...@@ -51,9 +51,9 @@ class MapDataStore : public AbstractDataStore { ...@@ -51,9 +51,9 @@ class MapDataStore : public AbstractDataStore {
return true; return true;
} }
virtual void sync() {} virtual void sync() override {}
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data) { virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data) override {
ABT_rwlock_wrlock(_map_lock); ABT_rwlock_wrlock(_map_lock);
auto x = _map.count(key); auto x = _map.count(key);
if(_no_overwrite && (x != 0)) { if(_no_overwrite && (x != 0)) {
...@@ -69,7 +69,29 @@ class MapDataStore : public AbstractDataStore { ...@@ -69,7 +69,29 @@ class MapDataStore : public AbstractDataStore {
return true; return true;
} }
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data) { virtual bool put(ds_bulk_t &&key, ds_bulk_t &&data) override {
ABT_rwlock_wrlock(_map_lock);
auto x = _map.count(key);
if(_no_overwrite && (x != 0)) {
ABT_rwlock_unlock(_map_lock);
return false;
}
if(_duplicates == Duplicates::IGNORE && (x != 0)) {
ABT_rwlock_unlock(_map_lock);
return false;
}
_map.insert(std::make_pair(std::move(key),std::move(data)));
ABT_rwlock_unlock(_map_lock);
return true;
}
virtual bool put(const void* key, size_t ksize, const void* value, size_t vsize) override {
ds_bulk_t k((const char*)key, ((const char*)key)+ksize);
ds_bulk_t v((const char*)value, ((const char*)value)+vsize);
return put(std::move(k), std::move(v));
}
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data) override {
ABT_rwlock_rdlock(_map_lock); ABT_rwlock_rdlock(_map_lock);
auto it = _map.find(key); auto it = _map.find(key);
if(it == _map.end()) { if(it == _map.end()) {
...@@ -81,20 +103,24 @@ class MapDataStore : public AbstractDataStore { ...@@ -81,20 +103,24 @@ class MapDataStore : public AbstractDataStore {
return true; return true;
} }
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t>& values) { virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t>& values) override {
values.clear(); values.clear();
values.resize(1); values.resize(1);
return get(key, values[0]); return get(key, values[0]);
} }
virtual bool exists(const ds_bulk_t& key) { virtual bool exists(const ds_bulk_t& key) const override {
ABT_rwlock_rdlock(_map_lock); ABT_rwlock_rdlock(_map_lock);
bool e = _map.count(key) > 0; bool e = _map.count(key) > 0;
ABT_rwlock_unlock(_map_lock); ABT_rwlock_unlock(_map_lock);
return e; return e;
} }
virtual bool erase(const ds_bulk_t &key) { virtual bool exists(const void* key, size_t ksize) const override {
return exists(ds_bulk_t((const char*)key, ((const char*)key)+ksize));
}
virtual bool erase(const ds_bulk_t &key) override {
ABT_rwlock_wrlock(_map_lock); ABT_rwlock_wrlock(_map_lock);
bool b = _map.find(key) != _map.end(); bool b = _map.find(key) != _map.end();
_map.erase(key); _map.erase(key);
...@@ -102,21 +128,21 @@ class MapDataStore : public AbstractDataStore { ...@@ -102,21 +128,21 @@ class MapDataStore : public AbstractDataStore {
return b; return b;
} }
virtual void set_in_memory(bool enable) { virtual void set_in_memory(bool enable) override {
_in_memory = enable; _in_memory = enable;
} }
virtual void set_comparison_function(const std::string& name, comparator_fn less) { virtual void set_comparison_function(const std::string& name, comparator_fn less) override {
_comp_fun_name = name; _comp_fun_name = name;
_less = less; _less = less;
} }
virtual void set_no_overwrite() { virtual void set_no_overwrite() override {
_no_overwrite = true; _no_overwrite = true;
} }
#ifdef USE_REMI #ifdef USE_REMI
virtual remi_fileset_t create_and_populate_fileset() const { virtual remi_fileset_t create_and_populate_fileset() const override {
return REMI_FILESET_NULL; return REMI_FILESET_NULL;
} }
#endif #endif
...@@ -124,7 +150,7 @@ class MapDataStore : public AbstractDataStore { ...@@ -124,7 +150,7 @@ class MapDataStore : public AbstractDataStore {
protected: protected:
virtual std::vector<ds_bulk_t> vlist_keys( virtual std::vector<ds_bulk_t> vlist_keys(
const ds_bulk_t &start_key, size_t count, const ds_bulk_t &prefix) const { const ds_bulk_t &start_key, size_t count, const ds_bulk_t &prefix) const override {
ABT_rwlock_rdlock(_map_lock); ABT_rwlock_rdlock(_map_lock);
std::vector<ds_bulk_t> result; std::vector<ds_bulk_t> result;
decltype(_map.begin()) it; decltype(_map.begin()) it;
...@@ -149,7 +175,7 @@ class MapDataStore : public AbstractDataStore { ...@@ -149,7 +175,7 @@ class MapDataStore : public AbstractDataStore {
} }
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyvals( virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyvals(
const ds_bulk_t &start_key, size_t count, const ds_bulk_t &prefix) const { const ds_bulk_t &start_key, size_t count, const ds_bulk_t &prefix) const override {
ABT_rwlock_rdlock(_map_lock); ABT_rwlock_rdlock(_map_lock);
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> result; std::vector<std::pair<ds_bulk_t,ds_bulk_t>> result;
decltype(_map.begin()) it; decltype(_map.begin()) it;
...@@ -174,7 +200,7 @@ class MapDataStore : public AbstractDataStore { ...@@ -174,7 +200,7 @@ class MapDataStore : public AbstractDataStore {
} }
virtual std::vector<ds_bulk_t> vlist_key_range( virtual std::vector<ds_bulk_t> vlist_key_range(
const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys) const { const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys) const override {
ABT_rwlock_rdlock(_map_lock); ABT_rwlock_rdlock(_map_lock);
std::vector<ds_bulk_t> result; std::vector<ds_bulk_t> result;
decltype(_map.begin()) it, ub; decltype(_map.begin()) it, ub;
...@@ -198,7 +224,7 @@ class MapDataStore : public AbstractDataStore { ...@@ -198,7 +224,7 @@ class MapDataStore : public AbstractDataStore {
} }
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyval_range( virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyval_range(
const ds_bulk_t &lower_bound, const ds_bulk_t& upper_bound, size_t max_keys) const { const ds_bulk_t &lower_bound, const ds_bulk_t& upper_bound, size_t max_keys) const override {
ABT_rwlock_rdlock(_map_lock); ABT_rwlock_rdlock(_map_lock);
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> result; std::vector<std::pair<ds_bulk_t,ds_bulk_t>> result;
decltype(_map.begin()) it, ub; decltype(_map.begin()) it, ub;
......
...@@ -779,13 +779,17 @@ static void sdskv_put_multi_ult(hg_handle_t handle) ...@@ -779,13 +779,17 @@ static void sdskv_put_multi_ult(hg_handle_t handle)
/* go through the key/value pairs and insert them */ /* go through the key/value pairs and insert them */
uint64_t keys_offset = sizeof(hg_size_t)*in.num_keys; uint64_t keys_offset = sizeof(hg_size_t)*in.num_keys;
uint64_t vals_offset = sizeof(hg_size_t)*in.num_keys; uint64_t vals_offset = sizeof(hg_size_t)*in.num_keys;
std::vector<const void*> kptrs(in.num_keys);
std::vector<const void*> vptrs(in.num_keys);
for(unsigned i=0; i < in.num_keys; i++) { for(unsigned i=0; i < in.num_keys; i++) {
ds_bulk_t kdata(local_keys_buffer.data()+keys_offset, local_keys_buffer.data()+keys_offset+key_sizes[i]); kptrs[i] = local_keys_buffer.data()+keys_offset;
ds_bulk_t vdata(local_vals_buffer.data()+vals_offset, local_vals_buffer.data()+vals_offset+val_sizes[i]); vptrs[i] = local_vals_buffer.data()+vals_offset;
db->put(kdata, vdata);
keys_offset += key_sizes[i]; keys_offset += key_sizes[i];
vals_offset += val_sizes[i]; vals_offset += val_sizes[i];
} }
bool result = db->put_multi(in.num_keys, kptrs.data(), key_sizes, vptrs.data(), val_sizes);
if(not result)
out.ret = SDSKV_ERR_PUT;
return; return;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment