...
 
Commits (32)
......@@ -95,7 +95,7 @@ include_HEADERS = include/sdskv-client.h \
include/sdskv-server.hpp \
include/sdskv-common.hpp
noinst_HEADERS = src/bulk.h \
noinst_HEADERS = src/data_slice.h \
src/sdskv-rpc-types.h \
src/datastore/datastore.h \
src/datastore/map_datastore.h \
......
......@@ -2,7 +2,7 @@
# Process this file with autoconf to produce a configure script.
AC_PREREQ([2.69])
AC_INIT([sds-keyval], [0.1.4], [robl@mcs.anl.gov])
AC_INIT([sds-keyval], [0.1.6], [robl@mcs.anl.gov])
AM_INIT_AUTOMAKE([1.13.4 -Wall -Werror foreign subdir-objects silent-rules])
AM_SILENT_RULES([yes])
AC_CONFIG_MACRO_DIR([m4])
......
......@@ -31,6 +31,36 @@ extern int32_t sdskv_remi_errno;
*/
int sdskv_client_init(margo_instance_id mid, sdskv_client_t* client);
/**
* @brief Configures the poolset of a client so that it can reuse bulk handles.
*
* @param client SDSKV client
* @param npools Number of pools to create
* @param nbufs Number of buffers in each pool
* @param first_size Size of the buffers in the first pool
* @param size_multiple Size factor from a pool to the next one
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
int sdskv_client_configure_bulk_poolset(
sdskv_client_t client,
hg_size_t npools,
hg_size_t nbufs,
hg_size_t first_size,
hg_size_t size_multiple);
/**
* @brief Get the usage of the poolset by the client.
*
* @param client SDSKV client.
* @param usage Poolset usage.
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
int sdskv_client_get_poolset_usage(
sdskv_client_t client,
sdskv_poolset_usage_t* usage);
/**
* @brief Finalizes a SDSKV client.
*
......
This diff is collapsed.
......@@ -18,6 +18,12 @@ typedef enum sdskv_db_type_t
typedef uint64_t sdskv_database_id_t;
#define SDSKV_DATABASE_ID_INVALID 0
typedef struct sdskv_poolset_usage_t {
uint64_t bulks_in_use; // number of bulk handles currently out of the poolset
uint64_t cache_hits; // number of times a bulk handle was succesfully obtained from the poolset
uint64_t cache_miss; // number of times the provider/client had to fallback to creating a bulk
} sdskv_poolset_usage_t;
#define SDSKV_KEEP_ORIGINAL 0 /* for migration operations, keep original */
#define SDSKV_REMOVE_ORIGINAL 1 /* for migration operations, remove the origin after migrating */
......@@ -40,7 +46,8 @@ typedef uint64_t sdskv_database_id_t;
#define SDSKV_ERR_REMI -15 /* REMI-related error */
#define SDSKV_ERR_ARGOBOTS -16 /* Argobots related error */
#define SDSKV_ERR_KEYEXISTS -17 /* Put operation would override data */
#define SDSKV_ERR_END -18 /* End of range for valid error codes */
#define SDSKV_ERR_POOLSET -18 /* Poolset cannot be changed because it is currently used */
#define SDSKV_ERR_END -19 /* End of range for valid error codes */
const char* const sdskv_error_messages[] = {
"",
......@@ -60,7 +67,8 @@ const char* const sdskv_error_messages[] = {
"Invalid comparison function",
"REMI error",
"Argobots error",
"Key exists"
"Key exists",
"Poolset related error"
};
#if defined(__cplusplus)
......
......@@ -52,6 +52,35 @@ int sdskv_provider_register(
ABT_pool pool,
sdskv_provider_t* provider);
/**
* @brief Configures the poolset of a provider so that it can reuse bulk handles.
*
* @param provider SDSKV provider
* @param npools Number of pools to create
* @param nbufs Number of buffers in each pool
* @param first_size Size of the buffers in the first pool
* @param size_multiple Size factor from a pool to the next one
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
int sdskv_provider_configure_bulk_poolset(
sdskv_provider_t provider,
hg_size_t npools,
hg_size_t nbufs,
hg_size_t first_size,
hg_size_t size_multiple);
/**
* @brief Get statistics on the poolset usage.
*
* @param[in] provider SDSKV provider.
* @param[out] usage Pointer to usage structure.
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
int sdskv_provider_get_poolset_usage(
sdskv_provider_t provider,
sdskv_poolset_usage_t* usage);
/**
* @brief Deregister the provider's RPCs and destroys the provider.
*
......
......@@ -87,6 +87,35 @@ class provider {
sdskv_provider_destroy(m_provider);
}
/**
* @brief Configures a poolset to be used for all transfers
* from this provider.
*
* @param npools Number of pools
* @param nbufs Number of buffers per pool
* @param first_size Size of buffers in the first pool
* @param size_multiple Buffer size multiple from a pool to the next one
*/
void configure_bulk_poolset(
size_t npools,
size_t nbufs,
size_t first_size,
size_t size_multiple) {
int ret = sdskv_provider_configure_bulk_poolset(m_provider,
npools, nbufs, first_size, size_multiple);
_CHECK_RET(ret);
}
/**
* @brief Equivalent of sdskv_provider_get_poolset_usage.
*
* @param usage Pointer to an sdskv_poolset_usage_t structure.
*/
void get_poolset_usage(sdskv_poolset_usage_t* usage) const {
int ret = sdskv_provider_get_poolset_usage(m_provider, usage);
_CHECK_RET(ret);
}
/**
* @brief Add a comparison function.
*
......
prefix=@prefix@
exec_prefix=@exec_prefix@
libdir=@libdir@
includedir=@includedir@
Name: kv-group
Description: services-based keyval, group logic
Version: 0.1
URL: https://xgitlab.cels.anl.gov/sds/sds-keyval
Requires: margo
Libs: -L${libdir} @GROUP_LIBS@
Cflags: -I${includedir}
......@@ -5,7 +5,7 @@ includedir=@includedir@
Name: sdskv-client
Description: services-based keyval client
Version: 0.1.4
Version: 0.1.6
URL: https://xgitlab.cels.anl.gov/sds/sds-keyval
Requires: margo
Libs: -L${libdir} -lsdskv-client
......
......@@ -5,7 +5,7 @@ includedir=@includedir@
Name: sdskv-server
Description: services-based keyval server
Version: 0.1.4
Version: 0.1.6
URL: https://xgitlab.cels.anl.gov/sds/sds-keyval
Requires: margo abt-io @SERVER_DEPS_PKG@
Libs: -L${libdir} -lsdskv-server @SERVER_LIBS_EXT@
......
{
"protocol" : "tcp",
"seed" : 0,
"client" : {
"poolset" : {
"pools" : 8,
"buffers" : 8,
"min_size" : 1024,
"size_factor" : 2
}
},
"server" : {
"use-progress-thread" : false,
"rpc-thread-count" : 0,
......@@ -8,6 +16,12 @@
"type" : "map",
"name" : "benchmark-db",
"path" : "/dev/shm"
},
"poolset" : {
"pools" : 8,
"buffers" : 8,
"min_size" : 1024,
"size_factor" : 2
}
},
"benchmarks" : [
......
// Copyright (c) 2017, Los Alamos National Security, LLC.
// All rights reserved.
#ifndef bulk_h
#define bulk_h
#include <stddef.h>
#include "kv-config.h"
//#include <boost/functional/hash.hpp>
#include <vector>
#include <string>
// implementation is std::vector<char> specific
// typedef is for convenience
typedef std::vector<char> ds_bulk_t;
struct ds_bulk_hash {
size_t operator()(const ds_bulk_t &v) const {
size_t hash = 0;
hash = std::hash<std::string>() ( std::string( v.begin(), v.end() ) );
//boost::hash_range(hash, v.begin(), v.end());
return hash;
}
};
struct ds_bulk_equal {
bool operator()(const ds_bulk_t &v1, const ds_bulk_t &v2) const {
return (v1 == v2);
}
};
struct ds_bulk_less {
bool operator()(const ds_bulk_t &v1, const ds_bulk_t &v2) const {
return (v1 < v2);
}
};
#endif // bulk_h
#ifndef _data_slice_h
#define _data_slice_h
#include <stddef.h>
#include "kv-config.h"
#include "fnv1a.h"
#include <vector>
#include <string>
class data_slice {
bool _owns_data = false;
size_t _size = 0;
char* _data = nullptr;
public:
data_slice() = default;
data_slice(const char* begin, const char* end)
: _size(std::distance(begin, end))
, _data(const_cast<char*>(begin)) {}
data_slice(const char* data, size_t size, bool owner=false)
: _size(size)
, _data(const_cast<char*>(data))
, _owns_data(owner) {}
data_slice(size_t size)
: _owns_data(true)
, _size(size)
, _data((char*)malloc(size)) {}
~data_slice() {
if(_owns_data) free(_data);
}
data_slice(const data_slice& other)
: _owns_data(true)
, _size(other._size) {
_data = (char*)malloc(_size);
memcpy(_data, other._data, _size);
}
data_slice(data_slice&& other)
: _owns_data(other._owns_data)
, _size(other._size)
, _data(other._data) {
other._owns_data = false;
other._size = 0;
other._data = nullptr;
}
data_slice& operator=(const data_slice& other) {
if(&other == this) return *this;
if(_owns_data) free(_data);
_owns_data = true;
_size = other._size;
_data = (char*)malloc(_size);
memcpy(_data, other._data, _size);
return *this;
}
data_slice& operator=(data_slice&& other) {
if(&other == this) return *this;
if(_owns_data) free(_data);
_owns_data = other._owns_data;
_size = other._size;
_data = other._data;
other._owns_data = false;
other._size = 0;
other._data = nullptr;
return *this;
}
const char* data() const {
return _data;
}
char* data() {
return _data;
}
size_t size() const {
return _size;
}
bool operator==(const data_slice& other) const {
if(_size != other._size) return false;
if(_data == other._data) return true;
return 0 == memcmp(_data, other._data, _size);
}
bool operator!=(const data_slice& other) const {
return !(*this == other);
}
bool operator<(const data_slice& other) const {
if(_size == other._size) {
if(_data == other._data) return false;
int r = memcmp(_data, other._data, _size);
return r < 0;
} else {
auto s = std::min(_size, other._size);
if(_data == other._data)
return _size < other._size;
int r = memcmp(_data, other._data, _size);
if(r < 0) return true;
if(r > 0) return false;
return _size < other._size;
}
}
void resize(size_t newsize) {
if(_owns_data) {
if(newsize > _size) {
_data = (char*)realloc(_data, newsize);
}
_size = newsize;
} else {
if(newsize > _size) {
char* tmp = (char*)malloc(newsize);
memcpy(tmp, _data, _size);
_data = tmp;
_owns_data = true;
}
_size = newsize;
}
}
};
struct data_slice_hash {
size_t operator()(const data_slice &v) const {
auto hashfn = fnv1a_t<8 * sizeof(std::size_t)> {};
hashfn.update(v.data(), v.size());
return hashfn.digest();
}
};
struct data_slice_equal {
bool operator()(const data_slice &v1, const data_slice &v2) const {
return (v1 == v2);
}
};
struct data_slice_less {
bool operator()(const data_slice &v1, const data_slice &v2) const {
return (v1 < v2);
}
};
#endif // bulk_h
This diff is collapsed.
......@@ -35,10 +35,17 @@ class BerkeleyDBDataStore : public AbstractDataStore {
const hg_size_t* ksizes,
const void* const* values,
const hg_size_t* vsizes) override;
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data) override;
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data) override;
virtual int get(const void* kdata, hg_size_t ksize, void* vdata, hg_size_t *vsize) override {
data_slice key((const char*)kdata, ksize);
data_slice val((const char*)vdata, *vsize);
int ret = get(key, val);
if(ret == SDSKV_SUCCESS)
*vsize = val.size();
return ret;
}
virtual int get(const data_slice &key, data_slice &data) override;
virtual bool exists(const void* key, hg_size_t ksize) const override;
virtual bool erase(const ds_bulk_t &key) override;
virtual bool erase(const data_slice &key) override;
virtual void set_in_memory(bool enable) override; // enable/disable in-memory mode
virtual void set_comparison_function(const std::string& name, comparator_fn less) override;
virtual void set_no_overwrite() override {
......@@ -49,14 +56,27 @@ class BerkeleyDBDataStore : public AbstractDataStore {
virtual remi_fileset_t create_and_populate_fileset() const override;
#endif
protected:
virtual std::vector<ds_bulk_t> vlist_keys(
const ds_bulk_t &start, hg_size_t count, const ds_bulk_t &prefix) const override;
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyvals(
const ds_bulk_t &start_key, hg_size_t count, const ds_bulk_t &) const override;
virtual std::vector<ds_bulk_t> vlist_key_range(
const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, hg_size_t max_keys) const override;
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyval_range(
const ds_bulk_t &lower_bound, const ds_bulk_t& upper_bound, hg_size_t max_keys) const override;
virtual void vlist_keys(
uint64_t max_count,
const data_slice &start,
const data_slice &prefix,
std::vector<data_slice>& result) const override;
virtual void vlist_keyvals(
uint64_t max_count,
const data_slice &start_key,
const data_slice& prefix,
std::vector<std::pair<data_slice,data_slice>>& result) const override;
virtual void vlist_key_range(
const data_slice &lower_bound, const data_slice &upper_bound,
std::vector<data_slice>& result) const override;
virtual void vlist_keyval_range(
const data_slice &lower_bound, const data_slice& upper_bound,
std::vector<std::pair<data_slice,data_slice>>& result) const override;
DbEnv *_dbenv = nullptr;
Db *_dbm = nullptr;
DbWrapper* _wrapper = nullptr;
......
......@@ -4,11 +4,12 @@
#define datastore_h
#include "kv-config.h"
#include "bulk.h"
#include "data_slice.h"
#include <margo.h>
#ifdef USE_REMI
#include "remi/remi-common.h"
#endif
#include "sdskv-common.h"
#include <vector>
......@@ -22,10 +23,7 @@ class AbstractDataStore {
virtual ~AbstractDataStore();
virtual bool openDatabase(const std::string& db_name, const std::string& path)=0;
virtual int put(const void* kdata, hg_size_t ksize, const void* vdata, hg_size_t vsize)=0;
virtual int put(const ds_bulk_t &key, const ds_bulk_t &data) {
return put(key.data(), key.size(), data.data(), data.size());
}
virtual int put(ds_bulk_t&& key, ds_bulk_t&& data) {
virtual int put(const data_slice &key, const data_slice &data) {
return put(key.data(), key.size(), data.data(), data.size());
}
virtual int put_multi(hg_size_t num_items,
......@@ -41,13 +39,40 @@ class AbstractDataStore {
}
return ret;
}
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data)=0;
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data)=0;
virtual int get(const data_slice &key, data_slice &data)=0;
virtual int get(const void* kdata, hg_size_t ksize, void* vdata, hg_size_t *vsize) {
data_slice key((const char*)kdata, ksize);
data_slice val((const char*)vdata, *vsize);
int ret = get(key, val);
if(ret == SDSKV_SUCCESS)
*vsize = val.size();
return ret;
}
virtual int get_multi(hg_size_t num_items,
const void* const* keys,
const hg_size_t* ksizes,
void** values,
hg_size_t* vsizes) {
int ret = 0;
for(hg_size_t i=0; i < num_items; i++) {
int r = get(keys[i], ksizes[i], values[i], &vsizes[i]);
ret = ret == 0 ? r : 0;
}
return ret;
}
virtual bool length(const data_slice &key, size_t& result) {
data_slice value;
if(get(key, value) == SDSKV_SUCCESS) {
result = value.size();
return true;
}
return false;
}
virtual bool exists(const void* key, hg_size_t ksize) const = 0;
virtual bool exists(const ds_bulk_t &key) const {
virtual bool exists(const data_slice &key) const {
return exists(key.data(), key.size());
}
virtual bool erase(const ds_bulk_t &key) = 0;
virtual bool erase(const data_slice &key) = 0;
virtual void set_in_memory(bool enable)=0; // enable/disable in-memory mode (where supported)
virtual void set_comparison_function(const std::string& name, comparator_fn less)=0;
virtual void set_no_overwrite()=0;
......@@ -69,24 +94,32 @@ class AbstractDataStore {
return _comp_fun_name;
}
std::vector<ds_bulk_t> list_keys(
const ds_bulk_t &start_key, hg_size_t count, const ds_bulk_t& prefix=ds_bulk_t()) const {
return vlist_keys(start_key, count, prefix);
void list_keys(uint64_t max_count,
const data_slice &start_key,
std::vector<data_slice>& result,
const data_slice& prefix=data_slice()) const {
vlist_keys(max_count, start_key, prefix, result);
}
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> list_keyvals(
const ds_bulk_t &start_key, hg_size_t count, const ds_bulk_t& prefix=ds_bulk_t()) const {
return vlist_keyvals(start_key, count, prefix);
void list_keyvals(uint64_t max_count,
const data_slice &start_key,
std::vector<std::pair<data_slice,data_slice>>& result,
const data_slice& prefix=data_slice()) const {
vlist_keyvals(max_count, start_key, prefix, result);
}
std::vector<ds_bulk_t> list_key_range(
const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, hg_size_t max_keys=0) const {
return vlist_key_range(lower_bound, upper_bound, max_keys);
void list_key_range(
const data_slice &lower_bound,
const data_slice &upper_bound,
std::vector<data_slice>& result) const {
vlist_key_range(lower_bound, upper_bound, result);
}
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> list_keyval_range(
const ds_bulk_t &lower_bound, const ds_bulk_t& upper_bound, hg_size_t max_keys=0) const {
return vlist_keyval_range(lower_bound, upper_bound, max_keys);
void list_keyval_range(
const data_slice &lower_bound,
const data_slice& upper_bound,
std::vector<std::pair<data_slice,data_slice>>& result) const {
vlist_keyval_range(lower_bound, upper_bound, result);
}
protected:
......@@ -98,14 +131,27 @@ class AbstractDataStore {
bool _debug;
bool _in_memory;
virtual std::vector<ds_bulk_t> vlist_keys(
const ds_bulk_t &start_key, hg_size_t count, const ds_bulk_t& prefix) const = 0;
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyvals(
const ds_bulk_t &start_key, hg_size_t count, const ds_bulk_t& prefix) const = 0;
virtual std::vector<ds_bulk_t> vlist_key_range(
const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, hg_size_t max_keys) const = 0;
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyval_range(
const ds_bulk_t &lower_bound, const ds_bulk_t& upper_bound, hg_size_t max_keys) const = 0;
virtual void vlist_keys(
uint64_t max_count,
const data_slice &start_key,
const data_slice& prefix,
std::vector<data_slice>& result) const = 0;
virtual void vlist_keyvals(
uint64_t max_count,
const data_slice &start_key,
const data_slice& prefix,
std::vector<std::pair<data_slice,data_slice>>& result) const = 0;
virtual void vlist_key_range(
const data_slice &lower_bound,
const data_slice &upper_bound,
std::vector<data_slice>& result) const = 0;
virtual void vlist_keyval_range(
const data_slice &lower_bound,
const data_slice& upper_bound,
std::vector<std::pair<data_slice,data_slice>>& result) const = 0;
};
#endif // datastore_h
......@@ -20,8 +20,8 @@ LevelDBDataStore::LevelDBDataStore(bool eraseOnGet, bool debug) :
_dbm = NULL;
};
std::string LevelDBDataStore::toString(const ds_bulk_t &bulk_val) {
std::string str_val(bulk_val.begin(), bulk_val.end());
std::string LevelDBDataStore::toString(const data_slice &bulk_val) {
std::string str_val(bulk_val.data(), bulk_val.size());
return str_val;
};
......@@ -30,9 +30,10 @@ std::string LevelDBDataStore::toString(const char* buf, hg_size_t buf_size) {
return str_val;
};
ds_bulk_t LevelDBDataStore::fromString(const std::string &str_val) {
ds_bulk_t bulk_val(str_val.begin(), str_val.end());
return bulk_val;
data_slice LevelDBDataStore::fromString(const std::string &str_val) {
data_slice bulk_val(str_val.data(), str_val.size());
data_slice copy = bulk_val; // force a copy to take ownership
return copy;
};
LevelDBDataStore::~LevelDBDataStore() {
......@@ -89,7 +90,7 @@ int LevelDBDataStore::put(const void* key, hg_size_t ksize, const void* value, h
return SDSKV_ERR_PUT;
};
bool LevelDBDataStore::erase(const ds_bulk_t &key) {
bool LevelDBDataStore::erase(const data_slice &key) {
leveldb::Status status;
status = _dbm->Delete(leveldb::WriteOptions(), toString(key));
return status.ok();
......@@ -102,47 +103,43 @@ bool LevelDBDataStore::exists(const void* key, hg_size_t ksize) const {
return status.ok();
}
bool LevelDBDataStore::get(const ds_bulk_t &key, ds_bulk_t &data) {
int LevelDBDataStore::get(const data_slice &key, data_slice &data) {
leveldb::Status status;
bool success = false;
int ret = SDSKV_SUCCESS;
//high_resolution_clock::time_point start = high_resolution_clock::now();
data.clear();
std::string value;
status = _dbm->Get(leveldb::ReadOptions(), toString(key), &value);
leveldb::Slice key_slice(key.data(), key.size());
status = _dbm->Get(leveldb::ReadOptions(), key_slice, &value);
if (status.ok()) {
data = fromString(value);
success = true;
}
else if (!status.IsNotFound()) {
std::cerr << "LevelDBDataStore::get: LevelDB error on Get = " << status.ToString() << std::endl;
if(data.size() == 0) {
data = fromString(value);
} else {
if(data.size() < value.size()) {
ret = SDSKV_ERR_SIZE;
} else {
memcpy(data.data(), value.data(), value.size());
data.resize(value.size());
}
}
}
// uint64_t elapsed = duration_cast<microseconds>(high_resolution_clock::now()-start).count();
// std::cout << "LevelDBDataStore::get time = " << elapsed << " microseconds" << std::endl;
return success;
};
bool LevelDBDataStore::get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data) {
bool success = false;
data.clear();
ds_bulk_t value;
if (get(key, value)) {
data.push_back(value);
success = true;
else if (status.IsNotFound()) {
ret = SDSKV_ERR_UNKNOWN_KEY;
data.resize(0);
}
return success;
return ret;
};
void LevelDBDataStore::set_in_memory(bool enable)
{};
std::vector<ds_bulk_t> LevelDBDataStore::vlist_keys(
const ds_bulk_t &start, hg_size_t count, const ds_bulk_t &prefix) const
void LevelDBDataStore::vlist_keys(
uint64_t max_count,
const data_slice &start,
const data_slice &prefix, std::vector<data_slice>& result) const
{
std::vector<ds_bulk_t> keys;
bool usermem = result.size() != 0;
auto count = usermem ? result.size() : max_count;
leveldb::Iterator *it = _dbm->NewIterator(leveldb::ReadOptions());
leveldb::Slice start_slice(start.data(), start.size());
......@@ -161,24 +158,42 @@ std::vector<ds_bulk_t> LevelDBDataStore::vlist_keys(
it->SeekToFirst();
}
/* note: iterator initialized above, not in for loop */
for (; it->Valid() && keys.size() < count; it->Next() ) {
ds_bulk_t k(it->key().size());
memcpy(k.data(), it->key().data(), it->key().size() );
unsigned i = 0;
bool size_error = false;
for (; it->Valid() && i < count; it->Next() ) {
const auto& k = it->key();
c = std::memcmp(prefix.data(), k.data(), prefix.size());
if(c == 0) {
keys.push_back(std::move(k));
if(c > 0) {
continue;
} else if(c < 0) {
break;
}
if(usermem) {
if(k.size() > result[i].size() || size_error) {
size_error = true;
} else {
std::memcpy(result[i].data(), k.data(), k.size());
std::cerr << "Key " << std::string(k.data(), k.size()) << std::endl;
}
result[i].resize(k.size());
} else {
result.push_back(data_slice(k.data(), k.size()));
}
++i;
}
delete it;
return keys;
result.resize(i);
if(size_error) throw SDSKV_ERR_SIZE;
}
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> LevelDBDataStore::vlist_keyvals(
const ds_bulk_t &start, hg_size_t count, const ds_bulk_t &prefix) const
void LevelDBDataStore::vlist_keyvals(
uint64_t max_count,
const data_slice &start,
const data_slice &prefix,
std::vector<std::pair<data_slice,data_slice>>& result) const
{
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> result;
bool usermem = result.size() != 0;
auto count = usermem ? result.size() : max_count;
leveldb::Iterator *it = _dbm->NewIterator(leveldb::ReadOptions());
leveldb::Slice start_slice(start.data(), start.size());
......@@ -196,38 +211,59 @@ std::vector<std::pair<ds_bulk_t,ds_bulk_t>> LevelDBDataStore::vlist_keyvals(
} else {
it->SeekToFirst();
}
unsigned i = 0;
bool size_error = false;
/* note: iterator initialized above, not in for loop */
for (; it->Valid() && result.size() < count; it->Next() ) {
ds_bulk_t k(it->key().size());
ds_bulk_t v(it->value().size());
memcpy(k.data(), it->key().data(), it->key().size());
memcpy(v.data(), it->value().data(), it->value().size());
for (; it->Valid() && i < count; it->Next() ) {
const auto& k = it->key();
const auto& v = it->value();
c = std::memcmp(prefix.data(), k.data(), prefix.size());
if(c == 0) {
result.push_back(std::make_pair(std::move(k), std::move(v)));
if(c > 0) {
continue;
} else if(c < 0) {
break;
}
if(usermem) {
if(size_error
|| k.size() > result[i].first.size()
|| v.size() > result[i].second.size()) {
size_error = true;
} else {
std::memcpy(result[i].first.data(), k.data(), k.size());
if(v.size()) std::memcpy(result[i].second.data(), v.data(), v.size());
}
result[i].first.resize(k.size());
result[i].second.resize(v.size());
} else {
result.push_back(
std::make_pair(
data_slice(k.data(), k.size()),
data_slice(v.data(), v.size())
)
);
}
++i;
}
delete it;
return result;
result.resize(i);
if(size_error) throw SDSKV_ERR_SIZE;
}
std::vector<ds_bulk_t> LevelDBDataStore::vlist_key_range(
const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, hg_size_t max_keys) const {
std::vector<ds_bulk_t> result;
void LevelDBDataStore::vlist_key_range(
const data_slice &lower_bound,
const data_slice &upper_bound,
std::vector<data_slice>& result) const {
// TODO implement this function
throw SDSKV_OP_NOT_IMPL;
return result;
}
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> LevelDBDataStore::vlist_keyval_range(
const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, hg_size_t max_keys) const {
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> result;
void LevelDBDataStore::vlist_keyval_range(
const data_slice &lower_bound,
const data_slice &upper_bound,
std::vector<std::pair<data_slice,data_slice>>& result) const {
// TODO implement this function
throw SDSKV_OP_NOT_IMPL;
return result;
}
#ifdef USE_REMI
......
......@@ -43,10 +43,9 @@ class LevelDBDataStore : public AbstractDataStore {
virtual ~LevelDBDataStore();
virtual bool openDatabase(const std::string& db_name, const std::string& path) override;
virtual int put(const void* key, hg_size_t ksize, const void* kdata, hg_size_t dsize) override;
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data) override;
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data) override;
virtual int get(const data_slice &key, data_slice &data) override;
virtual bool exists(const void* key, hg_size_t ksize) const override;
virtual bool erase(const ds_bulk_t &key) override;
virtual bool erase(const data_slice &key) override;
virtual void set_in_memory(bool enable) override; // not supported, a no-op
virtual void set_comparison_function(const std::string& name, comparator_fn less) override;
virtual void set_no_overwrite() override {
......@@ -57,19 +56,33 @@ class LevelDBDataStore : public AbstractDataStore {
virtual remi_fileset_t create_and_populate_fileset() const override;
#endif
protected:
virtual std::vector<ds_bulk_t> vlist_keys(
const ds_bulk_t &start, hg_size_t count, const ds_bulk_t &prefix) const override;
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyvals(
const ds_bulk_t &start_key, hg_size_t count, const ds_bulk_t &prefix) const override;
virtual std::vector<ds_bulk_t> vlist_key_range(
const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, hg_size_t max_keys) const override;
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyval_range(
const ds_bulk_t &lower_bound, const ds_bulk_t& upper_bound, hg_size_t max_keys) const override;
virtual void vlist_keys(
uint64_t max_count,
const data_slice &start,
const data_slice &prefix, std::vector<data_slice>& result) const override;
virtual void vlist_keyvals(
uint64_t max_count,
const data_slice &start_key,
const data_slice &prefix,
std::vector<std::pair<data_slice,data_slice>>& result) const override;
virtual void vlist_key_range(
const data_slice &lower_bound,
const data_slice &upper_bound,
std::vector<data_slice>& result) const override;
virtual void vlist_keyval_range(
const data_slice &lower_bound,
const data_slice& upper_bound,
std::vector<std::pair<data_slice,data_slice>>& result) const override;
leveldb::DB *_dbm = NULL;
private:
static std::string toString(const ds_bulk_t &key);
static std::string toString(const data_slice &key);
static std::string toString(const char* bug, hg_size_t buf_size);
static ds_bulk_t fromString(const std::string &keystr);
static data_slice fromString(const std::string &keystr);
AbstractDataStore::comparator_fn _less;
LevelDBDataStoreComparator _keycmp;
};
......
This diff is collapsed.
#ifndef __FNV1A_H
#define __FNV1A_H
/**
* thank you to 5gon12eder on StackOverflow for this code
* https://stackoverflow.com/questions/34597260/stdhash-value-on-char-value-and-not-on-memory-address
*/
#include <cstdint>
#include <cstring>
#include <type_traits>
template <typename ResultT, ResultT OffsetBasis, ResultT Prime>
class basic_fnv1a final
{
static_assert(std::is_unsigned<ResultT>::value, "need unsigned integer");
public:
using result_type = ResultT;
private:
result_type state_ {};
public:
constexpr
basic_fnv1a() noexcept : state_ {OffsetBasis}
{
}
constexpr void
update(const void *const data, const std::size_t size) noexcept
{
const auto cdata = static_cast<const unsigned char *>(data);
auto acc = this->state_;
for (auto i = std::size_t {}; i < size; ++i)
{
const auto next = std::size_t {cdata[i]};
acc = (acc ^ next) * Prime;
}
this->state_ = acc;
}
constexpr result_type
digest() const noexcept
{
return this->state_;
}
};
using fnv1a_32 = basic_fnv1a<std::uint32_t,
UINT32_C(2166136261),
UINT32_C(16777619)>;
using fnv1a_64 = basic_fnv1a<std::uint64_t,
UINT64_C(14695981039346656037),
UINT64_C(1099511628211)>;
template <std::size_t Bits>
struct fnv1a;
template <>
struct fnv1a<32>
{
using type = fnv1a_32;
};
template <>
struct fnv1a<64>
{
using type = fnv1a_64;
};
template <std::size_t Bits>
using fnv1a_t = typename fnv1a<Bits>::type;
#endif
......@@ -241,10 +241,9 @@ class PutMultiBenchmark : public PutBenchmark {
m_vsizes.resize(count);
m_vptrs.resize(count);
}
db.put(m_kptrs, m_ksizes, m_vptrs, m_vsizes);
db.put_multi(m_kptrs, m_ksizes, m_vptrs, m_vsizes);
remaining -= count;
}
db.put(m_keys, m_vals);
}
virtual void teardown() override {
......@@ -386,7 +385,7 @@ class GetMultiBenchmark : public GetBenchmark {
m_vsizes.resize(count);
m_vptrs.resize(count);
}
db.get(m_kptrs, m_ksizes, m_vptrs, m_vsizes);
db.get_multi(m_kptrs, m_ksizes, m_vptrs, m_vsizes);
if(!m_reuse_buffer)
k += count;
j += count;
......@@ -468,7 +467,7 @@ class LengthMultiBenchmark : public LengthBenchmark {
m_ksizes[i] = m_keys[i+j].size();
m_kptrs[i] = (const void*)m_keys[i+j].data();
}
db.length(count, m_kptrs.data(), m_ksizes.data(), m_vsizes.data()+k);
db.length_multi(count, m_kptrs.data(), m_ksizes.data(), m_vsizes.data()+k);
remaining -= count;
j += count;
if(!m_reuse_buffer)
......@@ -738,6 +737,12 @@ int main(int argc, char** argv) {
return 0;
}
static sdskv_poolset_usage_t server_poolset_usage;
static void get_server_poolset_usage(void* ctx) {
sdskv::provider* provider = static_cast<sdskv::provider*>(ctx);
provider->get_poolset_usage(&server_poolset_usage);
}
static void run_server(MPI_Comm comm, Json::Value& config) {
// initialize Margo
margo_instance_id mid = MARGO_INSTANCE_NULL;
......@@ -759,6 +764,15 @@ static void run_server(MPI_Comm comm, Json::Value& config) {
MPI_Bcast(server_addr_str.data(), buf_size, MPI_BYTE, 0, MPI_COMM_WORLD);
// initialize sdskv provider
auto provider = sdskv::provider::create(mid);
// initialize poolset if present
if(server_config["poolset"]) {
auto& poolset_config = server_config["poolset"];
int npools = poolset_config.get("pools",1).asInt();
int nbufs = poolset_config.get("buffers",10).asInt();
int min_size = poolset_config.get("min_size", 1024).asInt();
int size_factor = poolset_config.get("size_factor", 2).asInt();
provider->configure_bulk_poolset(npools, nbufs, min_size, size_factor);
}
// initialize database
auto& database_config = server_config["database"];
std::string db_name = database_config["name"].asString();
......@@ -772,10 +786,14 @@ static void run_server(MPI_Comm comm, Json::Value& config) {
.db_no_overwrite = 0
};
provider->attach_database(db_config);
margo_push_finalize_callback(mid, get_server_poolset_usage, provider);
// notify clients that the database is ready
MPI_Barrier(MPI_COMM_WORLD);
// wait for finalize
margo_wait_for_finalize(mid);
// send poolset usage to client 0
MPI_Send(&server_poolset_usage, sizeof(server_poolset_usage), MPI_BYTE, 1, 0, MPI_COMM_WORLD);
}
static void run_client(MPI_Comm comm, Json::Value& config) {
......@@ -799,8 +817,21 @@ static void run_client(MPI_Comm comm, Json::Value& config) {
// wait for server to have initialize the database
MPI_Barrier(MPI_COMM_WORLD);
{
// open remote database
// create client
sdskv::client client(mid);
// setup poolset
if(config["client"]) {
auto& client_config = config["client"];
if(client_config["poolset"]) {
auto& poolset_config = client_config["poolset"];
int npools = poolset_config.get("pools",1).asInt();
int nbufs = poolset_config.get("buffers",10).asInt();
int min_size = poolset_config.get("min_size", 1024).asInt();
int size_factor = poolset_config.get("size_factor", 2).asInt();
client.configure_bulk_poolset(npools, nbufs, min_size, size_factor);
}
}
// open database
sdskv::provider_handle ph(client, server_addr);
std::string db_name = config["server"]["database"]["name"].asString();
RemoteDatabase db = client.open(ph, db_name);
......@@ -883,6 +914,28 @@ static void run_client(MPI_Comm comm, Json::Value& config) {
// shutdown server and finalize margo
if(rank == 0)
margo_shutdown_remote_instance(mid, server_addr);
// collect poolset usage
sdskv_poolset_usage_t usage;
std::vector<sdskv_poolset_usage_t> usages;
client.get_poolset_usage(&usage);
if(rank == 0) usages.resize(num_clients);
MPI_Gather(&usage, sizeof(usage), MPI_BYTE,
usages.data(), sizeof(usage), MPI_BYTE, 0, comm);
if(rank == 0) {
std::cout << "##############################################" << std::endl;
std::cout << "Poolset usage" << std::endl;
unsigned int c = 0;
for(const auto& u : usages) {
std::cout << "Client " << c << " : "
<< u.cache_hits << " hits, "
<< u.cache_miss << " miss" << std::endl;
}
sdskv_poolset_usage_t server_usage;
MPI_Recv(&server_usage, sizeof(server_usage), MPI_BYTE, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
std::cout << "Server : "
<< server_usage.cache_hits << " hits, "
<< server_usage.cache_miss << " miss" << std::endl;
}
}
margo_addr_free(mid, server_addr);
margo_finalize(mid);
......
This diff is collapsed.
......@@ -113,6 +113,25 @@ int main(int argc, char **argv)
parse_args(argc, argv, &opts);
int use_poolset = 0;
hg_size_t poolset_npools = 6;
hg_size_t poolset_nbufs = 8;
hg_size_t poolset_first_size = 128;
hg_size_t poolset_size_multiple = 2;
char* var = getenv("SDSKV_SERVER_USE_POOLSET");
if(var != NULL) {
use_poolset = 1;
var = getenv("SDSKV_SERVER_POOLSET_NPOOLS");
if(var != NULL) poolset_npools = atol(var);
var = getenv("SDSKV_SERVER_POOLSET_NBUFS");
if(var != NULL) poolset_nbufs = atol(var);
var = getenv("SDSKV_SERVER_POOLSET_FIRST_SIZE");
if(var != NULL) poolset_first_size = atol(var);
var = getenv("SDSKV_SERVER_POOLSET_SIZE_MULTIPLE");
if(var != NULL) poolset_size_multiple = atol(var);
}
/* start margo */
/* use the main xstream for driving progress and executing rpc handlers */
mid = margo_init(opts.listen_addr_str, MARGO_SERVER_MODE, 0, -1);
......@@ -204,6 +223,10 @@ int main(int argc, char **argv)
return(-1);
}
if(use_poolset) {
sdskv_provider_configure_bulk_poolset(provider,
poolset_npools, poolset_nbufs, poolset_first_size, poolset_size_multiple);
}
printf("Provider %d managing database \"%s\" at multiplex id %d\n", i, opts.db_names[i], i+1);
}
......@@ -247,6 +270,10 @@ int main(int argc, char **argv)
return(-1);
}
if(use_poolset) {
sdskv_provider_configure_bulk_poolset(provider,
poolset_npools, poolset_nbufs, poolset_first_size, poolset_size_multiple);
}
printf("Provider 0 managing database \"%s\" at multiplex id %d\n", opts.db_names[i] , 1);
}
}
......
This diff is collapsed.
......@@ -32,6 +32,25 @@ int main(int argc, char *argv[])
hg_return_t hret;
int ret;
int use_poolset = 0;
hg_size_t poolset_npools = 4;
hg_size_t poolset_nbufs = 8;
hg_size_t poolset_first_size = 10240;
hg_size_t poolset_size_multiple = 2;
char* var = getenv("SDSKV_USE_POOLSET");
if(var != NULL) {
use_poolset = 1;
var = getenv("SDSKV_POOLSET_NPOOLS");
if(var != NULL) poolset_npools = atol(var);
var = getenv("SDSKV_POOLSET_NBUFS");
if(var != NULL) poolset_nbufs = atol(var);
var = getenv("SDSKV_POOLSET_FIRST_SIZE");
if(var != NULL) poolset_first_size = atol(var);
var = getenv("SDSKV_POOLSET_SIZE_MULTIPLE");
if(var != NULL) poolset_size_multiple = atol(var);
}
if(argc != 5)
{
fprintf(stderr, "Usage: %s <sdskv_server_addr> <mplex_id> <db_name> <num_keys>\n", argv[0]);
......@@ -65,6 +84,18 @@ int main(int argc, char *argv[])
return -1;
}
/* setup poolset if requested */
if(use_poolset) {
ret = sdskv_client_configure_bulk_poolset(
kvcl, poolset_npools, poolset_nbufs,
poolset_first_size, poolset_size_multiple);
if(ret != 0) {
fprintf(stderr, "Error: sdskv_client_configure_bulk_poolset()\n");
margo_finalize(mid);
return -1;
}
}
/* look up the SDSKV server address */
hret = margo_addr_lookup(mid, sdskv_svr_addr_str, &svr_addr);
if(hret != HG_SUCCESS)
......
......@@ -35,6 +35,26 @@ int main(int argc, char *argv[])
hg_return_t hret;
int use_poolset = 0;
hg_size_t poolset_npools = 4;
hg_size_t poolset_nbufs = 8;
hg_size_t poolset_first_size = 10240;
hg_size_t poolset_size_multiple = 2;
char* var = getenv("SDSKV_USE_POOLSET");
if(var != NULL) {
use_poolset = 1;
var = getenv("SDSKV_POOLSET_NPOOLS");
if(var != NULL) poolset_npools = atol(var);
var = getenv("SDSKV_POOLSET_NBUFS");
if(var != NULL) poolset_nbufs = atol(var);
var = getenv("SDSKV_POOLSET_FIRST_SIZE");
if(var != NULL) poolset_first_size = atol(var);
var = getenv("SDSKV_POOLSET_SIZE_MULTIPLE");
if(var != NULL) poolset_size_multiple = atol(var);
}
if(argc != 5)
{
fprintf(stderr, "Usage: %s <sdskv_server_addr> <mplex_id> <db_name> <num_keys>\n", argv[0]);
......@@ -169,7 +189,7 @@ static int put_get_erase_multi_test(sdskv::database& DB, uint32_t num_keys) {
values.push_back(v);
std::cout << "Inserting " << k << "===>\t" << v << std::endl;
}
DB.put(keys, values);
DB.put_multi(keys, values);
std::cout << "Successfuly inserted " << num_keys << " keys" << std::endl;
/* **** get keys **** */
......@@ -187,7 +207,7 @@ static int put_get_erase_multi_test(sdskv::database& DB, uint32_t num_keys) {
vals_subset.push_back(std::string(max_value_size, 0));
}
DB.get(keys_subset, vals_subset);
DB.get_multi(keys_subset, vals_subset);
for(unsigned i=0; i < keys_subset.size(); i++) {
if(vals_subset[i] != reference[keys_subset[i]]) {
......
......@@ -31,6 +31,25 @@ int main(int argc, char *argv[])
hg_return_t hret;
int ret;
int use_poolset = 0;
hg_size_t poolset_npools = 4;
hg_size_t poolset_nbufs = 8;
hg_size_t poolset_first_size = 10240;
hg_size_t poolset_size_multiple = 2;
char* var = getenv("SDSKV_USE_POOLSET");
if(var != NULL) {
use_poolset = 1;
var = getenv("SDSKV_POOLSET_NPOOLS");
if(var != NULL) poolset_npools = atol(var);
var = getenv("SDSKV_POOLSET_NBUFS");
if(var != NULL) poolset_nbufs = atol(var);
var = getenv("SDSKV_POOLSET_FIRST_SIZE");
if(var != NULL) poolset_first_size = atol(var);
var = getenv("SDSKV_POOLSET_SIZE_MULTIPLE");
if(var != NULL) poolset_size_multiple = atol(var);
}
if(argc != 5)
{
fprintf(stderr, "Usage: %s <sdskv_server_addr> <mplex_id> <db_name> <num_keys>\n", argv[0]);
......@@ -64,6 +83,18 @@ int main(int argc, char *argv[])
return -1;
}
/* setup poolset if requested */
if(use_poolset) {
ret = sdskv_client_configure_bulk_poolset(
kvcl, poolset_npools, poolset_nbufs,
poolset_first_size, poolset_size_multiple);
if(ret != 0) {
fprintf(stderr, "Error: sdskv_client_configure_bulk_poolset()\n");
margo_finalize(mid);
return -1;
}
}
/* look up the SDSKV server address */
hret = margo_addr_lookup(mid, sdskv_svr_addr_str, &svr_addr);
if(hret != HG_SUCCESS)
......
......@@ -32,6 +32,25 @@ int main(int argc, char *argv[])
hg_return_t hret;
int ret;
int use_poolset = 0;
hg_size_t poolset_npools = 4;
hg_size_t poolset_nbufs = 8;
hg_size_t poolset_first_size = 10240;
hg_size_t poolset_size_multiple = 2;
char* var = getenv("SDSKV_USE_POOLSET");
if(var != NULL) {
use_poolset = 1;
var = getenv("SDSKV_POOLSET_NPOOLS");
if(var != NULL) poolset_npools = atol(var);
var = getenv("SDSKV_POOLSET_NBUFS");
if(var != NULL) poolset_nbufs = atol(var);
var = getenv("SDSKV_POOLSET_FIRST_SIZE");
if(var != NULL) poolset_first_size = atol(var);
var = getenv("SDSKV_POOLSET_SIZE_MULTIPLE");
if(var != NULL) poolset_size_multiple = atol(var);
}
if(argc != 5)
{
fprintf(stderr, "Usage: %s <sdskv_server_addr> <mplex_id> <db_name> <num_keys>\n", argv[0]);
......@@ -65,6 +84,18 @@ int main(int argc, char *argv[])
return -1;
}
/* setup poolset if requested */
if(use_poolset) {
ret = sdskv_client_configure_bulk_poolset(
kvcl, poolset_npools, poolset_nbufs,
poolset_first_size, poolset_size_multiple);
if(ret != 0) {
fprintf(stderr, "Error: sdskv_client_configure_bulk_poolset()\n");
margo_finalize(mid);
return -1;
}
}
/* look up the SDSKV server address */
hret = margo_addr_lookup(mid, sdskv_svr_addr_str, &svr_addr);
if(hret != HG_SUCCESS)
......
......@@ -31,6 +31,25 @@ int main(int argc, char *argv[])
hg_return_t hret;
int ret;
int use_poolset = 0;
hg_size_t poolset_npools = 4;
hg_size_t poolset_nbufs = 8;
hg_size_t poolset_first_size = 10240;
hg_size_t poolset_size_multiple = 2;
char* var = getenv("SDSKV_USE_POOLSET");
if(var != NULL) {
use_poolset = 1;
var = getenv("SDSKV_POOLSET_NPOOLS");
if(var != NULL) poolset_npools = atol(var);
var = getenv("SDSKV_POOLSET_NBUFS");
if(var != NULL) poolset_nbufs = atol(var);
var = getenv("SDSKV_POOLSET_FIRST_SIZE");
if(var != NULL) poolset_first_size = atol(var);
var = getenv("SDSKV_POOLSET_SIZE_MULTIPLE");
if(var != NULL) poolset_size_multiple = atol(var);
}
if(argc != 5)
{
fprintf(stderr, "Usage: %s <sdskv_server_addr> <mplex_id> <db_name> <num_keys>\n", argv[0]);
......@@ -64,6 +83,18 @@ int main(int argc, char *argv[])
return -1;
}
/* setup poolset if requested */
if(use_poolset) {
ret = sdskv_client_configure_bulk_poolset(
kvcl, poolset_npools, poolset_nbufs,