Commit d7f75d76 authored by David Rich's avatar David Rich Committed by Rob Latham
Browse files

Did a "spack install leveldb" to pull stock leveldb into sds-repo. Decoupled...

Did a "spack install leveldb" to pull stock leveldb into sds-repo. Decoupled sds-keyval from ParSplice build environment.
parent 948463c4
...@@ -28,6 +28,18 @@ if test "x$PKG_CONFIG" == "x"; then ...@@ -28,6 +28,18 @@ if test "x$PKG_CONFIG" == "x"; then
AC_MSG_ERROR([Could not find pkg-config utility!]) AC_MSG_ERROR([Could not find pkg-config utility!])
fi fi
PKG_CHECK_MODULES([MARGO],[margo],[],
AC_MSG_ERROR([Could not find working margo installation!]) )
LIBS="$MARGO_LIBS $LIBS"
CPPFLAGS="$MARGO_CFLAGS $CPPFLAGS"
CFLAGS="$MARGO_CFLAGS $CFLAGS"
PKG_CHECK_MODULES([LEVELDB],[leveldb],[],
AC_MSG_ERROR([Could not find working leveldb installation!]) )
LIBS="$LEVELDB_LIBS $LIBS"
CPPFLAGS="$LEVELDB_CFLAGS $CPPFLAGS"
CFLAGS="$LEVELDB_CFLAGS $CFLAGS"
# Checks for typedefs, structures, and compiler characteristics. # Checks for typedefs, structures, and compiler characteristics.
AC_C_INLINE AC_C_INLINE
AC_TYPE_INT64_T AC_TYPE_INT64_T
......
...@@ -135,7 +135,7 @@ kv_key_t LevelDBDataStore::string2key(std::string &keystr) { ...@@ -135,7 +135,7 @@ kv_key_t LevelDBDataStore::string2key(std::string &keystr) {
LevelDBDataStore::~LevelDBDataStore() { LevelDBDataStore::~LevelDBDataStore() {
delete _dbm; delete _dbm;
leveldb::Env::Shutdown(); //leveldb::Env::Shutdown(); // Riak version only
}; };
void LevelDBDataStore::createDatabase(std::string db_name) { void LevelDBDataStore::createDatabase(std::string db_name) {
......
...@@ -22,32 +22,34 @@ static hg_return_t open_handler(hg_handle_t handle) ...@@ -22,32 +22,34 @@ static hg_return_t open_handler(hg_handle_t handle)
ret = margo_get_input(handle, &in); ret = margo_get_input(handle, &in);
std::string in_name(in.name); std::string in_name(in.name);
#ifdef KV_DEBUG
std::cout << "SERVER: OPEN " << in_name << std::endl; std::cout << "SERVER: OPEN " << in_name << std::endl;
#endif
if (!datastore) { if (!datastore) {
//datastore = new BwTreeDataStore(); // testing BwTree //datastore = new BwTreeDataStore(); // testing BwTree
//datastore = new LevelDBDataStore(); // testing LevelDB datastore = new LevelDBDataStore(); // testing LevelDB
datastore = new BerkeleyDBDataStore(); // testing BerkeleyDB //datastore = new BerkeleyDBDataStore(); // testing BerkeleyDB
db_name = in_name; db_name = in_name;
datastore->createDatabase(db_name); datastore->createDatabase(db_name);
#ifdef KV_DEBUG
std::cout << "SERVER OPEN: DataStore initialized and ready for " << db_name << std::endl; std::cout << "SERVER OPEN: DataStore initialized and ready for " << db_name << std::endl;
#endif
out.ret = HG_SUCCESS; out.ret = HG_SUCCESS;
} }
else { else {
if (db_name == in_name) { if (db_name == in_name) {
#ifdef KV_DEBUG
std::cout << "SERVER OPEN: DataStore initialized and ready for " << db_name << std::endl; std::cout << "SERVER OPEN: DataStore initialized and ready for " << db_name << std::endl;
#endif
out.ret = HG_SUCCESS; out.ret = HG_SUCCESS;
} }
else { else {
if (db_name == in_name) { #ifdef KV_DEBUG
std::cout << "SERVER OPEN: DataStore initialized and ready for " << db_name << std::endl; std::cout << "SERVER OPEN failed: currently managing " << db_name
out.ret = HG_SUCCESS;
}
else {
std::cout << "SERVER OPEN failed: currently managing " << db_name
<< " and unable to process OPEN request for " << in_name << std::endl; << " and unable to process OPEN request for " << in_name << std::endl;
out.ret = HG_OTHER_ERROR; #endif
} out.ret = HG_OTHER_ERROR;
} }
ret = margo_respond(handle, &out); ret = margo_respond(handle, &out);
...@@ -90,13 +92,17 @@ static hg_return_t put_handler(hg_handle_t handle) ...@@ -90,13 +92,17 @@ static hg_return_t put_handler(hg_handle_t handle)
memcpy(data.data(), &in.value, sizeof(in.value)); memcpy(data.data(), &in.value, sizeof(in.value));
if (datastore->put(in.key, data)) { if (datastore->put(in.key, data)) {
#ifdef KV_DEBUG
std::cout << "SERVER: PUT succeeded for key = " << in.key std::cout << "SERVER: PUT succeeded for key = " << in.key
<< " value = " << in.value << std::endl; << " value = " << in.value << std::endl;
#endif
out.ret = HG_SUCCESS; out.ret = HG_SUCCESS;
} }
else { else {
#ifdef KV_DEBUG
std::cout << "SERVER: PUT failed for key = " << in.key std::cout << "SERVER: PUT failed for key = " << in.key
<< " value = " << in.value << std::endl; << " value = " << in.value << std::endl;
#endif
out.ret = HG_OTHER_ERROR; out.ret = HG_OTHER_ERROR;
} }
...@@ -137,14 +143,18 @@ static hg_return_t bulk_put_handler(hg_handle_t handle) ...@@ -137,14 +143,18 @@ static hg_return_t bulk_put_handler(hg_handle_t handle)
assert(ret == HG_SUCCESS); assert(ret == HG_SUCCESS);
if (datastore->put(bpin.key, data)) { if (datastore->put(bpin.key, data)) {
#ifdef KV_DEBUG
std::cout << "SERVER: BULK PUT succeeded for key = " << bpin.key std::cout << "SERVER: BULK PUT succeeded for key = " << bpin.key
<< " size = " << bpin.size << std::endl; << " size = " << bpin.size << std::endl;
#endif
bpout.ret = HG_SUCCESS; bpout.ret = HG_SUCCESS;
} }
else { else {
// e.g. put returns false if the key-value pair already // e.g. put returns false if the key-value pair already
// exists in the DB and duplicates are not allowed or ignored // exists in the DB and duplicates are not allowed or ignored
#ifdef KV_DEBUG
std::cout << "SERVER: BULK PUT failed for key = " << bpin.key << std::endl; std::cout << "SERVER: BULK PUT failed for key = " << bpin.key << std::endl;
#endif
bpout.ret = HG_OTHER_ERROR; bpout.ret = HG_OTHER_ERROR;
} }
...@@ -174,20 +184,26 @@ static hg_return_t get_handler(hg_handle_t handle) ...@@ -174,20 +184,26 @@ static hg_return_t get_handler(hg_handle_t handle)
kv_value_t value; kv_value_t value;
if (data.size() <= sizeof(value)) { if (data.size() <= sizeof(value)) {
memcpy(&value, data.data(), data.size()); memcpy(&value, data.data(), data.size());
#ifdef KV_DEBUG
std::cout << "SERVER: GET succeeded for key = " << in.key std::cout << "SERVER: GET succeeded for key = " << in.key
<< " value = " << value << std::endl; << " value = " << value << std::endl;
#endif
out.value = value; out.value = value;
out.ret = HG_SUCCESS; out.ret = HG_SUCCESS;
} }
else { else {
#ifdef KV_DEBUG
std::cout << "SERVER: GET failed for key = " << in.key std::cout << "SERVER: GET failed for key = " << in.key
<< " value returned too large for kv_value_t" << std::endl; << " value returned too large for kv_value_t" << std::endl;
#endif
out.ret = HG_SIZE_ERROR; // caller should be checking return value out.ret = HG_SIZE_ERROR; // caller should be checking return value
} }
} }
else { else {
// get on key did not succeed // get on key did not succeed
#ifdef KV_DEBUG
std::cout << "SERVER: GET failed for key = " << in.key << std::endl; std::cout << "SERVER: GET failed for key = " << in.key << std::endl;
#endif
out.ret = HG_OTHER_ERROR; // caller should be checking return value out.ret = HG_OTHER_ERROR; // caller should be checking return value
} }
...@@ -215,7 +231,9 @@ static hg_return_t bulk_get_handler(hg_handle_t handle) ...@@ -215,7 +231,9 @@ static hg_return_t bulk_get_handler(hg_handle_t handle)
ds_bulk_t data; ds_bulk_t data;
if (datastore->get(bgin.key, data)) { if (datastore->get(bgin.key, data)) {
#ifdef KV_DEBUG
std::cout << "SERVER: BULK GET succeeded for key = " << bgin.key << std::endl; std::cout << "SERVER: BULK GET succeeded for key = " << bgin.key << std::endl;
#endif
// will the transfer fit on the client side? // will the transfer fit on the client side?
bgout.size = data.size(); bgout.size = data.size();
if (bgout.size <= bgin.size) { if (bgout.size <= bgin.size) {
...@@ -234,14 +252,18 @@ static hg_return_t bulk_get_handler(hg_handle_t handle) ...@@ -234,14 +252,18 @@ static hg_return_t bulk_get_handler(hg_handle_t handle)
bgout.ret = HG_SUCCESS; bgout.ret = HG_SUCCESS;
} }
else { else {
#ifdef KV_DEBUG
std::cout << "SERVER: BULK GET failed for key = " << bgin.key std::cout << "SERVER: BULK GET failed for key = " << bgin.key
<< " value returned too large for kv_value_t" << std::endl; << " value returned too large for kv_value_t" << std::endl;
#endif
bgout.ret = HG_SIZE_ERROR; bgout.ret = HG_SIZE_ERROR;
} }
} }
else { else {
// get on key did not find a value (return 0 for size) // get on key did not find a value (return 0 for size)
#ifdef KV_DEBUG
std::cout << "SERVER: BULK GET failed for key = " << bgin.key << std::endl; std::cout << "SERVER: BULK GET failed for key = " << bgin.key << std::endl;
#endif
bgout.size = 0; // assuming caller will check return code bgout.size = 0; // assuming caller will check return code
bgout.ret = HG_OTHER_ERROR; bgout.ret = HG_OTHER_ERROR;
} }
...@@ -316,7 +338,7 @@ static void RandomInsertSpeedTest(int32_t key_num, bench_result *results) ...@@ -316,7 +338,7 @@ static void RandomInsertSpeedTest(int32_t key_num, bench_result *results)
std::chrono::duration<double> elapsed_seconds = end - start; std::chrono::duration<double> elapsed_seconds = end - start;
results->nkeys = (size_t)key_num; results->nkeys = (hg_size_t)key_num;
results->insert_time = elapsed_seconds.count(); results->insert_time = elapsed_seconds.count();
std::cout << "BwTree: at least " << (key_num * 2.0 / (1024 * 1024)) / elapsed_seconds.count() std::cout << "BwTree: at least " << (key_num * 2.0 / (1024 * 1024)) / elapsed_seconds.count()
......
#include <stdint.h> #include <stdint.h>
#include <assert.h>
#include <mercury.h> #include <mercury.h>
#include <mercury_macros.h> #include <mercury_macros.h>
...@@ -11,6 +12,9 @@ ...@@ -11,6 +12,9 @@
#ifndef sds_keyval_h #ifndef sds_keyval_h
#define sds_keyval_h #define sds_keyval_h
// uncomment to re-enable print statements
//#define KV_DEBUG
#if defined(__cplusplus) #if defined(__cplusplus)
extern "C" { extern "C" {
#endif #endif
...@@ -82,16 +86,32 @@ DECLARE_MARGO_RPC_HANDLER(close_handler) ...@@ -82,16 +86,32 @@ DECLARE_MARGO_RPC_HANDLER(close_handler)
MERCURY_GEN_PROC(bench_in_t, ((int32_t)(count)) ) MERCURY_GEN_PROC(bench_in_t, ((int32_t)(count)) )
typedef struct { typedef struct {
size_t nkeys; hg_size_t nkeys;
double insert_time; double insert_time;
double read_time; double read_time;
double overhead; double overhead;
} bench_result; } bench_result;
static inline hg_return_t hg_proc_double(hg_proc_t proc, void *data)
{
return hg_proc_memcpy(proc, data, sizeof(double));
}
static inline hg_return_t hg_proc_bench_result(hg_proc_t proc, void *data) static inline hg_return_t hg_proc_bench_result(hg_proc_t proc, void *data)
{ {
/* TODO: needs a portable encoding */ hg_return_t ret;
return(hg_proc_memcpy(proc, data, sizeof(bench_result))); bench_result *in = (bench_result*)data;
ret = hg_proc_hg_size_t(proc, &in->nkeys);
assert(ret == HG_SUCCESS);
ret = hg_proc_double(proc, &in->insert_time);
assert(ret == HG_SUCCESS);
ret = hg_proc_double(proc, &in->read_time);
assert(ret == HG_SUCCESS);
ret = hg_proc_double(proc, &in->overhead);
assert(ret == HG_SUCCESS);
return HG_SUCCESS;
} }
DECLARE_MARGO_RPC_HANDLER(bench_handler) DECLARE_MARGO_RPC_HANDLER(bench_handler)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment