Commit 98fbe554 authored by David Rich's avatar David Rich Committed by Rob Latham
Browse files

Simplify API. Handle arbitrary key/value types. Code still has explicit...

Simplify API. Handle arbitrary key/value types. Code still has explicit handling of bulk transfers. Unless this is a performance win, just let HG handle it all implicitly?
parent d7f75d76
......@@ -2,6 +2,7 @@ ACLOCAL_AMFLAGS="-Im4"
bin_PROGRAMS = test/bench-client
test_bench_client_SOURCES = test/bench-client.cc
test_bench_client_CXXFLAGS = -pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof
test_bench_client_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src
test_bench_client_LDADD = -lkvclient
......@@ -37,18 +38,20 @@ TESTS = test/test-client \
test/bench-client \
test/test-mpi
test_test_client_SOURCES = test/test-client.cc
test_test_client_CXXFLAGS = -pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof
test_test_client_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src
test_test_client_LDADD = -lkvclient
test_test_server_SOURCES = test/test-server.cc
test_test_server_CXXFLAGS = -pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof
test_test_server_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src -I${srcdir}/src/BwTree/src
test_test_server_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src
test_test_server_LDADD = -lkvserver -ldb -ldb_cxx -ldb_stl -lleveldb -lsnappy -lboost_filesystem -lboost_system
test_test_mpi_SOURCES = test/test-mpi.cc
test_test_mpi_CXXFLAGS = -pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof
test_test_mpi_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src -I${srcdir}/src/BwTree/src
test_test_mpi_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src
test_test_mpi_LDADD = -lkvclient -lkvserver -ldb -ldb_cxx -ldb_stl -lleveldb -lsnappy -lboost_filesystem -lboost_system
pkgconfigdir = $(libdir)/pkgconfig
......
......@@ -34,9 +34,9 @@ BwTreeDataStore::~BwTreeDataStore() {
};
void BwTreeDataStore::createDatabase(std::string db_name) {
_tree = new BwTree<kv_key_t, ds_bulk_t, std::less<kv_key_t>,
std::equal_to<kv_key_t>, std::hash<kv_key_t>,
my_equal_to/*ds_bulk_t*/, my_hash/*ds_bulk_t*/>();
_tree = new BwTree<ds_bulk_t, ds_bulk_t,
my_less, my_equal, my_hash,
my_equal, my_hash>();
if (_debug) {
_tree->SetDebugLogging(1);
}
......@@ -47,7 +47,7 @@ void BwTreeDataStore::createDatabase(std::string db_name) {
_tree->AssignGCID(0);
};
bool BwTreeDataStore::put(const kv_key_t &key, ds_bulk_t &data) {
bool BwTreeDataStore::put(ds_bulk_t &key, ds_bulk_t &data) {
std::vector<ds_bulk_t> values;
bool success = false;
......@@ -70,7 +70,7 @@ bool BwTreeDataStore::put(const kv_key_t &key, ds_bulk_t &data) {
return success;
};
bool BwTreeDataStore::get(const kv_key_t &key, ds_bulk_t &data) {
bool BwTreeDataStore::get(ds_bulk_t &key, ds_bulk_t &data) {
std::vector<ds_bulk_t> values;
bool success = false;
......@@ -90,7 +90,7 @@ bool BwTreeDataStore::get(const kv_key_t &key, ds_bulk_t &data) {
return success;
};
bool BwTreeDataStore::get(const kv_key_t &key, std::vector<ds_bulk_t> &data) {
bool BwTreeDataStore::get(ds_bulk_t &key, std::vector<ds_bulk_t> &data) {
std::vector<ds_bulk_t> values;
bool success = false;
......@@ -121,16 +121,14 @@ LevelDBDataStore::LevelDBDataStore(Duplicates duplicates, bool eraseOnGet, bool
_dbm = NULL;
};
std::string LevelDBDataStore::key2string(const kv_key_t &key) {
kv_key_t k = key; // grab a copy to work with
char *c = reinterpret_cast<char *>(&k);
std::string keystr(c, sizeof(k));
return keystr;
std::string LevelDBDataStore::toString(ds_bulk_t &bulk_val) {
std::string str_val(bulk_val.begin(), bulk_val.end());
return str_val;
};
kv_key_t LevelDBDataStore::string2key(std::string &keystr) {
kv_key_t *key = reinterpret_cast<kv_key_t*>(&(keystr[0]));
return *key;
ds_bulk_t LevelDBDataStore::fromString(std::string &str_val) {
ds_bulk_t bulk_val(str_val.begin(), str_val.end());
return bulk_val;
};
LevelDBDataStore::~LevelDBDataStore() {
......@@ -161,12 +159,12 @@ void LevelDBDataStore::createDatabase(std::string db_name) {
// debugging support?
};
bool LevelDBDataStore::put(const kv_key_t &key, ds_bulk_t &data) {
bool LevelDBDataStore::put(ds_bulk_t &key, ds_bulk_t &data) {
leveldb::Status status;
bool success = false;
std::string value;
status = _dbm->Get(leveldb::ReadOptions(), key2string(key), &value);
status = _dbm->Get(leveldb::ReadOptions(), toString(key), &value);
bool duplicate_key = false;
if (status.ok()) {
duplicate_key = true;
......@@ -184,14 +182,13 @@ bool LevelDBDataStore::put(const kv_key_t &key, ds_bulk_t &data) {
success = true;
}
else {
std::cerr << "LevelDBDataStore::put: duplicate key " << key
std::cerr << "LevelDBDataStore::put: duplicate key detected "
<< ", duplicates not supported" << std::endl;
}
}
if (insert_key) {
std::string datastr(data.begin(), data.end());
status = _dbm->Put(leveldb::WriteOptions(), key2string(key), datastr);
status = _dbm->Put(leveldb::WriteOptions(), toString(key), toString(data));
if (status.ok()) {
success = true;
}
......@@ -203,15 +200,15 @@ bool LevelDBDataStore::put(const kv_key_t &key, ds_bulk_t &data) {
return success;
};
bool LevelDBDataStore::get(const kv_key_t &key, ds_bulk_t &data) {
bool LevelDBDataStore::get(ds_bulk_t &key, ds_bulk_t &data) {
leveldb::Status status;
bool success = false;
data.clear();
std::string value;
status = _dbm->Get(leveldb::ReadOptions(), key2string(key), &value);
status = _dbm->Get(leveldb::ReadOptions(), toString(key), &value);
if (status.ok()) {
data = ds_bulk_t(value.begin(), value.end());
data = fromString(value);
success = true;
}
else if (!status.IsNotFound()) {
......@@ -221,7 +218,7 @@ bool LevelDBDataStore::get(const kv_key_t &key, ds_bulk_t &data) {
return success;
};
bool LevelDBDataStore::get(const kv_key_t &key, std::vector<ds_bulk_t> &data) {
bool LevelDBDataStore::get(ds_bulk_t &key, std::vector<ds_bulk_t> &data) {
bool success = false;
data.clear();
......@@ -246,18 +243,6 @@ BerkeleyDBDataStore::BerkeleyDBDataStore(Duplicates duplicates, bool eraseOnGet,
_dbenv = NULL;
};
ds_bulk_t BerkeleyDBDataStore::key2ds_bulk(const kv_key_t &key) {
ds_bulk_t keydata(sizeof(kv_key_t), 0);
uint64_t *p = reinterpret_cast<uint64_t*>(&(keydata[0]));
*p = key;
return keydata;
};
kv_key_t BerkeleyDBDataStore::ds_bulk2key(ds_bulk_t &keydata) {
kv_key_t *key = reinterpret_cast<kv_key_t*>(&(keydata[0]));
return *key;
};
BerkeleyDBDataStore::~BerkeleyDBDataStore() {
delete _dbm;
delete _dbenv;
......@@ -317,11 +302,11 @@ void BerkeleyDBDataStore::createDatabase(std::string db_name) {
// debugging support?
};
bool BerkeleyDBDataStore::put(const kv_key_t &key, ds_bulk_t &data) {
bool BerkeleyDBDataStore::put(ds_bulk_t &key, ds_bulk_t &data) {
int status = 0;
bool success = false;
ds_bulk_t keydata = key2ds_bulk(key);
ds_bulk_t keydata;
Dbt db_key(&(keydata[0]), uint32_t(keydata.size()));
Dbt get_data;
status = _dbm->get(NULL, &db_key, &get_data, 0);
......@@ -338,7 +323,7 @@ bool BerkeleyDBDataStore::put(const kv_key_t &key, ds_bulk_t &data) {
success = true;
}
else {
std::cerr << "BerkeleyDBDataStore::put: duplicate key " << key
std::cerr << "BerkeleyDBDataStore::put: duplicate key detected "
<< ", duplicates not supported" << std::endl;
}
}
......@@ -358,13 +343,13 @@ bool BerkeleyDBDataStore::put(const kv_key_t &key, ds_bulk_t &data) {
return success;
};
bool BerkeleyDBDataStore::get(const kv_key_t &key, ds_bulk_t &data) {
bool BerkeleyDBDataStore::get(ds_bulk_t &key, ds_bulk_t &data) {
int status = 0;
bool success = false;
data.clear();
ds_bulk_t keydata = key2ds_bulk(key);
ds_bulk_t keydata;
Dbt db_key(&(keydata[0]), uint32_t(keydata.size()));
Dbt db_data;
status = _dbm->get(NULL, &db_key, &db_data, 0);
......@@ -381,7 +366,7 @@ bool BerkeleyDBDataStore::get(const kv_key_t &key, ds_bulk_t &data) {
return success;
};
bool BerkeleyDBDataStore::get(const kv_key_t &key, std::vector<ds_bulk_t> &data) {
bool BerkeleyDBDataStore::get(ds_bulk_t &key, std::vector<ds_bulk_t> &data) {
bool success = false;
data.clear();
......
......@@ -29,21 +29,27 @@ struct my_hash {
}
};
struct my_equal_to {
struct my_equal {
bool operator()(const ds_bulk_t &v1, const ds_bulk_t &v2) const {
return (v1 == v2);
}
};
struct my_less {
bool operator()(const ds_bulk_t &v1, const ds_bulk_t &v2) const {
return (v1 < v2);
}
};
class AbstractDataStore {
public:
AbstractDataStore();
AbstractDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
virtual ~AbstractDataStore();
virtual void createDatabase(std::string db_name)=0;
virtual bool put(const kv_key_t &key, ds_bulk_t &data)=0;
virtual bool get(const kv_key_t &key, ds_bulk_t &data)=0;
virtual bool get(const kv_key_t &key, std::vector<ds_bulk_t> &data)=0;
virtual bool put(ds_bulk_t &key, ds_bulk_t &data)=0;
virtual bool get(ds_bulk_t &key, ds_bulk_t &data)=0;
virtual bool get(ds_bulk_t &key, std::vector<ds_bulk_t> &data)=0;
protected:
Duplicates _duplicates;
bool _eraseOnGet;
......@@ -56,13 +62,13 @@ public:
BwTreeDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
virtual ~BwTreeDataStore();
virtual void createDatabase(std::string db_name);
virtual bool put(const kv_key_t &key, ds_bulk_t &data);
virtual bool get(const kv_key_t &key, ds_bulk_t &data);
virtual bool get(const kv_key_t &key, std::vector<ds_bulk_t> &data);
virtual bool put(ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(ds_bulk_t &key, std::vector<ds_bulk_t> &data);
protected:
BwTree<kv_key_t, ds_bulk_t, std::less<kv_key_t>,
std::equal_to<kv_key_t>, std::hash<kv_key_t>,
my_equal_to/*ds_bulk_t*/, my_hash/*ds_bulk_t*/> *_tree = NULL;
BwTree<ds_bulk_t, ds_bulk_t,
my_less, my_equal, my_hash,
my_equal, my_hash> *_tree = NULL;
};
// may want to implement some caching for persistent stores like LevelDB
......@@ -72,14 +78,14 @@ public:
LevelDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
virtual ~LevelDBDataStore();
virtual void createDatabase(std::string db_name);
virtual bool put(const kv_key_t &key, ds_bulk_t &data);
virtual bool get(const kv_key_t &key, ds_bulk_t &data);
virtual bool get(const kv_key_t &key, std::vector<ds_bulk_t> &data);
virtual bool put(ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(ds_bulk_t &key, std::vector<ds_bulk_t> &data);
protected:
leveldb::DB *_dbm = NULL;
private:
std::string key2string(const kv_key_t &key);
kv_key_t string2key(std::string &keystr);
std::string toString(ds_bulk_t &key);
ds_bulk_t fromString(std::string &keystr);
};
// may want to implement some caching for persistent stores like BerkeleyDB
......@@ -89,15 +95,12 @@ public:
BerkeleyDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
virtual ~BerkeleyDBDataStore();
virtual void createDatabase(std::string db_name);
virtual bool put(const kv_key_t &key, ds_bulk_t &data);
virtual bool get(const kv_key_t &key, ds_bulk_t &data);
virtual bool get(const kv_key_t &key, std::vector<ds_bulk_t> &data);
virtual bool put(ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(ds_bulk_t &key, ds_bulk_t &data);
virtual bool get(ds_bulk_t &key, std::vector<ds_bulk_t> &data);
protected:
DbEnv *_dbenv = NULL;
Db *_dbm = NULL;
private:
ds_bulk_t key2ds_bulk(const kv_key_t &key);
kv_key_t ds_bulk2key(ds_bulk_t &keydata);
};
#endif // datastore_h
......@@ -8,10 +8,11 @@
// pass in NULL pointer to get default behavior
kv_context *kv_client_register(const char *addr_str) {
kv_context_t *kv_client_register(const char *addr_str) {
hg_return_t ret;
kv_context * context;
context = malloc(sizeof(kv_context));
kv_context_t * context;
context = (kv_context_t*)malloc(sizeof(kv_context_t));
memset(context, 0, sizeof(kv_context_t));
/* client side: no custom xstreams */
......@@ -50,7 +51,7 @@ kv_context *kv_client_register(const char *addr_str) {
return context;
}
hg_return_t kv_open(kv_context *context, const char *server_addr, const char *db_name) {
hg_return_t kv_open(kv_context_t *context, const char *server_addr, const char *db_name) {
hg_return_t ret = HG_SUCCESS;
hg_handle_t handle;
open_in_t open_in;
......@@ -103,101 +104,144 @@ hg_return_t kv_open(kv_context *context, const char *server_addr, const char *db
/* we gave types in the open call. Will need to maintain in 'context' the
* size. */
hg_return_t kv_put(kv_context *context, void *key, void *value) {
hg_return_t ret;
put_in_t put_in;
put_in.key = *(kv_key_t*)key;
put_in.value = *(kv_value_t*)value;
ret = margo_forward(context->put_handle, &put_in);
hg_return_t kv_put(kv_context_t *context,
void *key, hg_size_t ksize,
void *value, hg_size_t vsize) {
hg_return_t ret;
hg_size_t msize;
msize = ksize + vsize + 2*sizeof(hg_size_t);
/*
* If total payload is large, we'll do our own
* explicit transfer of the value data.
*/
if (msize <= MAX_RPC_MESSAGE_SIZE) {
put_in_t pin;
put_out_t pout;
pin.pi.key = (kv_data_t)key;
pin.pi.ksize = ksize;
pin.pi.value = (kv_data_t)value;
pin.pi.vsize = vsize;
ret = margo_forward(context->put_handle, &pin);
assert(ret == HG_SUCCESS);
ret = margo_get_output(context->put_handle, &put_out);
ret = margo_get_output(context->put_handle, &pout);
assert(ret == HG_SUCCESS);
assert(put_out.ret == HG_SUCCESS);
ret = put_out.ret;
margo_free_output(context->put_handle, &put_out);
return ret;
}
hg_return_t kv_bulk_put(kv_context *context, void *key, void *data, size_t *data_size) {
hg_return_t ret;
bulk_put_in_t bpin;
bulk_put_out_t bpout;
bpin.key = *(kv_key_t*)key;
bpin.size = *(size_t*)data_size;
ret = margo_bulk_create(context->mid, 1, &data, data_size,
HG_BULK_READ_ONLY, &bpin.bulk_handle);
assert(ret == HG_SUCCESS);
ret = margo_forward(context->bulk_put_handle, &bpin);
assert(ret == HG_SUCCESS);
ret = margo_get_output(context->bulk_put_handle, &bpout);
assert(ret == HG_SUCCESS);
ret = bpout.ret; // make sure the server side says all is OK
margo_free_output(context->bulk_put_handle, &bpout);
return ret;
}
hg_return_t kv_get(kv_context *context, void *key, void *value)
{
hg_return_t ret;
get_in_t get_in;
get_out_t get_out;
get_in.key = *(kv_key_t*)key;
ret = margo_forward(context->get_handle, &get_in);
ret = pout.ret;
margo_free_output(context->put_handle, &pout);
}
else {
// use bulk transfer method to move value
bulk_put_in_t bpin;
bulk_put_out_t bpout;
/*
* If (ksize + sizeof(hg_size_t) is too large
* we'll simply rely on HG to handle it rather
* than do multiple bulk transfers. Most likely
* key payload size is << value payload size
*/
bpin.bulk.key = (kv_data_t)key;
bpin.bulk.ksize = ksize;
bpin.bulk.vsize = vsize;
ret = margo_bulk_create(context->mid, 1, &value, &bpin.bulk.vsize,
HG_BULK_READ_ONLY, &bpin.bulk.handle);
assert(ret == HG_SUCCESS);
ret = margo_get_output(context->get_handle, &get_out);
ret = margo_forward(context->bulk_put_handle, &bpin);
assert(ret == HG_SUCCESS);
assert(get_out.ret == HG_SUCCESS);
ret = get_out.ret;
*(kv_value_t*)value = get_out.value;
ret = margo_get_output(context->bulk_put_handle, &bpout);
assert(ret == HG_SUCCESS);
ret = bpout.ret; // make sure the server side says all is OK
margo_free_output(context->get_handle, &get_out);
margo_free_output(context->bulk_put_handle, &bpout);
}
return ret;
}
hg_return_t kv_bulk_get(kv_context *context, void *key, void *data, size_t *data_size)
// vsize is in/out
hg_return_t kv_get(kv_context_t *context,
void *key, hg_size_t ksize,
void *value, hg_size_t *vsize)
{
hg_return_t ret;
bulk_get_in_t bgin;
bulk_get_out_t bgout;
bgin.key = *(kv_key_t*)key;
bgin.size = *(size_t*)data_size;
hg_size_t size;
hg_size_t msize;
size = *(hg_size_t*)vsize;
msize = size + sizeof(hg_size_t);
/*
* If return payload is large, we'll do our own
* explicit transfer of the value data.
*/
if (msize <= MAX_RPC_MESSAGE_SIZE) {
get_in_t gin;
get_out_t gout;
gin.gi.key = (kv_data_t)key;
gin.gi.ksize = ksize;
gin.gi.vsize = size;
ret = margo_forward(context->get_handle, &gin);
assert(ret == HG_SUCCESS);
ret = margo_bulk_create(context->mid, 1, &data, data_size,
HG_BULK_WRITE_ONLY, &bgin.bulk_handle);
assert(ret == HG_SUCCESS);
ret = margo_get_output(context->get_handle, &gout);
assert(ret == HG_SUCCESS);
ret = gout.go.ret;
/*
* Return size of data transferred. Note that
* size may be zero if there was a problem
* with the transfer.
*/
*vsize = (hg_size_t)gout.go.vsize;
if (gout.go.vsize > 0) {
memcpy(value, gout.go.value, gout.go.vsize);
}
margo_free_output(context->get_handle, &gout);
}
else {
bulk_get_in_t bgin;
bulk_get_out_t bgout;
bgin.bulk.key = (kv_data_t)key;
bgin.bulk.ksize = ksize;
bgin.bulk.vsize = size;
ret = margo_bulk_create(context->mid, 1, &value, &bgin.bulk.vsize,
HG_BULK_WRITE_ONLY, &bgin.bulk.handle);
assert(ret == HG_SUCCESS);
ret = margo_forward(context->bulk_get_handle, &bgin);
assert(ret == HG_SUCCESS);
ret = margo_forward(context->bulk_get_handle, &bgin);
assert(ret == HG_SUCCESS);
ret = margo_get_output(context->bulk_get_handle, &bgout);
assert(ret == HG_SUCCESS);
ret = bgout.ret; // make sure the server side says all is OK
ret = margo_get_output(context->bulk_get_handle, &bgout);
assert(ret == HG_SUCCESS);
ret = bgout.ret; // make sure the server side says all is OK
*data_size = (size_t)bgout.size; // report actual size of data transferred to caller
/*
* Return size of data transferred. Note that
* size may be zero if there was a problem
* with the transfer.
*/
*vsize = (hg_size_t)bgout.size;
margo_free_output(context->bulk_get_handle, &bgout);
margo_free_output(context->bulk_get_handle, &bgout);
}
return ret;
}
hg_return_t kv_close(kv_context *context)
hg_return_t kv_close(kv_context_t *context)
{
hg_return_t ret;
hg_handle_t handle;
......@@ -220,32 +264,33 @@ hg_return_t kv_close(kv_context *context)
return HG_SUCCESS;
}
bench_result *kv_benchmark(kv_context *context, int32_t count) {
bench_result_t *kv_benchmark(kv_context *context, int32_t count) {
hg_return_t ret;
hg_handle_t handle;
bench_in_t bench_in;
bench_out_t bench_out;
bench_result_t *result = NULL;
ret = margo_create(context->mid, context->svr_addr,
context->bench_id, &handle);
assert(ret == HG_SUCCESS);
bench_in.count = count;
ret = margo_forward(handle, &bench_in);
assert(ret == HG_SUCCESS);
ret = margo_get_output(handle, &bench_out);
assert(ret == HG_SUCCESS);
result = malloc(sizeof(bench_result));
result->nkeys = bench_out.result.nkeys;
result->insert_time = bench_out.result.insert_time;
result->read_time = bench_out.result.read_time;
result->overhead = bench_out.result.overhead;
margo_free_output(handle, &bench_out);
margo_destroy(handle);
ret = margo_get_output(handle, &bench_out);
assert(ret == HG_SUCCESS);
result = malloc(sizeof(bench_result_t));
result->nkeys = bench_out.result.nkeys;
result->insert_time = bench_out.result.insert_time;
result->read_time = bench_out.result.read_time;
result->overhead = bench_out.result.overhead;
margo_free_output(handle, &bench_out);
margo_destroy(handle);
return result;
}
hg_return_t kv_client_deregister(kv_context *context) {
hg_return_t kv_client_deregister(kv_context_t *context) {
hg_return_t ret;
margo_destroy(context->put_handle);
......@@ -265,7 +310,7 @@ hg_return_t kv_client_deregister(kv_context *context) {
return HG_SUCCESS;
}
hg_return_t kv_client_signal_shutdown(kv_context *context) {
hg_return_t kv_client_signal_shutdown(kv_context_t *context) {
hg_return_t ret;
ret = margo_forward(context->shutdown_handle, NULL);
......
......@@ -7,7 +7,9 @@
#include <abt.h>
#include <assert.h>
#include <random>
//#include <random>
#include <stdlib.h>