Commit c7a8dd4b authored by Matthieu Dorier's avatar Matthieu Dorier

implementated migration functionality

parent 8f4dfc0a
......@@ -153,6 +153,12 @@ if test "x${bwtree_backend}" == xyes ; then
CXXFLAGS="-pthread -g -Wall -mcx16 -Wno-invalid-offsetof ${CXXFLAGS}"
fi
PKG_CHECK_MODULES([REMI],[remi],[],
[AC_MSG_ERROR([Could not find working remi installation!])])
LIBS="$REMI_LIBS $LIBS"
CPPFLAGS="$REMI_CFLAGS $CPPFLAGS"
CFLAGS="$REMI_CFLAGS $CFLAGS"
AM_CONDITIONAL([BUILD_BDB], [test "x${berkelydb_backend}" == xyes])
AM_CONDITIONAL([BUILD_LEVELDB], [test "x${leveldb_backend}" == xyes])
AM_CONDITIONAL([BUILD_BWTREE], [test "x${bwtree_backend}" == xyes])
......
......@@ -517,6 +517,29 @@ int sdskv_migrate_all_keys(
sdskv_database_id_t target_db_id,
int flag);
/**
* @brief Migrate an entire database to a target destination provider.
* Note that the database will not have the same id at the destination
* so the user should call sdskv_open to re-open the database at its
* destination.
*
* @param[in] source Source provider.
* @param[in] source_db_id Source provider id.
* @param[in] dest_addr Address of the destination provider.
* @param[in] dest_provider_id Provider id of the destination provider.
* @param[in] dest_root Root path at the destination.
* @param[in] flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_ORIGINAL
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
int sdskv_migrate_database(
sdskv_provider_handle_t source,
sdskv_database_id_t source_db_id,
const char* dest_addr,
uint16_t dest_provider_id,
const char* dest_root,
int flag);
/**
* Shuts down a remote SDSKV service (given an address).
* This will shutdown all the providers on the target address.
......
......@@ -35,6 +35,7 @@ typedef uint64_t sdskv_database_id_t;
#define SDSKV_ERR_MIGRATION -12 /* Error during data migration */
#define SDSKV_OP_NOT_IMPL -13 /* Operation not implemented for this backend */
#define SDSKV_ERR_COMP_FUNC -14 /* Comparison function does not exist */
#define SDSKV_ERR_REMI -15 /* REMI-related error */
#if defined(__cplusplus)
}
......
......@@ -7,6 +7,6 @@ Name: sdskv-server
Description: services-based keyval server
Version: 0.1
URL: https://xgitlab.cels.anl.gov/sds/sds-keyval
Requires: margo @SERVER_DEPS_PKG@
Requires: margo remi @SERVER_DEPS_PKG@
Libs: -L${libdir} -lsdskv-server @SERVER_LIBS_EXT@
Cflags: -I${includedir}
......@@ -33,6 +33,9 @@ BerkeleyDBDataStore::~BerkeleyDBDataStore() {
bool BerkeleyDBDataStore::openDatabase(const std::string& db_name, const std::string& db_path) {
int status = 0;
_name = db_name;
_path = db_path;
if (!db_path.empty()) {
mkdirs(db_path.c_str());
}
......@@ -128,7 +131,8 @@ bool BerkeleyDBDataStore::openDatabase(const std::string& db_name, const std::st
return (status == 0);
};
void BerkeleyDBDataStore::set_comparison_function(comparator_fn less) {
void BerkeleyDBDataStore::set_comparison_function(const std::string& name, comparator_fn less) {
_comp_fun_name = name;
_wrapper->_less = less;
}
......@@ -369,3 +373,20 @@ int BerkeleyDBDataStore::compkeys(Db *db, const Dbt *dbt1, const Dbt *dbt2, size
return 0;
}
}
remi_fileset_t BerkeleyDBDataStore::create_and_populate_fileset() const {
remi_fileset_t fileset = REMI_FILESET_NULL;
std::string local_root = _path;
int ret;
if(_path[_path.size()-1] != '/')
local_root += "/";
remi_fileset_create("sdskv", local_root.c_str(), &fileset);
remi_fileset_register_file(fileset, _name.c_str());
remi_fileset_register_metadata(fileset, "database_type", "berkeleydb");
remi_fileset_register_metadata(fileset, "comparison_function", _comp_fun_name.c_str());
remi_fileset_register_metadata(fileset, "database_name", _name.c_str());
if(_no_overwrite) {
remi_fileset_register_metadata(fileset, "no_overwrite", "");
}
return fileset;
}
......@@ -35,10 +35,11 @@ class BerkeleyDBDataStore : public AbstractDataStore {
virtual bool exists(const ds_bulk_t &key);
virtual bool erase(const ds_bulk_t &key);
virtual void set_in_memory(bool enable); // enable/disable in-memory mode
virtual void set_comparison_function(comparator_fn less);
virtual void set_comparison_function(const std::string& name, comparator_fn less);
virtual void set_no_overwrite() {
_no_overwrite = true;
}
remi_fileset_t create_and_populate_fileset() const;
protected:
virtual std::vector<ds_bulk_t> vlist_keys(
const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix) const;
......@@ -51,7 +52,6 @@ class BerkeleyDBDataStore : public AbstractDataStore {
DbEnv *_dbenv = nullptr;
Db *_dbm = nullptr;
DbWrapper* _wrapper = nullptr;
bool _no_overwrite = false;
};
#endif // bdb_datastore_h
......@@ -5,6 +5,7 @@
#include "kv-config.h"
#include "bulk.h"
#include "remi/remi-common.h"
#include <vector>
......@@ -25,8 +26,21 @@ class AbstractDataStore {
virtual bool exists(const ds_bulk_t &key) = 0;
virtual bool erase(const ds_bulk_t &key) = 0;
virtual void set_in_memory(bool enable)=0; // enable/disable in-memory mode (where supported)
virtual void set_comparison_function(comparator_fn less)=0;
virtual void set_comparison_function(const std::string& name, comparator_fn less)=0;
virtual void set_no_overwrite()=0;
virtual remi_fileset_t create_and_populate_fileset() const = 0;
const std::string& get_path() const {
return _path;
}
const std::string& get_name() const {
return _name;
}
const std::string& get_comparison_function_name() const {
return _comp_fun_name;
}
std::vector<ds_bulk_t> list_keys(
const ds_bulk_t &start_key, size_t count, const ds_bulk_t& prefix=ds_bulk_t()) const {
......@@ -49,7 +63,11 @@ class AbstractDataStore {
}
protected:
std::string _path;
std::string _name;
std::string _comp_fun_name;
Duplicates _duplicates;
bool _no_overwrite = false;
bool _eraseOnGet;
bool _debug;
bool _in_memory;
......
......@@ -36,6 +36,9 @@ LevelDBDataStore::~LevelDBDataStore() {
};
bool LevelDBDataStore::openDatabase(const std::string& db_name, const std::string& db_path) {
_name = db_name;
_path = db_path;
leveldb::Options options;
leveldb::Status status;
......@@ -57,7 +60,8 @@ bool LevelDBDataStore::openDatabase(const std::string& db_name, const std::strin
return true;
};
void LevelDBDataStore::set_comparison_function(comparator_fn less) {
void LevelDBDataStore::set_comparison_function(const std::string& name, comparator_fn less) {
_comp_fun_name = name;
_less = less;
}
......@@ -233,3 +237,20 @@ std::vector<std::pair<ds_bulk_t,ds_bulk_t>> LevelDBDataStore::vlist_keyval_range
throw SDSKV_OP_NOT_IMPL;
return result;
}
remi_fileset_t LevelDBDataStore::create_and_populate_fileset() const {
remi_fileset_t fileset = REMI_FILESET_NULL;
std::string local_root = _path;
int ret;
if(_path[_path.size()-1] != '/')
local_root += "/";
remi_fileset_create("sdskv", local_root.c_str(), &fileset);
remi_fileset_register_directory(fileset, (_name+"/").c_str());
remi_fileset_register_metadata(fileset, "database_type", "leveldb");
remi_fileset_register_metadata(fileset, "comparison_function", _comp_fun_name.c_str());
remi_fileset_register_metadata(fileset, "database_name", _name.c_str());
if(_no_overwrite) {
remi_fileset_register_metadata(fileset, "no_overwrite", "");
}
return fileset;
}
......@@ -48,10 +48,11 @@ class LevelDBDataStore : public AbstractDataStore {
virtual bool exists(const ds_bulk_t &key);
virtual bool erase(const ds_bulk_t &key);
virtual void set_in_memory(bool enable); // not supported, a no-op
virtual void set_comparison_function(comparator_fn less);
virtual void set_comparison_function(const std::string& name, comparator_fn less);
virtual void set_no_overwrite() {
_no_overwrite = true;
}
remi_fileset_t create_and_populate_fileset() const;
protected:
virtual std::vector<ds_bulk_t> vlist_keys(
const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix) const;
......@@ -67,7 +68,6 @@ class LevelDBDataStore : public AbstractDataStore {
ds_bulk_t fromString(const std::string &keystr);
AbstractDataStore::comparator_fn _less;
LevelDBDataStoreComparator _keycmp;
bool _no_overwrite = false;
};
#endif // ldb_datastore_h
......@@ -29,15 +29,17 @@ class MapDataStore : public AbstractDataStore {
public:
MapDataStore()
: AbstractDataStore(), _less(nullptr), _map(keycmp(this)), _no_overwrite(false) {}
: AbstractDataStore(), _less(nullptr), _map(keycmp(this)) {}
MapDataStore(Duplicates duplicates, bool eraseOnGet, bool debug)
: AbstractDataStore(duplicates, eraseOnGet, debug), _less(nullptr), _map(keycmp(this)),
_no_overwrite(false) {}
: AbstractDataStore(duplicates, eraseOnGet, debug), _less(nullptr), _map(keycmp(this))
{}
~MapDataStore() = default;
virtual bool openDatabase(const std::string& db_name, const std::string& path) {
_name = db_name;
_path = path;
_map.clear();
return true;
}
......@@ -81,7 +83,8 @@ class MapDataStore : public AbstractDataStore {
_in_memory = enable;
}
virtual void set_comparison_function(comparator_fn less) {
virtual void set_comparison_function(const std::string& name, comparator_fn less) {
_comp_fun_name = name;
_less = less;
}
......@@ -89,6 +92,10 @@ class MapDataStore : public AbstractDataStore {
_no_overwrite = true;
}
remi_fileset_t create_and_populate_fileset() const {
return REMI_FILESET_NULL;
}
protected:
virtual std::vector<ds_bulk_t> vlist_keys(
......@@ -185,7 +192,6 @@ class MapDataStore : public AbstractDataStore {
private:
AbstractDataStore::comparator_fn _less;
std::map<ds_bulk_t, ds_bulk_t, keycmp> _map;
bool _no_overwrite;
};
#endif
......@@ -25,6 +25,7 @@ struct sdskv_client {
hg_id_t sdskv_migrate_key_range_id;
hg_id_t sdskv_migrate_keys_prefixed_id;
hg_id_t sdskv_migrate_all_keys_id;
hg_id_t sdskv_migrate_database_id;
uint64_t num_provider_handles;
};
......@@ -65,6 +66,7 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
margo_registered_name(mid, "sdskv_migrate_key_range_rpc", &client->sdskv_migrate_key_range_id, &flag);
margo_registered_name(mid, "sdskv_migrate_keys_prefixed_rpc", &client->sdskv_migrate_keys_prefixed_id, &flag);
margo_registered_name(mid, "sdskv_migrate_all_keys_rpc", &client->sdskv_migrate_all_keys_id, &flag);
margo_registered_name(mid, "sdskv_migrate_database_rpc", &client->sdskv_migrate_database_id, &flag);
} else {
......@@ -104,6 +106,8 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
MARGO_REGISTER(mid, "sdskv_migrate_keys_prefixed_rpc", migrate_keys_prefixed_in_t, migrate_keys_out_t, NULL);
client->sdskv_migrate_all_keys_id =
MARGO_REGISTER(mid, "sdskv_migrate_all_keys_rpc", migrate_all_keys_in_t, migrate_keys_out_t, NULL);
client->sdskv_migrate_database_id =
MARGO_REGISTER(mid, "sdskv_migrate_database_rpc", migrate_database_in_t, migrate_database_out_t, NULL);
}
return SDSKV_SUCCESS;
......@@ -1450,6 +1454,51 @@ finish:
return ret;
}
int sdskv_migrate_database(
sdskv_provider_handle_t source,
sdskv_database_id_t source_db_id,
const char* dest_addr,
uint16_t dest_provider_id,
const char* dest_root,
int flag)
{
hg_return_t hret;
hg_handle_t handle;
migrate_database_in_t in;
migrate_database_out_t out;
int ret;
in.source_db_id = source_db_id;
in.remove_src = flag;
in.dest_remi_addr = dest_addr;
in.dest_remi_provider_id = dest_provider_id;
in.dest_root = dest_root;
hret = margo_create(source->client->mid, source->addr,
source->client->sdskv_migrate_database_id, &handle);
if(hret != HG_SUCCESS)
return SDSKV_ERR_MERCURY;
hret = margo_provider_forward(source->provider_id, handle, &in);
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
}
ret = out.ret;
margo_free_output(handle, &out);
margo_destroy(handle);
return ret;
}
int sdskv_shutdown_service(sdskv_client_t client, hg_addr_t addr)
{
......
......@@ -191,5 +191,15 @@ MERCURY_GEN_PROC(migrate_all_keys_in_t,
((uint64_t)(target_db_id))\
((int32_t)(flag)))
// ------------- MIGRATE DATABASE ---------- //
MERCURY_GEN_PROC(migrate_database_in_t,
((uint64_t)(source_db_id))\
((int32_t)(remove_src))\
((hg_const_string_t)(dest_remi_addr))\
((uint16_t)(dest_remi_provider_id))\
((hg_const_string_t)(dest_root)))
MERCURY_GEN_PROC(migrate_database_out_t,
((int32_t)(ret)))
#endif
......@@ -7,6 +7,8 @@
#include <map>
#include <iostream>
#include <unordered_map>
#include <remi/remi-client.h>
#include <remi/remi-server.h>
#define SDSKV
#include "datastore/datastore_factory.h"
#include "sdskv-rpc-types.h"
......@@ -18,6 +20,8 @@ struct sdskv_server_context_t
std::map<std::string, sdskv_database_id_t> name2id;
std::map<sdskv_database_id_t, std::string> id2name;
std::map<std::string, sdskv_compare_fn> compfunctions;
remi_client_t remi_client;
remi_provider_t remi_provider;
hg_id_t sdskv_put_id;
hg_id_t sdskv_put_multi_id;
......@@ -71,9 +75,14 @@ DECLARE_MARGO_RPC_HANDLER(sdskv_migrate_keys_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_migrate_key_range_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_migrate_keys_prefixed_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_migrate_all_keys_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_migrate_database_ult)
static void sdskv_server_finalize_cb(void *data);
static int sdskv_pre_migration_callback(remi_fileset_t fileset, void* uargs);
static int sdskv_post_migration_callback(remi_fileset_t fileset, void* uargs);
extern "C" int sdskv_provider_register(
margo_instance_id mid,
uint16_t provider_id,
......@@ -93,6 +102,16 @@ extern "C" int sdskv_provider_register(
}
}
/* check if a REMI provider exists with the same provider id */
{
int flag;
remi_provider_registered(mid, provider_id, &flag, NULL, NULL);
if(flag) {
fprintf(stderr, "sdskv_provider_register(): a REMI provider with the same (%d) already exists\n", provider_id);
return SDSKV_ERR_REMI;
}
}
/* allocate the resulting structure */
tmp_svr_ctx = new sdskv_server_context_t;
if(!tmp_svr_ctx)
......@@ -191,9 +210,33 @@ extern "C" int sdskv_provider_register(
sdskv_migrate_all_keys_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_migrate_all_keys_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_migrate_database_rpc",
migrate_database_in_t, migrate_database_out_t,
sdskv_migrate_database_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_migrate_database_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
/* install the bake server finalize callback */
margo_push_finalize_callback(mid, &sdskv_server_finalize_cb, tmp_svr_ctx);
/* register a REMI client */
int ret = remi_client_init(mid, &(tmp_svr_ctx->remi_client));
if(ret != REMI_SUCCESS) {
return SDSKV_ERR_REMI;
}
/* register a REMI provider */
ret = remi_provider_register(mid, provider_id, abt_pool, &(tmp_svr_ctx->remi_provider));
if(ret != REMI_SUCCESS) {
return SDSKV_ERR_REMI;
}
ret = remi_provider_register_migration_class(tmp_svr_ctx->remi_provider,
"bake", sdskv_pre_migration_callback,
sdskv_post_migration_callback, NULL, tmp_svr_ctx);
if(ret != REMI_SUCCESS) {
return SDSKV_ERR_REMI;
}
if(provider != SDSKV_PROVIDER_IGNORE)
*provider = tmp_svr_ctx;
......@@ -230,7 +273,7 @@ extern "C" int sdskv_provider_attach_database(
std::string(config->db_name), std::string(config->db_path));
if(db == nullptr) return SDSKV_ERR_DB_CREATE;
if(comp_fn) {
db->set_comparison_function(comp_fn);
db->set_comparison_function(config->db_comp_fn_name, comp_fn);
}
sdskv_database_id_t id = (sdskv_database_id_t)(db);
if(config->db_no_overwrite) {
......@@ -1934,6 +1977,91 @@ static void sdskv_migrate_all_keys_ult(hg_handle_t handle)
}
DEFINE_MARGO_RPC_HANDLER(sdskv_migrate_all_keys_ult)
static void sdskv_migrate_database_ult(hg_handle_t handle)
{
migrate_database_in_t in;
in.dest_remi_addr = NULL;
in.dest_root = NULL;
migrate_database_out_t out;
hg_addr_t dest_addr = HG_ADDR_NULL;
hg_return_t hret;
margo_instance_id mid;
int ret;
remi_provider_handle_t remi_ph = REMI_PROVIDER_HANDLE_NULL;
remi_fileset_t local_fileset = REMI_FILESET_NULL;
memset(&out, 0, sizeof(out));
do {
mid = margo_hg_handle_get_instance(handle);
assert(mid);
const struct hg_info* info = margo_get_info(handle);
sdskv_provider_t svr_ctx = static_cast<sdskv_provider_t>(margo_registered_data(mid, info->id));
if(!svr_ctx) {
out.ret = SDSKV_ERR_UNKNOWN_PR;
break;
}
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS)
{
out.ret = SDSKV_ERR_MERCURY;
break;
}
// find the database that needs to be migrated
auto it = svr_ctx->databases.find(in.source_db_id);
if(it == svr_ctx->databases.end()) {
out.ret = SDSKV_ERR_UNKNOWN_DB;
break;
}
auto database = it->second;
/* lookup the address of the destination REMI provider */
hret = margo_addr_lookup(mid, in.dest_remi_addr, &dest_addr);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
break;
}
/* use the REMI client to create a REMI provider handle */
ret = remi_provider_handle_create(svr_ctx->remi_client,
dest_addr, in.dest_remi_provider_id, &remi_ph);
if(ret != REMI_SUCCESS) {
out.ret = SDSKV_ERR_REMI;
break;
}
/* create a fileset */
remi_fileset_t fileset = database->create_and_populate_fileset();
if(fileset == REMI_FILESET_NULL) {
out.ret = SDSKV_OP_NOT_IMPL;
break;
}
/* issue the migration */
int status = 0;
ret = remi_fileset_migrate(remi_ph, local_fileset, in.dest_root, in.remove_src, &status);
if(ret != REMI_SUCCESS) {
out.ret = SDSKV_ERR_REMI;
break;
}
/* remove the target from the list of managed targets */
sdskv_provider_remove_database(svr_ctx, in.source_db_id);
out.ret = SDSKV_SUCCESS;
} while(false);
remi_fileset_free(local_fileset);
remi_provider_handle_release(remi_ph);
margo_addr_free(mid, dest_addr);
margo_free_input(handle, &in);
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(sdskv_migrate_database_ult)
static void sdskv_server_finalize_cb(void *data)
{
sdskv_provider_t svr_ctx = (sdskv_provider_t)data;
......@@ -1946,3 +2074,82 @@ static void sdskv_server_finalize_cb(void *data)
return;
}
struct migration_metadata {
std::unordered_map<std::string,std::string> _metadata;
};
static void get_metadata(const char* key, const char* value, void* uargs) {
auto md = static_cast<migration_metadata*>(uargs);
md->_metadata[key] = value;
}
static int sdskv_pre_migration_callback(remi_fileset_t fileset, void* uargs)
{
sdskv_provider_t provider = (sdskv_provider_t)uargs;
migration_metadata md;
remi_fileset_foreach_metadata(fileset, get_metadata, static_cast<void*>(&md));
// (1) check the metadata
if(md._metadata.find("database_type") == md._metadata.end()
|| md._metadata.find("database_name") == md._metadata.end()
|| md._metadata.find("comparison_function") == md._metadata.end()) {
return -1;
}
std::string db_name = md._metadata["database_name"];
std::string db_type = md._metadata["database_type"];
std::string comp_fn = md._metadata["comparison_function"];
// (2) check that there isn't a database with the same name
if(provider->name2id.find(db_name) != provider->name2id.end()) {
return -1;
}
// (3) check that the type of database is ok to migrate
if(db_type != "berkeleydb" && db_type != "leveldb") {
return -1;
}
// (4) check that the comparison function exists
if(provider->compfunctions.find(comp_fn) == provider->compfunctions.end()) {
return -1;
}
// all is fine
return 0;
}
static int sdskv_post_migration_callback(remi_fileset_t fileset, void* uargs)
{
sdskv_provider_t provider = (sdskv_provider_t)uargs;
migration_metadata md;
remi_fileset_foreach_metadata(fileset, get_metadata, static_cast<void*>(&md));
// (1) check the metadata
if(md._metadata.find("database_type") == md._metadata.end()
|| md._metadata.find("database_name") == md._metadata.end()
|| md._metadata.find("comparison_function") == md._metadata.end()) {
return -1;
}
std::string db_name = md._metadata["database_name"];
std::string db_type = md._metadata["database_type"];
std::string comp_fn = md._metadata["comparison_function"];
std::vector<char> db_root;
size_t root_size = 0;
remi_fileset_get_root(fileset, NULL, &root_size);
db_root.resize(root_size+1);
remi_fileset_get_root(fileset, db_root.data(), &root_size);
sdskv_config_t config;
config.db_name = db_name.c_str();
config.db_path = db_root.data();
if(db_type == "berkeleydb")
config.db_type = KVDB_BERKELEYDB;
else if(db_type == "leveldb")
config.db_type = KVDB_LEVELDB;
config.db_comp_fn_name = comp_fn.c_str();
if(md._metadata.find("no_overwrite") != md._metadata.end())
config.db_no_overwrite = 1;
else
config.db_no_overwrite = 0;
sdskv_database_id_t db_id;
int ret = sdskv_provider_attach_database(provider, &config, &db_id);
if(ret != SDSKV_SUCCESS)
return -1;
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment