Commit 092c8bfd authored by Matthieu Dorier's avatar Matthieu Dorier

added pretection mechanism for migration

parent 1324afec
......@@ -36,6 +36,7 @@ typedef uint64_t sdskv_database_id_t;
#define SDSKV_OP_NOT_IMPL -13 /* Operation not implemented for this backend */
#define SDSKV_ERR_COMP_FUNC -14 /* Comparison function does not exist */
#define SDSKV_ERR_REMI -15 /* REMI-related error */
#define SDSKV_ERR_ARGOBOTS -16 /* Argobots related error */
#if defined(__cplusplus)
}
......
......@@ -23,6 +23,10 @@ struct sdskv_server_context_t
remi_client_t remi_client;
remi_provider_t remi_provider;
ABT_rwlock lock; // write-locked during migration, read-locked by all other
// operations. There should be something better to avoid locking everything
// but we are going with that for simplicity for now.
hg_id_t sdskv_put_id;
hg_id_t sdskv_put_multi_id;
hg_id_t sdskv_bulk_put_id;
......@@ -90,6 +94,7 @@ extern "C" int sdskv_provider_register(
sdskv_provider_t* provider)
{
sdskv_server_context_t *tmp_svr_ctx;
int ret;
/* check if a provider with the same multiplex id already exists */
{
......@@ -117,6 +122,16 @@ extern "C" int sdskv_provider_register(
if(!tmp_svr_ctx)
return SDSKV_ERR_ALLOCATION;
tmp_svr_ctx->remi_client = REMI_CLIENT_NULL;
tmp_svr_ctx->remi_provider = REMI_PROVIDER_NULL;
/* Create rwlock */
ret = ABT_rwlock_create(&(tmp_svr_ctx->lock));
if(ret != ABT_SUCCESS) {
free(tmp_svr_ctx);
return SDSKV_ERR_ARGOBOTS;
}
/* register RPCs */
hg_id_t rpc_id;
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_put_rpc",
......@@ -216,27 +231,30 @@ extern "C" int sdskv_provider_register(
tmp_svr_ctx->sdskv_migrate_database_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
/* install the bake server finalize callback */
margo_push_finalize_callback(mid, &sdskv_server_finalize_cb, tmp_svr_ctx);
/* register a REMI client */
int ret = remi_client_init(mid, &(tmp_svr_ctx->remi_client));
ret = remi_client_init(mid, &(tmp_svr_ctx->remi_client));
if(ret != REMI_SUCCESS) {
sdskv_server_finalize_cb(tmp_svr_ctx);
return SDSKV_ERR_REMI;
}
/* register a REMI provider */
ret = remi_provider_register(mid, provider_id, abt_pool, &(tmp_svr_ctx->remi_provider));
if(ret != REMI_SUCCESS) {
sdskv_server_finalize_cb(tmp_svr_ctx);
return SDSKV_ERR_REMI;
}
ret = remi_provider_register_migration_class(tmp_svr_ctx->remi_provider,
"sdskv", sdskv_pre_migration_callback,
sdskv_post_migration_callback, NULL, tmp_svr_ctx);
if(ret != REMI_SUCCESS) {
sdskv_server_finalize_cb(tmp_svr_ctx);
return SDSKV_ERR_REMI;
}
/* install the bake server finalize callback */
margo_push_finalize_callback(mid, &sdskv_server_finalize_cb, tmp_svr_ctx);
if(provider != SDSKV_PROVIDER_IGNORE)
*provider = tmp_svr_ctx;
......@@ -293,6 +311,8 @@ extern "C" int sdskv_provider_remove_database(
sdskv_provider_t provider,
sdskv_database_id_t db_id)
{
ABT_rwlock_wrlock(provider->lock);
auto r = at_exit([provider]() { ABT_rwlock_unlock(provider->lock); });
if(provider->databases.count(db_id)) {
auto dbname = provider->id2name[db_id];
provider->id2name.erase(db_id);
......@@ -309,6 +329,8 @@ extern "C" int sdskv_provider_remove_database(
extern "C" int sdskv_provider_remove_all_databases(
sdskv_provider_t provider)
{
ABT_rwlock_wrlock(provider->lock);
auto r = at_exit([provider]() { ABT_rwlock_unlock(provider->lock); });
for(auto db : provider->databases) {
delete db.second;
}
......@@ -332,6 +354,8 @@ extern "C" int sdskv_provider_list_databases(
sdskv_database_id_t* targets)
{
unsigned i = 0;
ABT_rwlock_rdlock(provider->lock);
auto r = at_exit([provider]() { ABT_rwlock_unlock(provider->lock); });
for(auto p : provider->name2id) {
targets[i] = p.second;
i++;
......@@ -357,6 +381,9 @@ static void sdskv_put_ult(hg_handle_t handle)
margo_destroy(handle);
return;
}
ABT_rwlock_rdlock(svr_ctx->lock);
auto r = at_exit([svr_ctx]() { ABT_rwlock_unlock(svr_ctx->lock); });
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
......@@ -414,6 +441,9 @@ static void sdskv_put_multi_ult(hg_handle_t handle)
return;
}
ABT_rwlock_rdlock(svr_ctx->lock);
auto unlock = at_exit([svr_ctx]() { ABT_rwlock_unlock(svr_ctx->lock); });
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
......@@ -506,6 +536,9 @@ static void sdskv_length_ult(hg_handle_t handle)
return;
}
ABT_rwlock_rdlock(svr_ctx->lock);
auto unlock = at_exit([svr_ctx]() { ABT_rwlock_unlock(svr_ctx->lock); });
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
......@@ -562,6 +595,9 @@ static void sdskv_get_ult(hg_handle_t handle)
return;
}
ABT_rwlock_rdlock(svr_ctx->lock);
auto unlock = at_exit([svr_ctx]() { ABT_rwlock_unlock(svr_ctx->lock); });
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
......@@ -635,6 +671,9 @@ static void sdskv_get_multi_ult(hg_handle_t handle)
return;
}
ABT_rwlock_rdlock(svr_ctx->lock);
auto unlock = at_exit([svr_ctx]() { ABT_rwlock_unlock(svr_ctx->lock); });
/* deserialize input */
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
......@@ -759,6 +798,9 @@ static void sdskv_length_multi_ult(hg_handle_t handle)
return;
}
ABT_rwlock_rdlock(svr_ctx->lock);
auto unlock = at_exit([svr_ctx]() { ABT_rwlock_unlock(svr_ctx->lock); });
/* deserialize input */
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
......@@ -860,6 +902,9 @@ static void sdskv_open_ult(hg_handle_t handle)
return;
}
ABT_rwlock_rdlock(svr_ctx->lock);
auto unlock = at_exit([svr_ctx]() { ABT_rwlock_unlock(svr_ctx->lock); });
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
......@@ -908,6 +953,9 @@ static void sdskv_bulk_put_ult(hg_handle_t handle)
return;
}
ABT_rwlock_rdlock(svr_ctx->lock);
auto unlock = at_exit([svr_ctx]() { ABT_rwlock_unlock(svr_ctx->lock); });
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
......@@ -995,6 +1043,9 @@ static void sdskv_bulk_get_ult(hg_handle_t handle)
return;
}
ABT_rwlock_rdlock(svr_ctx->lock);
auto unlock = at_exit([svr_ctx]() { ABT_rwlock_unlock(svr_ctx->lock); });
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
......@@ -1086,6 +1137,9 @@ static void sdskv_erase_ult(hg_handle_t handle)
return;
}
ABT_rwlock_rdlock(svr_ctx->lock);
auto unlock = at_exit([svr_ctx]() { ABT_rwlock_unlock(svr_ctx->lock); });
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
......@@ -1142,6 +1196,9 @@ static void sdskv_erase_multi_ult(hg_handle_t handle)
return;
}
ABT_rwlock_rdlock(svr_ctx->lock);
auto unlock = at_exit([svr_ctx]() { ABT_rwlock_unlock(svr_ctx->lock); });
/* deserialize input */
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
......@@ -1207,6 +1264,9 @@ static void sdskv_exists_ult(hg_handle_t handle)
return;
}
ABT_rwlock_rdlock(svr_ctx->lock);
auto unlock = at_exit([svr_ctx]() { ABT_rwlock_unlock(svr_ctx->lock); });
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
......@@ -1262,6 +1322,9 @@ static void sdskv_list_keys_ult(hg_handle_t handle)
return;
}
ABT_rwlock_rdlock(svr_ctx->lock);
auto unlock = at_exit([svr_ctx]() { ABT_rwlock_unlock(svr_ctx->lock); });
/* get the input */
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
......@@ -1416,6 +1479,9 @@ static void sdskv_list_keyvals_ult(hg_handle_t handle)
return;
}
ABT_rwlock_rdlock(svr_ctx->lock);
auto unlock = at_exit([svr_ctx]() { ABT_rwlock_unlock(svr_ctx->lock); });
/* get the input */
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
......@@ -1637,6 +1703,10 @@ static void sdskv_migrate_keys_ult(hg_handle_t handle)
out.ret = SDSKV_ERR_UNKNOWN_PR;
return;
}
ABT_rwlock_rdlock(provider->lock);
auto unlock = at_exit([provider]() { ABT_rwlock_unlock(provider->lock); });
/* get the input */
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
......@@ -1751,6 +1821,10 @@ static void sdskv_migrate_key_range_ult(hg_handle_t handle)
margo_destroy(handle);
return;
}
ABT_rwlock_rdlock(provider->lock);
auto unlock = at_exit([provider]() { ABT_rwlock_unlock(provider->lock); });
/* get the input */
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
......@@ -1789,6 +1863,10 @@ static void sdskv_migrate_keys_prefixed_ult(hg_handle_t handle)
out.ret = SDSKV_ERR_UNKNOWN_PR;
return;
}
ABT_rwlock_rdlock(provider->lock);
auto unlock = at_exit([provider]() { ABT_rwlock_unlock(provider->lock); });
/* get the input */
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
......@@ -1893,6 +1971,10 @@ static void sdskv_migrate_all_keys_ult(hg_handle_t handle)
out.ret = SDSKV_ERR_UNKNOWN_PR;
return;
}
ABT_rwlock_rdlock(provider->lock);
auto unlock = at_exit([provider]() { ABT_rwlock_unlock(provider->lock); });
/* get the input */
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
......@@ -2002,6 +2084,8 @@ static void sdskv_migrate_database_ult(hg_handle_t handle)
out.ret = SDSKV_ERR_UNKNOWN_PR;
break;
}
ABT_rwlock_wrlock(svr_ctx->lock);
auto unlock = at_exit([svr_ctx]() { ABT_rwlock_unlock(svr_ctx->lock); });
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS)
......@@ -2072,6 +2156,8 @@ static void sdskv_server_finalize_cb(void *data)
sdskv_provider_remove_all_databases(svr_ctx);
ABT_rwlock_free(&(svr_ctx->lock));
delete svr_ctx;
return;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment