Commit 012de19d authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

added sdskv_exists and a new way of adding databases

parent 90bd4f4d
......@@ -140,6 +140,20 @@ int sdskv_length(sdskv_provider_handle_t handle,
sdskv_database_id_t db_id, const void *key,
hg_size_t ksize, hg_size_t* vsize);
/**
* @brief Checks if the given key exists in the database.
*
* @param[in] handle provider handle
* @param[in] db_id database id
* @param[in] key key to lookup
* @param[in] ksize size of the key
* @param[out] flag 1 if the key exists, 0 otherwise
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
int sdskv_exists(sdskv_provider_handle_t handle,
sdskv_database_id_t db_id, const void *key,
hg_size_t ksize, int* flag);
/**
* @brief Erases the key/value pair pointed by the given key.
*
......
......@@ -8,7 +8,7 @@ extern "C" {
typedef enum sdskv_db_type_t
{
KVDB_MAP, /* Datastore implementation using std::map */
KVDB_MAP = 0, /* Datastore implementation using std::map */
KVDB_BWTREE, /* Datastore implementation using a BwTree */
KVDB_LEVELDB, /* Datastore implementation using LevelDB */
KVDB_BERKELEYDB /* Datasotre implementation using BerkeleyDB */
......
......@@ -22,6 +22,19 @@ extern "C" {
typedef struct sdskv_server_context_t* sdskv_provider_t;
typedef int (*sdskv_compare_fn)(const void*, size_t, const void*, size_t);
typedef struct sdskv_server_context_t* sdskv_provider_t;
typedef int (*sdskv_compare_fn)(const void*, size_t, const void*, size_t);
typedef struct sdskv_config_t {
const char* db_name; // name of the database
const char* db_path; // path to the database
sdskv_db_type_t db_type; // type of database
sdskv_compare_fn db_comparison_fn; // comparison function (can be NULL)
int db_no_overwrite; // prevents overwritting data if set to 1
} sdskv_config_t;
#define SDSKV_CONFIG_DEFAULT { "", "", KVDB_MAP, SDSKV_COMPARE_DEFAULT, 0 }
/**
* @brief Creates a new provider.
*
......@@ -58,6 +71,23 @@ int sdskv_provider_add_database(
const char* db_path,
sdskv_db_type_t db_type,
sdskv_compare_fn comp_fn,
sdskv_database_id_t* sb_id)
__attribute__((deprecated("use sdskv_provider_attach_database instead")));
/**
* Makes the provider start managing a database. The database will
* be created if it does not exist. Otherwise, the provider will start
* to manage the existing database.
*
* @param[in] provider provider
* @param[in] config configuration object to use for the database
* @param[out] db_id resulting id identifying the database
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
int sdskv_provider_attach_database(
sdskv_provider_t provider,
const sdskv_config_t* config,
sdskv_database_id_t* sb_id);
/**
......
......@@ -30,7 +30,7 @@ BerkeleyDBDataStore::~BerkeleyDBDataStore() {
delete _dbenv;
};
void BerkeleyDBDataStore::createDatabase(const std::string& db_name, const std::string& db_path) {
bool BerkeleyDBDataStore::openDatabase(const std::string& db_name, const std::string& db_path) {
int status = 0;
if (!db_path.empty()) {
......@@ -125,9 +125,7 @@ void BerkeleyDBDataStore::createDatabase(const std::string& db_name, const std::
std::cerr << "status = " << status << std::endl;
}
}
assert(status == 0); // fall over
// debugging support?
return (status == 0);
};
void BerkeleyDBDataStore::set_comparison_function(comparator_fn less) {
......@@ -137,7 +135,11 @@ void BerkeleyDBDataStore::set_comparison_function(comparator_fn less) {
bool BerkeleyDBDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) {
int status = 0;
bool success = false;
if(_no_overwrite) {
if(exists(key)) return false;
}
// IGNORE case deals with redundant puts (where key/value is the same). In BerkeleyDB a
// redundant may overwrite previous value which is fine when key/value is the same.
// ALLOW case deals with actual duplicates (where key is the same but value is different).
......@@ -164,6 +166,12 @@ bool BerkeleyDBDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) {
return success;
};
bool BerkeleyDBDataStore::exists(const ds_bulk_t &key) {
Dbt db_key((void*)key.data(), key.size());
int status = _dbm->exists(NULL, &db_key, 0);
return status == 0;
}
bool BerkeleyDBDataStore::erase(const ds_bulk_t &key) {
Dbt db_key((void*)key.data(), key.size());
int status = _dbm->del(NULL, &db_key, 0);
......@@ -194,14 +202,14 @@ bool BerkeleyDBDataStore::get(const ds_bulk_t &key, ds_bulk_t &data) {
success = true;
}
else {
std::cerr << "BerkeleyDBDataStore::get: BerkeleyDB error on Get = " << status << std::endl;
//std::cerr << "BerkeleyDBDataStore::get: BerkeleyDB error on Get = " << status << std::endl;
}
if (success && _eraseOnGet) {
status = _dbm->del(NULL, &db_key, 0);
if (status != 0) {
success = false;
std::cerr << "BerkeleyDBDataStore::get: BerkeleyDB error on delete (eraseOnGet) = " << status << std::endl;
//std::cerr << "BerkeleyDBDataStore::get: BerkeleyDB error on delete (eraseOnGet) = " << status << std::endl;
}
}
......
......@@ -27,20 +27,24 @@ class BerkeleyDBDataStore : public AbstractDataStore {
BerkeleyDBDataStore();
BerkeleyDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
virtual ~BerkeleyDBDataStore();
virtual void createDatabase(const std::string& db_name, const std::string& path);
virtual bool openDatabase(const std::string& db_name, const std::string& path);
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data);
virtual bool exists(const ds_bulk_t &key);
virtual bool erase(const ds_bulk_t &key);
virtual void set_in_memory(bool enable); // enable/disable in-memory mode
virtual void set_comparison_function(comparator_fn less);
virtual void set_no_overwrite() {
_no_overwrite = true;
}
protected:
virtual std::vector<ds_bulk_t> vlist_keys(const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix);
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyvals(const ds_bulk_t &start_key, size_t count, const ds_bulk_t &);
DbEnv *_dbenv = nullptr;
Db *_dbm = nullptr;
DbWrapper* _wrapper = nullptr;
bool _no_overwrite = false;
};
#endif // bdb_datastore_h
......@@ -24,7 +24,7 @@ BwTreeDataStore::~BwTreeDataStore() {
#endif
};
void BwTreeDataStore::createDatabase(const std::string& db_name, const std::string& path) {
bool BwTreeDataStore::openDatabase(const std::string& db_name, const std::string& path) {
_tree = new BwTree<ds_bulk_t, ds_bulk_t,
ds_bulk_less, ds_bulk_equal, ds_bulk_hash,
ds_bulk_equal, ds_bulk_hash>();
......@@ -36,6 +36,7 @@ void BwTreeDataStore::createDatabase(const std::string& db_name, const std::stri
}
_tree->UpdateThreadLocal(1);
_tree->AssignGCID(0);
return true;
};
void BwTreeDataStore::set_comparison_function(comparator_fn less) {
......@@ -46,6 +47,10 @@ bool BwTreeDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) {
std::vector<ds_bulk_t> values;
bool success = false;
if(_no_overwrite) {
if(exists(key)) return false;
}
if(!_tree) return false;
if (_duplicates == Duplicates::ALLOW) {
......
......@@ -14,19 +14,27 @@ public:
BwTreeDataStore();
BwTreeDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
virtual ~BwTreeDataStore();
virtual void createDatabase(const std::string& db_name, const std::string& path);
virtual bool openDatabase(const std::string& db_name, const std::string& path);
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data);
virtual bool exists(const ds_bulk_t &key) {
ds_bulk_t data;
return get(key,data);
}
virtual bool erase(const ds_bulk_t &key);
virtual void set_in_memory(bool enable); // a no-op
virtual void set_comparison_function(comparator_fn less);
virtual void set_no_overwrite() {
_no_overwrite = true;
}
protected:
virtual std::vector<ds_bulk_t> vlist_keys(const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix);
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyvals(const ds_bulk_t &start_key, size_t count, const ds_bulk_t &prefix);
BwTree<ds_bulk_t, ds_bulk_t,
ds_bulk_less, ds_bulk_equal, ds_bulk_hash,
ds_bulk_equal, ds_bulk_hash> *_tree = NULL;
bool _no_overwrite = false;
};
#endif // bwtree_datastore_h
......@@ -18,13 +18,15 @@ public:
AbstractDataStore();
AbstractDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
virtual ~AbstractDataStore();
virtual void createDatabase(const std::string& db_name, const std::string& path)=0;
virtual bool openDatabase(const std::string& db_name, const std::string& path)=0;
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data)=0;
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data)=0;
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data)=0;
virtual bool exists(const ds_bulk_t &key) = 0;
virtual bool erase(const ds_bulk_t &key) = 0;
virtual void set_in_memory(bool enable)=0; // enable/disable in-memory mode (where supported)
virtual void set_comparison_function(comparator_fn less)=0;
virtual void set_no_overwrite()=0;
std::vector<ds_bulk_t> list_keys(
const ds_bulk_t &start_key, size_t count, const ds_bulk_t& prefix=ds_bulk_t()) {
......
......@@ -25,41 +25,57 @@
class datastore_factory {
static AbstractDataStore* create_map_datastore(
static AbstractDataStore* open_map_datastore(
const std::string& name, const std::string& path) {
auto db = new MapDataStore();
db->createDatabase(name, path);
return db;
if(db->openDatabase(name, path)) {
return db;
} else {
delete db;
return nullptr;
}
}
static AbstractDataStore* create_bwtree_datastore(
static AbstractDataStore* open_bwtree_datastore(
const std::string& name, const std::string& path) {
#ifdef USE_BWTREE
auto db = new BwTreeDataStore();
db->createDatabase(name, path);
return db;
if(db->openDatabase(name, path)) {
return db;
} else {
delete db;
return nullptr;
}
#else
return nullptr;
#endif
}
static AbstractDataStore* create_berkeleydb_datastore(
static AbstractDataStore* open_berkeleydb_datastore(
const std::string& name, const std::string& path) {
#ifdef USE_BDB
auto db = new BerkeleyDBDataStore();
db->createDatabase(name, path);
return db;
if(db->openDatabase(name, path)) {
return db;
} else {
delete db;
return nullptr;
}
#else
return nullptr;
#endif
}
static AbstractDataStore* create_leveldb_datastore(
static AbstractDataStore* open_leveldb_datastore(
const std::string& name, const std::string& path) {
#ifdef USE_LEVELDB
auto db = new LevelDBDataStore();
db->createDatabase(name, path);
return db;
if(db->openDatabase(name, path)) {
return db;
} else {
delete db;
return nullptr;
}
#else
return nullptr;
#endif
......@@ -68,12 +84,12 @@ class datastore_factory {
public:
#ifdef SDSKV
static AbstractDataStore* create_datastore(
static AbstractDataStore* open_datastore(
sdskv_db_type_t type,
const std::string& name,
const std::string& path)
#else
static AbstractDataStore* create_datastore(
static AbstractDataStore* open_datastore(
kv_db_type_t type,
const std::string& name="db",
const std::string& path="db")
......@@ -81,13 +97,13 @@ class datastore_factory {
{
switch(type) {
case KVDB_MAP:
return create_map_datastore(name, path);
return open_map_datastore(name, path);
case KVDB_BWTREE:
return create_bwtree_datastore(name, path);
return open_bwtree_datastore(name, path);
case KVDB_LEVELDB:
return create_leveldb_datastore(name, path);
return open_leveldb_datastore(name, path);
case KVDB_BERKELEYDB:
return create_berkeleydb_datastore(name, path);
return open_berkeleydb_datastore(name, path);
}
return nullptr;
};
......
......@@ -35,7 +35,7 @@ LevelDBDataStore::~LevelDBDataStore() {
//leveldb::Env::Shutdown(); // Riak version only
};
void LevelDBDataStore::createDatabase(const std::string& db_name, const std::string& db_path) {
bool LevelDBDataStore::openDatabase(const std::string& db_name, const std::string& db_path) {
leveldb::Options options;
leveldb::Status status;
......@@ -52,10 +52,9 @@ void LevelDBDataStore::createDatabase(const std::string& db_name, const std::str
if (!status.ok()) {
// error
std::cerr << "LevelDBDataStore::createDatabase: LevelDB error on Open = " << status.ToString() << std::endl;
return false;
}
assert(status.ok()); // fall over
// debugging support?
return true;
};
void LevelDBDataStore::set_comparison_function(comparator_fn less) {
......@@ -65,8 +64,12 @@ void LevelDBDataStore::set_comparison_function(comparator_fn less) {
bool LevelDBDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) {
leveldb::Status status;
bool success = false;
high_resolution_clock::time_point start = high_resolution_clock::now();
if(_no_overwrite) {
if(exists(key)) return false;
}
//high_resolution_clock::time_point start = high_resolution_clock::now();
// IGNORE case deals with redundant puts (where key/value is the same). In LevelDB a
// redundant put simply overwrites previous value which is fine when key/value is the same.
if (_duplicates == Duplicates::IGNORE) {
......@@ -96,11 +99,18 @@ bool LevelDBDataStore::erase(const ds_bulk_t &key) {
return status.ok();
}
bool LevelDBDataStore::exists(const ds_bulk_t &key) {
leveldb::Status status;
std::string value;
status = _dbm->Get(leveldb::ReadOptions(), toString(key), &value);
return status.ok();
}
bool LevelDBDataStore::get(const ds_bulk_t &key, ds_bulk_t &data) {
leveldb::Status status;
bool success = false;
high_resolution_clock::time_point start = high_resolution_clock::now();
//high_resolution_clock::time_point start = high_resolution_clock::now();
data.clear();
std::string value;
status = _dbm->Get(leveldb::ReadOptions(), toString(key), &value);
......
......@@ -39,13 +39,17 @@ class LevelDBDataStore : public AbstractDataStore {
LevelDBDataStore();
LevelDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
virtual ~LevelDBDataStore();
virtual void createDatabase(const std::string& db_name, const std::string& path);
virtual bool openDatabase(const std::string& db_name, const std::string& path);
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data);
virtual bool exists(const ds_bulk_t &key);
virtual bool erase(const ds_bulk_t &key);
virtual void set_in_memory(bool enable); // not supported, a no-op
virtual void set_comparison_function(comparator_fn less);
virtual void set_no_overwrite() {
_no_overwrite = true;
}
protected:
virtual std::vector<ds_bulk_t> vlist_keys(const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix);
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyvals(const ds_bulk_t &start_key, size_t count, const ds_bulk_t &prefix);
......@@ -55,6 +59,7 @@ class LevelDBDataStore : public AbstractDataStore {
ds_bulk_t fromString(const std::string &keystr);
AbstractDataStore::comparator_fn _less;
LevelDBDataStoreComparator _keycmp;
bool _no_overwrite = false;
};
#endif // ldb_datastore_h
......@@ -29,20 +29,26 @@ class MapDataStore : public AbstractDataStore {
public:
MapDataStore()
: AbstractDataStore(), _less(nullptr), _map(keycmp(this)) {}
: AbstractDataStore(), _less(nullptr), _map(keycmp(this)), _no_overwrite(false) {}
MapDataStore(Duplicates duplicates, bool eraseOnGet, bool debug)
: AbstractDataStore(duplicates, eraseOnGet, debug), _less(nullptr), _map(keycmp(this)) {}
: AbstractDataStore(duplicates, eraseOnGet, debug), _less(nullptr), _map(keycmp(this)),
_no_overwrite(false) {}
~MapDataStore() = default;
virtual void createDatabase(const std::string& db_name, const std::string& path) {
virtual bool openDatabase(const std::string& db_name, const std::string& path) {
_map.clear();
return true;
}
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data) {
if(_duplicates == Duplicates::IGNORE && _map.count(key)) {
return true;
auto x = _map.count(key);
if(_no_overwrite && (x != 0)) {
return false;
}
if(_duplicates == Duplicates::IGNORE && (x != 0)) {
return false;
}
_map.insert(std::make_pair(key,data));
return true;
......@@ -61,6 +67,10 @@ class MapDataStore : public AbstractDataStore {
return get(key, values[0]);
}
virtual bool exists(const ds_bulk_t& key) {
return _map.count(key) > 0;
}
virtual bool erase(const ds_bulk_t &key) {
bool b = _map.find(key) != _map.end();
_map.erase(key);
......@@ -75,6 +85,10 @@ class MapDataStore : public AbstractDataStore {
_less = less;
}
virtual void set_no_overwrite() {
_no_overwrite = true;
}
protected:
virtual std::vector<ds_bulk_t> vlist_keys(
......@@ -128,6 +142,7 @@ class MapDataStore : public AbstractDataStore {
private:
AbstractDataStore::comparator_fn _less;
std::map<ds_bulk_t, ds_bulk_t, keycmp> _map;
bool _no_overwrite;
};
#endif
......@@ -9,6 +9,7 @@ struct sdskv_client {
hg_id_t sdskv_put_id;
hg_id_t sdskv_bulk_put_id;
hg_id_t sdskv_get_id;
hg_id_t sdskv_exists_id;
hg_id_t sdskv_erase_id;
hg_id_t sdskv_length_id;
hg_id_t sdskv_bulk_get_id;
......@@ -41,6 +42,7 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
margo_registered_name(mid, "sdskv_bulk_put_rpc", &client->sdskv_bulk_put_id, &flag);
margo_registered_name(mid, "sdskv_get_rpc", &client->sdskv_get_id, &flag);
margo_registered_name(mid, "sdskv_erase_rpc", &client->sdskv_erase_id, &flag);
margo_registered_name(mid, "sdskv_exists_rpc", &client->sdskv_exists_id, &flag);
margo_registered_name(mid, "sdskv_length_rpc", &client->sdskv_length_id, &flag);
margo_registered_name(mid, "sdskv_bulk_get_rpc", &client->sdskv_bulk_get_id, &flag);
margo_registered_name(mid, "sdskv_open_rpc", &client->sdskv_open_id, &flag);
......@@ -57,6 +59,8 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
MARGO_REGISTER(mid, "sdskv_get_rpc", get_in_t, get_out_t, NULL);
client->sdskv_erase_id =
MARGO_REGISTER(mid, "sdskv_erase_rpc", erase_in_t, erase_out_t, NULL);
client->sdskv_exists_id =
MARGO_REGISTER(mid, "sdskv_exists_rpc", exists_in_t, exists_out_t, NULL);
client->sdskv_length_id =
MARGO_REGISTER(mid, "sdskv_length_rpc", length_in_t, length_out_t, NULL);
client->sdskv_bulk_get_id =
......@@ -397,6 +401,49 @@ int sdskv_get(sdskv_provider_handle_t provider,
return ret;
}
int sdskv_exists(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id, const void *key,
hg_size_t ksize, int* flag)
{
hg_return_t hret;
int ret;
hg_handle_t handle;
exists_in_t in;
exists_out_t out;
in.db_id = db_id;
in.key.data = (kv_ptr_t)key;
in.key.size = ksize;
/* create handle */
hret = margo_create(
provider->client->mid,
provider->addr,
provider->client->sdskv_exists_id,
&handle);
if(hret != HG_SUCCESS) return SDSKV_ERR_MERCURY;
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
}
ret = out.ret;
if(ret == 0) *flag = out.flag;
margo_free_output(handle, &out);
margo_destroy(handle);
return ret;
}
int sdskv_length(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id, const void *key,
hg_size_t ksize, hg_size_t* vsize)
......
......@@ -71,6 +71,10 @@ MERCURY_GEN_PROC(get_out_t, ((int32_t)(ret))\
MERCURY_GEN_PROC(length_in_t, ((uint64_t)(db_id))((kv_data_t)(key)))
MERCURY_GEN_PROC(length_out_t, ((hg_size_t)(size)) ((int32_t)(ret)))
// ------------- EXISTS ------------- //
MERCURY_GEN_PROC(exists_in_t, ((uint64_t)(db_id))((kv_data_t)(key)))
MERCURY_GEN_PROC(exists_out_t, ((int32_t)(flag)) ((int32_t)(ret)))
// ------------- ERASE ------------- //
MERCURY_GEN_PROC(erase_out_t, ((int32_t)(ret)))
MERCURY_GEN_PROC(erase_in_t, ((uint64_t)(db_id))((kv_data_t)(key)))
......
......@@ -28,6 +28,7 @@ DECLARE_MARGO_RPC_HANDLER(sdskv_bulk_get_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_list_keys_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_list_keyvals_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_erase_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_exists_ult)
static void sdskv_server_finalize_cb(void *data);
......@@ -73,6 +74,10 @@ extern "C" int sdskv_provider_register(
length_in_t, length_out_t,
sdskv_length_ult, provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_exists_rpc",
exists_in_t, exists_out_t,
sdskv_exists_ult, provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_bulk_get_rpc",
bulk_get_in_t, bulk_get_out_t,
sdskv_bulk_get_ult, provider_id, abt_pool);
......@@ -103,22 +108,22 @@ extern "C" int sdskv_provider_register(
return SDSKV_SUCCESS;
}
extern "C" int sdskv_provider_add_database(
extern "C" int sdskv_provider_attach_database(
sdskv_provider_t provider,
const char *db_name,
const char *db_path,
sdskv_db_type_t db_type,
sdskv_compare_fn comp_fn,
const sdskv_config_t* config,
sdskv_database_id_t* db_id)