Commit 6e142876 authored by Matthieu Dorier's avatar Matthieu Dorier

added user-defined migration callbacks

parent 14223202
......@@ -32,6 +32,9 @@ typedef struct sdskv_config_t {
#define SDSKV_CONFIG_DEFAULT { "", "", KVDB_MAP, SDSKV_COMPARE_DEFAULT, 0 }
typedef void (*sdskv_pre_migration_callback_fn)(sdskv_provider_t, const sdskv_config_t*, void*);
typedef void (*sdskv_post_migration_callback_fn)(sdskv_provider_t, const sdskv_config_t*, sdskv_database_id_t, void*);
/**
* @brief Creates a new provider.
*
......@@ -143,6 +146,23 @@ int sdskv_provider_compute_database_size(
sdskv_database_id_t database_id,
size_t* size);
/**
* @brief Register custom migration callbacks to call before and
* after a database is migrated to this provider.
*
* @param provider Provider in which to register the callbacks.
* @param pre_cb Pre-migration callback.
* @param post_cb Post-migration callback.
* @param uargs User arguments.
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
int sdskv_provider_set_migration_callbacks(
sdskv_provider_t provider,
sdskv_pre_migration_callback_fn pre_cb,
sdskv_post_migration_callback_fn post_cb,
void* uargs);
#ifdef __cplusplus
}
#endif
......
......@@ -22,6 +22,9 @@ struct sdskv_server_context_t
std::map<std::string, sdskv_compare_fn> compfunctions;
remi_client_t remi_client;
remi_provider_t remi_provider;
sdskv_pre_migration_callback_fn pre_migration_callback;
sdskv_post_migration_callback_fn post_migration_callback;
void* migration_uargs;
ABT_rwlock lock; // write-locked during migration, read-locked by all other
// operations. There should be something better to avoid locking everything
......@@ -124,6 +127,9 @@ extern "C" int sdskv_provider_register(
tmp_svr_ctx->remi_client = REMI_CLIENT_NULL;
tmp_svr_ctx->remi_provider = REMI_PROVIDER_NULL;
tmp_svr_ctx->pre_migration_callback = NULL;
tmp_svr_ctx->post_migration_callback = NULL;
tmp_svr_ctx->migration_uargs = NULL;
/* Create rwlock */
ret = ABT_rwlock_create(&(tmp_svr_ctx->lock));
......@@ -263,7 +269,7 @@ extern "C" int sdskv_provider_register(
return SDSKV_SUCCESS;
}
int sdskv_provider_add_comparison_function(
extern "C" int sdskv_provider_add_comparison_function(
sdskv_provider_t provider,
const char* function_name,
sdskv_compare_fn comp_fn)
......@@ -365,6 +371,45 @@ extern "C" int sdskv_provider_list_databases(
return SDSKV_SUCCESS;
}
extern "C" int sdskv_provider_compute_database_size(
sdskv_provider_t provider,
sdskv_database_id_t database_id,
size_t* size)
{
int ret;
// find the database
auto it = provider->databases.find(database_id);
if(it == provider->databases.end()) {
return SDSKV_ERR_UNKNOWN_DB;
}
auto database = it->second;
database->sync();
/* create a fileset */
remi_fileset_t fileset = database->create_and_populate_fileset();
if(fileset == REMI_FILESET_NULL) {
return SDSKV_OP_NOT_IMPL;
}
/* issue the migration */
ret = remi_fileset_compute_size(fileset, 0, size);
if(ret != REMI_SUCCESS) {
return SDSKV_ERR_REMI;
}
return SDSKV_SUCCESS;
}
extern "C" int sdskv_provider_set_migration_callbacks(
sdskv_provider_t provider,
sdskv_pre_migration_callback_fn pre_cb,
sdskv_post_migration_callback_fn post_cb,
void* uargs)
{
provider->pre_migration_callback = pre_cb;
provider->post_migration_callback = post_cb;
provider->migration_uargs = uargs;
return SDSKV_SUCCESS;
}
static void sdskv_put_ult(hg_handle_t handle)
{
hg_return_t hret;
......@@ -2188,6 +2233,11 @@ static int sdskv_pre_migration_callback(remi_fileset_t fileset, void* uargs)
std::string db_name = md._metadata["database_name"];
std::string db_type = md._metadata["database_type"];
std::string comp_fn = md._metadata["comparison_function"];
std::vector<char> db_root;
size_t root_size = 0;
remi_fileset_get_root(fileset, NULL, &root_size);
db_root.resize(root_size+1);
remi_fileset_get_root(fileset, db_root.data(), &root_size);
// (2) check that there isn't a database with the same name
if(provider->name2id.find(db_name) != provider->name2id.end()) {
return -102;
......@@ -2202,6 +2252,25 @@ static int sdskv_pre_migration_callback(remi_fileset_t fileset, void* uargs)
return -104;
}
}
// (5) fill up a config structure and call the user-defined pre-migration callback
if(provider->pre_migration_callback) {
sdskv_config_t config;
config.db_name = db_name.c_str();
config.db_path = db_root.data();
if(db_type == "berkeleydb")
config.db_type = KVDB_BERKELEYDB;
else if(db_type == "leveldb")
config.db_type = KVDB_LEVELDB;
if(comp_fn.size() != 0)
config.db_comp_fn_name = comp_fn.c_str();
else
config.db_comp_fn_name = NULL;
if(md._metadata.find("no_overwrite") != md._metadata.end())
config.db_no_overwrite = 1;
else
config.db_no_overwrite = 0;
(provider->pre_migration_callback)(provider, &config, provider->migration_uargs);
}
// all is fine
return 0;
}
......@@ -2242,32 +2311,10 @@ static int sdskv_post_migration_callback(remi_fileset_t fileset, void* uargs)
int ret = sdskv_provider_attach_database(provider, &config, &db_id);
if(ret != SDSKV_SUCCESS)
return -106;
return 0;
}
int sdskv_provider_compute_database_size(
sdskv_provider_t provider,
sdskv_database_id_t database_id,
size_t* size)
{
int ret;
// find the database
auto it = provider->databases.find(database_id);
if(it == provider->databases.end()) {
return SDSKV_ERR_UNKNOWN_DB;
}
auto database = it->second;
database->sync();
/* create a fileset */
remi_fileset_t fileset = database->create_and_populate_fileset();
if(fileset == REMI_FILESET_NULL) {
return SDSKV_OP_NOT_IMPL;
if(provider->post_migration_callback) {
(provider->post_migration_callback)(provider, &config, db_id, provider->migration_uargs);
}
/* issue the migration */
ret = remi_fileset_compute_size(fileset, 0, size);
if(ret != REMI_SUCCESS) {
return SDSKV_ERR_REMI;
}
return SDSKV_SUCCESS;
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment