Commit ab646472 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

added sdskv_provider_destroy function

parent 4ec0955b
......@@ -52,6 +52,15 @@ int sdskv_provider_register(
ABT_pool pool,
sdskv_provider_t* provider);
/**
* @brief Deregister the provider's RPCs and destroys the provider.
*
* @param provider Provider to destroy.
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
int sdskv_provider_destroy(sdskv_provider_t provider);
/**
* @brief Registers a comparison function for databases to use.
*
......
......@@ -18,12 +18,15 @@
struct sdskv_server_context_t
{
margo_instance_id mid;
std::unordered_map<sdskv_database_id_t, AbstractDataStore*> databases;
std::map<std::string, sdskv_database_id_t> name2id;
std::map<sdskv_database_id_t, std::string> id2name;
std::map<std::string, sdskv_compare_fn> compfunctions;
#ifdef USE_REMI
int owns_remi_provider;
remi_client_t remi_client;
remi_provider_t remi_provider;
sdskv_pre_migration_callback_fn pre_migration_callback;
......@@ -129,7 +132,10 @@ extern "C" int sdskv_provider_register(
if(!tmp_svr_ctx)
return SDSKV_ERR_ALLOCATION;
tmp_svr_ctx->mid = mid;
#ifdef USE_REMI
tmp_svr_ctx->owns_remi_provider = 0;
tmp_svr_ctx->remi_client = REMI_CLIENT_NULL;
tmp_svr_ctx->remi_provider = REMI_PROVIDER_NULL;
tmp_svr_ctx->pre_migration_callback = NULL;
......@@ -151,102 +157,122 @@ extern "C" int sdskv_provider_register(
sdskv_open_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_open_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_count_databases_rpc",
void, count_db_out_t,
sdskv_count_db_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_count_databases_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_list_databases_rpc",
list_db_in_t, list_db_out_t,
sdskv_list_db_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_list_databases_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_put_rpc",
put_in_t, put_out_t,
sdskv_put_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_put_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_put_multi_rpc",
put_multi_in_t, put_multi_out_t,
sdskv_put_multi_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_put_multi_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_bulk_put_rpc",
bulk_put_in_t, bulk_put_out_t,
sdskv_bulk_put_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_bulk_put_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_get_rpc",
get_in_t, get_out_t,
sdskv_get_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_get_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_get_multi_rpc",
get_multi_in_t, get_multi_out_t,
sdskv_get_multi_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_get_multi_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_length_rpc",
length_in_t, length_out_t,
sdskv_length_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_length_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_length_multi_rpc",
length_multi_in_t, length_multi_out_t,
sdskv_length_multi_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_length_multi_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_exists_rpc",
exists_in_t, exists_out_t,
sdskv_exists_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_exists_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_bulk_get_rpc",
bulk_get_in_t, bulk_get_out_t,
sdskv_bulk_get_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_bulk_get_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_list_keys_rpc",
list_keys_in_t, list_keys_out_t,
sdskv_list_keys_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_list_keys_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_list_keyvals_rpc",
list_keyvals_in_t, list_keyvals_out_t,
sdskv_list_keyvals_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_list_keyvals_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_erase_rpc",
erase_in_t, erase_out_t,
sdskv_erase_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_erase_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_erase_multi_rpc",
erase_multi_in_t, erase_multi_out_t,
sdskv_erase_multi_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_erase_multi_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
/* migration RPC */
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_migrate_keys_rpc",
migrate_keys_in_t, migrate_keys_out_t,
sdskv_migrate_keys_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_migrate_keys_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_migrate_key_range_rpc",
migrate_key_range_in_t, migrate_keys_out_t,
sdskv_migrate_key_range_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_migrate_key_range_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_migrate_keys_prefixed_rpc",
migrate_keys_prefixed_in_t, migrate_keys_out_t,
sdskv_migrate_keys_prefixed_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_migrate_keys_prefixed_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_migrate_all_keys_rpc",
migrate_all_keys_in_t, migrate_keys_out_t,
sdskv_migrate_all_keys_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_migrate_all_keys_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_migrate_database_rpc",
migrate_database_in_t, migrate_database_out_t,
sdskv_migrate_database_ult, provider_id, abt_pool);
......@@ -268,6 +294,7 @@ extern "C" int sdskv_provider_register(
remi_provider_registered(mid, provider_id, &flag, NULL, NULL, &remi_provider);
if(flag) { /* a REMI provider exists */
tmp_svr_ctx->remi_provider = remi_provider;
tmp_svr_ctx->owns_remi_provider = 0;
} else {
/* register a REMI provider because it does not exist */
ret = remi_provider_register(mid, ABT_IO_INSTANCE_NULL, provider_id, abt_pool, &(tmp_svr_ctx->remi_provider));
......@@ -275,6 +302,7 @@ extern "C" int sdskv_provider_register(
sdskv_server_finalize_cb(tmp_svr_ctx);
return SDSKV_ERR_REMI;
}
tmp_svr_ctx->owns_remi_provider = 1;
}
ret = remi_provider_register_migration_class(tmp_svr_ctx->remi_provider,
"sdskv", sdskv_pre_migration_callback,
......@@ -287,7 +315,7 @@ extern "C" int sdskv_provider_register(
#endif
/* install the bake server finalize callback */
margo_push_finalize_callback(mid, &sdskv_server_finalize_cb, tmp_svr_ctx);
margo_provider_push_finalize_callback(mid, tmp_svr_ctx, &sdskv_server_finalize_cb, tmp_svr_ctx);
if(provider != SDSKV_PROVIDER_IGNORE)
*provider = tmp_svr_ctx;
......@@ -295,6 +323,13 @@ extern "C" int sdskv_provider_register(
return SDSKV_SUCCESS;
}
extern "C" int sdskv_provider_destroy(sdskv_provider_t provider)
{
margo_provider_pop_finalize_callback(provider->mid, provider);
sdskv_server_finalize_cb(provider);
return SDSKV_SUCCESS;
}
extern "C" int sdskv_provider_add_comparison_function(
sdskv_provider_t provider,
const char* function_name,
......@@ -2388,14 +2423,44 @@ DEFINE_MARGO_RPC_HANDLER(sdskv_migrate_database_ult)
static void sdskv_server_finalize_cb(void *data)
{
sdskv_provider_t svr_ctx = (sdskv_provider_t)data;
assert(svr_ctx);
sdskv_provider_remove_all_databases(svr_ctx);
sdskv_provider_t provider = (sdskv_provider_t)data;
assert(provider);
margo_instance_id mid = provider->mid;
ABT_rwlock_free(&(svr_ctx->lock));
#ifdef USE_REMI
if(provider->owns_remi_provider) {
remi_provider_destroy(provider->remi_provider);
}
remi_client_finalize(provider->remi_client);
#endif
delete svr_ctx;
sdskv_provider_remove_all_databases(provider);
margo_deregister(mid, provider->sdskv_open_id);
margo_deregister(mid, provider->sdskv_count_databases_id);
margo_deregister(mid, provider->sdskv_list_databases_id);
margo_deregister(mid, provider->sdskv_put_id);
margo_deregister(mid, provider->sdskv_put_multi_id);
margo_deregister(mid, provider->sdskv_bulk_put_id);
margo_deregister(mid, provider->sdskv_get_id);
margo_deregister(mid, provider->sdskv_get_multi_id);
margo_deregister(mid, provider->sdskv_exists_id);
margo_deregister(mid, provider->sdskv_erase_id);
margo_deregister(mid, provider->sdskv_erase_multi_id);
margo_deregister(mid, provider->sdskv_length_id);
margo_deregister(mid, provider->sdskv_length_multi_id);
margo_deregister(mid, provider->sdskv_bulk_get_id);
margo_deregister(mid, provider->sdskv_list_keys_id);
margo_deregister(mid, provider->sdskv_list_keyvals_id);
margo_deregister(mid, provider->sdskv_migrate_keys_id);
margo_deregister(mid, provider->sdskv_migrate_key_range_id);
margo_deregister(mid, provider->sdskv_migrate_keys_prefixed_id);
margo_deregister(mid, provider->sdskv_migrate_all_keys_id);
margo_deregister(mid, provider->sdskv_migrate_database_id);
ABT_rwlock_free(&(provider->lock));
delete provider;
return;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment