Commit b7a45a2d authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

berkeley db now support custom comparison function

parent e4370c62
......@@ -17,8 +17,18 @@ typedef enum sdskv_db_type_t
typedef uint64_t sdskv_database_id_t;
#define SDSKV_DATABASE_ID_INVALID 0
#define SDSKV_SUCCESS 0
#define SDSKV_ERROR -1
#define SDSKV_SUCCESS 0 /* Success */
#define SDSKV_ERR_ALLOCATION -1 /* Error allocating something */
#define SDSKV_ERR_INVALID_ARG -2 /* An argument is invalid */
#define SDSKV_ERR_MERCURY -3 /* An error happened calling a Mercury function */
#define SDSKV_ERR_DB_CREATE -4 /* Could not create database */
#define SDSKV_ERR_DB_NAME -5 /* Invalid database name */
#define SDSKV_ERR_UNKNOWN_DB -6 /* Database refered to by id is not known to provider */
#define SDSKV_ERR_UNKNOWN_PR -7 /* Mplex id could not be matched with a provider */
#define SDSKV_ERR_PUT -8 /* Could not put into the database */
#define SDSKV_ERR_UNKNOWN_KEY -9 /* Key requested does not exist */
#define SDSKV_ERR_SIZE -10 /* Client did not allocate enough for the requested data */
#define SDSKV_ERR_ERASE -11 /* Could not erase the given key */
#if defined(__cplusplus)
}
......
......@@ -3,6 +3,7 @@
#include "berkeleydb_datastore.h"
#include "kv-config.h"
#include <chrono>
#include <cstring>
#include <iostream>
#include <boost/filesystem.hpp>
......@@ -21,7 +22,8 @@ BerkeleyDBDataStore::BerkeleyDBDataStore(Duplicates duplicates, bool eraseOnGet,
};
BerkeleyDBDataStore::~BerkeleyDBDataStore() {
delete _dbm;
// delete _dbm;
delete _wrapper;
delete _dbenv;
};
......@@ -88,11 +90,14 @@ void BerkeleyDBDataStore::createDatabase(std::string db_name) {
}
if (status == 0) {
_dbm = new Db(_dbenv, DB_CXX_NO_EXCEPTIONS);
_wrapper = new DbWrapper(_dbenv, DB_CXX_NO_EXCEPTIONS);
_dbm = &(_wrapper->_db);
if (_duplicates == Duplicates::ALLOW) {
_dbm->set_flags(DB_DUP); // Allow duplicate keys
}
_dbm->set_bt_compare(&(BerkeleyDBDataStore::compkeys));
uint32_t flags = DB_CREATE | DB_AUTO_COMMIT | DB_THREAD; // Allow database creation
if (_in_memory) {
......@@ -125,10 +130,7 @@ void BerkeleyDBDataStore::createDatabase(std::string db_name) {
};
void BerkeleyDBDataStore::set_comparison_function(comparator_fn less) {
if(less) {
fprintf(stderr, "Error: BerkeleyDBDataStore's comparison function cannot be changed\n");
exit(-1);
}
_wrapper->_less = less;
}
bool BerkeleyDBDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) {
......@@ -176,9 +178,10 @@ bool BerkeleyDBDataStore::get(const ds_bulk_t &key, ds_bulk_t &data) {
data.clear();
Dbt db_key((void*)&(key[0]), uint32_t(key.size()));
db_key.set_ulen(uint32_t(key.size()));
Dbt db_data;
db_key.set_flags(DB_DBT_USERMEM);
db_data.set_flags(DB_DBT_REALLOC);
db_data.set_flags(DB_DBT_MALLOC);
status = _dbm->get(NULL, &db_key, &db_data, 0);
if (status != DB_NOTFOUND && status != DB_KEYEMPTY && db_data.get_size() > 0) {
......@@ -322,3 +325,18 @@ std::vector<std::pair<ds_bulk_t,ds_bulk_t>> BerkeleyDBDataStore::vlist_keyvals(c
cursorp->close();
return result;
}
int BerkeleyDBDataStore::compkeys(Db *db, const Dbt *dbt1, const Dbt *dbt2, size_t *locp) {
DbWrapper* _wrapper = (DbWrapper*)(((char*)db) - offsetof(BerkeleyDBDataStore::DbWrapper, _db));
if(_wrapper->_less) {
return (_wrapper->_less)(dbt1->get_data(), dbt1->get_size(),
dbt2->get_data(), dbt2->get_size());
} else {
size_t s = dbt1->get_size() > dbt2->get_size() ? dbt2->get_size() : dbt1->get_size();
int c = std::memcmp(dbt1->get_data(), dbt2->get_data(), s);
if(c != 0) return c;
if(dbt1->get_size() < dbt2->get_size()) return -1;
if(dbt1->get_size() > dbt2->get_size()) return 1;
return 0;
}
}
......@@ -10,22 +10,37 @@
// may want to implement some caching for persistent stores like BerkeleyDB
class BerkeleyDBDataStore : public AbstractDataStore {
public:
BerkeleyDBDataStore();
BerkeleyDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
virtual ~BerkeleyDBDataStore();
virtual void createDatabase(std::string db_name);
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data);
virtual bool erase(const ds_bulk_t &key);
virtual void set_in_memory(bool enable); // enable/disable in-memory mode
virtual void set_comparison_function(comparator_fn less);
protected:
virtual std::vector<ds_bulk_t> vlist_keys(const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix);
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyvals(const ds_bulk_t &start_key, size_t count, const ds_bulk_t &);
DbEnv *_dbenv = NULL;
Db *_dbm = NULL;
private:
struct DbWrapper {
Db _db;
AbstractDataStore::comparator_fn _less;
template<typename ... T>
DbWrapper(T&&... args) :
_db(std::forward<T>(args)...), _less(nullptr) {}
};
static int compkeys(Db *db, const Dbt *dbt1, const Dbt *dbt2, size_t *locp);
public:
BerkeleyDBDataStore();
BerkeleyDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
virtual ~BerkeleyDBDataStore();
virtual void createDatabase(std::string db_name);
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data);
virtual bool erase(const ds_bulk_t &key);
virtual void set_in_memory(bool enable); // enable/disable in-memory mode
virtual void set_comparison_function(comparator_fn less);
protected:
virtual std::vector<ds_bulk_t> vlist_keys(const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix);
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyvals(const ds_bulk_t &start_key, size_t count, const ds_bulk_t &);
DbEnv *_dbenv = nullptr;
Db *_dbm = nullptr;
DbWrapper* _wrapper = nullptr;
};
#endif // bdb_datastore_h
......@@ -69,13 +69,13 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
MARGO_REGISTER(mid, "sdskv_list_keyvals_rpc", list_keyvals_in_t, list_keyvals_out_t, NULL);
}
return 0;
return SDSKV_SUCCESS;
}
int sdskv_client_init(margo_instance_id mid, sdskv_client_t* client)
{
sdskv_client_t c = (sdskv_client_t)calloc(1, sizeof(*c));
if(!c) return -1;
if(!c) return SDSKV_ERR_ALLOCATION;
c->num_provider_handles = 0;
......@@ -83,7 +83,7 @@ int sdskv_client_init(margo_instance_id mid, sdskv_client_t* client)
if(ret != 0) return ret;
*client = c;
return 0;
return SDSKV_SUCCESS;
}
int sdskv_client_finalize(sdskv_client_t client)
......@@ -94,7 +94,7 @@ int sdskv_client_finalize(sdskv_client_t client)
client->num_provider_handles);
}
free(client);
return 0;
return SDSKV_SUCCESS;
}
int sdskv_provider_handle_create(
......@@ -103,17 +103,18 @@ int sdskv_provider_handle_create(
uint8_t mplex_id,
sdskv_provider_handle_t* handle)
{
if(client == SDSKV_CLIENT_NULL) return -1;
if(client == SDSKV_CLIENT_NULL)
return SDSKV_ERR_INVALID_ARG;
sdskv_provider_handle_t provider =
(sdskv_provider_handle_t)calloc(1, sizeof(*provider));
if(!provider) return -1;
if(!provider) return SDSKV_ERR_ALLOCATION;
hg_return_t ret = margo_addr_dup(client->mid, addr, &(provider->addr));
if(ret != HG_SUCCESS) {
free(provider);
return -1;
return SDSKV_ERR_MERCURY;
}
provider->client = client;
......@@ -123,15 +124,15 @@ int sdskv_provider_handle_create(
client->num_provider_handles += 1;
*handle = provider;
return 0;
return SDSKV_SUCCESS;
}
int sdskv_provider_handle_ref_incr(
sdskv_provider_handle_t handle)
{
if(handle == SDSKV_PROVIDER_HANDLE_NULL) return -1;
if(handle == SDSKV_PROVIDER_HANDLE_NULL) return SDSKV_ERR_INVALID_ARG;
handle->refcount += 1;
return 0;
return SDSKV_SUCCESS;
}
int sdskv_provider_handle_release(sdskv_provider_handle_t handle)
......@@ -143,7 +144,7 @@ int sdskv_provider_handle_release(sdskv_provider_handle_t handle)
handle->client->num_provider_handles -= 1;
free(handle);
}
return 0;
return SDSKV_SUCCESS;
}
int sdskv_open(
......@@ -163,13 +164,13 @@ int sdskv_open(
provider->addr,
provider->client->sdskv_open_id,
&handle);
if(hret != HG_SUCCESS) return -1;
if(hret != HG_SUCCESS) return SDSKV_ERR_MERCURY;
hret = margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
return SDSKV_ERR_MERCURY;
}
in.name = (char*)db_name;
......@@ -177,13 +178,13 @@ int sdskv_open(
hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
return SDSKV_ERR_MERCURY;
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
return SDSKV_ERR_MERCURY;
}
ret = out.ret;
......@@ -225,7 +226,7 @@ int sdskv_put(sdskv_provider_handle_t provider,
&handle);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_create() failed in sdskv_put()\n");
return -1;
return SDSKV_ERR_MERCURY;
}
hret = margo_set_target_id(handle, provider->mplex_id);
......@@ -233,21 +234,21 @@ int sdskv_put(sdskv_provider_handle_t provider,
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_set_target_id() failed in sdskv_put()\n");
margo_destroy(handle);
return -1;
return SDSKV_ERR_MERCURY;
}
hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_forward() failed in sdskv_put()\n");
margo_destroy(handle);
return -1;
return SDSKV_ERR_MERCURY;
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_get_output() failed in sdskv_put()\n");
margo_destroy(handle);
return -1;
return SDSKV_ERR_MERCURY;
}
ret = out.ret;
......@@ -268,7 +269,7 @@ int sdskv_put(sdskv_provider_handle_t provider,
HG_BULK_READ_ONLY, &in.handle);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_bulk_create() failed in sdskv_put()\n");
return -1;
return SDSKV_ERR_MERCURY;
}
/* create handle */
......@@ -280,7 +281,7 @@ int sdskv_put(sdskv_provider_handle_t provider,
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_create() failed in sdskv_put()\n");
margo_bulk_free(in.handle);
return -1;
return SDSKV_ERR_MERCURY;
}
hret = margo_set_target_id(handle, provider->mplex_id);
......@@ -289,7 +290,7 @@ int sdskv_put(sdskv_provider_handle_t provider,
fprintf(stderr,"[SDSKV] margo_set_target_id() failed in sdskv_put()\n");
margo_bulk_free(in.handle);
margo_destroy(handle);
return -1;
return SDSKV_ERR_MERCURY;
}
hret = margo_forward(handle, &in);
......@@ -297,7 +298,7 @@ int sdskv_put(sdskv_provider_handle_t provider,
fprintf(stderr,"[SDSKV] margo_forward() failed in sdskv_put()\n");
margo_bulk_free(in.handle);
margo_destroy(handle);
return -1;
return SDSKV_ERR_MERCURY;
}
hret = margo_get_output(handle, &out);
......@@ -305,7 +306,7 @@ int sdskv_put(sdskv_provider_handle_t provider,
fprintf(stderr,"[SDSKV] margo_get_output() failed in sdskv_put()\n");
margo_bulk_free(in.handle);
margo_destroy(handle);
return -1;
return SDSKV_ERR_MERCURY;
}
ret = out.ret;
......@@ -314,7 +315,7 @@ int sdskv_put(sdskv_provider_handle_t provider,
}
margo_destroy(handle);
return ret;
return SDSKV_SUCCESS;
}
int sdskv_get(sdskv_provider_handle_t provider,
......@@ -347,25 +348,25 @@ int sdskv_get(sdskv_provider_handle_t provider,
provider->addr,
provider->client->sdskv_get_id,
&handle);
if(hret != HG_SUCCESS) return -1;
if(hret != HG_SUCCESS) return SDSKV_ERR_MERCURY;
hret = margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
return SDSKV_ERR_MERCURY;
}
hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
return SDSKV_ERR_MERCURY;
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
return SDSKV_ERR_MERCURY;
}
ret = out.ret;
......@@ -389,7 +390,7 @@ int sdskv_get(sdskv_provider_handle_t provider,
hret = margo_bulk_create(provider->client->mid, 1, &value, &in.vsize,
HG_BULK_WRITE_ONLY, &in.handle);
if(hret != HG_SUCCESS) return -1;
if(hret != HG_SUCCESS) return SDSKV_ERR_MERCURY;
/* create handle */
hret = margo_create(
......@@ -399,28 +400,28 @@ int sdskv_get(sdskv_provider_handle_t provider,
&handle);
if(hret != HG_SUCCESS) {
margo_bulk_free(in.handle);
return -1;
return SDSKV_ERR_MERCURY;
}
hret = margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS) {
margo_bulk_free(in.handle);
margo_destroy(handle);
return -1;
return SDSKV_ERR_MERCURY;
}
hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS) {
margo_bulk_free(in.handle);
margo_destroy(handle);
return -1;
return SDSKV_ERR_MERCURY;
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
margo_bulk_free(in.handle);
margo_destroy(handle);
return -1;
return SDSKV_ERR_MERCURY;
}
ret = out.ret;
......@@ -445,8 +446,8 @@ int sdskv_length(sdskv_provider_handle_t provider,
length_in_t in;
length_out_t out;
in.db_id = db_id;
in.key.data = (kv_ptr_t)key;
in.db_id = db_id;
in.key.data = (kv_ptr_t)key;
in.key.size = ksize;
/* create handle */
......@@ -455,24 +456,24 @@ int sdskv_length(sdskv_provider_handle_t provider,
provider->addr,
provider->client->sdskv_length_id,
&handle);
if(hret != HG_SUCCESS) return -1;
if(hret != HG_SUCCESS) return SDSKV_ERR_MERCURY;
hret = margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
return SDSKV_ERR_MERCURY;
}
hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
return SDSKV_ERR_MERCURY;
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
return SDSKV_ERR_MERCURY;
}
ret = out.ret;
......@@ -504,24 +505,24 @@ int sdskv_erase(sdskv_provider_handle_t provider,
provider->addr,
provider->client->sdskv_erase_id,
&handle);
if(hret != HG_SUCCESS) return -1;
if(hret != HG_SUCCESS) return SDSKV_ERR_MERCURY;
hret = margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
return SDSKV_ERR_MERCURY;
}
hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
return SDSKV_ERR_MERCURY;
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
return SDSKV_ERR_MERCURY;
}
ret = out.ret;
......@@ -570,7 +571,7 @@ int sdskv_list_keys_with_prefix(sdskv_provider_handle_t provider,
list_keys_out_t out;
hg_return_t hret = HG_SUCCESS;
hg_handle_t handle = HG_HANDLE_NULL;
int ret = 0;
int ret = SDSKV_SUCCESS;
int i;
in.db_id = db_id;
......@@ -590,7 +591,7 @@ int sdskv_list_keys_with_prefix(sdskv_provider_handle_t provider,
HG_BULK_READWRITE,
&in.ksizes_bulk_handle);
if(hret != HG_SUCCESS) {
ret = -1;
ret = SDSKV_ERR_MERCURY;
goto finish;
}
......@@ -600,7 +601,7 @@ int sdskv_list_keys_with_prefix(sdskv_provider_handle_t provider,
HG_BULK_WRITE_ONLY,
&in.keys_bulk_handle);
if(hret != HG_SUCCESS) {
ret = -1;
ret = SDSKV_ERR_MERCURY;
goto finish;
}
......@@ -612,28 +613,28 @@ int sdskv_list_keys_with_prefix(sdskv_provider_handle_t provider,
provider->client->sdskv_list_keys_id,
&handle);
if(hret != HG_SUCCESS) {
ret = -1;
ret = SDSKV_ERR_MERCURY;
goto finish;
}
/* set target id */
hret = margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS) {
ret = -1;
ret = SDSKV_ERR_MERCURY;
goto finish;
}
/* forward to provider */
hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS) {
ret = -1;
ret = SDSKV_ERR_MERCURY;
goto finish;
}
/* get the output from provider */
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
ret = -1;
ret = SDSKV_ERR_MERCURY;
goto finish;
}
......@@ -703,7 +704,7 @@ int sdskv_list_keyvals_with_prefix(sdskv_provider_handle_t provider,
list_keyvals_out_t out;
hg_return_t hret = HG_SUCCESS;
hg_handle_t handle = HG_HANDLE_NULL;
int ret = 0;
int ret = SDSKV_SUCCESS;
int i;
in.db_id = db_id;
......@@ -725,7 +726,7 @@ int sdskv_list_keyvals_with_prefix(sdskv_provider_handle_t provider,
HG_BULK_READWRITE,
&in.ksizes_bulk_handle);
if(hret != HG_SUCCESS) {
ret = -1;
ret = SDSKV_ERR_MERCURY;
goto finish;
}
......@@ -737,7 +738,7 @@ int sdskv_list_keyvals_with_prefix(sdskv_provider_handle_t provider,
HG_BULK_READWRITE,
&in.vsizes_bulk_handle);
if(hret != HG_SUCCESS) {
ret = -1;
ret = SDSKV_ERR_MERCURY;
goto finish;
}
......@@ -747,7 +748,7 @@ int sdskv_list_keyvals_with_prefix(sdskv_provider_handle_t provider,
HG_BULK_WRITE_ONLY,
&in.keys_bulk_handle);
if(hret != HG_SUCCESS) {
ret = -1;
ret = SDSKV_ERR_MERCURY;
goto finish;
}
......@@ -757,7 +758,7 @@ int sdskv_list_keyvals_with_prefix(sdskv_provider_handle_t provider,
HG_BULK_WRITE_ONLY,
&in.vals_bulk_handle);
if(hret != HG_SUCCESS) {
ret = -1;
ret = SDSKV_ERR_MERCURY;
goto finish;
}
......@@ -768,28 +769,28 @@ int sdskv_list_keyvals_with_prefix(sdskv_provider_handle_t provider,
provider->client->sdskv_list_keyvals_id,
&handle);
if(hret != HG_SUCCESS) {
ret = -1;
ret = SDSKV_ERR_MERCURY;
goto finish;
}
/* set target id */
hret = margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS) {
ret = -1;
ret = SDSKV_ERR_MERCURY;
goto finish;
}
/* forward to provider */
hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS) {
ret = -1;
ret = SDSKV_ERR_MERCURY;
goto finish;
}
/* get the output from provider */
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
ret = -1;
ret = SDSKV_ERR_MERCURY;
goto finish;
}
......
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment