Commit a4766a33 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

it is now possible with the Map store to set custom comparison function

parent 5f8918aa
......@@ -14,13 +14,21 @@ bin_PROGRAMS = bin/sdskv-server-daemon \
test/sdskv-erase-test \
test/sdskv-list-keys-test \
test/sdskv-list-keyvals-test \
test/sdskv-list-keys-prefix-test
test/sdskv-list-keys-prefix-test \
test/sdskv-custom-cmp-test \
test/sdskv-custom-server-daemon
bin_sdskv_server_daemon_SOURCES = src/sdskv-server-daemon.c
bin_sdskv_server_daemon_DEPENDENCIES = lib/libsdskv-server.la
bin_sdskv_server_daemon_LDFLAGS = -Llib -lsdskv-server
bin_sdskv_server_daemon_LDADD = ${LIBS} -lsdskv-server ${SERVER_LIBS}
test_sdskv_custom_server_daemon_SOURCES = test/sdskv-custom-server-daemon.c
test_sdskv_custom_server_daemon_DEPENDENCIES = lib/libsdskv-server.la
test_sdskv_custom_server_daemon_LDFLAGS = -Llib -lsdskv-server
test_sdskv_custom_server_daemon_LDADD = ${LIBS} -lsdskv-server ${SERVER_LIBS}
bin_sdskv_shutdown_SOURCES = src/sdskv-shutdown.c
bin_sdskv_shutdown_DEPENDENCIES = lib/libsdskv-client.la
bin_sdskv_shutdown_LDFLAGS = -Llib -lsdskv-client
......@@ -98,7 +106,8 @@ TESTS = test/basic.sh \
test/erase-test.sh \
test/list-keys-test.sh \
test/list-keyvals-test.sh \
test/list-keys-prefix-test.sh
test/list-keys-prefix-test.sh \
test/custom-cmp-test.sh
TESTS_ENVIRONMENT = TIMEOUT="$(TIMEOUT)" \
MKTEMP="$(MKTEMP)"
......@@ -135,49 +144,9 @@ test_sdskv_erase_test_SOURCES = test/sdskv-erase-test.cc
test_sdskv_erase_test_DEPENDENCIES = lib/libsdskv-client.la
test_sdskv_erase_test_LDFLAGS = -Llib -lsdskv-client
#############################################################
## tests bellow correspond to old tests (see old-test folder)
#############################################################
#bin_PROGRAMS = test/bench-client \
# test/test-client \
# test/test-server
#test_bench_client_SOURCES = test/bench-client.cc
#test_bench_client_DEPENDENCIES = lib/libkvclient.la
#test_bench_client_LDFLAGS = -Llib -lkvclient
#test_bench_client_LDADD = ${LIBS} -lkvclient
#check_PROGRAMS = test/test-client \
# test/test-server \
# test/bench-client \
# test/test-mpi \
# test/test-mpi-group
#
#TESTS = test/test-client \
# test/test-server \
# test/bench-client \
# test/test-mpi \
# test/test-mpi-group
#test_test_client_SOURCES = test/test-client.cc
#test_test_client_DEPENDENCIES = lib/libkvclient.la
#test_test_client_LDFLAGS = -Llib -lkvclient
#test_test_server_SOURCES = test/test-server.cc
#test_test_server_DEPENDENCIES = lib/libkvserver.la
#test_test_server_LDFLAGS = -Llib -lkvserver ${SERVER_LIBS}
#test_test_mpi_SOURCES = test/test-mpi.cc
#test_test_mpi_DEPENDENCIES = lib/libkvserver.la lib/libkvclient.la
#test_test_mpi_LDFLAGS = -Llib -lkvclient -lkvserver ${SERVER_LIBS}
#test_test_mpi_group_SOURCES = test/test-mpi-group.cc
#test_test_mpi_group_DEPENDENCIES = lib/libkvgroupserver.la lib/libkvgroupclient.la lib/libkvserver.la lib/libkvclient.la
#test_test_mpi_group_LDFLAGS = -Llib -lkvgroupserver -lkvgroupclient -lkvclient -lkvserver ${SERVER_LIBS} ${GROUP_LIBS}
test_sdskv_custom_cmp_test_SOURCES = test/sdskv-custom-cmp-test.cc
test_sdskv_custom_cmp_test_DEPENDENCIES = lib/libsdskv-client.la
test_sdskv_custom_cmp_test_LDFLAGS = -Llib -lsdskv-client
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = maint/sdskv-server.pc \
......
......@@ -17,8 +17,10 @@ extern "C" {
#define SDSKV_ABT_POOL_DEFAULT ABT_POOL_NULL
#define SDSKV_MPLEX_ID_DEFAULT 0
#define SDSKV_PROVIDER_IGNORE NULL
#define SDSKV_COMPARE_DEFAULT NULL
typedef struct sdskv_server_context_t* sdskv_provider_t;
typedef int (*sdskv_compare_fn)(const void*, size_t, const void*, size_t);
int sdskv_provider_register(
margo_instance_id mid,
......@@ -40,6 +42,7 @@ int sdskv_provider_add_database(
sdskv_provider_t provider,
const char* db_name,
sdskv_db_type_t db_type,
sdskv_compare_fn comp_fn,
sdskv_database_id_t* sb_id);
/**
......
......@@ -124,6 +124,10 @@ void BerkeleyDBDataStore::createDatabase(std::string db_name) {
// debugging support?
};
void BerkeleyDBDataStore::set_comparison_function(comparator_fn less) {
// TODO
}
bool BerkeleyDBDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) {
int status = 0;
bool success = false;
......
......@@ -20,6 +20,7 @@ public:
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data);
virtual bool erase(const ds_bulk_t &key);
virtual void set_in_memory(bool enable); // enable/disable in-memory mode
virtual void set_comparison_function(comparator_fn less);
protected:
virtual std::vector<ds_bulk_t> vlist_keys(const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix);
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyvals(const ds_bulk_t &start_key, size_t count, const ds_bulk_t &);
......
......@@ -39,6 +39,10 @@ void BwTreeDataStore::createDatabase(std::string db_name) {
_tree->AssignGCID(0);
};
void BwTreeDataStore::set_comparison_function(comparator_fn less) {
// TODO
}
bool BwTreeDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) {
std::vector<ds_bulk_t> values;
bool success = false;
......
......@@ -20,6 +20,7 @@ public:
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data);
virtual bool erase(const ds_bulk_t &key);
virtual void set_in_memory(bool enable); // a no-op
virtual void set_comparison_function(comparator_fn less);
protected:
virtual std::vector<ds_bulk_t> vlist_keys(const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix);
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyvals(const ds_bulk_t &start_key, size_t count, const ds_bulk_t &prefix);
......
......@@ -13,6 +13,9 @@ enum class Duplicates : int {ALLOW, IGNORE};
class AbstractDataStore {
public:
typedef int (*comparator_fn)(const void*, size_t, const void*, size_t);
AbstractDataStore();
AbstractDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
virtual ~AbstractDataStore();
......@@ -22,6 +25,7 @@ public:
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data)=0;
virtual bool erase(const ds_bulk_t &key) = 0;
virtual void set_in_memory(bool enable)=0; // enable/disable in-memory mode (where supported)
virtual void set_comparison_function(comparator_fn less)=0;
std::vector<ds_bulk_t> list_keys(
const ds_bulk_t &start_key, size_t count, const ds_bulk_t& prefix=ds_bulk_t()) {
......
......@@ -56,6 +56,10 @@ void LevelDBDataStore::createDatabase(std::string db_name) {
// debugging support?
};
void LevelDBDataStore::set_comparison_function(comparator_fn less) {
// TODO
}
bool LevelDBDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) {
leveldb::Status status;
bool success = false;
......
......@@ -20,6 +20,7 @@ public:
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data);
virtual bool erase(const ds_bulk_t &key);
virtual void set_in_memory(bool enable); // not supported, a no-op
virtual void set_comparison_function(comparator_fn less);
protected:
virtual std::vector<ds_bulk_t> vlist_keys(const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix);
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyvals(const ds_bulk_t &start_key, size_t count, const ds_bulk_t &prefix);
......
......@@ -4,19 +4,34 @@
#define map_datastore_h
#include <map>
#include "kv-config.h"
#include "bulk.h"
#include "datastore/datastore.h"
class MapDataStore : public AbstractDataStore {
private:
struct keycmp {
MapDataStore* _store;
keycmp(MapDataStore* store)
: _store(store) {}
bool operator()(const ds_bulk_t& a, const ds_bulk_t& b) const {
if(_store->_less)
return _store->_less((const void*)a.data(),
a.size(), (const void*)b.data(), b.size()) < 0;
else
return std::less<ds_bulk_t>()(a,b);
}
};
public:
MapDataStore()
: AbstractDataStore() {}
: AbstractDataStore(), _less(nullptr), _map(keycmp(this)) {}
MapDataStore(Duplicates duplicates, bool eraseOnGet, bool debug)
: AbstractDataStore(duplicates, eraseOnGet, debug) {}
: AbstractDataStore(duplicates, eraseOnGet, debug), _less(nullptr), _map(keycmp(this)) {}
~MapDataStore() = default;
......@@ -55,6 +70,10 @@ class MapDataStore : public AbstractDataStore {
_in_memory = enable;
}
virtual void set_comparison_function(comparator_fn less) {
_less = less;
}
protected:
virtual std::vector<ds_bulk_t> vlist_keys(
......@@ -106,7 +125,8 @@ class MapDataStore : public AbstractDataStore {
}
private:
std::map<ds_bulk_t, ds_bulk_t> _map;
AbstractDataStore::comparator_fn _less;
std::map<ds_bulk_t, ds_bulk_t, keycmp> _map;
};
#endif
......@@ -180,7 +180,9 @@ int main(int argc, char **argv)
}
sdskv_database_id_t db_id;
ret = sdskv_provider_add_database(provider, opts.db_names[i], opts.db_types[i], &db_id);
ret = sdskv_provider_add_database(provider,
opts.db_names[i], opts.db_types[i], SDSKV_COMPARE_DEFAULT,
&db_id);
if(ret != 0)
{
......@@ -209,7 +211,10 @@ int main(int argc, char **argv)
for(i=0; i < opts.num_db; i++) {
sdskv_database_id_t db_id;
ret = sdskv_provider_add_database(provider, opts.db_names[i], opts.db_types[i], &db_id);
ret = sdskv_provider_add_database(provider,
opts.db_names[i], opts.db_types[i],
SDSKV_COMPARE_DEFAULT,
&db_id);
if(ret != 0)
{
......
......@@ -107,10 +107,12 @@ extern "C" int sdskv_provider_add_database(
sdskv_provider_t provider,
const char *db_name,
sdskv_db_type_t db_type,
sdskv_compare_fn comp_fn,
sdskv_database_id_t* db_id)
{
auto db = datastore_factory::create_datastore(db_type, std::string(db_name));
if(db == nullptr) return -1;
db->set_comparison_function(comp_fn);
sdskv_database_id_t id = (sdskv_database_id_t)(db);
provider->name2id[std::string(db_name)] = id;
......
#!/bin/bash -x
if [ -z $srcdir ]; then
echo srcdir variable not set.
exit 1
fi
source $srcdir/test/test-util.sh
find_db_name
# start a server with 2 second wait,
# 20s timeout, and my_test_db as database
test_start_custom_server 2 20 $test_db_full
sleep 1
#####################
run_to 20 test/sdskv-custom-cmp-test $svr_addr 1 $test_db_name 10
if [ $? -ne 0 ]; then
wait
exit 1
fi
wait
echo cleaning up $TMPBASE
rm -rf $TMPBASE
exit 0
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <margo.h>
#include <iostream>
#include <string>
#include <vector>
#include <algorithm>
#include <map>
#include "sdskv-client.h"
static std::string gen_random_string(size_t len);
int main(int argc, char *argv[])
{
char cli_addr_prefix[64] = {0};
char *sdskv_svr_addr_str;
char *db_name;
margo_instance_id mid;
hg_addr_t svr_addr;
uint8_t mplex_id;
uint32_t num_keys;
sdskv_client_t kvcl;
sdskv_provider_handle_t kvph;
hg_return_t hret;
int ret;
if(argc != 5)
{
fprintf(stderr, "Usage: %s <sdskv_server_addr> <mplex_id> <db_name> <num_keys>\n", argv[0]);
fprintf(stderr, " Example: %s tcp://localhost:1234 1 foo 1000\n", argv[0]);
return(-1);
}
sdskv_svr_addr_str = argv[1];
mplex_id = atoi(argv[2]);
db_name = argv[3];
num_keys = atoi(argv[4]);
/* initialize Margo using the transport portion of the server
* address (i.e., the part before the first : character if present)
*/
for(unsigned i=0; (i<63 && sdskv_svr_addr_str[i] != '\0' && sdskv_svr_addr_str[i] != ':'); i++)
cli_addr_prefix[i] = sdskv_svr_addr_str[i];
/* start margo */
mid = margo_init(cli_addr_prefix, MARGO_SERVER_MODE, 0, 0);
if(mid == MARGO_INSTANCE_NULL)
{
fprintf(stderr, "Error: margo_init()\n");
return(-1);
}
ret = sdskv_client_init(mid, &kvcl);
if(ret != 0)
{
fprintf(stderr, "Error: sdskv_client_init()\n");
margo_finalize(mid);
return -1;
}
/* look up the SDSKV server address */
hret = margo_addr_lookup(mid, sdskv_svr_addr_str, &svr_addr);
if(hret != HG_SUCCESS)
{
fprintf(stderr, "Error: margo_addr_lookup()\n");
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return(-1);
}
/* create a SDSKV provider handle */
ret = sdskv_provider_handle_create(kvcl, svr_addr, mplex_id, &kvph);
if(ret != 0)
{
fprintf(stderr, "Error: sdskv_provider_handle_create()\n");
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return(-1);
}
/* open the database */
sdskv_database_id_t db_id;
ret = sdskv_open(kvph, db_name, &db_id);
if(ret == 0) {
printf("Successfuly open database %s, id is %ld\n", db_name, db_id);
} else {
fprintf(stderr, "Error: could not open database %s\n", db_name);
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return(-1);
}
/* **** put keys ***** */
std::vector<std::string> keys;
size_t max_key_size = 16;
size_t max_value_size = 16;
for(unsigned i=0; i < num_keys; i++) {
auto k = gen_random_string((max_key_size + (rand()%max_key_size))/2);
// half of the entries will be put using bulk
auto v = gen_random_string(i*max_value_size/num_keys);
ret = sdskv_put(kvph, db_id,
(const void *)k.data(), k.size()+1,
(const void *)v.data(), v.size()+1);
if(ret != 0) {
fprintf(stderr, "Error: sdskv_put() failed (iteration %d)\n", i);
sdskv_shutdown_service(kvcl, svr_addr);
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return -1;
}
keys.push_back(k);
std::cerr << k << "\t ===> " << v << std::endl;
}
printf("Successfuly inserted %d keys\n", num_keys);
/* **** list keys **** */
std::sort(keys.begin(), keys.end(), [](const std::string& lhs, const std::string& rhs){
auto s = std::min(lhs.size(), rhs.size());
for(unsigned i = 0; i < s+1; i++) {
auto l = lhs[i] > 0 ? tolower(lhs[i]) : 0;
auto r = rhs[i] > 0 ? tolower(rhs[i]) : 0;
if(l > r) return false;
if(l < r) return true;
}
return true;
});
auto i1 = keys.size()/3;
auto i2 = 2*keys.size()/3;
auto keys_after = keys[i1-1];
hg_size_t max_keys = i2-i1;
std::vector<std::vector<char>> result_strings(max_keys, std::vector<char>(max_key_size+1));
std::vector<void*> list_result(max_keys);
std::vector<hg_size_t> ksizes(max_keys, max_key_size+1);
std::cerr << "Keys are sorted as follows:" << std::endl;
for(auto k : keys)
std::cerr << k << std::endl;
for(unsigned i=0; i<max_keys; i++) {
list_result[i] = (void*)result_strings[i].data();
}
std::cout << "Expecting " << max_keys << " keys after " << keys_after << std::endl;
ret = sdskv_list_keys(kvph, db_id,
(const void*)keys_after.c_str(), keys_after.size()+1,
list_result.data(), ksizes.data(), &max_keys);
if(ret != 0) {
fprintf(stderr, "Error: sdskv_list_keys() failed\n");
sdskv_shutdown_service(kvcl, svr_addr);
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return -1;
}
if(max_keys != i2-i1) {
fprintf(stderr, "Error: number of returned keys (%ld) is not the number requested (%ld)\n", max_keys, i2-i1);
sdskv_shutdown_service(kvcl, svr_addr);
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return -1;
}
/* put the returned strings in an array */
std::vector<std::string> res;
for(auto ptr : list_result) {
res.push_back(std::string((const char*)ptr));
std::cout << *res.rbegin() << std::endl;
}
/* check that the returned keys are correct */
for(unsigned i=0; i < max_keys; i++) {
if(res[i] != keys[i+i1]) {
fprintf(stderr, "Error: returned keys don't match expected keys\n");
fprintf(stderr, " key received: %s\n", res[i].c_str());
fprintf(stderr, " key expected: %s\n", keys[i+i1].c_str());
sdskv_shutdown_service(kvcl, svr_addr);
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return -1;
}
}
/* shutdown the server */
ret = sdskv_shutdown_service(kvcl, svr_addr);
/**** cleanup ****/
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return(ret);
}
static std::string gen_random_string(size_t len) {
static const char alphanum[] =
"0123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz";
std::string s(len, ' ');
for (unsigned i = 0; i < len; ++i) {
s[i] = alphanum[rand() % (sizeof(alphanum) - 1)];
}
return s;
}
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <unistd.h>
#include <margo.h>
#include <sdskv-server.h>
typedef enum {
MODE_DATABASES = 0,
MODE_PROVIDERS = 1
} kv_mplex_mode_t;
struct options
{
char *listen_addr_str;
unsigned num_db;
char **db_names;
sdskv_db_type_t *db_types;
char *host_file;
kv_mplex_mode_t mplex_mode;
};
int custom_key_cmp(const void* x, size_t sx, const void* y, size_t sy) {
const char* a = (const char*)x;
const char* b = (const char*)y;
return strncasecmp(a, b, sx < sy ? sx : sy);
}
static void usage(int argc, char **argv)
{
fprintf(stderr, "Usage: sdskv-server-daemon [OPTIONS] <listen_addr> <db name 1>[:map|:bwt|:bdb|:ldb] <db name 2>[:map|:bwt|:bdb|:ldb] ...\n");
fprintf(stderr, " listen_addr is the Mercury address to listen on\n");
fprintf(stderr, " db name X are the names of the databases\n");
fprintf(stderr, " [-f filename] to write the server address to a file\n");
fprintf(stderr, " [-m mode] multiplexing mode (providers or databases) for managing multiple databases (default is databases)\n");
fprintf(stderr, "Example: ./sdskv-server-daemon tcp://localhost:1234 foo:bdb bar\n");
return;
}
static sdskv_db_type_t parse_db_type(char* db_fullname) {
char* column = strstr(db_fullname, ":");
if(column == NULL) {
return KVDB_MAP;
}
*column = '\0';
char* db_type = column + 1;
if(strcmp(db_type, "map") == 0) {
return KVDB_MAP;
} else if(strcmp(db_type, "bwt") == 0) {
return KVDB_BWTREE;
} else if(strcmp(db_type, "bdb") == 0) {
return KVDB_BERKELEYDB;
} else if(strcmp(db_type, "ldb") == 0) {
return KVDB_LEVELDB;
}
fprintf(stderr, "Unknown database type \"%s\"\n", db_type);
exit(-1);
}
static void parse_args(int argc, char **argv, struct options *opts)
{