...
 
Commits (3)
......@@ -131,6 +131,18 @@ bool BwTreeDataStore::get(ds_bulk_t &key, std::vector<ds_bulk_t> &data) {
void BwTreeDataStore::BwTreeDataStore::set_in_memory(bool enable)
{};
std::vector<ds_bulk_t>* BwTreeDataStore::BwTreeDataStore::list(ds_bulk_t &start, size_t count)
{
auto keys = new std::vector<ds_bulk_t>;
auto it = _tree->Begin(start);
while (it.IsEnd() == false) {
/* BUG: bwtree doesn't support "list keys" or "get a key" */
//keys->push_back(it.GetLeafNode());
}
return keys;
}
#elif USE_LEVELDB
LevelDBDataStore::LevelDBDataStore() :
AbstractDataStore(Duplicates::IGNORE, false, false) {
......@@ -244,6 +256,20 @@ bool LevelDBDataStore::get(ds_bulk_t &key, std::vector<ds_bulk_t> &data) {
void LevelDBDataStore::LevelDBDataStore::set_in_memory(bool enable)
{};
std::vector<ds_bulk_t>* LevelDBDataStore::LevelDBDataStore::list(ds_bulk_t &start, size_t count)
{
auto keys = new std::vector<ds_bulk_t>;
leveldb::Iterator *it = _dbm->NewIterator(leveldb::ReadOptions());
ds_bulk_t k;
for (it->SeekToFirst(); it->Valid(); it->Next() ) {
k.resize(it->key().size());
memcpy(&(k[0]), it->key().data(), it->key().size() );
keys->push_back(k);
}
return keys;
}
#elif USE_BDB
BerkeleyDBDataStore::BerkeleyDBDataStore() :
AbstractDataStore(Duplicates::IGNORE, false, false) {
......@@ -444,6 +470,23 @@ bool BerkeleyDBDataStore::get(ds_bulk_t &key, std::vector<ds_bulk_t> &data) {
void BerkeleyDBDataStore::BerkeleyDBDataStore::set_in_memory(bool enable) {
_in_memory = enable;
};
std::vector<ds_bulk_t>* BerkeleyDBDataStore::BerkeleyDBDataStore::list(ds_bulk_t &start, size_t count)
{
auto keys = new std::vector<ds_bulk_t>;
Dbc * cursorp;
Dbt key, data;
ds_bulk_t k;
int ret;
_dbm->cursor(NULL, &cursorp, DB_SET_RANGE);
while (ret = cursorp->get(&key, &data, DB_NEXT) == 0) {
k.resize(key.get_size());
memcpy(&(k[0]), key.get_data(), key.get_size());
/* I hope this is a deep copy! */
keys->push_back(k);
}
return keys;
}
#else
#error "No backend for datastore selected"
#endif
......@@ -59,6 +59,7 @@ public:
virtual bool get(ds_bulk_t &key, ds_bulk_t &data)=0;
virtual bool get(ds_bulk_t &key, std::vector<ds_bulk_t> &data)=0;
virtual void set_in_memory(bool enable)=0; // enable/disable in-memory mode (where supported)
virtual std::vector<ds_bulk_t>* list(ds_bulk_t &start, size_t count)=0;
protected:
Duplicates _duplicates;
bool _eraseOnGet;
......@@ -77,6 +78,7 @@ public:
virtual bool get(ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(ds_bulk_t &key, std::vector<ds_bulk_t> &data);
virtual void set_in_memory(bool enable); // a no-op
virtual std::vector<ds_bulk_t>* list(ds_bulk_t &start, size_t count);
protected:
BwTree<ds_bulk_t, ds_bulk_t,
my_less, my_equal, my_hash,
......@@ -94,6 +96,7 @@ public:
virtual bool get(ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(ds_bulk_t &key, std::vector<ds_bulk_t> &data);
virtual void set_in_memory(bool enable); // not supported, a no-op
virtual std::vector<ds_bulk_t>* list(ds_bulk_t &start, size_t count);
protected:
leveldb::DB *_dbm = NULL;
private:
......@@ -112,6 +115,7 @@ public:
virtual bool get(ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(ds_bulk_t &key, std::vector<ds_bulk_t> &data);
virtual void set_in_memory(bool enable); // enable/disable in-memory mode
virtual std::vector<ds_bulk_t>* list(ds_bulk_t &start, size_t count);
protected:
DbEnv *_dbenv = NULL;
Db *_dbm = NULL;
......
......@@ -35,6 +35,7 @@ typedef struct kv_context_s {
hg_id_t close_id;
hg_id_t bench_id;
hg_id_t shutdown_id;
hg_id_t list_id;
kv_id kv;
} kv_context_t;
......@@ -51,6 +52,7 @@ typedef struct kv_database_s {
hg_handle_t bulk_get_handle;
hg_handle_t shutdown_handle;
hg_handle_t bench_handle;
hg_handle_t list_handle;
} kv_database_t;
......@@ -83,6 +85,9 @@ static inline hg_return_t hg_proc_hg_return_t(hg_proc_t proc, void *data)
return hg_proc_hg_int32_t(proc, data);
}
/* the put_in, get_in, put_out, get_out, list_in, list_out code is repetitive. candidate for a template? */
static inline hg_return_t hg_proc_kv_put_in_t(hg_proc_t proc, void *data)
{
hg_return_t ret;
......@@ -194,6 +199,92 @@ static inline hg_return_t hg_proc_kv_get_out_t(hg_proc_t proc, void *data)
return HG_SUCCESS;
}
typedef struct {
kv_data_t start_key;
hg_size_t start_ksize;
hg_size_t max_keys;
} kv_list_in_t;
typedef struct {
hg_size_t nkeys;
kv_data_t *keys;
hg_size_t *ksizes;
hg_return_t ret;
} kv_list_out_t;
static inline hg_return_t hg_proc_kv_list_in_t(hg_proc_t proc, void *data)
{
hg_return_t ret;
kv_list_in_t *in = (kv_list_in_t*)data;
ret = hg_proc_hg_size_t(proc, &in->start_ksize);
assert(ret == HG_SUCCESS);
if (in->start_ksize) {
switch(hg_proc_get_op(proc)) {
case HG_ENCODE:
ret = hg_proc_raw(proc, in->start_key, in->start_ksize);
assert(ret == HG_SUCCESS);
break;
case HG_DECODE:
in->start_key = (kv_data_t)malloc(in->start_ksize);
ret = hg_proc_raw(proc, in->start_key, in->start_ksize);
assert(ret == HG_SUCCESS);
break;
case HG_FREE:
free(in->start_key);
default:
break;
}
}
ret = hg_proc_hg_size_t(proc, &in->max_keys);
assert(ret == HG_SUCCESS);
return ret;
}
static inline hg_return_t hg_proc_kv_list_out_t(hg_proc_t proc, void *data)
{
hg_return_t ret;
int i;
kv_list_out_t *out = (kv_list_out_t*)data;
/* typedef struct {
hg_size_t nkeys;
kv_data_t *keys;
hg_size_t *ksizes;
} kv_list_out_t; */
ret = hg_proc_hg_size_t(proc, &out->nkeys);
assert (ret == HG_SUCCESS);
if (out->nkeys) {
switch(hg_proc_get_op(proc)) {
case HG_ENCODE:
for (i=0; i<out->nkeys; i++) {
ret = hg_proc_raw(proc, out->keys[i], out->ksizes[i]);
assert(ret == HG_SUCCESS);
}
break;
case HG_DECODE:
for (i=0; i<out->nkeys; i++) {
out->keys[i] = (kv_data_t)malloc(out->ksizes[i]);
ret = hg_proc_raw(proc, out->keys[i], out->ksizes[i]);
assert(ret == HG_SUCCESS);
}
break;
case HG_FREE:
for (i=0; i<out->nkeys; i++) {
free(out->keys[i]);
}
break;
default:
break;
}
}
ret = hg_proc_hg_return_t(proc, &out->ret);
assert (ret == HG_SUCCESS);
return ret;
}
MERCURY_GEN_PROC(put_in_t, ((kv_put_in_t)(pi)))
MERCURY_GEN_PROC(put_out_t, ((hg_return_t)(ret)))
DECLARE_MARGO_RPC_HANDLER(put_handler)
......@@ -251,6 +342,10 @@ static inline hg_return_t hg_proc_kv_bulk_t(hg_proc_t proc, void *data)
return HG_SUCCESS;
}
MERCURY_GEN_PROC(list_in_t, ((kv_list_in_t)(list_in)))
MERCURY_GEN_PROC(list_out_t, ((kv_list_out_t)(list_out)))
MERCURY_GEN_PROC(bulk_put_in_t, ((kv_bulk_t)(bulk)))
MERCURY_GEN_PROC(bulk_put_out_t, ((hg_return_t)(ret)))
DECLARE_MARGO_RPC_HANDLER(bulk_put_handler)
......@@ -261,6 +356,7 @@ DECLARE_MARGO_RPC_HANDLER(bulk_get_handler)
DECLARE_MARGO_RPC_HANDLER(shutdown_handler)
DECLARE_MARGO_RPC_HANDLER(list_keys_handler)
// some setup to support simple benchmarking
static inline hg_return_t hg_proc_double(hg_proc_t proc, void *data)
......
......@@ -38,6 +38,9 @@ kv_context_t *kv_client_register(const margo_instance_id mid) {
context->shutdown_id = MARGO_REGISTER(context->mid, "shutdown",
void, void, NULL);
context->list_id = MARGO_REGISTER(context->mid, "list",
list_in_t, list_out_t, NULL);
return context;
}
......@@ -96,6 +99,9 @@ kv_database_t * kv_open(kv_context_t *context,
ret = margo_create(context->mid, db->svr_addr,
context->bench_id, &(db->bench_handle));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, db->svr_addr,
context->list_id, &(db->list_handle));
assert(ret == HG_SUCCESS);
......@@ -272,6 +278,34 @@ hg_return_t kv_get(kv_database_t *db,
return ret;
}
hg_return_t kv_list_keys(kv_database_t *db,
const void *start_key, hg_size_t start_ksize,
void **keys, hg_size_t *ksizes, hg_size_t *max_keys)
{
list_in_t list_in;
list_out_t list_out;
int ret = HG_SUCCESS;
int i;
list_in.list_in.start_key = (kv_data_t) start_key;
list_in.list_in.start_ksize = start_ksize;
list_in.list_in.max_keys = *max_keys;
ret = margo_forward(db->list_handle, &list_in);
ret = margo_get_output(db->list_handle, &list_out);
*max_keys = list_out.list_out.nkeys;
for (i=0; i<list_out.list_out.nkeys; i++) {
ksizes[i] = list_out.list_out.ksizes[i];
memcpy(keys[i], list_out.list_out.keys[i], list_out.list_out.ksizes[i]);
}
margo_free_output(db->list_handle, &list_out);
return ret;
}
hg_return_t kv_close(kv_database_t *db)
{
hg_return_t ret;
......
......@@ -349,6 +349,31 @@ static hg_return_t bulk_get_handler(hg_handle_t handle)
}
DEFINE_MARGO_RPC_HANDLER(bulk_get_handler)
static hg_return_t list_handler(hg_handle_t handle)
{
hg_return_t ret;
margo_instance_id mid;
list_in_t list_in;
list_out_t list_out;
std::vector<char> start{};
margo_get_input(handle, &list_in);
std::cout << "max_keys: " << list_in.list_in.max_keys;
auto keys = datastore->list(start, list_in.list_in.max_keys);
std::cout << " found " << start.size() << " keys" << std::endl;
for (auto i: *keys) {
std::cout << "as string" << std::string(i.data()) << " ";
std::cout << "as int" << *(int *)(i.data()) << " ";
}
ret = margo_respond(handle, &list_out);
margo_free_input(handle, &list_in);
margo_destroy(handle);
return HG_SUCCESS;
}
DEFINE_MARGO_RPC_HANDLER(list_handler)
static void shutdown_handler(hg_handle_t handle)
{
hg_return_t ret;
......@@ -555,6 +580,10 @@ kv_context_t *kv_server_register(const margo_instance_id mid)
context->shutdown_id = MARGO_REGISTER(context->mid, "shutdown",
void, void, shutdown_handler);
context->list_id = MARGO_REGISTER(context->mid, "list",
list_in_t, list_out_t, list_handler);
return context;
}
......
......@@ -59,6 +59,28 @@ hg_return_t kv_get(kv_database_t *db, void *key, hg_size_t ksize,
void *value, hg_size_t *vsize);
hg_return_t kv_close(kv_database_t * db);
hg_return_t kv_list_keys(kv_database_t *db, // db instance
const void *start_key, // we want keys strictly after this start_key
hg_size_t start_ksize, // size of the start_key
void **keys, // pointer to an array of void* pointers,
// this array has size *max_keys
hg_size_t* ksizes, // pointer to an array of hg_size_t sizes
// representing sizes allocated in
// keys for each key
hg_size_t* max_keys); // maximum number of keys requested
hg_return_t kv_list_keys_with_prefix(kv_database_t *db,
const void *start_key, // we want keys strictly after this start_key
hg_size_t start_ksize, // size of the start_key
const void *prefix, // return only keys that begin with 'prefix'
hg_size_t prefix_size,
void **keys, // pointer to an array of void* pointers,
// this array has size *max_keys
hg_size_t* ksizes, // pointer to an array of hg_size_t sizes
// representing sizes allocated in
// keys for each key
hg_size_t* max_keys); // maximum number of keys requested
// benchmark routine
bench_result_t *kv_benchmark(kv_database_t *context, int32_t count);
......
......@@ -66,6 +66,22 @@ int main(int argc, char **argv) {
std::cout << "remote_bulk_val: " << remote_bulk_val[0] << std::endl;
}
/* listing all keys in DB */
int i, start_key=0;
hg_size_t max_keys=10;
void **keys;
hg_size_t *sizes;
keys = (void **)calloc(max_keys, sizeof(void *));
for (i=0; i< max_keys; i++)
keys[i] = calloc(1, sizeof(int));
sizes = (hg_size_t *)calloc(max_keys, sizeof(*sizes));
ret = kv_list_keys(db, &start_key, sizeof(start_key),
keys, sizes, &max_keys);
for(int i=0; i< max_keys; i++) {
printf("found: %d of %d: %d (%zd)\n", i+1, max_keys, ((int*)keys)[i], sizes[i]);
}
bench_result_t *output;
output = kv_benchmark(db, 1000);
printf("inserts: %zd keys in %f seconds: %f Million-inserts per sec\n",
......