Commit 7dd4849a authored by Matthieu Dorier's avatar Matthieu Dorier

fixed many bugs and remove the Duplicate enum

parent ad51d8e5
......@@ -314,11 +314,11 @@ class client {
std::vector<hg_size_t> vsizes; vsizes.reserve(count);
for(auto it = kbegin; it != kend; it++) {
ksizes.push_back(object_size(*it));
kdata.push_back((const void*)(it->data()));
kdata.push_back(object_data(*it));
}
for(auto it = vbegin; it != vend; it++) {
vsizes.push_back(object_size(*it));
vdata.push_back((const void*)(it->data()));
vdata.push_back(object_data(*it));
}
put(db, kdata, ksizes, vdata, vsizes);
}
......@@ -606,11 +606,11 @@ class client {
std::vector<hg_size_t> vsizes; vsizes.reserve(count);
for(auto it = kbegin; it != kend; it++) {
ksizes.push_back(object_size(*it));
kdata.push_back((const void*)(it->data()));
kdata.push_back(object_data(*it));
}
for(auto it = vbegin; it != vend; it++) {
vsizes.push_back(object_size(*it));
vdata.push_back((void*)(it->data()));
vdata.push_back(object_data(*it));
}
return get(db, kdata, ksizes, vdata, vsizes);
}
......@@ -667,7 +667,7 @@ class client {
* @param key Key.
*/
template<typename K>
inline void erase( const database& db,
inline void erase(const database& db,
const K& key) const {
erase(db, object_data(key), object_size(key));
}
......@@ -684,7 +684,7 @@ class client {
* @param keys Array of keys.
* @param ksizes Array of key sizes.
*/
void erase(const database& db,
void erase_multi(const database& db,
hg_size_t num, const void* const* keys,
const hg_size_t* ksizes) const;
......@@ -698,7 +698,7 @@ class client {
* @param keys Vector of keys to erase.
*/
template<typename K>
inline void erase(const database& db,
inline void erase_multi(const database& db,
const std::vector<K>& keys) const {
std::vector<const void*> kdata; kdata.reserve(keys.size());
std::vector<hg_size_t> ksizes; ksizes.reserve(keys.size());
......@@ -706,7 +706,7 @@ class client {
kdata.push_back(object_data(k));
ksizes.push_back(object_size(k));
}
return erase(db, keys.size(), kdata.data(), ksizes.data());
return erase_multi(db, keys.size(), kdata.data(), ksizes.data());
}
//////////////////////////
......@@ -1321,6 +1321,14 @@ class database {
m_ph.m_client->erase(*this, std::forward<T>(args)...);
}
/**
* @brief @see client::erase_multi.
*/
template<typename ... T>
void erase_multi(T&& ... args) const {
m_ph.m_client->erase_multi(*this, std::forward<T>(args)...);
}
/**
* @brief @see client::list_keys.
*/
......@@ -1449,7 +1457,7 @@ inline void client::erase(const database& db,
_CHECK_RET(ret);
}
inline void client::erase(const database& db,
inline void client::erase_multi(const database& db,
hg_size_t num, const void* const* keys,
const hg_size_t* ksizes) const {
int ret = sdskv_erase_multi(db.m_ph.m_ph, db.m_db_id,
......
......@@ -38,6 +38,7 @@ typedef uint64_t sdskv_database_id_t;
#define SDSKV_ERR_COMP_FUNC -14 /* Comparison function does not exist */
#define SDSKV_ERR_REMI -15 /* REMI-related error */
#define SDSKV_ERR_ARGOBOTS -16 /* Argobots related error */
#define SDSKV_ERR_KEYEXISTS -17 /* Put operation would override data */
#if defined(__cplusplus)
}
......
......@@ -11,14 +11,14 @@
using namespace std::chrono;
BerkeleyDBDataStore::BerkeleyDBDataStore() :
AbstractDataStore(Duplicates::IGNORE, false, false) {
AbstractDataStore(false, false) {
_dbm = NULL;
_dbenv = NULL;
_in_memory = false;
};
BerkeleyDBDataStore::BerkeleyDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug) :
AbstractDataStore(duplicates, eraseOnGet, debug) {
BerkeleyDBDataStore::BerkeleyDBDataStore(bool eraseOnGet, bool debug) :
AbstractDataStore(eraseOnGet, debug) {
_dbm = NULL;
_dbenv = NULL;
_in_memory = false;
......@@ -102,10 +102,6 @@ bool BerkeleyDBDataStore::openDatabase(const std::string& db_name, const std::st
_wrapper = new DbWrapper(_dbenv, DB_CXX_NO_EXCEPTIONS);
_dbm = &(_wrapper->_db);
if (_duplicates == Duplicates::ALLOW) {
_dbm->set_flags(DB_DUP); // Allow duplicate keys
}
_dbm->set_bt_compare(&(BerkeleyDBDataStore::compkeys));
uint32_t flags = DB_CREATE | DB_AUTO_COMMIT | DB_THREAD; // Allow database creation
......@@ -143,40 +139,21 @@ void BerkeleyDBDataStore::set_comparison_function(const std::string& name, compa
_wrapper->_less = less;
}
bool BerkeleyDBDataStore::put(const void* key, size_t ksize, const void* val, size_t vsize) {
int BerkeleyDBDataStore::put(const void* key, size_t ksize, const void* val, size_t vsize) {
int status = 0;
bool success = false;
if(_no_overwrite) {
if(exists(key, ksize)) return false;
}
// IGNORE case deals with redundant puts (where key/value is the same). In BerkeleyDB a
// redundant may overwrite previous value which is fine when key/value is the same.
// ALLOW case deals with actual duplicates (where key is the same but value is different).
// This option might be used when eraseOnGet is set (e.g. ParSplice hotpoint use case).
if (_duplicates == Duplicates::IGNORE || _duplicates == Duplicates::ALLOW) {
Dbt db_key((void*)key, ksize);
Dbt db_data((void*)val, vsize);
db_key.set_flags(DB_DBT_USERMEM);
db_data.set_flags(DB_DBT_USERMEM);
status = _dbm->put(NULL, &db_key, &db_data, 0);
if (status == 0 ||
(_duplicates == Duplicates::IGNORE && status == DB_KEYEXIST)) {
success = true;
}
else {
std::cerr << "BerkeleyDBDataStore::put: BerkeleyDB error on put = " << status << std::endl;
}
}
else {
std::cerr << "BerkeleyDBDataStore::put: Unexpected Duplicates option = " << int32_t(_duplicates) << std::endl;
}
return success;
Dbt db_key((void*)key, ksize);
Dbt db_data((void*)val, vsize);
db_key.set_flags(DB_DBT_USERMEM);
db_data.set_flags(DB_DBT_USERMEM);
int flag = _no_overwrite ? DB_NOOVERWRITE : 0;
status = _dbm->put(NULL, &db_key, &db_data, flag);
if(status == 0) return SDSKV_SUCCESS;
if(status == DB_KEYEXIST) return SDSKV_ERR_KEYEXISTS;
return SDSKV_ERR_PUT;
};
bool BerkeleyDBDataStore::put_multi(size_t num_items,
int BerkeleyDBDataStore::put_multi(size_t num_items,
const void* const* keys,
const size_t* ksizes,
const void* const* values,
......@@ -212,9 +189,12 @@ bool BerkeleyDBDataStore::put_multi(size_t num_items,
keybuilder.append((void*)keys[i], ksizes[i]);
databuilder.append((void*)values[i], vsizes[i]);
}
int status = _dbm->put(NULL, &mkey, &mdata, DB_MULTIPLE);
return status == 0;
int flag = DB_MULTIPLE;
if(_no_overwrite) flag |= DB_NOOVERWRITE;
int status = _dbm->put(NULL, &mkey, &mdata, flag);
if(status == 0) return SDSKV_SUCCESS;
if(status == DB_KEYEXIST) return SDSKV_ERR_KEYEXISTS;
return SDSKV_ERR_PUT;
}
bool BerkeleyDBDataStore::exists(const void* key, size_t size) const {
......
......@@ -26,17 +26,15 @@ class BerkeleyDBDataStore : public AbstractDataStore {
public:
BerkeleyDBDataStore();
BerkeleyDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
BerkeleyDBDataStore(bool eraseOnGet, bool debug);
virtual ~BerkeleyDBDataStore();
virtual bool openDatabase(const std::string& db_name, const std::string& path) override;
virtual bool put(const void* key, size_t ksize, const void* value, size_t vsize) override;
virtual bool put_multi(size_t num_items,
virtual int put(const void* key, size_t ksize, const void* value, size_t vsize) override;
virtual int put_multi(size_t num_items,
const void* const* keys,
const size_t* ksizes,
const void* const* values,
const size_t* vsizes) override;
/* virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data) override;
virtual bool put(ds_bulk_t &&key, ds_bulk_t &&data) override; */
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data) override;
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data) override;
virtual bool exists(const void* key, size_t ksize) const override;
......
......@@ -8,14 +8,12 @@
using namespace std::chrono;
AbstractDataStore::AbstractDataStore() {
_duplicates = Duplicates::IGNORE;
_eraseOnGet = false;
_debug = false;
_in_memory = false;
};
AbstractDataStore::AbstractDataStore(Duplicates duplicates, bool eraseOnGet, bool debug) {
_duplicates = duplicates;
AbstractDataStore::AbstractDataStore(bool eraseOnGet, bool debug) {
_eraseOnGet = eraseOnGet;
_debug = debug;
_in_memory = false;
......
......@@ -12,36 +12,34 @@
#include <vector>
enum class Duplicates : int {ALLOW, IGNORE};
class AbstractDataStore {
public:
typedef int (*comparator_fn)(const void*, size_t, const void*, size_t);
AbstractDataStore();
AbstractDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
AbstractDataStore(bool eraseOnGet, bool debug);
virtual ~AbstractDataStore();
virtual bool openDatabase(const std::string& db_name, const std::string& path)=0;
virtual bool put(const void* kdata, size_t ksize, const void* vdata, size_t vsize)=0;
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data) {
virtual int put(const void* kdata, size_t ksize, const void* vdata, size_t vsize)=0;
virtual int put(const ds_bulk_t &key, const ds_bulk_t &data) {
return put(key.data(), key.size(), data.data(), data.size());
}
virtual bool put(ds_bulk_t&& key, ds_bulk_t&& data) {
virtual int put(ds_bulk_t&& key, ds_bulk_t&& data) {
return put(key.data(), key.size(), data.data(), data.size());
}
virtual bool put_multi(size_t num_items,
virtual int put_multi(size_t num_items,
const void* const* keys,
const size_t* ksizes,
const void* const* values,
const size_t* vsizes)
{
bool b = true;
int ret = 0;
for(size_t i=0; i < num_items; i++) {
bool b2 = put(keys[i], ksizes[i], values[i], vsizes[i]);
b = b && b2;
int r = put(keys[i], ksizes[i], values[i], vsizes[i]);
ret = ret == 0 ? r : 0;
}
return b;
return ret;
}
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data)=0;
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data)=0;
......@@ -95,7 +93,6 @@ class AbstractDataStore {
std::string _path;
std::string _name;
std::string _comp_fun_name;
Duplicates _duplicates;
bool _no_overwrite = false;
bool _eraseOnGet;
bool _debug;
......
......@@ -11,12 +11,12 @@
using namespace std::chrono;
LevelDBDataStore::LevelDBDataStore() :
AbstractDataStore(Duplicates::IGNORE, false, false), _less(nullptr), _keycmp(this) {
AbstractDataStore(false, false), _less(nullptr), _keycmp(this) {
_dbm = NULL;
};
LevelDBDataStore::LevelDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug) :
AbstractDataStore(duplicates, eraseOnGet, debug), _less(nullptr), _keycmp(this) {
LevelDBDataStore::LevelDBDataStore(bool eraseOnGet, bool debug) :
AbstractDataStore(eraseOnGet, debug), _less(nullptr), _keycmp(this) {
_dbm = NULL;
};
......@@ -74,38 +74,19 @@ void LevelDBDataStore::set_comparison_function(const std::string& name, comparat
_less = less;
}
bool LevelDBDataStore::put(const void* key, size_t ksize, const void* value, size_t vsize) {
int LevelDBDataStore::put(const void* key, size_t ksize, const void* value, size_t vsize) {
leveldb::Status status;
bool success = false;
if(_no_overwrite) {
if(exists(key, ksize)) return false;
if(exists(key, ksize)) return SDSKV_ERR_KEYEXISTS;
}
//high_resolution_clock::time_point start = high_resolution_clock::now();
// IGNORE case deals with redundant puts (where key/value is the same). In LevelDB a
// redundant put simply overwrites previous value which is fine when key/value is the same.
if (_duplicates == Duplicates::IGNORE) {
status = _dbm->Put(leveldb::WriteOptions(),
status = _dbm->Put(leveldb::WriteOptions(),
leveldb::Slice((const char*)key, ksize),
leveldb::Slice((const char*)value, vsize));
if (status.ok()) {
success = true;
}
else {
std::cerr << "LevelDBDataStore::put: LevelDB error on Put = " << status.ToString() << std::endl;
}
}
else if (_duplicates == Duplicates::ALLOW) {
std::cerr << "LevelDBDataStore::put: Duplicates::ALLOW set, LevelDB does not support duplicates" << std::endl;
}
else {
std::cerr << "LevelDBDataStore::put: Unexpected Duplicates option = " << int32_t(_duplicates) << std::endl;
}
// uint64_t elapsed = duration_cast<microseconds>(high_resolution_clock::now()-start).count();
// std::cout << "LevelDBDataStore::put time = " << elapsed << " microseconds" << std::endl;
return success;
if (status.ok()) return SDSKV_SUCCESS;
return SDSKV_ERR_PUT;
};
bool LevelDBDataStore::erase(const ds_bulk_t &key) {
......
......@@ -39,14 +39,10 @@ class LevelDBDataStore : public AbstractDataStore {
public:
LevelDBDataStore();
LevelDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
LevelDBDataStore(bool eraseOnGet, bool debug);
virtual ~LevelDBDataStore();
virtual bool openDatabase(const std::string& db_name, const std::string& path) override;
/*
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data) override;
virtual bool put(ds_bulk_t &&key, ds_bulk_t &&data) override;
*/
virtual bool put(const void* key, size_t ksize, const void* kdata, size_t dsize) override;
virtual int put(const void* key, size_t ksize, const void* kdata, size_t dsize) override;
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data) override;
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data) override;
virtual bool exists(const void* key, size_t ksize) const override;
......
......@@ -33,8 +33,8 @@ class MapDataStore : public AbstractDataStore {
ABT_rwlock_create(&_map_lock);
}
MapDataStore(Duplicates duplicates, bool eraseOnGet, bool debug)
: AbstractDataStore(duplicates, eraseOnGet, debug), _less(nullptr), _map(keycmp(this)){
MapDataStore(bool eraseOnGet, bool debug)
: AbstractDataStore(eraseOnGet, debug), _less(nullptr), _map(keycmp(this)){
ABT_rwlock_create(&_map_lock);
}
......@@ -53,39 +53,31 @@ class MapDataStore : public AbstractDataStore {
virtual void sync() override {}
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data) override {
virtual int put(const ds_bulk_t &key, const ds_bulk_t &data) override {
ABT_rwlock_wrlock(_map_lock);
auto x = _map.count(key);
if(_no_overwrite && (x != 0)) {
ABT_rwlock_unlock(_map_lock);
return false;
}
if(_duplicates == Duplicates::IGNORE && (x != 0)) {
ABT_rwlock_unlock(_map_lock);
return false;
return SDSKV_ERR_KEYEXISTS;
}
_map.insert(std::make_pair(key,data));
ABT_rwlock_unlock(_map_lock);
return true;
return SDSKV_SUCCESS;
}
virtual bool put(ds_bulk_t &&key, ds_bulk_t &&data) override {
virtual int put(ds_bulk_t &&key, ds_bulk_t &&data) override {
ABT_rwlock_wrlock(_map_lock);
auto x = _map.count(key);
if(_no_overwrite && (x != 0)) {
ABT_rwlock_unlock(_map_lock);
return false;
}
if(_duplicates == Duplicates::IGNORE && (x != 0)) {
ABT_rwlock_unlock(_map_lock);
return false;
return SDSKV_ERR_KEYEXISTS;
}
_map.insert(std::make_pair(std::move(key),std::move(data)));
ABT_rwlock_unlock(_map_lock);
return true;
return SDSKV_SUCCESS;
}
virtual bool put(const void* key, size_t ksize, const void* value, size_t vsize) override {
virtual int put(const void* key, size_t ksize, const void* value, size_t vsize) override {
ds_bulk_t k((const char*)key, ((const char*)key)+ksize);
ds_bulk_t v((const char*)value, ((const char*)value)+vsize);
return put(std::move(k), std::move(v));
......
......@@ -676,12 +676,7 @@ static void sdskv_put_ult(hg_handle_t handle)
ds_bulk_t kdata(in.key.data, in.key.data+in.key.size);
ds_bulk_t vdata(in.value.data, in.value.data+in.value.size);
if(db->put(kdata, vdata)) {
out.ret = SDSKV_SUCCESS;
} else {
fprintf(stderr, "Error (sdskv_put_ult): put failed\n");
out.ret = SDSKV_ERR_PUT;
}
out.ret = db->put(kdata, vdata);
margo_respond(handle, &out);
margo_free_input(handle, &in);
......@@ -713,7 +708,6 @@ static void sdskv_put_multi_ult(hg_handle_t handle)
return;
}
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
......@@ -787,9 +781,7 @@ static void sdskv_put_multi_ult(hg_handle_t handle)
keys_offset += key_sizes[i];
vals_offset += val_sizes[i];
}
bool result = db->put_multi(in.num_keys, kptrs.data(), key_sizes, vptrs.data(), val_sizes);
if(not result)
out.ret = SDSKV_ERR_PUT;
out.ret = db->put_multi(in.num_keys, kptrs.data(), key_sizes, vptrs.data(), val_sizes);
return;
}
......@@ -1029,6 +1021,7 @@ static void sdskv_get_multi_ult(hg_handle_t handle)
ds_bulk_t vdata;
size_t client_allocated_value_size = val_sizes[i];
if(db->get(kdata, vdata)) {
size_t old_vsize = val_sizes[i];
if(vdata.size() > val_sizes[i]) {
val_sizes[i] = 0;
} else {
......@@ -1039,7 +1032,7 @@ static void sdskv_get_multi_ult(hg_handle_t handle)
val_sizes[i] = 0;
}
packed_keys += key_sizes[i];
packed_values += val_sizes[i]; //client_allocated_value_size;
packed_values += val_sizes[i];
}
/* do a PUSH operation to push back the values to the client */
......@@ -1240,13 +1233,7 @@ static void sdskv_bulk_put_ult(hg_handle_t handle)
ds_bulk_t kdata(in.key.data, in.key.data+in.key.size);
auto b = db->put(kdata, vdata);
if(b) {
out.ret = SDSKV_SUCCESS;
} else {
out.ret = SDSKV_ERR_PUT;
}
out.ret = db->put(kdata, vdata);
margo_respond(handle, &out);
margo_free_input(handle, &in);
......@@ -1465,6 +1452,14 @@ static void sdskv_erase_multi_ult(hg_handle_t handle)
}
auto r6 = at_exit([&local_keys_bulk_handle]() { margo_bulk_free(local_keys_bulk_handle); });
/* transfer keys */
hret = margo_bulk_transfer(mid, HG_BULK_PULL, info->addr, in.keys_bulk_handle, 0,
local_keys_bulk_handle, 0, in.keys_bulk_size);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
return;
}
/* interpret beginning of the key buffer as a list of key sizes */
hg_size_t* key_sizes = (hg_size_t*)local_keys_buffer.data();
/* find beginning of packed keys */
......
......@@ -182,7 +182,11 @@ static int put_get_erase_multi_test(sdskv::database& DB, uint32_t num_keys) {
std::cout << " " << k << std::endl;
keys_subset.push_back(k);
}
std::vector<std::string> vals_subset(keys_subset.size(), std::string(max_value_size, 0));
std::vector<std::string> vals_subset;
for(unsigned i=0; i < keys_subset.size(); i++) {
vals_subset.push_back(std::string(max_value_size, 0));
}
DB.get(keys_subset, vals_subset);
for(unsigned i=0; i < keys_subset.size(); i++) {
......@@ -196,10 +200,11 @@ static int put_get_erase_multi_test(sdskv::database& DB, uint32_t num_keys) {
/* erase keys */
for(unsigned i=0; i < num_keys; i++) {
DB.erase(keys[i]);
}
/*
for(auto& k : keys)
DB.erase(k);
*/
DB.erase_multi(keys);
return 0;
}
......
......@@ -105,10 +105,10 @@ int main(int argc, char *argv[])
// half of the entries will be put using bulk
auto v = gen_random_string(i*8000/num_keys);
ret = sdskv_put(kvph, db_id,
(const void *)k.data(), k.size()+1,
(const void *)v.data(), v.size()+1);
(const void *)k.data(), k.size(),
(const void *)v.data(), v.size());
if(ret != 0) {
fprintf(stderr, "Error: sdskv_put() failed (iteration %d)\n", i);
fprintf(stderr, "Error: sdskv_put() failed (iteration %d, ret = %d)\n", i, ret);
sdskv_shutdown_service(kvcl, svr_addr);
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment