Commit a811a244 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

added erase functionality

parent cd332650
......@@ -5,13 +5,14 @@ CLIENT_LiBS=@CLIENT_LIBS@
AM_CPPFLAGS = -I${srcdir}/src
bin_PROGRAMS = bin/sdskv-server-daemon \
bin/sdskv-shutdown \
test/sdskv-open-test \
test/sdskv-put-test \
test/sdskv-length-test \
test/sdskv-get-test \
test/sdskv-list-keys-test
bin_PROGRAMS = bin/sdskv-server-daemon \
bin/sdskv-shutdown \
test/sdskv-open-test \
test/sdskv-put-test \
test/sdskv-length-test \
test/sdskv-get-test \
test/sdskv-list-keys-test \
test/sdskv-erase-test
bin_sdskv_server_daemon_SOURCES = src/sdskv-server-daemon.c
bin_sdskv_server_daemon_DEPENDENCIES = lib/libsdskv-server.la
......@@ -88,7 +89,8 @@ TESTS = test/basic.sh \
test/put-test.sh \
test/length-test.sh \
test/get-test.sh \
test/list-keys-test.sh
test/list-keys-test.sh \
test/erase-test.sh
TESTS_ENVIRONMENT = TIMEOUT="$(TIMEOUT)" \
MKTEMP="$(MKTEMP)"
......@@ -113,6 +115,10 @@ test_sdskv_list_keys_test_SOURCES = test/sdskv-list-keys-test.cc
test_sdskv_list_keys_test_DEPENDENCIES = lib/libsdskv-client.la
test_sdskv_list_keys_test_LDFLAGS = -Llib -lsdskv-client
test_sdskv_erase_test_SOURCES = test/sdskv-erase-test.cc
test_sdskv_erase_test_DEPENDENCIES = lib/libsdskv-client.la
test_sdskv_erase_test_LDFLAGS = -Llib -lsdskv-client
#############################################################
## tests bellow correspond to old tests (see old-test folder)
#############################################################
......
......@@ -154,6 +154,12 @@ bool BerkeleyDBDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) {
return success;
};
bool BerkeleyDBDataStore::erase(const ds_bulk_t &key) {
Dbt db_key((void*)key.data(), key.size());
int status = _dbm->del(NULL, &db_key, 0);
return status == 0;
}
// In the case where Duplicates::ALLOW, this will return the first
// value found using key.
bool BerkeleyDBDataStore::get(const ds_bulk_t &key, ds_bulk_t &data) {
......
......@@ -18,6 +18,7 @@ public:
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data);
virtual bool erase(const ds_bulk_t &key);
virtual void set_in_memory(bool enable); // enable/disable in-memory mode
virtual std::vector<ds_bulk_t> list(const ds_bulk_t &start, size_t count);
protected:
......
......@@ -94,6 +94,12 @@ bool BwTreeDataStore::get(const ds_bulk_t &key, ds_bulk_t &data) {
return success;
};
bool BwTreeDataStore::erase(const ds_bulk_t &key) {
ds_bulk_t data;
if(!get(key,data)) return false;
return _tree->Delete(key,data);
}
bool BwTreeDataStore::get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data) {
std::vector<ds_bulk_t> values;
bool success = false;
......
......@@ -18,6 +18,7 @@ public:
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data);
virtual bool erase(const ds_bulk_t &key);
virtual void set_in_memory(bool enable); // a no-op
virtual std::vector<ds_bulk_t> list(const ds_bulk_t &start, size_t count);
protected:
......
......@@ -20,6 +20,7 @@ public:
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data)=0;
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data)=0;
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data)=0;
virtual bool erase(const ds_bulk_t &key) = 0;
virtual void set_in_memory(bool enable)=0; // enable/disable in-memory mode (where supported)
virtual std::vector<ds_bulk_t> list(const ds_bulk_t &start_key, size_t count)=0;
protected:
......
......@@ -84,6 +84,12 @@ bool LevelDBDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) {
return success;
};
bool LevelDBDataStore::erase(const ds_bulk_t &key) {
leveldb::Status status;
status = _dbm->Delete(leveldb::WriteOptions(), toString(key));
return status.ok();
}
bool LevelDBDataStore::get(const ds_bulk_t &key, ds_bulk_t &data) {
leveldb::Status status;
bool success = false;
......
......@@ -18,6 +18,7 @@ public:
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data);
virtual bool erase(const ds_bulk_t &key);
virtual void set_in_memory(bool enable); // not supported, a no-op
virtual std::vector<ds_bulk_t> list(const ds_bulk_t &start, size_t count);
protected:
......
......@@ -45,6 +45,12 @@ class MapDataStore : public AbstractDataStore {
return get(key, values[0]);
}
virtual bool erase(const ds_bulk_t &key) {
bool b = _map.find(key) != _map.end();
_map.erase(key);
return b;
}
virtual void set_in_memory(bool enable) {
_in_memory = enable;
}
......
......@@ -9,6 +9,7 @@ struct sdskv_client {
hg_id_t sdskv_put_id;
hg_id_t sdskv_bulk_put_id;
hg_id_t sdskv_get_id;
hg_id_t sdskv_erase_id;
hg_id_t sdskv_length_id;
hg_id_t sdskv_bulk_get_id;
hg_id_t sdskv_open_id;
......@@ -38,6 +39,7 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
margo_registered_name(mid, "sdskv_put_rpc", &client->sdskv_put_id, &flag);
margo_registered_name(mid, "sdskv_bulk_put_rpc", &client->sdskv_bulk_put_id, &flag);
margo_registered_name(mid, "sdskv_get_rpc", &client->sdskv_get_id, &flag);
margo_registered_name(mid, "sdskv_erase_rpc", &client->sdskv_erase_id, &flag);
margo_registered_name(mid, "sdskv_length_rpc", &client->sdskv_length_id, &flag);
margo_registered_name(mid, "sdskv_bulk_get_rpc", &client->sdskv_bulk_get_id, &flag);
margo_registered_name(mid, "sdskv_open_rpc", &client->sdskv_open_id, &flag);
......@@ -51,6 +53,8 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
MARGO_REGISTER(mid, "sdskv_bulk_put_rpc", bulk_put_in_t, bulk_put_out_t, NULL);
client->sdskv_get_id =
MARGO_REGISTER(mid, "sdskv_get_rpc", get_in_t, get_out_t, NULL);
client->sdskv_erase_id =
MARGO_REGISTER(mid, "sdskv_erase_rpc", erase_in_t, erase_out_t, NULL);
client->sdskv_length_id =
MARGO_REGISTER(mid, "sdskv_length_rpc", length_in_t, length_out_t, NULL);
client->sdskv_bulk_get_id =
......@@ -475,6 +479,54 @@ int sdskv_length(sdskv_provider_handle_t provider,
return ret;
}
int sdskv_erase(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id, const void *key,
hg_size_t ksize)
{
hg_return_t hret;
int ret;
hg_handle_t handle;
erase_in_t in;
erase_out_t out;
in.db_id = db_id;
in.key = (kv_data_t)key;
in.ksize = ksize;
/* create handle */
hret = margo_create(
provider->client->mid,
provider->addr,
provider->client->sdskv_erase_id,
&handle);
if(hret != HG_SUCCESS) return -1;
hret = margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
}
hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
}
ret = out.ret;
margo_free_output(handle, &out);
margo_destroy(handle);
return ret;
}
int sdskv_list_keys(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id, // db instance
const void *start_key, // we want keys strictly after this start_key
......
......@@ -49,6 +49,10 @@ int sdskv_length(sdskv_provider_handle_t db,
sdskv_database_id_t db_id, const void *key,
hg_size_t ksize, hg_size_t* vsize);
int sdskv_erase(sdskv_provider_handle_t db,
sdskv_database_id_t db_id, const void *key,
hg_size_t ksize);
int sdskv_list_keys(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id, // db instance
const void *start_key, // we want keys strictly after this start_key
......
......@@ -190,6 +190,46 @@ static inline hg_return_t hg_proc_get_out_t(hg_proc_t proc, void *data)
return HG_SUCCESS;
}
typedef struct {
uint64_t db_id;
kv_data_t key;
hg_size_t ksize;
} erase_in_t;
static inline hg_return_t hg_proc_erase_in_t(hg_proc_t proc, void *data)
{
hg_return_t ret;
erase_in_t *in = (erase_in_t*)data;
ret = hg_proc_uint64_t(proc, &in->db_id);
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_hg_size_t(proc, &in->ksize);
if(ret != HG_SUCCESS) return ret;
if (in->ksize) {
switch (hg_proc_get_op(proc)) {
case HG_ENCODE:
ret = hg_proc_raw(proc, in->key, in->ksize);
if(ret != HG_SUCCESS) return ret;
break;
case HG_DECODE:
in->key = (kv_data_t)malloc(in->ksize);
ret = hg_proc_raw(proc, in->key, in->ksize);
if(ret != HG_SUCCESS) return ret;
break;
case HG_FREE:
free(in->key);
break;
default:
break;
}
}
return HG_SUCCESS;
}
MERCURY_GEN_PROC(erase_out_t, ((int32_t)(ret)))
typedef struct {
uint64_t db_id;
kv_data_t start_key;
......@@ -285,7 +325,6 @@ static inline hg_return_t hg_proc_list_out_t(hg_proc_t proc, void *data)
return ret;
}
MERCURY_GEN_PROC(put_out_t, ((int32_t)(ret)))
MERCURY_GEN_PROC(length_out_t, ((hg_size_t)(size)) ((int32_t)(ret)))
......
......@@ -26,6 +26,7 @@ DECLARE_MARGO_RPC_HANDLER(sdskv_open_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_bulk_put_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_bulk_get_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_list_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_erase_ult)
static void sdskv_server_finalize_cb(void *data);
......@@ -83,6 +84,10 @@ extern "C" int sdskv_provider_register(
list_in_t, list_out_t,
sdskv_list_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "sdskv_erase_rpc",
erase_in_t, erase_out_t,
sdskv_erase_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
/* install the bake server finalize callback */
margo_push_finalize_callback(mid, &sdskv_server_finalize_cb, tmp_svr_ctx);
......@@ -552,6 +557,59 @@ static void sdskv_bulk_get_ult(hg_handle_t handle)
}
DEFINE_MARGO_RPC_HANDLER(sdskv_bulk_get_ult)
static void sdskv_erase_ult(hg_handle_t handle)
{
hg_return_t hret;
erase_in_t in;
erase_out_t out;
margo_instance_id mid = margo_hg_handle_get_instance(handle);
assert(mid);
const struct hg_info* info = margo_get_info(handle);
sdskv_provider_t svr_ctx =
(sdskv_provider_t)margo_registered_data_mplex(mid, info->id, info->target_id);
if(!svr_ctx) {
fprintf(stderr, "Error: SDSKV could not find provider\n");
out.ret = -1;
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
out.ret = -1;
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
auto it = svr_ctx->databases.find(in.db_id);
if(it == svr_ctx->databases.end()) {
out.ret = -1;
margo_respond(handle, &out);
margo_free_input(handle, &in);
margo_destroy(handle);
return;
}
ds_bulk_t kdata(in.key, in.key+in.ksize);
if(it->second->erase(kdata)) {
out.ret = 0;
} else {
out.ret = -1;
}
margo_respond(handle, &out);
margo_free_input(handle, &in);
margo_destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(sdskv_erase_ult)
static void sdskv_list_ult(hg_handle_t handle)
{
......
#!/bin/bash -x
if [ -z $srcdir ]; then
echo srcdir variable not set.
exit 1
fi
source $srcdir/test/test-util.sh
# start a server with 2 second wait,
# 20s timeout, and my_test_db as database
test_start_server 2 20 my_test_db
sleep 1
#####################
run_to 20 test/sdskv-erase-test $svr_addr 1 my_test_db 10
if [ $? -ne 0 ]; then
wait
exit 1
fi
wait
echo cleaning up $TMPBASE
rm -rf $TMPBASE
exit 0
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <margo.h>
#include <string>
#include <vector>
#include <algorithm>
#include <map>
#include "sdskv-client.h"
static std::string gen_random_string(size_t len);
int main(int argc, char *argv[])
{
char cli_addr_prefix[64] = {0};
char *sdskv_svr_addr_str;
char *db_name;
margo_instance_id mid;
hg_addr_t svr_addr;
uint8_t mplex_id;
uint32_t num_keys;
sdskv_client_t kvcl;
sdskv_provider_handle_t kvph;
hg_return_t hret;
int ret;
if(argc != 5)
{
fprintf(stderr, "Usage: %s <sdskv_server_addr> <mplex_id> <db_name> <num_keys>\n", argv[0]);
fprintf(stderr, " Example: %s tcp://localhost:1234 1 foo 1000\n", argv[0]);
return(-1);
}
sdskv_svr_addr_str = argv[1];
mplex_id = atoi(argv[2]);
db_name = argv[3];
num_keys = atoi(argv[4]);
/* initialize Margo using the transport portion of the server
* address (i.e., the part before the first : character if present)
*/
for(unsigned i=0; (i<63 && sdskv_svr_addr_str[i] != '\0' && sdskv_svr_addr_str[i] != ':'); i++)
cli_addr_prefix[i] = sdskv_svr_addr_str[i];
/* start margo */
mid = margo_init(cli_addr_prefix, MARGO_SERVER_MODE, 0, 0);
if(mid == MARGO_INSTANCE_NULL)
{
fprintf(stderr, "Error: margo_init()\n");
return(-1);
}
ret = sdskv_client_init(mid, &kvcl);
if(ret != 0)
{
fprintf(stderr, "Error: sdskv_client_init()\n");
margo_finalize(mid);
return -1;
}
/* look up the SDSKV server address */
hret = margo_addr_lookup(mid, sdskv_svr_addr_str, &svr_addr);
if(hret != HG_SUCCESS)
{
fprintf(stderr, "Error: margo_addr_lookup()\n");
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return(-1);
}
/* create a SDSKV provider handle */
ret = sdskv_provider_handle_create(kvcl, svr_addr, mplex_id, &kvph);
if(ret != 0)
{
fprintf(stderr, "Error: sdskv_provider_handle_create()\n");
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return(-1);
}
/* open the database */
sdskv_database_id_t db_id;
ret = sdskv_open(kvph, db_name, &db_id);
if(ret == 0) {
printf("Successfuly open database %s, id is %ld\n", db_name, db_id);
} else {
fprintf(stderr, "Error: could not open database %s\n", db_name);
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return(-1);
}
/* **** put keys ***** */
std::vector<std::string> keys;
std::map<std::string, std::string> reference;
size_t max_value_size = 8000;
for(unsigned i=0; i < num_keys; i++) {
auto k = gen_random_string(16);
// half of the entries will be put using bulk
auto v = gen_random_string(i*max_value_size/num_keys);
ret = sdskv_put(kvph, db_id,
(const void *)k.data(), k.size()+1,
(const void *)v.data(), v.size()+1);
if(ret != 0) {
fprintf(stderr, "Error: sdskv_put() failed (iteration %d)\n", i);
sdskv_shutdown_service(kvcl, svr_addr);
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return -1;
}
reference[k] = v;
keys.push_back(k);
}
printf("Successfuly inserted %d keys\n", num_keys);
/* **** erase half of the keys **** */
for(unsigned i=0; i < num_keys; i += 2) {
const auto& k = keys[i];
ret = sdskv_erase(kvph, db_id,
(const void *)k.data(), k.size()+1);
if(ret != 0) {
fprintf(stderr, "Error: sdskv_erase() failed (key was %s)\n", k.c_str());
sdskv_shutdown_service(kvcl, svr_addr);
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return -1;
}
}
/* **** get keys **** */
for(unsigned i=0; i < num_keys; i++) {
auto k = keys[i];
size_t value_size = max_value_size;
std::vector<char> v(max_value_size);
ret = sdskv_get(kvph, db_id,
(const void *)k.data(), k.size()+1,
(void *)v.data(), &value_size);
if(i % 2 == 0) { /* key is supposed to be erased */
if(ret == 0) {
fprintf(stderr, "Error: sdskv_get() retrieved a key that was erased (key was %s)\n", k.c_str());
sdskv_shutdown_service(kvcl, svr_addr);
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return -1;
}
} else {
if(ret != 0) {
fprintf(stderr, "Error: sdskv_get() failed (key was %s)\n", k.c_str());
sdskv_shutdown_service(kvcl, svr_addr);
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return -1;
}
std::string vstring((char*)(v.data()));
if(vstring != reference[k]) {
fprintf(stderr, "Error: sdskv_get() returned a value different from the reference\n");
sdskv_shutdown_service(kvcl, svr_addr);
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return -1;
}
}
}
/* shutdown the server */
ret = sdskv_shutdown_service(kvcl, svr_addr);
/**** cleanup ****/
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return(ret);
}
static std::string gen_random_string(size_t len) {
static const char alphanum[] =
"0123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz";
std::string s(len, ' ');
for (unsigned i = 0; i < len; ++i) {
s[i] = alphanum[rand() % (sizeof(alphanum) - 1)];
}
return s;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment