Commit a506507d authored by Matthieu Dorier's avatar Matthieu Dorier

implemented and tested some migration functions

parent bca811c9
......@@ -17,6 +17,7 @@ check_PROGRAMS = test/sdskv-open-test \
test/sdskv-list-keyvals-test \
test/sdskv-list-keys-prefix-test \
test/sdskv-custom-cmp-test \
test/sdskv-migrate-test \
test/sdskv-custom-server-daemon
bin_sdskv_server_daemon_SOURCES = src/sdskv-server-daemon.c
......@@ -105,6 +106,7 @@ TESTS = test/basic.sh \
test/list-keys-test.sh \
test/list-keyvals-test.sh \
test/list-keys-prefix-test.sh \
test/migrate-test.sh \
test/custom-cmp-test.sh
TESTS_ENVIRONMENT = TIMEOUT="$(TIMEOUT)" \
......@@ -151,6 +153,10 @@ test_sdskv_custom_server_daemon_DEPENDENCIES = lib/libsdskv-server.la
test_sdskv_custom_server_daemon_LDFLAGS = -Llib -lsdskv-server
test_sdskv_custom_server_daemon_LDADD = ${LIBS} -lsdskv-server ${SERVER_LIBS}
test_sdskv_migrate_test_SOURCES = test/sdskv-migrate-test.cc
test_sdskv_migrate_test_DEPENDENCIES = lib/libsdskv-client.la
test_sdskv_migrate_test_LDFLAGS = -Llib -lsdskv-client
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = maint/sdskv-server.pc \
maint/sdskv-client.pc
......
......@@ -292,6 +292,174 @@ int sdskv_list_keyvals_with_prefix(
hg_size_t* vsizes,
hg_size_t* max_items);
/**
* @brief Migrates a set of keys/values from a source provider/database
* to a target provider/database.
*
* @param source_provider source provider
* @param source_db_id source database id
* @param target_addr target address
* @param target_provider_id target provider id
* @param target_db_id target database id
* @param num_keys number of keys
* @param keys array of keys
* @param key_sizes array of key sizes
* @param flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_BEFORE, or SDSKV_REMOVE_AFTER
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
int sdskv_migrate_keys(
sdskv_provider_handle_t source_provider,
sdskv_database_id_t source_db_id,
const char* target_addr,
uint16_t target_provider_id,
sdskv_database_id_t target_db_id,
hg_size_t num_keys,
const void** keys,
const hg_size_t* key_sizes,
int flag);
/**
* @brief Migrates a single key/value from a source provider/database
* to a target provider/database.
*
* @param source_provider source provider
* @param source_db_id source database id
* @param target_addr target address
* @param target_provider_id target provider id
* @param target_db_id target database id
* @param key key to migrate
* @param key_size size of the key
* @param flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_BEFORE, or SDSKV_REMOVE_AFTER
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
inline int sdskv_migrate_key(
sdskv_provider_handle_t source_provider,
sdskv_database_id_t source_db_id,
const char* target_addr,
uint16_t target_provider_id,
sdskv_database_id_t target_db_id,
const void* key,
hg_size_t key_size,
int flag)
{
return sdskv_migrate_keys(
source_provider,
source_db_id,
target_addr,
target_provider_id,
target_db_id,
1,
&key,
&key_size,
flag);
}
/**
* @brief Migrates a set of keys/values from a source provider/database
* to a target provider/database based on a range. The range is
* expressed by the array key_range, which contains two elements.
* key_range[0] must be a lower bound lb.
* key_range[1] must be an upper bound ub.
* The set of keys migrated are within the range [lb, ub[ (i.e. lb
* included, ub is not included).
*
* @param source_provider source provider
* @param source_db_id source database id
* @param target_addr target address
* @param target_provider_id target provider id
* @param target_db_id target database id
* @param key_range range of keys to migrate
* @param key_range_sizes size of the keys provided for the range
* @param flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_BEFORE, or SDSKV_REMOVE_AFTER
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
int sdskv_migrate_key_range(
sdskv_provider_handle_t source_provider,
sdskv_database_id_t source_db_id,
const char* target_addr,
uint16_t target_provider_id,
sdskv_database_id_t target_db_id,
const void* key_range[],
const hg_size_t key_range_sizes[],
int flag);
/**
* @brief Migrates a set of keys/values from a source provider/database
* to a target provider/database based on a prefix.
* All key matching the provided prefix will be migrated.
*
* @param source_provider source provider
* @param source_db_id source database id
* @param target_addr target address
* @param target_provider_id target provider id
* @param target_db_id target database id
* @param key_prefix prefix of keys to migrate
* @param key_prefix_size size of the prefix provided
* @param flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_BEFORE, or SDSKV_REMOVE_AFTER
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
int sdskv_migrate_keys_prefixed(
sdskv_provider_handle_t source_provider,
sdskv_database_id_t source_db_id,
const char* target_addr,
uint16_t target_provider_id,
sdskv_database_id_t target_db_id,
const void* key_prefix,
hg_size_t key_prefix_size,
int flag);
/**
* @brief Migrates all the keys/values from a source provider/database
* to a target provider/database based on a prefix.
*
* @param source_provider source provider
* @param source_db_id source database id
* @param target_addr target address
* @param target_provider_id target provider id
* @param target_db_id target database id
* @param flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_BEFORE, or SDSKV_REMOVE_AFTER
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
int sdskv_migrate_all_keys(
sdskv_provider_handle_t source_provider,
sdskv_database_id_t source_db_id,
const char* target_addr,
uint16_t target_provider_id,
sdskv_database_id_t target_db_id,
int flag);
/**
* @brief Migrates a database from a source provider
* to a target provider. The difference with sdskv_migrate_all_keys is
* that the target database does not exist yet and the id of the newly
* created database will be returned to the called.
* Contrary to sdskv_migrate_all_keys, if SDSKV_REMOVE_BEFORE or
* SDSKV_REMOVE_AFTER are used as flag, the source database is deleted
* on its provider (while sdskv_migrate_all_keys only removes all the keys,
* leaving the database present).
*
* @param source_provider source provider
* @param source_db_id source database id
* @param target_addr target address
* @param target_provider_id target provider id
* @param target_db_id resulting target database id
* @param flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_BEFORE, or SDSKV_REMOVE_AFTER
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
int sdskv_migrate_database(
sdskv_provider_handle_t source_provider,
sdskv_database_id_t source_db_id,
const char* target_addr,
uint16_t target_provider_id,
sdskv_database_id_t* target_db_id,
int flag);
/**
* Shuts down a remote SDSKV service (given an address).
* This will shutdown all the providers on the target address.
......
......@@ -17,6 +17,9 @@ typedef enum sdskv_db_type_t
typedef uint64_t sdskv_database_id_t;
#define SDSKV_DATABASE_ID_INVALID 0
#define SDSKV_KEEP_ORIGINAL 1 /* for migration operations, keep original */
#define SDSKV_REMOVE_ORIGINAL 2 /* for migration operations, remove the origin after migrating */
#define SDSKV_SUCCESS 0 /* Success */
#define SDSKV_ERR_ALLOCATION -1 /* Error allocating something */
#define SDSKV_ERR_INVALID_ARG -2 /* An argument is invalid */
......@@ -29,6 +32,8 @@ typedef uint64_t sdskv_database_id_t;
#define SDSKV_ERR_UNKNOWN_KEY -9 /* Key requested does not exist */
#define SDSKV_ERR_SIZE -10 /* Client did not allocate enough for the requested data */
#define SDSKV_ERR_ERASE -11 /* Could not erase the given key */
#define SDSKV_ERR_MIGRATION -12 /* Error during data migration */
#define SDSKV_OP_NOT_IMPL -13 /* Operation not implemented for this backend */
#if defined(__cplusplus)
}
......
......@@ -235,7 +235,8 @@ void BerkeleyDBDataStore::set_in_memory(bool enable) {
_in_memory = enable;
};
std::vector<ds_bulk_t> BerkeleyDBDataStore::vlist_keys(const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix)
std::vector<ds_bulk_t> BerkeleyDBDataStore::vlist_keys(
const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix) const
{
std::vector<ds_bulk_t> keys;
Dbc * cursorp;
......@@ -284,7 +285,8 @@ std::vector<ds_bulk_t> BerkeleyDBDataStore::vlist_keys(const ds_bulk_t &start, s
return keys;
}
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> BerkeleyDBDataStore::vlist_keyvals(const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix)
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> BerkeleyDBDataStore::vlist_keyvals(
const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix) const
{
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> result;
Dbc * cursorp;
......@@ -337,6 +339,22 @@ std::vector<std::pair<ds_bulk_t,ds_bulk_t>> BerkeleyDBDataStore::vlist_keyvals(c
return result;
}
std::vector<ds_bulk_t> BerkeleyDBDataStore::vlist_key_range(
const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys) const {
std::vector<ds_bulk_t> result;
// TODO implement this function
throw SDSKV_OP_NOT_IMPL;
return result;
}
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> BerkeleyDBDataStore::vlist_keyval_range(
const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys) const {
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> result;
// TODO implement this function
throw SDSKV_OP_NOT_IMPL;
return result;
}
int BerkeleyDBDataStore::compkeys(Db *db, const Dbt *dbt1, const Dbt *dbt2, size_t *locp) {
DbWrapper* _wrapper = (DbWrapper*)(((char*)db) - offsetof(BerkeleyDBDataStore::DbWrapper, _db));
if(_wrapper->_less) {
......
......@@ -7,6 +7,7 @@
#include "datastore/datastore.h"
#include <db_cxx.h>
#include <dbstl_map.h>
#include "sdskv-common.h"
// may want to implement some caching for persistent stores like BerkeleyDB
class BerkeleyDBDataStore : public AbstractDataStore {
......@@ -39,8 +40,14 @@ class BerkeleyDBDataStore : public AbstractDataStore {
_no_overwrite = true;
}
protected:
virtual std::vector<ds_bulk_t> vlist_keys(const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix);
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyvals(const ds_bulk_t &start_key, size_t count, const ds_bulk_t &);
virtual std::vector<ds_bulk_t> vlist_keys(
const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix) const;
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyvals(
const ds_bulk_t &start_key, size_t count, const ds_bulk_t &) const;
virtual std::vector<ds_bulk_t> vlist_key_range(
const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys) const;
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyval_range(
const ds_bulk_t &lower_bound, const ds_bulk_t& upper_bound, size_t max_keys) const;
DbEnv *_dbenv = nullptr;
Db *_dbm = nullptr;
DbWrapper* _wrapper = nullptr;
......
......@@ -131,9 +131,10 @@ bool BwTreeDataStore::get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data) {
void BwTreeDataStore::set_in_memory(bool enable)
{};
std::vector<ds_bulk_t> BwTreeDataStore::vlist_keys(const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix)
std::vector<ds_bulk_t> BwTreeDataStore::vlist_keys(const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix) const
{
std::vector<ds_bulk_t> keys;
throw SDSKV_OP_NOT_IMPL;
#if 0
auto it = _tree->Begin(start);
while (it.IsEnd() == false) {
......@@ -144,9 +145,10 @@ std::vector<ds_bulk_t> BwTreeDataStore::vlist_keys(const ds_bulk_t &start, size_
return keys;
}
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> BwTreeDataStore::vlist_keyvals(const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix)
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> BwTreeDataStore::vlist_keyvals(const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix) const
{
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> keyvals;
throw SDSKV_OP_NOT_IMPL;
#if 0
auto it = _tree->Begin(start);
while (it.IsEnd() == false) {
......@@ -156,3 +158,19 @@ std::vector<std::pair<ds_bulk_t,ds_bulk_t>> BwTreeDataStore::vlist_keyvals(const
return keyvals;
}
std::vector<ds_bulk_t> BwTreeDataStore::vlist_key_range(
const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys) const {
std::vector<ds_bulk_t> result;
throw SDSKV_OP_NOT_IMPL;
// TODO implement this function
return result;
}
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> BwTreeDataStore::vlist_keyval_range(
const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys) const {
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> result;
throw SDSKV_OP_NOT_IMPL;
// TODO implement this function
return result;
}
......@@ -6,6 +6,7 @@
#include "kv-config.h"
#include "bwtree.h"
#include "datastore.h"
#include "sdskv-common.h"
using namespace wangziqi2013::bwtree;
......@@ -29,8 +30,14 @@ public:
_no_overwrite = true;
}
protected:
virtual std::vector<ds_bulk_t> vlist_keys(const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix);
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyvals(const ds_bulk_t &start_key, size_t count, const ds_bulk_t &prefix);
virtual std::vector<ds_bulk_t> vlist_keys(
const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix) const;
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyvals(
const ds_bulk_t &start_key, size_t count, const ds_bulk_t &prefix) const;
virtual std::vector<ds_bulk_t> vlist_key_range(
const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys) const;
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyval_range(
const ds_bulk_t &lower_bound, const ds_bulk_t& upper_bound, size_t max_keys) const;
BwTree<ds_bulk_t, ds_bulk_t,
ds_bulk_less, ds_bulk_equal, ds_bulk_hash,
ds_bulk_equal, ds_bulk_hash> *_tree = NULL;
......
......@@ -11,43 +11,57 @@
enum class Duplicates : int {ALLOW, IGNORE};
class AbstractDataStore {
public:
typedef int (*comparator_fn)(const void*, size_t, const void*, size_t);
AbstractDataStore();
AbstractDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
virtual ~AbstractDataStore();
virtual bool openDatabase(const std::string& db_name, const std::string& path)=0;
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data)=0;
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data)=0;
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data)=0;
virtual bool exists(const ds_bulk_t &key) = 0;
virtual bool erase(const ds_bulk_t &key) = 0;
virtual void set_in_memory(bool enable)=0; // enable/disable in-memory mode (where supported)
virtual void set_comparison_function(comparator_fn less)=0;
virtual void set_no_overwrite()=0;
std::vector<ds_bulk_t> list_keys(
const ds_bulk_t &start_key, size_t count, const ds_bulk_t& prefix=ds_bulk_t()) {
return vlist_keys(start_key, count, prefix);
}
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> list_keyvals(
const ds_bulk_t &start_key, size_t count, const ds_bulk_t& prefix=ds_bulk_t()) {
return vlist_keyvals(start_key, count, prefix);
}
protected:
Duplicates _duplicates;
bool _eraseOnGet;
bool _debug;
bool _in_memory;
virtual std::vector<ds_bulk_t> vlist_keys(
const ds_bulk_t &start_key, size_t count, const ds_bulk_t& prefix) = 0;
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyvals(
const ds_bulk_t &start_key, size_t count, const ds_bulk_t& prefix) = 0;
public:
typedef int (*comparator_fn)(const void*, size_t, const void*, size_t);
AbstractDataStore();
AbstractDataStore(Duplicates duplicates, bool eraseOnGet, bool debug);
virtual ~AbstractDataStore();
virtual bool openDatabase(const std::string& db_name, const std::string& path)=0;
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data)=0;
virtual bool get(const ds_bulk_t &key, ds_bulk_t &data)=0;
virtual bool get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data)=0;
virtual bool exists(const ds_bulk_t &key) = 0;
virtual bool erase(const ds_bulk_t &key) = 0;
virtual void set_in_memory(bool enable)=0; // enable/disable in-memory mode (where supported)
virtual void set_comparison_function(comparator_fn less)=0;
virtual void set_no_overwrite()=0;
std::vector<ds_bulk_t> list_keys(
const ds_bulk_t &start_key, size_t count, const ds_bulk_t& prefix=ds_bulk_t()) const {
return vlist_keys(start_key, count, prefix);
}
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> list_keyvals(
const ds_bulk_t &start_key, size_t count, const ds_bulk_t& prefix=ds_bulk_t()) const {
return vlist_keyvals(start_key, count, prefix);
}
std::vector<ds_bulk_t> list_key_range(
const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys=0) const {
return vlist_key_range(lower_bound, upper_bound, max_keys);
}
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> list_keyval_range(
const ds_bulk_t &lower_bound, const ds_bulk_t& upper_bound, size_t max_keys=0) const {
return vlist_keyval_range(lower_bound, upper_bound, max_keys);
}
protected:
Duplicates _duplicates;
bool _eraseOnGet;
bool _debug;
bool _in_memory;
virtual std::vector<ds_bulk_t> vlist_keys(
const ds_bulk_t &start_key, size_t count, const ds_bulk_t& prefix) const = 0;
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyvals(
const ds_bulk_t &start_key, size_t count, const ds_bulk_t& prefix) const = 0;
virtual std::vector<ds_bulk_t> vlist_key_range(
const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys) const = 0;
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyval_range(
const ds_bulk_t &lower_bound, const ds_bulk_t& upper_bound, size_t max_keys) const = 0;
};
#endif // datastore_h
......@@ -143,7 +143,8 @@ bool LevelDBDataStore::get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data) {
void LevelDBDataStore::set_in_memory(bool enable)
{};
std::vector<ds_bulk_t> LevelDBDataStore::vlist_keys(const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix)
std::vector<ds_bulk_t> LevelDBDataStore::vlist_keys(
const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix) const
{
std::vector<ds_bulk_t> keys;
......@@ -178,7 +179,8 @@ std::vector<ds_bulk_t> LevelDBDataStore::vlist_keys(const ds_bulk_t &start, size
return keys;
}
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> LevelDBDataStore::vlist_keyvals(const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix)
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> LevelDBDataStore::vlist_keyvals(
const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix) const
{
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> result;
......@@ -215,21 +217,19 @@ std::vector<std::pair<ds_bulk_t,ds_bulk_t>> LevelDBDataStore::vlist_keyvals(cons
delete it;
return result;
}
/*
{
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> keyvals;
leveldb::Iterator *it = _dbm->NewIterator(leveldb::ReadOptions());
size_t i=0;
for (it->SeekToFirst(); it->Valid(); it->Next() ) {
ds_bulk_t k(it->key().size());
ds_bulk_t v(it->value().size());
memcpy(k.data(), it->key().data(), it->key().size() );
memcpy(v.data(), it->value().data(), it->value().size() );
keyvals.push_back(std::make_pair(std::move(k), std::move(v)));
if (i++ > count) break;
}
delete it;
return keyvals;
std::vector<ds_bulk_t> LevelDBDataStore::vlist_key_range(
const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys) const {
std::vector<ds_bulk_t> result;
// TODO implement this function
throw SDSKV_OP_NOT_IMPL;
return result;
}
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> LevelDBDataStore::vlist_keyval_range(
const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys) const {
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> result;
// TODO implement this function
throw SDSKV_OP_NOT_IMPL;
return result;
}
*/
......@@ -7,8 +7,10 @@
#include <leveldb/db.h>
#include <leveldb/comparator.h>
#include <leveldb/env.h>
#include "sdskv-common.h"
#include "datastore/datastore.h"
// may want to implement some caching for persistent stores like LevelDB
class LevelDBDataStore : public AbstractDataStore {
private:
......@@ -51,8 +53,14 @@ class LevelDBDataStore : public AbstractDataStore {
_no_overwrite = true;
}
protected:
virtual std::vector<ds_bulk_t> vlist_keys(const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix);
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyvals(const ds_bulk_t &start_key, size_t count, const ds_bulk_t &prefix);
virtual std::vector<ds_bulk_t> vlist_keys(
const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix) const;
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyvals(
const ds_bulk_t &start_key, size_t count, const ds_bulk_t &prefix) const;
virtual std::vector<ds_bulk_t> vlist_key_range(
const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys) const;
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyval_range(
const ds_bulk_t &lower_bound, const ds_bulk_t& upper_bound, size_t max_keys) const;
leveldb::DB *_dbm = NULL;
private:
std::string toString(const ds_bulk_t &key);
......
......@@ -92,12 +92,11 @@ class MapDataStore : public AbstractDataStore {
protected:
virtual std::vector<ds_bulk_t> vlist_keys(
const ds_bulk_t &start_key, size_t count, const ds_bulk_t &prefix) {
const ds_bulk_t &start_key, size_t count, const ds_bulk_t &prefix) const {
std::vector<ds_bulk_t> result;
decltype(_map.begin()) it;
if(start_key.size() > 0) {
it = _map.lower_bound(start_key);
while(it != _map.end() && it->first == start_key) it++;
it = _map.upper_bound(start_key);
} else {
it = _map.begin();
}
......@@ -116,12 +115,11 @@ class MapDataStore : public AbstractDataStore {
}
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyvals(
const ds_bulk_t &start_key, size_t count, const ds_bulk_t &prefix) {
const ds_bulk_t &start_key, size_t count, const ds_bulk_t &prefix) const {
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> result;
decltype(_map.begin()) it;
if(start_key.size() > 0) {
it = _map.lower_bound(start_key);
while(it != _map.end() && it->first == start_key) it++;
it = _map.upper_bound(start_key);
} else {
it = _map.begin();
}
......@@ -139,6 +137,51 @@ class MapDataStore : public AbstractDataStore {
return result;
}
virtual std::vector<ds_bulk_t> vlist_key_range(
const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys) const {
std::vector<ds_bulk_t> result;
decltype(_map.begin()) it, ub;
// get the first element that goes immediately after lower_bound
it = _map.upper_bound(lower_bound);
if(it == _map.end()) {
return result;
}
// get the element that goes immediately before upper bound
ub = _map.lower_bound(upper_bound);
if(ub->first != upper_bound) ub++;
while(it != ub) {
result.push_back(it->second);
it++;
if(max_keys != 0 && result.size() == max_keys)
break;
}
return result;
}
virtual std::vector<std::pair<ds_bulk_t,ds_bulk_t>> vlist_keyval_range(
const ds_bulk_t &lower_bound, const ds_bulk_t& upper_bound, size_t max_keys) const {
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> result;
decltype(_map.begin()) it, ub;
// get the first element that goes immediately after lower_bound
it = _map.upper_bound(lower_bound);
if(it == _map.end()) {
return result;
}
// get the element that goes immediately before upper bound
ub = _map.lower_bound(upper_bound);
if(ub->first != upper_bound) ub++;
while(it != ub) {
result.emplace_back(it->first,it->second);
it++;
if(max_keys != 0 && result.size() == max_keys)
break;
}
return result;
}
private:
AbstractDataStore::comparator_fn _less;
std::map<ds_bulk_t, ds_bulk_t, keycmp> _map;
......
This diff is collapsed.
......@@ -113,4 +113,56 @@ MERCURY_GEN_PROC(bulk_get_in_t, ((uint64_t)(db_id))\
((hg_bulk_t)(handle)))
MERCURY_GEN_PROC(bulk_get_out_t, ((hg_size_t)(size)) ((int32_t)(ret)))
// ------------- MIGRATE KEYS ----------- //
MERCURY_GEN_PROC(migrate_keys_in_t,
((uint64_t)(source_db_id))\
((hg_string_t)(target_addr))\
((uint16_t)(target_provider_id))\
((uint64_t)(target_db_id))\
((hg_size_t)(num_keys))\
((hg_size_t)(bulk_size))\
((hg_bulk_t)(keys_bulk))\
((int32_t)(flag)))
MERCURY_GEN_PROC(migrate_keys_out_t,
((int32_t)(ret)))
// ------------- MIGRATE KEY RANGE ----------- //
MERCURY_GEN_PROC(migrate_key_range_in_t,
((uint64_t)(source_db_id))\
((hg_string_t)(target_addr))\
((uint16_t)(target_provider_id))\
((uint64_t)(target_db_id))\
((kv_data_t)(key_lb))\
((kv_data_t)(key_ub))\
((int32_t)(flag)))
// ------------- MIGRATE KEY PREFIXED -------- //
MERCURY_GEN_PROC(migrate_keys_prefixed_in_t,
((uint64_t)(source_db_id))\
((hg_string_t)(target_addr))\
((uint16_t)(target_provider_id))\
((uint64_t)(target_db_id))\
((kv_data_t)(key_prefix))\
((int32_t)(flag)))
// ------------- MIGRATE ALL KEYS ----------- //
MERCURY_GEN_PROC(migrate_all_keys_in_t,
((uint64_t)(source_db_id))\
((hg_string_t)(target_addr))\
((uint16_t)(target_provider_id))\
((uint64_t)(target_db_id))\
((int32_t)(flag)))
// ------------- MIGRATE DATABASE ----------- //
MERCURY_GEN_PROC(migrate_database_in_t,
((uint64_t)(source_db_id))\
((hg_string_t)(target_addr))\
((uint16_t)(target_provider_id))\
((int32_t)(flag)))
MERCURY_GEN_PROC(migrate_database_out_t,
((int32_t)(ret))\
((uint64_t)(db_id)))
#endif
This diff is collapsed.
#!/bin/bash -x
if [ -z $srcdir ]; then
echo srcdir variable not set.
exit 1
fi
source $srcdir/test/test-util.sh
find_db_name
# start a server with 2 second wait,
# 20s timeout, and my_test_db as database
a="A"
test_db_nameA=${test_db_name}$a
test_db_full="${test_db_nameA}:${test_db_type}"
test_start_server 2 20 $test_db_full
svr_addrA=$svr_addr
b="B"
test_db_nameB=${test_db_name}$b
test_db_full="${test_db_nameB}:${test_db_type}"
test_start_server 2 20 $test_db_full
svr_addrB=$svr_addr