Commit 7ebaa3ea authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

added map datastore and a get test

parent 2a7f2176
......@@ -8,7 +8,8 @@ AM_CPPFLAGS = -I${srcdir}/src
bin_PROGRAMS = bin/sdskv-server-daemon \
bin/sdskv-shutdown \
test/sdskv-open-test \
test/sdskv-put-test
test/sdskv-put-test \
test/sdskv-get-test
bin_sdskv_server_daemon_SOURCES = src/sdskv-server-daemon.c
bin_sdskv_server_daemon_DEPENDENCIES = lib/libsdskv-server.la
......@@ -81,7 +82,8 @@ check_PROGRAMS = test/sdskv-open-test
TESTS = test/basic.sh \
test/open-test.sh \
test/put-test.sh
test/put-test.sh \
test/get-test.sh
TESTS_ENVIRONMENT = TIMEOUT="$(TIMEOUT)" \
MKTEMP="$(MKTEMP)"
......@@ -94,6 +96,10 @@ test_sdskv_put_test_SOURCES = test/sdskv-put-test.cc
test_sdskv_put_test_DEPENDENCIES = lib/libsdskv-client.la
test_sdskv_put_test_LDFLAGS = -Llib -lsdskv-client
test_sdskv_get_test_SOURCES = test/sdskv-get-test.cc
test_sdskv_get_test_DEPENDENCIES = lib/libsdskv-client.la
test_sdskv_get_test_LDFLAGS = -Llib -lsdskv-client
#############################################################
## tests bellow correspond to old tests (see old-test folder)
#############################################################
......
......@@ -133,8 +133,8 @@ bool BerkeleyDBDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) {
// ALLOW case deals with actual duplicates (where key is the same but value is different).
// This option might be used when eraseOnGet is set (e.g. ParSplice hotpoint use case).
if (_duplicates == Duplicates::IGNORE || _duplicates == Duplicates::ALLOW) {
Dbt db_key(&(key[0]), uint32_t(key.size()));
Dbt db_data(&(data[0]), uint32_t(data.size()));
Dbt db_key((void*)&(key[0]), uint32_t(key.size()));
Dbt db_data((void*)&(data[0]), uint32_t(data.size()));
db_key.set_flags(DB_DBT_USERMEM);
db_data.set_flags(DB_DBT_USERMEM);
uint32_t flags = DB_NOOVERWRITE; // to simply overwrite value, don't use this flag
......@@ -162,7 +162,7 @@ bool BerkeleyDBDataStore::get(const ds_bulk_t &key, ds_bulk_t &data) {
data.clear();
Dbt db_key(&(key[0]), uint32_t(key.size()));
Dbt db_key((void*)&(key[0]), uint32_t(key.size()));
Dbt db_data;
db_key.set_flags(DB_DBT_USERMEM);
db_data.set_flags(DB_DBT_REALLOC);
......@@ -213,9 +213,8 @@ std::vector<ds_bulk_t> BerkeleyDBDataStore::BerkeleyDBDataStore::list(const ds_b
std::vector<ds_bulk_t> keys;
Dbc * cursorp;
Dbt key, data;
int ret;
_dbm->cursor(NULL, &cursorp, 0);
while (ret = cursorp->get(&key, &data, DB_NEXT) == 0) {
while (cursorp->get(&key, &data, DB_NEXT) == 0) {
ds_bulk_t k(key.get_size() );
memcpy(k.data(), key.get_data(), key.get_size() );
/* I hope this is a deep copy! */
......
......@@ -4,7 +4,7 @@
#define bdb_datastore_h
#include "kv-config.h"
#include "sds-keyval.h"
#include "datastore/datastore.h"
#include <db_cxx.h>
#include <dbstl_map.h>
......
......@@ -9,6 +9,8 @@
#endif
#include "datastore.h"
#include "map_datastore.h"
#ifdef USE_BWTREE
#include "bwtree_datastore.h"
#endif
......@@ -23,6 +25,12 @@
class datastore_factory {
static AbstractDataStore* create_map_datastore(const std::string& name) {
auto db = new MapDataStore();
db->createDatabase(name);
return db;
}
static AbstractDataStore* create_bwtree_datastore(const std::string& name) {
#ifdef USE_BWTREE
auto db = new BwTreeDataStore();
......@@ -62,6 +70,8 @@ class datastore_factory {
#endif
{
switch(type) {
case KVDB_MAP:
return create_map_datastore(name);
case KVDB_BWTREE:
return create_bwtree_datastore(name);
case KVDB_LEVELDB:
......
......@@ -4,9 +4,9 @@
#define ldb_datastore_h
#include "kv-config.h"
#include "sds-keyval.h"
#include <leveldb/db.h>
#include <leveldb/env.h>
#include "datastore/datastore.h"
// may want to implement some caching for persistent stores like LevelDB
class LevelDBDataStore : public AbstractDataStore {
......@@ -23,8 +23,8 @@ public:
protected:
leveldb::DB *_dbm = NULL;
private:
std::string toString(ds_bulk_t &key);
ds_bulk_t fromString(std::string &keystr);
std::string toString(const ds_bulk_t &key);
ds_bulk_t fromString(const std::string &keystr);
};
#endif // ldb_datastore_h
// Copyright (c) 2017, Los Alamos National Security, LLC.
// All rights reserved.
#ifndef map_datastore_h
#define map_datastore_h
#include <map>
#include "kv-config.h"
#include "bulk.h"
#include "datastore/datastore.h"
class MapDataStore : public AbstractDataStore {
public:
MapDataStore()
: AbstractDataStore() {}
MapDataStore(Duplicates duplicates, bool eraseOnGet, bool debug)
: AbstractDataStore(duplicates, eraseOnGet, debug) {}
~MapDataStore() = default;
virtual void createDatabase(std::string db_name) {
_map.clear();
}
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data) {
if(_duplicates == Duplicates::IGNORE && _map.count(key)) {
return true;
}
_map.insert(std::make_pair(key,data));
return true;
}
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data) {
auto it = _map.find(key);
if(it == _map.end()) return false;
data = it->second;
return true;
}
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t>& values) {
values.clear();
values.resize(1);
return get(key, values[0]);
}
virtual void set_in_memory(bool enable) {
_in_memory = enable;
}
virtual std::vector<ds_bulk_t> list(const ds_bulk_t &start_key, size_t count) {
std::vector<ds_bulk_t> result;
auto it = _map.lower_bound(start_key);
while(it != _map.end() && it->first == start_key) {
it++;
}
auto lastkey = start_key;
for(size_t i=0; it != _map.end() && i < count; it++) {
if(it->first != lastkey) {
result.push_back(lastkey);
i += 1;
lastkey = it->first;
}
}
return result;
}
private:
std::map<ds_bulk_t, ds_bulk_t> _map;
};
#endif
......@@ -11,6 +11,7 @@ extern "C" {
typedef enum kv_db_type_t
{
KVDB_MAP,
KVDB_BWTREE,
KVDB_LEVELDB,
KVDB_BERKELEYDB
......
......@@ -8,6 +8,7 @@ extern "C" {
typedef enum sdskv_db_type_t
{
KVDB_MAP,
KVDB_BWTREE,
KVDB_LEVELDB,
KVDB_BERKELEYDB
......
......@@ -27,7 +27,7 @@ struct options
static void usage(int argc, char **argv)
{
fprintf(stderr, "Usage: sdskv-server-daemon [OPTIONS] <listen_addr> <db name 1>[:bwt|:bdb|:ldb] <db name 2>[:bwt|:bdb|:ldb] ...\n");
fprintf(stderr, "Usage: sdskv-server-daemon [OPTIONS] <listen_addr> <db name 1>[:map|:bwt|:bdb|:ldb] <db name 2>[:map|:bwt|:bdb|:ldb] ...\n");
fprintf(stderr, " listen_addr is the Mercury address to listen on\n");
fprintf(stderr, " db name X are the names of the databases\n");
fprintf(stderr, " [-f filename] to write the server address to a file\n");
......@@ -38,10 +38,14 @@ static void usage(int argc, char **argv)
static sdskv_db_type_t parse_db_type(char* db_fullname) {
char* column = strstr(db_fullname, ":");
if(column == NULL) return KVDB_BWTREE;
if(column == NULL) {
return KVDB_MAP;
}
*column = '\0';
char* db_type = column + 1;
if(strcmp(db_type, "bwt") == 0) {
if(strcmp(db_type, "map") == 0) {
return KVDB_MAP;
} else if(strcmp(db_type, "bwt") == 0) {
return KVDB_BWTREE;
} else if(strcmp(db_type, "bdb") == 0) {
return KVDB_BERKELEYDB;
......@@ -175,9 +179,8 @@ int main(int argc, char **argv)
return(-1);
}
sdskv_db_type_t db_type = KVDB_BWTREE; // TODO get from argv
sdskv_database_id_t db_id;
ret = sdskv_provider_add_database(provider, opts.db_names[i], db_type, &db_id);
ret = sdskv_provider_add_database(provider, opts.db_names[i], opts.db_types[i], &db_id);
if(ret != 0)
{
......@@ -205,9 +208,8 @@ int main(int argc, char **argv)
}
for(i=0; i < opts.num_db; i++) {
sdskv_db_type_t db_type = KVDB_BWTREE; // TODO get from argv
sdskv_database_id_t db_id;
ret = sdskv_provider_add_database(provider, opts.db_names[i], db_type, &db_id);
ret = sdskv_provider_add_database(provider, opts.db_names[i], opts.db_types[i], &db_id);
if(ret != 0)
{
......
#!/bin/bash -x
if [ -z $srcdir ]; then
echo srcdir variable not set.
exit 1
fi
source $srcdir/test/test-util.sh
# start a server with 2 second wait,
# 20s timeout, and my_test_db as database
test_start_server 2 20 my_test_db
sleep 1
#####################
run_to 20 test/sdskv-get-test $svr_addr 1 my_test_db 10
if [ $? -ne 0 ]; then
wait
exit 1
fi
wait
echo cleaning up $TMPBASE
rm -rf $TMPBASE
exit 0
......@@ -14,7 +14,7 @@ sleep 1
#####################
run_to 20 test/sdskv-put-test $svr_addr 1 my_test_db 100
run_to 20 test/sdskv-put-test $svr_addr 1 my_test_db 10
if [ $? -ne 0 ]; then
wait
exit 1
......
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <margo.h>
#include <string>
#include <vector>
#include <algorithm>
#include <map>
#include "sdskv-client.h"
static std::string gen_random_string(size_t len);
int main(int argc, char *argv[])
{
char cli_addr_prefix[64] = {0};
char *sdskv_svr_addr_str;
char *db_name;
margo_instance_id mid;
hg_addr_t svr_addr;
uint8_t mplex_id;
uint32_t num_keys;
sdskv_client_t kvcl;
sdskv_provider_handle_t kvph;
hg_return_t hret;
int ret;
if(argc != 5)
{
fprintf(stderr, "Usage: %s <sdskv_server_addr> <mplex_id> <db_name> <num_keys>\n", argv[0]);
fprintf(stderr, " Example: %s tcp://localhost:1234 1 foo 1000\n", argv[0]);
return(-1);
}
sdskv_svr_addr_str = argv[1];
mplex_id = atoi(argv[2]);
db_name = argv[3];
num_keys = atoi(argv[4]);
/* initialize Margo using the transport portion of the server
* address (i.e., the part before the first : character if present)
*/
for(unsigned i=0; (i<63 && sdskv_svr_addr_str[i] != '\0' && sdskv_svr_addr_str[i] != ':'); i++)
cli_addr_prefix[i] = sdskv_svr_addr_str[i];
/* start margo */
mid = margo_init(cli_addr_prefix, MARGO_SERVER_MODE, 0, 0);
if(mid == MARGO_INSTANCE_NULL)
{
fprintf(stderr, "Error: margo_init()\n");
return(-1);
}
ret = sdskv_client_init(mid, &kvcl);
if(ret != 0)
{
fprintf(stderr, "Error: sdskv_client_init()\n");
margo_finalize(mid);
return -1;
}
/* look up the SDSKV server address */
hret = margo_addr_lookup(mid, sdskv_svr_addr_str, &svr_addr);
if(hret != HG_SUCCESS)
{
fprintf(stderr, "Error: margo_addr_lookup()\n");
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return(-1);
}
/* create a SDSKV provider handle */
ret = sdskv_provider_handle_create(kvcl, svr_addr, mplex_id, &kvph);
if(ret != 0)
{
fprintf(stderr, "Error: sdskv_provider_handle_create()\n");
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return(-1);
}
/* open the database */
sdskv_database_id_t db_id;
ret = sdskv_open(kvph, db_name, &db_id);
if(ret == 0) {
printf("Successfuly open database %s, id is %ld\n", db_name, db_id);
} else {
fprintf(stderr, "Error: could not open database %s\n", db_name);
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return(-1);
}
/* **** put keys ***** */
std::vector<std::string> keys;
std::map<std::string, std::string> reference;
size_t max_value_size = 8000;
for(unsigned i=0; i < num_keys; i++) {
auto k = gen_random_string(16);
// half of the entries will be put using bulk
auto v = gen_random_string(i*max_value_size/num_keys);
ret = sdskv_put(kvph, db_id,
(const void *)k.data(), k.size()+1,
(const void *)v.data(), v.size()+1);
if(ret != 0) {
fprintf(stderr, "Error: sdskv_put() failed (iteration %d)\n", i);
sdskv_shutdown_service(kvcl, svr_addr);
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return -1;
}
reference[k] = v;
keys.push_back(k);
}
printf("Successfuly inserted %d keys\n", num_keys);
/* **** get keys **** */
for(unsigned i=0; i < num_keys; i++) {
auto k = keys[rand() % keys.size()];
size_t value_size = max_value_size;
std::vector<char> v(max_value_size);
ret = sdskv_get(kvph, db_id,
(const void *)k.data(), k.size()+1,
(void *)v.data(), &value_size);
if(ret != 0) {
fprintf(stderr, "Error: sdskv_get() failed (key was %s)\n", k.c_str());
sdskv_shutdown_service(kvcl, svr_addr);
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return -1;
}
std::string vstring((char*)(v.data()));
if(vstring != reference[k]) {
fprintf(stderr, "Error: sdskv_get() returned a value different from the reference\n");
sdskv_shutdown_service(kvcl, svr_addr);
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return -1;
}
}
/* shutdown the server */
ret = sdskv_shutdown_service(kvcl, svr_addr);
/**** cleanup ****/
sdskv_provider_handle_release(kvph);
margo_addr_free(mid, svr_addr);
sdskv_client_finalize(kvcl);
margo_finalize(mid);
return(ret);
}
static std::string gen_random_string(size_t len) {
static const char alphanum[] =
"0123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz";
std::string s(len, ' ');
for (unsigned i = 0; i < len; ++i) {
s[i] = alphanum[rand() % (sizeof(alphanum) - 1)];
}
return s;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment