Commit d1fb9292 authored by David Rich's avatar David Rich Committed by Rob Latham

Adding DataStore. Compiles but has not been tested. Still need to update test-mpi test app.

parent 848219f0
......@@ -6,7 +6,7 @@ lib_LTLIBRARIES = libkvclient.la \
libkvclient_la_SOURCES = src/kv-client.c
libkvserver_la_SOURCES = src/kv-server.cc \
src/BwTree/src/bwtree.cpp
src/BwTree/src/bwtree.cpp
libkvserver_la_CXXFLAGS = -pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof
libkvserver_la_CPPFLAGS = -I${srcdir}/src/BwTree/src
......
// Copyright (c) 2017, Los Alamos National Security, LLC.
// All rights reserved.
#include "bwtree.h"
#include <vector>
using namespace wangziqi2013::bwtree;
#ifndef datastore_cc
#define datastore_cc
template <typename KeyType, typename ValueType> class AbstractDataStore {
public:
virtual void createDatabase(std::string homeDir, std::string baseName)=0;
virtual bool put(const KeyType &key, ValueType &data)=0;
virtual bool get(const KeyType &key, ValueType &data)=0;
virtual bool get(const KeyType &key, std::vector<ValueType> &data)=0;
};
template <typename KeyType,
typename ValueType,
typename KeyComparator = std::less<KeyType>,
typename KeyEqualityChecker = std::equal_to<KeyType>,
typename KeyHashFunc = std::hash<KeyType>,
typename ValueEqualityChecker = std::equal_to<ValueType>,
typename ValueHashFunc = std::hash<ValueType>>
class BwTreeDataStore : public AbstractDataStore<KeyType, ValueType> {
public:
BwTreeDataStore(Duplicates duplicates, bool eraseOnGet, bool debug) {
_duplicates = duplicates;
_eraseOnGet = eraseOnGet;
_debug = debug;
_tree = NULL;
};
virtual ~BwTreeDataStore() {
delete _tree;
};
virtual void createDatabase(std::string homeDir, std::string baseName) {
_tree = new BwTree<KeyType, ValueType,
KeyComparator, KeyEqualityChecker, KeyHashFunc,
ValueEqualityChecker, ValueHashFunc>();
if (_debug) {
_tree->SetDebugLogging(1);
}
else {
_tree->SetDebugLogging(0);
}
_tree->UpdateThreadLocal(1);
_tree->AssignGCID(0);
};
virtual bool put(const KeyType &key, ValueType &data) {
std::vector<ValueType> values;
bool success = false;
_tree->GetValue(key, values);
bool duplicate_key = (values.size() != 0);
if (duplicate_key) {
if (_duplicates == Duplicates::IGNORE) {
success = true;
}
else { // Duplicates::ALLOW
success = _tree->Insert(key, data);
}
}
else {
success = _tree->Insert(key, data);
}
return success;
};
virtual bool get(const KeyType &key, ValueType &data) {
std::vector<ValueType> values;
bool success = false;
_tree->GetValue(key, values);
if (values.size() == 1) {
data = values.front();
success = true;
}
else if (values.size() > 1) {
// this should only happen if duplicates are allowed
if (_duplicates == Duplicates::ALLOW) {
data = values.front(); // caller is asking for just 1
success = true;
}
}
return success;
};
virtual bool get(const KeyType &key, std::vector<ValueType> &data) {
std::vector<ValueType> values;
bool success = false;
_tree->GetValue(key, values);
if (values.size() > 1) {
// this should only happen if duplicates are allowed
if (_duplicates == Duplicates::ALLOW) {
data = values;
success = true;
}
}
else {
data = values;
success = true;
}
return success;
};
protected:
BwTree<KeyType, ValueType,
KeyComparator, KeyEqualityChecker, KeyHashFunc,
ValueEqualityChecker, ValueHashFunc> *_tree = NULL;
Duplicates _duplicates;
bool _eraseOnGet;
bool _debug;
};
#endif // datastore_cc
// Copyright (c) 2017, Los Alamos National Security, LLC.
// All rights reserved.
#ifndef datastore_h
#define datastore_h
enum class Duplicates : int {ALLOW, IGNORE};
#endif // datastore_h
#include "datastore.cc"
#if 0
#include "bwtree.h"
#endif
#include "sds-keyval.h"
#include "datastore.h"
#include <mercury.h>
#include <margo.h>
#include <abt-snoozer.h>
......@@ -131,7 +130,6 @@ DEFINE_MARGO_RPC_HANDLER(bench_handler)
#else
/* keyval-specific stuff can go here */
#include <bwtree.h>
#include <vector>
#include <boost/functional/hash.hpp>
......@@ -153,18 +151,15 @@ using namespace wangziqi2013::bwtree;
// since this is global, we're assuming this server instance
// will manage a single DB
BwTree<uint64_t, std::vector<char>,
std::less<uint64_t>,
std::equal_to<uint64_t>,
std::hash<uint64_t>,
my_equal_to,
my_hash> *TREE = NULL;
BwTreeDataStore<uint64_t, std::vector<char>,
std::less<uint64_t>,
std::equal_to<uint64_t>,
std::hash<uint64_t>,
my_equal_to,
my_hash> *dbm = NULL;
const char *my_db = "minima_store";
// these should probably be passed in put/get calls
uint16_t db_options = KV_IGNOREDUPKEY;
static hg_return_t open_handler(hg_handle_t handle)
{
hg_return_t ret;
......@@ -175,7 +170,17 @@ static hg_return_t open_handler(hg_handle_t handle)
printf("SERVER: OPEN %s, k-type %d, v-type %d\n", in.name, in.keytype, in.valtype);
if (strcmp(in.name, my_db) == 0) {
printf("SERVER: BwTree initialized and ready for %s\n", my_db);
if (!dbm) {
printf("SERVER: initializing DataStore instance to manage %s\n", my_db);
dbm = new BwTreeDataStore<uint64_t, std::vector<char>,
std::less<uint64_t>,
std::equal_to<uint64_t>,
std::hash<uint64_t>,
my_equal_to,
my_hash>(Duplicates::IGNORE, false, false);
dbm->createDatabase(std::string(""), std::string(my_db));
}
printf("SERVER: DataStore initialized and ready for %s\n", my_db);
out.ret = HG_SUCCESS;
}
else {
......@@ -232,8 +237,13 @@ static hg_return_t put_handler(hg_handle_t handle)
std::vector<char> data;
data.resize(sizeof(in.value));
memcpy(data.data(), &in.value, sizeof(in.value));
// TODO: check return value here (see bulk_put_handler)
TREE->Insert(in.key, data);
if (dbm->put(in.key, data)) {
printf("SERVER: PUT succeeded for key = %d value = %d\n", in.key, in.value);
}
else {
printf("SERVER: PUT failed for key = %d value = %d\n", in.key, in.value);
}
ret = margo_respond(handle, &out);
assert(ret == HG_SUCCESS);
......@@ -256,7 +266,6 @@ static hg_return_t bulk_put_handler(hg_handle_t handle)
ret = margo_get_input(handle, &bpin);
assert(ret == HG_SUCCESS);
printf("SERVER: BULK PUT key = %lu size = %lu\n", bpin.key, bpin.size);
/* get handle info and margo instance */
hgi = margo_get_info(handle);
......@@ -271,37 +280,16 @@ static hg_return_t bulk_put_handler(hg_handle_t handle)
assert(ret == HG_SUCCESS);
ret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, bpin.bulk_handle, 0, bulk_handle, 0, bpin.size);
assert(ret == HG_SUCCESS);
std::vector<std::vector<char>> values;
TREE->GetValue(bpin.key, values);
bool duplicate_key = (values.size() != 0);
// only option currently implemented is to ignore duplicate Insert attempts
if (duplicate_key) {
if ((db_options & KV_IGNOREDUPKEY) == KV_IGNOREDUPKEY) {
printf("SERVER: TREE ignoring duplicate Insert attempt for key = %lu\n", bpin.key);
bpout.ret = HG_SUCCESS;
}
else if ((db_options & KV_ALLOWDUPKEY) == KV_ALLOWDUPKEY) {
printf("SERVER: TREE duplicate Insert support not implemented, key = %lu\n", bpin.key);
bpout.ret = HG_OTHER_ERROR;
}
else {
printf("SERVER: TREE unhandled duplicate Insert attempt for key = %lu\n", bpin.key);
bpout.ret = HG_OTHER_ERROR;
}
if (dbm->put(bpin.key, data)) {
printf("SERVER: BULK PUT succeeded for key = %lu size = %lu\n", bpin.key, bpin.size);
bpout.ret = HG_SUCCESS;
}
else {
if (TREE->Insert(bpin.key, data)) {
printf("SERVER: TREE Insert succeeded for key = %lu\n", bpin.key);
bpout.ret = HG_SUCCESS;
}
else {
// BwTree Insert returns False if the key-value pair already
// exists in the DB.
printf("SERVER: TREE unexpected duplicate Insert failed for key = %lu\n", bpin.key);
bpout.ret = HG_OTHER_ERROR; // shouldn't get here, but...
}
// put returns False if the key-value pair already
// exists in the DB.
printf("SERVER: BULK PUT failed for key = %lu\n", bpin.key);
bpout.ret = HG_OTHER_ERROR; // shouldn't get here, but...
}
bpout.ret = ret;
......@@ -326,26 +314,17 @@ static hg_return_t get_handler(hg_handle_t handle)
ret = margo_get_input(handle, &in);
assert(ret == HG_SUCCESS);
/*void GetValue (const KeyType &search_key, std::vector< ValueType > &value_list) */
std::vector<std::vector<char>> values;
TREE->GetValue(in.key, values);
int value = 0;
if (values.size() == 1) {
std::vector<char> data = values.front();
std::vector<char> data;
if (dbm->get(in.key, data)) {
int value = 0;
memcpy(&value, data.data(), data.size());
printf("SERVER: GET succeeded for key=%d, value=%d\n", in.key, value);
out.value = value;
out.ret = HG_SUCCESS;
}
else if (values.size() > 1) {
// get on key returned more than 1 value (return number found)
printf("SERVER: GET: found %lu values for key=%d\n", values.size(), in.key);
out.value = values.size(); // assuming caller will check return code
out.ret = HG_OTHER_ERROR;
}
else {
// get on key did not find a value (return 0 for number found)
printf("SERVER: GET: found 0 values for key=%d\n", in.key);
// get on key did not succeed
printf("SERVER: GET failed for key=%d\n", in.key);
out.value = 0; // assuming caller will check return code
out.ret = HG_OTHER_ERROR;
}
......@@ -372,15 +351,9 @@ static hg_return_t bulk_get_handler(hg_handle_t handle)
ret = margo_get_input(handle, &bgin);
assert(ret == HG_SUCCESS);
/* void GetValue (const KeyType &search_key, std::vector< ValueType > &value_list) */
std::vector<std::vector<char>> values;
TREE->GetValue(bgin.key, values); // is this right for values?
// what do we do if we get more than 1 value?
// perhaps > 1 or 0 results in an error return value?
if (values.size() == 1) {
printf("SERVER: BULK GET: found 1 value for key=%lu\n", bgin.key);
std::vector<char> data = values.front();
std::vector<char> data;
if (dbm->get(bgin.key, data)) {
printf("SERVER: BULK GET succeeded for key=%lu\n", bgin.key);
// will the transfer fit on the client side?
bgout.size = data.size();
if (bgout.size <= bgin.size) {
......@@ -402,15 +375,9 @@ static hg_return_t bulk_get_handler(hg_handle_t handle)
bgout.ret = HG_SIZE_ERROR;
}
}
else if (values.size() > 1) {
// get on key returned more than 1 value (return number found)
printf("SERVER: BULK GET: found %lu values for key=%lu\n", values.size(), bgin.key);
bgout.size = values.size(); // assuming caller will check return code
bgout.ret = HG_OTHER_ERROR;
}
else {
// get on key did not find a value (return 0 for number found)
printf("SERVER: BULK GET: found 0 values for key=%lu\n", bgin.key);
printf("SERVER: BULK GET failed for key=%lu\n", bgin.key);
bgout.size = 0; // assuming caller will check return code
bgout.ret = HG_OTHER_ERROR;
}
......@@ -652,9 +619,8 @@ hg_return_t kv_server_wait_for_shutdown(kv_context *context) {
/* this is the same as client. should be moved to common utility library */
hg_return_t kv_server_deregister(kv_context *context) {
free(context);
//delete TREE; // there seems to be a bug somewhere in BwTree
//printf("SERVER: deregistered, cleaned up BwTree instance\n");
printf("SERVER: deregistered\n");
delete dbm;
printf("SERVER: deregistered, cleaned up DataStore instance\n");
return HG_SUCCESS;
}
......@@ -26,12 +26,6 @@ typedef enum {
KV_BULK,
} kv_type;
typedef enum {
KV_ALLOWDUPKEY = 0x0001,
KV_IGNOREDUPKEY = 0x0002,
KV_ERASEONGETKEY = 0x0004,
} kv_options;
/* do we need one for server, one for client? */
typedef struct kv_context_s {
margo_instance_id mid;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment