Commit c954c5a3 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

added list kv and corresponding tests

parent caff79d8
......@@ -5,13 +5,14 @@ CLIENT_LiBS=@CLIENT_LIBS@
AM_CPPFLAGS = -I${srcdir}/src -I${srcdir}/include
bin_PROGRAMS = bin/sdskv-server-daemon \
bin/sdskv-shutdown \
test/sdskv-open-test \
test/sdskv-put-test \
test/sdskv-length-test \
test/sdskv-get-test \
test/sdskv-list-keys-test \
bin_PROGRAMS = bin/sdskv-server-daemon \
bin/sdskv-shutdown \
test/sdskv-open-test \
test/sdskv-put-test \
test/sdskv-length-test \
test/sdskv-get-test \
test/sdskv-list-keys-test \
test/sdskv-list-keyvals-test \
test/sdskv-erase-test
bin_sdskv_server_daemon_SOURCES = src/sdskv-server-daemon.c
......@@ -94,6 +95,7 @@ TESTS = test/basic.sh \
test/length-test.sh \
test/get-test.sh \
test/list-keys-test.sh \
test/list-keyvals-test.sh \
test/erase-test.sh
TESTS_ENVIRONMENT = TIMEOUT="$(TIMEOUT)" \
......@@ -119,6 +121,10 @@ test_sdskv_list_keys_test_SOURCES = test/sdskv-list-keys-test.cc
test_sdskv_list_keys_test_DEPENDENCIES = lib/libsdskv-client.la
test_sdskv_list_keys_test_LDFLAGS = -Llib -lsdskv-client
test_sdskv_list_keyvals_test_SOURCES = test/sdskv-list-kv-test.cc
test_sdskv_list_keyvals_test_DEPENDENCIES = lib/libsdskv-client.la
test_sdskv_list_keyvals_test_LDFLAGS = -Llib -lsdskv-client
test_sdskv_erase_test_SOURCES = test/sdskv-erase-test.cc
test_sdskv_erase_test_DEPENDENCIES = lib/libsdskv-client.la
test_sdskv_erase_test_LDFLAGS = -Llib -lsdskv-client
......
......@@ -210,25 +210,47 @@ bool BerkeleyDBDataStore::get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data
return success;
};
void BerkeleyDBDataStore::BerkeleyDBDataStore::set_in_memory(bool enable) {
void BerkeleyDBDataStore::set_in_memory(bool enable) {
_in_memory = enable;
};
std::vector<ds_bulk_t> BerkeleyDBDataStore::BerkeleyDBDataStore::list(const ds_bulk_t &start, size_t count)
std::vector<ds_bulk_t> BerkeleyDBDataStore::list_keys(const ds_bulk_t &start, size_t count)
{
std::vector<ds_bulk_t> keys;
Dbc * cursorp;
Dbt key, data;
_dbm->cursor(NULL, &cursorp, 0);
for (size_t i=0; i< count; i++) {
int ret = cursorp->get(&key, &data, DB_NEXT);
if (ret !=0 ) break;
int ret = cursorp->get(&key, &data, DB_NEXT);
if (ret !=0 ) break;
ds_bulk_t k(key.get_size() );
memcpy(k.data(), key.get_data(), key.get_size() );
/* I hope this is a deep copy! */
keys.push_back(std::move(k));
ds_bulk_t k(key.get_size() );
memcpy(k.data(), key.get_data(), key.get_size() );
/* I hope this is a deep copy! */
keys.push_back(std::move(k));
}
cursorp->close();
return keys;
}
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> BerkeleyDBDataStore::list_keyvals(const ds_bulk_t &start_key, size_t count)
{
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> keyvals;
Dbc * cursorp;
Dbt key, data;
_dbm->cursor(NULL, &cursorp, 0);
for (size_t i=0; i< count; i++) {
int ret = cursorp->get(&key, &data, DB_NEXT);
if (ret !=0 ) break;
ds_bulk_t k(key.get_size());
ds_bulk_t v(data.get_size());
memcpy(k.data(), key.get_data(), key.get_size());
memcpy(v.data(), data.get_data(), data.get_size());
keyvals.push_back(std::make_pair(std::move(k), std::move(v)));
}
cursorp->close();
return keyvals;
}
......@@ -20,7 +20,8 @@ public:
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data);
virtual bool erase(const ds_bulk_t &key);
virtual void set_in_memory(bool enable); // enable/disable in-memory mode
virtual std::vector<ds_bulk_t> list(const ds_bulk_t &start, size_t count);
virtual std::vector<ds_bulk_t> list_keys(const ds_bulk_t &start, size_t count);
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> list_keyvals(const ds_bulk_t &start_key, size_t count);
protected:
DbEnv *_dbenv = NULL;
Db *_dbm = NULL;
......
......@@ -120,17 +120,31 @@ bool BwTreeDataStore::get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data) {
return success;
};
void BwTreeDataStore::BwTreeDataStore::set_in_memory(bool enable)
void BwTreeDataStore::set_in_memory(bool enable)
{};
std::vector<ds_bulk_t> BwTreeDataStore::BwTreeDataStore::list(const ds_bulk_t &start, size_t count)
std::vector<ds_bulk_t> BwTreeDataStore::list_keys(const ds_bulk_t &start, size_t count)
{
std::vector<ds_bulk_t> keys;
#if 0
auto it = _tree->Begin(start);
while (it.IsEnd() == false) {
/* BUG: bwtree doesn't support "list keys" or "get a key" */
//keys.push_back(it.GetLeafNode());
}
#endif
return keys;
}
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> BwTreeDataStore::list_keyvals(const ds_bulk_t &start, size_t count)
{
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> keyvals;
#if 0
auto it = _tree->Begin(start);
while (it.IsEnd() == false) {
/* BUG: bwtree doesn't support "list keys" or "get a key" */
}
#endif
return keyvals;
}
......@@ -20,7 +20,8 @@ public:
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data);
virtual bool erase(const ds_bulk_t &key);
virtual void set_in_memory(bool enable); // a no-op
virtual std::vector<ds_bulk_t> list(const ds_bulk_t &start, size_t count);
virtual std::vector<ds_bulk_t> list_keys(const ds_bulk_t &start, size_t count);
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> list_keyvals(const ds_bulk_t &start_key, size_t count);
protected:
BwTree<ds_bulk_t, ds_bulk_t,
ds_bulk_less, ds_bulk_equal, ds_bulk_hash,
......
......@@ -22,7 +22,8 @@ public:
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data)=0;
virtual bool erase(const ds_bulk_t &key) = 0;
virtual void set_in_memory(bool enable)=0; // enable/disable in-memory mode (where supported)
virtual std::vector<ds_bulk_t> list(const ds_bulk_t &start_key, size_t count)=0;
virtual std::vector<ds_bulk_t> list_keys(const ds_bulk_t &start_key, size_t count)=0;
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> list_keyvals(const ds_bulk_t &start_key, size_t count)=0;
protected:
Duplicates _duplicates;
bool _eraseOnGet;
......
......@@ -124,10 +124,10 @@ bool LevelDBDataStore::get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data) {
return success;
};
void LevelDBDataStore::LevelDBDataStore::set_in_memory(bool enable)
void LevelDBDataStore::set_in_memory(bool enable)
{};
std::vector<ds_bulk_t> LevelDBDataStore::LevelDBDataStore::list(const ds_bulk_t &start, size_t count)
std::vector<ds_bulk_t> LevelDBDataStore::list_keys(const ds_bulk_t &start, size_t count)
{
std::vector<ds_bulk_t> keys;
......@@ -137,8 +137,26 @@ std::vector<ds_bulk_t> LevelDBDataStore::LevelDBDataStore::list(const ds_bulk_t
ds_bulk_t k(it->key().size());
memcpy(k.data(), it->key().data(), it->key().size() );
keys.push_back(k);
if (i++ > count) break;
if (i++ > count) break;
}
delete it;
return keys;
}
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> LevelDBDataStore::list_keyvals(const ds_bulk_t &start_key, size_t count)
{
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> keyvals;
leveldb::Iterator *it = _dbm->NewIterator(leveldb::ReadOptions());
size_t i=0;
for (it->SeekToFirst(); it->Valid(); it->Next() ) {
ds_bulk_t k(it->key().size());
ds_bulk_t v(it->value().size());
memcpy(k.data(), it->key().data(), it->key().size() );
memcpy(v.data(), it->value().data(), it->value().size() );
keyvals.push_back(std::make_pair(std::move(k), std::move(v)));
if (i++ > count) break;
}
delete it;
return keyvals;
}
......@@ -20,7 +20,8 @@ public:
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data);
virtual bool erase(const ds_bulk_t &key);
virtual void set_in_memory(bool enable); // not supported, a no-op
virtual std::vector<ds_bulk_t> list(const ds_bulk_t &start, size_t count);
virtual std::vector<ds_bulk_t> list_keys(const ds_bulk_t &start, size_t count);
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> list_keyvals(const ds_bulk_t &start_key, size_t count);
protected:
leveldb::DB *_dbm = NULL;
private:
......
......@@ -55,19 +55,26 @@ class MapDataStore : public AbstractDataStore {
_in_memory = enable;
}
virtual std::vector<ds_bulk_t> list(const ds_bulk_t &start_key, size_t count) {
virtual std::vector<ds_bulk_t> list_keys(const ds_bulk_t &start_key, size_t count) {
std::vector<ds_bulk_t> result;
auto it = _map.lower_bound(start_key);
while(it != _map.end() && it->first == start_key) {
it++;
}
auto lastkey = start_key;
for(size_t i=0; it != _map.end() && i < count; it++) {
if(it->first != lastkey) {
result.push_back(lastkey);
i += 1;
lastkey = it->first;
}
for(size_t i=0; it != _map.end() && i < count; it++, i++) {
result.push_back(it->first);
}
return result;
}
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> list_keyvals(const ds_bulk_t &start_key, size_t count) {
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> result;
auto it = _map.lower_bound(start_key);
while(it != _map.end() && it->first == start_key) {
it++;
}
for(size_t i=0; it != _map.end() && i < count; it++, i++) {
result.push_back(*it);
}
return result;
}
......
......@@ -360,7 +360,7 @@ static hg_return_t list_handler(hg_handle_t handle)
std::vector<char> start{};
margo_get_input(handle, &list_in);
auto keys = datastore->list(start, list_in.list_in.max_keys);
auto keys = datastore->list_keys(start, list_in.list_in.max_keys);
list_out.list_out.nkeys = keys.size();
/* we have a C++ vector but will serialize it before shipping over wire.
......
......@@ -14,6 +14,7 @@ struct sdskv_client {
hg_id_t sdskv_bulk_get_id;
hg_id_t sdskv_open_id;
hg_id_t sdskv_list_keys_id;
hg_id_t sdskv_list_keyvals_id;
uint64_t num_provider_handles;
};
......@@ -44,6 +45,7 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
margo_registered_name(mid, "sdskv_bulk_get_rpc", &client->sdskv_bulk_get_id, &flag);
margo_registered_name(mid, "sdskv_open_rpc", &client->sdskv_open_id, &flag);
margo_registered_name(mid, "sdskv_list_keys_rpc", &client->sdskv_list_keys_id, &flag);
margo_registered_name(mid, "sdskv_list_keyvals_rpc", &client->sdskv_list_keyvals_id, &flag);
} else {
......@@ -63,6 +65,8 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
MARGO_REGISTER(mid, "sdskv_open_rpc", open_in_t, open_out_t, NULL);
client->sdskv_list_keys_id =
MARGO_REGISTER(mid, "sdskv_list_keys_rpc", list_in_t, list_out_t, NULL);
client->sdskv_list_keyvals_id =
MARGO_REGISTER(mid, "sdskv_list_keyvals_rpc", list_in_t, list_out_t, NULL);
}
return 0;
......@@ -590,20 +594,80 @@ int sdskv_list_keys(sdskv_provider_handle_t provider,
return ret;
}
int sdskv_list_keys_with_prefix(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id, // db instance
const void *start_key, // we want keys strictly after this start_key
hg_size_t start_ksize, // size of the start_key
const void *prefix, // return only keys that begin with 'prefix'
hg_size_t prefix_size,
void **keys, // pointer to an array of void* pointers,
// this array has size *max_keys
hg_size_t* ksizes, // pointer to an array of hg_size_t sizes
// representing sizes allocated in
// keys for each key
hg_size_t* max_keys) // maximum number of keys requested
int sdskv_list_keyvals(
sdskv_provider_handle_t provider,
sdskv_database_id_t db_id,
const void *start_key,
hg_size_t start_ksize,
void **keys,
hg_size_t* ksizes,
void** values,
hg_size_t* vsizes,
hg_size_t* max_keys)
{
// TODO
list_in_t in;
list_out_t out;
hg_return_t hret = HG_SUCCESS;
hg_handle_t handle;
int ret;
int i;
in.db_id = db_id;
in.start_key = (kv_data_t) start_key;
in.start_ksize = start_ksize;
in.max_keys = *max_keys;
/* create handle */
hret = margo_create(
provider->client->mid,
provider->addr,
provider->client->sdskv_list_keyvals_id,
&handle);
if(hret != HG_SUCCESS) return -1;
hret = margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
}
hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
}
*max_keys = out.nkeys;
ret = out.ret;
if(ret == 0) {
hg_size_t s;
for (i=0; i < out.nkeys; i++) {
s = ksizes[i] > out.ksizes[i] ? out.ksizes[i] : ksizes[i];
ksizes[i] = s;
memcpy(keys[i], out.keys[i], s);
if(s < out.ksizes[i]) ret = -1; // truncated key
}
for(i=0; i< out.nvalues; i++) {
s = vsizes[i] > out.vsizes[i] ? out.vsizes[i] : vsizes[i];
vsizes[i] = s;
memcpy(values[i], out.values[i], s);
if(s < out.vsizes[i]) return -1; // truncated value
}
}
margo_free_output(handle, &out);
margo_destroy(handle);
return ret;
}
int sdskv_shutdown_service(sdskv_client_t client, hg_addr_t addr)
......
......@@ -287,14 +287,13 @@ static inline hg_return_t hg_proc_list_out_t(hg_proc_t proc, void *data)
ret = hg_proc_hg_size_t(proc, &out->nkeys);
if(ret != HG_SUCCESS) return ret;
/* encode/decode the number values */
/* encode/decode the number of values */
ret = hg_proc_hg_size_t(proc, &out->nvalues);
if(ret != HG_SUCCESS) return ret;
if (out->nkeys) {
switch(hg_proc_get_op(proc)) {
switch(hg_proc_get_op(proc)) {
case HG_ENCODE:
fprintf(stderr,"In HG_ENCODE out->nkeys = %ld\n", out->nkeys);
/* encode the size of each key */
for (i=0; i<out->nkeys; i++) {
ret = hg_proc_hg_size_t(proc, &(out->ksizes[i]));
......@@ -305,23 +304,10 @@ static inline hg_return_t hg_proc_list_out_t(hg_proc_t proc, void *data)
ret = hg_proc_raw(proc, out->keys[i], out->ksizes[i]);
if(ret != HG_SUCCESS) return ret;
}
/* encode the size of values, if present */
if(out->vsizes) {
for(i=0; i<out->nvalues; i++) {
ret = hg_proc_hg_size_t(proc, &(out->vsizes[i]));
if(ret != HG_SUCCESS) return ret;
}
}
/* encode the values, if present */
if(out->values) {
for(i=0; i<out->nvalues; i++) {
ret = hg_proc_raw(proc, out->values[i], out->vsizes[i]);
if(ret != HG_SUCCESS) return ret;
}
}
break;
case HG_DECODE:
fprintf(stderr,"In HG_DECODE out->nkeys = %ld\n", out->nkeys);
if(out->nkeys) {
/* decode the size of each key */
out->ksizes = (hg_size_t*)malloc(out->nkeys*sizeof(*out->ksizes));
......@@ -344,16 +330,45 @@ static inline hg_return_t hg_proc_list_out_t(hg_proc_t proc, void *data)
out->ksizes = NULL;
out->keys = NULL;
}
if(out->nvalues) {
break;
case HG_FREE:
for (i=0; i<out->nkeys; i++) {
free(out->keys[i]);
}
free(out->keys);
free(out->ksizes);
break;
default:
break;
}
switch(hg_proc_get_op(proc)) {
case HG_ENCODE:
/* encode the size of values, if present */
for(i=0; i <out->nvalues; i++) {
ret = hg_proc_hg_size_t(proc, &(out->vsizes[i]));
if(ret != HG_SUCCESS) return ret;
}
/* encode the values, if present */
for(i=0; i < out->nvalues; i++) {
ret = hg_proc_raw(proc, out->values[i], out->vsizes[i]);
if(ret != HG_SUCCESS) return ret;
}
break;
case HG_DECODE:
if(out->nvalues != 0) {
/* decode the size of each value */
out->vsizes = (hg_size_t*)malloc(out->nvalues*sizeof(*out->vsizes));
for( i=0; i<out->nvalues; i++) {
for( i=0; i < out->nvalues; i++) {
ret = hg_proc_hg_size_t(proc, &(out->vsizes[i]));
if(ret != HG_SUCCESS) return ret;
}
/* decode each key */
/* decode each value */
out->values = (kv_data_t *)malloc(out->nvalues*sizeof(kv_data_t));
for(i=0; i<out->nvalues; i++) {
for(i=0; i < out->nvalues; i++) {
if(out->vsizes[i] == 0) {
out->values[i] = NULL;
continue;
......@@ -369,22 +384,17 @@ static inline hg_return_t hg_proc_list_out_t(hg_proc_t proc, void *data)
break;
case HG_FREE:
for (i=0; i<out->nkeys; i++) {
free(out->keys[i]);
}
for(i=0; i<out->nvalues; i++) {
for(i=0; i < out->nvalues; i++) {
free(out->values[i]);
}
free(out->keys);
free(out->ksizes);
free(out->values);
free(out->vsizes);
break;
default:
break;
}
}
}
/* encode/decode the return value */
ret = hg_proc_int32_t(proc, &out->ret);
return ret;
......
......@@ -26,6 +26,7 @@ DECLARE_MARGO_RPC_HANDLER(sdskv_open_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_bulk_put_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_bulk_get_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_list_keys_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_list_keyvals_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_erase_ult)
static void sdskv_server_finalize_cb(void *data);
......@@ -84,6 +85,10 @@ extern "C" int sdskv_provider_register(
list_in_t, list_out_t,
sdskv_list_keys_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "sdskv_list_keyvals_rpc",
list_in_t, list_out_t,
sdskv_list_keyvals_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "sdskv_erase_rpc",
erase_in_t, erase_out_t,
sdskv_erase_ult, mplex_id, abt_pool);
......@@ -653,7 +658,7 @@ static void sdskv_list_keys_ult(hg_handle_t handle)
}
ds_bulk_t start_kdata(in.start_key, in.start_key+in.start_ksize);
auto keys = it->second->list(start_kdata, in.max_keys);
auto keys = it->second->list_keys(start_kdata, in.max_keys);
out.nkeys = keys.size();
/* create the array of sizes */
std::vector<hg_size_t> sizes(out.nkeys);
......@@ -677,6 +682,87 @@ static void sdskv_list_keys_ult(hg_handle_t handle)
}
DEFINE_MARGO_RPC_HANDLER(sdskv_list_keys_ult)
static void sdskv_list_keyvals_ult(hg_handle_t handle)
{
hg_return_t hret;
list_in_t in;
list_out_t out;
out.ret = -1;
out.nkeys = 0;
out.ksizes = nullptr;
out.keys = nullptr;
out.nvalues = 0;
out.vsizes = nullptr;
out.values = nullptr;
margo_instance_id mid = margo_hg_handle_get_instance(handle);
assert(mid);
const struct hg_info* info = margo_get_info(handle);
sdskv_provider_t svr_ctx =
(sdskv_provider_t)margo_registered_data_mplex(mid, info->id, info->target_id);
if(!svr_ctx) {
fprintf(stderr, "Error: SDSKV could not find provider\n");
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
auto it = svr_ctx->databases.find(in.db_id);
if(it == svr_ctx->databases.end()) {
margo_respond(handle, &out);
margo_free_input(handle, &in);
margo_destroy(handle);
return;
}
ds_bulk_t start_kdata(in.start_key, in.start_key+in.start_ksize);
auto keyvals = it->second->list_keyvals(start_kdata, in.max_keys);
out.nkeys = keyvals.size();
out.nvalues = keyvals.size();
/* create the array of sizes */
std::vector<hg_size_t> sizes(out.nkeys);
for(unsigned i = 0; i < out.nkeys; i++) {
sizes[i] = keyvals[i].first.size();
}
out.ksizes = sizes.data();
/* create the packed data */
std::vector<kv_data_t> packed_keys(out.nkeys);
for(unsigned i = 0; i < out.nkeys; i++) {
packed_keys[i] = (char*)(keyvals[i].first.data());
}
out.keys = packed_keys.data();
/* create the array of value sizes */
std::vector<hg_size_t> vsizes(out.nvalues);
for(unsigned i = 0; i < out.nvalues; i++) {
vsizes[i] = keyvals[i].second.size();
}
out.vsizes = vsizes.data();
/* create the packed value data */
std::vector<kv_data_t> packed_vals(out.nvalues);