Commit 3b5ce132 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

Merge branch 'dev-migrate-db' into 'master'

Implemented database migration

See merge request !7
parents 8f4dfc0a 092c8bfd
......@@ -153,6 +153,12 @@ if test "x${bwtree_backend}" == xyes ; then
CXXFLAGS="-pthread -g -Wall -mcx16 -Wno-invalid-offsetof ${CXXFLAGS}"
fi
PKG_CHECK_MODULES([REMI],[remi],[],
[AC_MSG_ERROR([Could not find working remi installation!])])
LIBS="$REMI_LIBS $LIBS"
CPPFLAGS="$REMI_CFLAGS $CPPFLAGS"
CFLAGS="$REMI_CFLAGS $CFLAGS"
AM_CONDITIONAL([BUILD_BDB], [test "x${berkelydb_backend}" == xyes])
AM_CONDITIONAL([BUILD_LEVELDB], [test "x${leveldb_backend}" == xyes])
AM_CONDITIONAL([BUILD_BWTREE], [test "x${bwtree_backend}" == xyes])
......
......@@ -517,6 +517,29 @@ int sdskv_migrate_all_keys(
sdskv_database_id_t target_db_id,
int flag);
/**
* @brief Migrate an entire database to a target destination provider.
* Note that the database will not have the same id at the destination
* so the user should call sdskv_open to re-open the database at its
* destination.
*
* @param[in] source Source provider.
* @param[in] source_db_id Source provider id.
* @param[in] dest_addr Address of the destination provider.
* @param[in] dest_provider_id Provider id of the destination provider.
* @param[in] dest_root Root path at the destination.
* @param[in] flag SDSKV_KEEP_ORIGINAL, or SDSKV_REMOVE_ORIGINAL
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
int sdskv_migrate_database(
sdskv_provider_handle_t source,
sdskv_database_id_t source_db_id,
const char* dest_addr,
uint16_t dest_provider_id,
const char* dest_root,
int flag);
/**
* Shuts down a remote SDSKV service (given an address).
* This will shutdown all the providers on the target address.
......
......@@ -17,8 +17,8 @@ typedef enum sdskv_db_type_t
typedef uint64_t sdskv_database_id_t;
#define SDSKV_DATABASE_ID_INVALID 0
#define SDSKV_KEEP_ORIGINAL 1 /* for migration operations, keep original */
#define SDSKV_REMOVE_ORIGINAL 2 /* for migration operations, remove the origin after migrating */
#define SDSKV_KEEP_ORIGINAL 0 /* for migration operations, keep original */
#define SDSKV_REMOVE_ORIGINAL 1 /* for migration operations, remove the origin after migrating */
#define SDSKV_SUCCESS 0 /* Success */
#define SDSKV_ERR_ALLOCATION -1 /* Error allocating something */
......@@ -35,6 +35,8 @@ typedef uint64_t sdskv_database_id_t;
#define SDSKV_ERR_MIGRATION -12 /* Error during data migration */
#define SDSKV_OP_NOT_IMPL -13 /* Operation not implemented for this backend */
#define SDSKV_ERR_COMP_FUNC -14 /* Comparison function does not exist */
#define SDSKV_ERR_REMI -15 /* REMI-related error */
#define SDSKV_ERR_ARGOBOTS -16 /* Argobots related error */
#if defined(__cplusplus)
}
......
......@@ -7,6 +7,6 @@ Name: sdskv-server
Description: services-based keyval server
Version: 0.1
URL: https://xgitlab.cels.anl.gov/sds/sds-keyval
Requires: margo @SERVER_DEPS_PKG@
Requires: margo remi @SERVER_DEPS_PKG@
Libs: -L${libdir} -lsdskv-server @SERVER_LIBS_EXT@
Cflags: -I${includedir}
......@@ -33,6 +33,9 @@ BerkeleyDBDataStore::~BerkeleyDBDataStore() {
bool BerkeleyDBDataStore::openDatabase(const std::string& db_name, const std::string& db_path) {
int status = 0;
_name = db_name;
_path = db_path;
if (!db_path.empty()) {
mkdirs(db_path.c_str());
}
......@@ -128,7 +131,8 @@ bool BerkeleyDBDataStore::openDatabase(const std::string& db_name, const std::st
return (status == 0);
};
void BerkeleyDBDataStore::set_comparison_function(comparator_fn less) {
void BerkeleyDBDataStore::set_comparison_function(const std::string& name, comparator_fn less) {
_comp_fun_name = name;
_wrapper->_less = less;
}
......@@ -176,7 +180,11 @@ bool BerkeleyDBDataStore::erase(const ds_bulk_t &key) {
Dbt db_key((void*)key.data(), key.size());
int status = _dbm->del(NULL, &db_key, 0);
return status == 0;
}
}
void BerkeleyDBDataStore::sync() {
_dbm->sync(0);
}
// In the case where Duplicates::ALLOW, this will return the first
// value found using key.
......@@ -369,3 +377,21 @@ int BerkeleyDBDataStore::compkeys(Db *db, const Dbt *dbt1, const Dbt *dbt2, size
return 0;
}
}
remi_fileset_t BerkeleyDBDataStore::create_and_populate_fileset() const {
remi_fileset_t fileset = REMI_FILESET_NULL;
std::string local_root = _path;
int ret;
if(_path[_path.size()-1] != '/')
local_root += "/";
remi_fileset_create("sdskv", local_root.c_str(), &fileset);
remi_fileset_register_file(fileset, _name.c_str());
remi_fileset_register_file(fileset, "log.0000000001");
remi_fileset_register_metadata(fileset, "database_type", "berkeleydb");
remi_fileset_register_metadata(fileset, "comparison_function", _comp_fun_name.c_str());
remi_fileset_register_metadata(fileset, "database_name", _name.c_str());
if(_no_overwrite) {
remi_fileset_register_metadata(fileset, "no_overwrite", "");
}
return fileset;
}
......@@ -35,10 +35,12 @@ class BerkeleyDBDataStore : public AbstractDataStore {
virtual bool exists(const ds_bulk_t &key);
virtual bool erase(const ds_bulk_t &key);
virtual void set_in_memory(bool enable); // enable/disable in-memory mode
virtual void set_comparison_function(comparator_fn less);
virtual void set_comparison_function(const std::string& name, comparator_fn less);
virtual void set_no_overwrite() {
_no_overwrite = true;
}
virtual void sync();
remi_fileset_t create_and_populate_fileset() const;
protected:
virtual std::vector<ds_bulk_t> vlist_keys(
const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix) const;
......@@ -51,7 +53,6 @@ class BerkeleyDBDataStore : public AbstractDataStore {
DbEnv *_dbenv = nullptr;
Db *_dbm = nullptr;
DbWrapper* _wrapper = nullptr;
bool _no_overwrite = false;
};
#endif // bdb_datastore_h
......@@ -5,6 +5,7 @@
#include "kv-config.h"
#include "bulk.h"
#include "remi/remi-common.h"
#include <vector>
......@@ -25,8 +26,22 @@ class AbstractDataStore {
virtual bool exists(const ds_bulk_t &key) = 0;
virtual bool erase(const ds_bulk_t &key) = 0;
virtual void set_in_memory(bool enable)=0; // enable/disable in-memory mode (where supported)
virtual void set_comparison_function(comparator_fn less)=0;
virtual void set_comparison_function(const std::string& name, comparator_fn less)=0;
virtual void set_no_overwrite()=0;
virtual void sync() = 0;
virtual remi_fileset_t create_and_populate_fileset() const = 0;
const std::string& get_path() const {
return _path;
}
const std::string& get_name() const {
return _name;
}
const std::string& get_comparison_function_name() const {
return _comp_fun_name;
}
std::vector<ds_bulk_t> list_keys(
const ds_bulk_t &start_key, size_t count, const ds_bulk_t& prefix=ds_bulk_t()) const {
......@@ -49,7 +64,11 @@ class AbstractDataStore {
}
protected:
std::string _path;
std::string _name;
std::string _comp_fun_name;
Duplicates _duplicates;
bool _no_overwrite = false;
bool _eraseOnGet;
bool _debug;
bool _in_memory;
......
......@@ -35,7 +35,14 @@ LevelDBDataStore::~LevelDBDataStore() {
//leveldb::Env::Shutdown(); // Riak version only
};
void LevelDBDataStore::sync() {
}
bool LevelDBDataStore::openDatabase(const std::string& db_name, const std::string& db_path) {
_name = db_name;
_path = db_path;
leveldb::Options options;
leveldb::Status status;
......@@ -57,7 +64,8 @@ bool LevelDBDataStore::openDatabase(const std::string& db_name, const std::strin
return true;
};
void LevelDBDataStore::set_comparison_function(comparator_fn less) {
void LevelDBDataStore::set_comparison_function(const std::string& name, comparator_fn less) {
_comp_fun_name = name;
_less = less;
}
......@@ -233,3 +241,20 @@ std::vector<std::pair<ds_bulk_t,ds_bulk_t>> LevelDBDataStore::vlist_keyval_range
throw SDSKV_OP_NOT_IMPL;
return result;
}
remi_fileset_t LevelDBDataStore::create_and_populate_fileset() const {
remi_fileset_t fileset = REMI_FILESET_NULL;
std::string local_root = _path;
int ret;
if(_path[_path.size()-1] != '/')
local_root += "/";
remi_fileset_create("sdskv", local_root.c_str(), &fileset);
remi_fileset_register_directory(fileset, (_name+"/").c_str());
remi_fileset_register_metadata(fileset, "database_type", "leveldb");
remi_fileset_register_metadata(fileset, "comparison_function", _comp_fun_name.c_str());
remi_fileset_register_metadata(fileset, "database_name", _name.c_str());
if(_no_overwrite) {
remi_fileset_register_metadata(fileset, "no_overwrite", "");
}
return fileset;
}
......@@ -48,10 +48,12 @@ class LevelDBDataStore : public AbstractDataStore {
virtual bool exists(const ds_bulk_t &key);
virtual bool erase(const ds_bulk_t &key);
virtual void set_in_memory(bool enable); // not supported, a no-op
virtual void set_comparison_function(comparator_fn less);
virtual void set_comparison_function(const std::string& name, comparator_fn less);
virtual void set_no_overwrite() {
_no_overwrite = true;
}
virtual void sync();
remi_fileset_t create_and_populate_fileset() const;
protected:
virtual std::vector<ds_bulk_t> vlist_keys(
const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix) const;
......@@ -67,7 +69,6 @@ class LevelDBDataStore : public AbstractDataStore {
ds_bulk_t fromString(const std::string &keystr);
AbstractDataStore::comparator_fn _less;
LevelDBDataStoreComparator _keycmp;
bool _no_overwrite = false;
};
#endif // ldb_datastore_h
......@@ -29,19 +29,23 @@ class MapDataStore : public AbstractDataStore {
public:
MapDataStore()
: AbstractDataStore(), _less(nullptr), _map(keycmp(this)), _no_overwrite(false) {}
: AbstractDataStore(), _less(nullptr), _map(keycmp(this)) {}
MapDataStore(Duplicates duplicates, bool eraseOnGet, bool debug)
: AbstractDataStore(duplicates, eraseOnGet, debug), _less(nullptr), _map(keycmp(this)),
_no_overwrite(false) {}
: AbstractDataStore(duplicates, eraseOnGet, debug), _less(nullptr), _map(keycmp(this))
{}
~MapDataStore() = default;
virtual bool openDatabase(const std::string& db_name, const std::string& path) {
_name = db_name;
_path = path;
_map.clear();
return true;
}
virtual void sync() {}
virtual bool put(const ds_bulk_t &key, const ds_bulk_t &data) {
auto x = _map.count(key);
if(_no_overwrite && (x != 0)) {
......@@ -81,7 +85,8 @@ class MapDataStore : public AbstractDataStore {
_in_memory = enable;
}
virtual void set_comparison_function(comparator_fn less) {
virtual void set_comparison_function(const std::string& name, comparator_fn less) {
_comp_fun_name = name;
_less = less;
}
......@@ -89,6 +94,10 @@ class MapDataStore : public AbstractDataStore {
_no_overwrite = true;
}
remi_fileset_t create_and_populate_fileset() const {
return REMI_FILESET_NULL;
}
protected:
virtual std::vector<ds_bulk_t> vlist_keys(
......@@ -185,7 +194,6 @@ class MapDataStore : public AbstractDataStore {
private:
AbstractDataStore::comparator_fn _less;
std::map<ds_bulk_t, ds_bulk_t, keycmp> _map;
bool _no_overwrite;
};
#endif
......@@ -25,6 +25,7 @@ struct sdskv_client {
hg_id_t sdskv_migrate_key_range_id;
hg_id_t sdskv_migrate_keys_prefixed_id;
hg_id_t sdskv_migrate_all_keys_id;
hg_id_t sdskv_migrate_database_id;
uint64_t num_provider_handles;
};
......@@ -65,6 +66,7 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
margo_registered_name(mid, "sdskv_migrate_key_range_rpc", &client->sdskv_migrate_key_range_id, &flag);
margo_registered_name(mid, "sdskv_migrate_keys_prefixed_rpc", &client->sdskv_migrate_keys_prefixed_id, &flag);
margo_registered_name(mid, "sdskv_migrate_all_keys_rpc", &client->sdskv_migrate_all_keys_id, &flag);
margo_registered_name(mid, "sdskv_migrate_database_rpc", &client->sdskv_migrate_database_id, &flag);
} else {
......@@ -104,6 +106,8 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
MARGO_REGISTER(mid, "sdskv_migrate_keys_prefixed_rpc", migrate_keys_prefixed_in_t, migrate_keys_out_t, NULL);
client->sdskv_migrate_all_keys_id =
MARGO_REGISTER(mid, "sdskv_migrate_all_keys_rpc", migrate_all_keys_in_t, migrate_keys_out_t, NULL);
client->sdskv_migrate_database_id =
MARGO_REGISTER(mid, "sdskv_migrate_database_rpc", migrate_database_in_t, migrate_database_out_t, NULL);
}
return SDSKV_SUCCESS;
......@@ -1450,6 +1454,51 @@ finish:
return ret;
}
int sdskv_migrate_database(
sdskv_provider_handle_t source,
sdskv_database_id_t source_db_id,
const char* dest_addr,
uint16_t dest_provider_id,
const char* dest_root,
int flag)
{
hg_return_t hret;
hg_handle_t handle;
migrate_database_in_t in;
migrate_database_out_t out;
int ret;
in.source_db_id = source_db_id;
in.remove_src = flag;
in.dest_remi_addr = dest_addr;
in.dest_remi_provider_id = dest_provider_id;
in.dest_root = dest_root;
hret = margo_create(source->client->mid, source->addr,
source->client->sdskv_migrate_database_id, &handle);
if(hret != HG_SUCCESS)
return SDSKV_ERR_MERCURY;
hret = margo_provider_forward(source->provider_id, handle, &in);
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
}
ret = out.ret;
margo_free_output(handle, &out);
margo_destroy(handle);
return ret;
}
int sdskv_shutdown_service(sdskv_client_t client, hg_addr_t addr)
{
......
......@@ -191,5 +191,15 @@ MERCURY_GEN_PROC(migrate_all_keys_in_t,
((uint64_t)(target_db_id))\
((int32_t)(flag)))
// ------------- MIGRATE DATABASE ---------- //
MERCURY_GEN_PROC(migrate_database_in_t,
((uint64_t)(source_db_id))\
((int32_t)(remove_src))\
((hg_const_string_t)(dest_remi_addr))\
((uint16_t)(dest_remi_provider_id))\
((hg_const_string_t)(dest_root)))
MERCURY_GEN_PROC(migrate_database_out_t,
((int32_t)(ret)))
#endif
......@@ -7,6 +7,8 @@
#include <map>
#include <iostream>
#include <unordered_map>
#include <remi/remi-client.h>
#include <remi/remi-server.h>
#define SDSKV
#include "datastore/datastore_factory.h"
#include "sdskv-rpc-types.h"
......@@ -18,6 +20,12 @@ struct sdskv_server_context_t
std::map<std::string, sdskv_database_id_t> name2id;
std::map<sdskv_database_id_t, std::string> id2name;
std::map<std::string, sdskv_compare_fn> compfunctions;
remi_client_t remi_client;
remi_provider_t remi_provider;
ABT_rwlock lock; // write-locked during migration, read-locked by all other
// operations. There should be something better to avoid locking everything
// but we are going with that for simplicity for now.
hg_id_t sdskv_put_id;
hg_id_t sdskv_put_multi_id;
......@@ -71,9 +79,14 @@ DECLARE_MARGO_RPC_HANDLER(sdskv_migrate_keys_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_migrate_key_range_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_migrate_keys_prefixed_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_migrate_all_keys_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_migrate_database_ult)
static void sdskv_server_finalize_cb(void *data);
static int sdskv_pre_migration_callback(remi_fileset_t fileset, void* uargs);
static int sdskv_post_migration_callback(remi_fileset_t fileset, void* uargs);
extern "C" int sdskv_provider_register(
margo_instance_id mid,
uint16_t provider_id,
......@@ -81,6 +94,7 @@ extern "C" int sdskv_provider_register(
sdskv_provider_t* provider)
{
sdskv_server_context_t *tmp_svr_ctx;
int ret;
/* check if a provider with the same multiplex id already exists */
{
......@@ -93,11 +107,31 @@ extern "C" int sdskv_provider_register(
}
}
/* check if a REMI provider exists with the same provider id */
{
int flag;
remi_provider_registered(mid, provider_id, &flag, NULL, NULL);
if(flag) {
fprintf(stderr, "sdskv_provider_register(): a REMI provider with the same (%d) already exists\n", provider_id);
return SDSKV_ERR_REMI;
}
}
/* allocate the resulting structure */
tmp_svr_ctx = new sdskv_server_context_t;
if(!tmp_svr_ctx)
return SDSKV_ERR_ALLOCATION;
tmp_svr_ctx->remi_client = REMI_CLIENT_NULL;
tmp_svr_ctx->remi_provider = REMI_PROVIDER_NULL;
/* Create rwlock */
ret = ABT_rwlock_create(&(tmp_svr_ctx->lock));
if(ret != ABT_SUCCESS) {
free(tmp_svr_ctx);
return SDSKV_ERR_ARGOBOTS;
}
/* register RPCs */
hg_id_t rpc_id;
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_put_rpc",
......@@ -191,6 +225,33 @@ extern "C" int sdskv_provider_register(
sdskv_migrate_all_keys_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_migrate_all_keys_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_migrate_database_rpc",
migrate_database_in_t, migrate_database_out_t,
sdskv_migrate_database_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_migrate_database_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
/* register a REMI client */
ret = remi_client_init(mid, &(tmp_svr_ctx->remi_client));
if(ret != REMI_SUCCESS) {
sdskv_server_finalize_cb(tmp_svr_ctx);
return SDSKV_ERR_REMI;
}
/* register a REMI provider */
ret = remi_provider_register(mid, provider_id, abt_pool, &(tmp_svr_ctx->remi_provider));
if(ret != REMI_SUCCESS) {
sdskv_server_finalize_cb(tmp_svr_ctx);
return SDSKV_ERR_REMI;
}
ret = remi_provider_register_migration_class(tmp_svr_ctx->remi_provider,
"sdskv", sdskv_pre_migration_callback,
sdskv_post_migration_callback, NULL, tmp_svr_ctx);
if(ret != REMI_SUCCESS) {
sdskv_server_finalize_cb(tmp_svr_ctx);
return SDSKV_ERR_REMI;
}
/* install the bake server finalize callback */
margo_push_finalize_callback(mid, &sdskv_server_finalize_cb, tmp_svr_ctx);
......@@ -230,7 +291,7 @@ extern "C" int sdskv_provider_attach_database(
std::string(config->db_name), std::string(config->db_path));
if(db == nullptr) return SDSKV_ERR_DB_CREATE;
if(comp_fn) {
db->set_comparison_function(comp_fn);
db->set_comparison_function(config->db_comp_fn_name, comp_fn);
}
sdskv_database_id_t id = (sdskv_database_id_t)(db);
if(config->db_no_overwrite) {
......@@ -250,6 +311,8 @@ extern "C" int sdskv_provider_remove_database(
sdskv_provider_t provider,
sdskv_database_id_t db_id)
{
ABT_rwlock_wrlock(provider->lock);
auto r = at_exit([provider]() { ABT_rwlock_unlock(provider->lock); });
if(provider->databases.count(db_id)) {
auto dbname = provider->id2name[db_id];
provider->id2name.erase(db_id);
......@@ -266,6 +329,8 @@ extern "C" int sdskv_provider_remove_database(
extern "C" int sdskv_provider_remove_all_databases(
sdskv_provider_t provider)
{
ABT_rwlock_wrlock(provider->lock);
auto r = at_exit([provider]() { ABT_rwlock_unlock(provider->lock); });
for(auto db : provider->databases) {
delete db.second;
}
......@@ -289,6 +354,8 @@ extern "C" int sdskv_provider_list_databases(
sdskv_database_id_t* targets)
{
unsigned i = 0;
ABT_rwlock_rdlock(provider->lock);
auto r = at_exit([provider]() { ABT_rwlock_unlock(provider->lock); });
for(auto p : provider->name2id) {
targets[i] = p.second;
i++;
......@@ -314,6 +381,9 @@ static void sdskv_put_ult(hg_handle_t handle)
margo_destroy(handle);
return;
}
ABT_rwlock_rdlock(svr_ctx->lock);
auto r = at_exit([svr_ctx]() { ABT_rwlock_unlock(svr_ctx->lock); });
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
......@@ -371,6 +441,9 @@ static void sdskv_put_multi_ult(hg_handle_t handle)
return;
}
ABT_rwlock_rdlock(svr_ctx->lock);
auto unlock = at_exit([svr_ctx]() { ABT_rwlock_unlock(svr_ctx->lock); });
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
......@@ -463,6 +536,9 @@ static void sdskv_length_ult(hg_handle_t handle)
return;
}
ABT_rwlock_rdlock(svr_ctx->lock);
auto unlock = at_exit([svr_ctx]() { ABT_rwlock_unlock(svr_ctx->lock); });
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
......@@ -519,6 +595,9 @@ static void sdskv_get_ult(hg_handle_t handle)
return;
}
ABT_rwlock_rdlock(svr_ctx->lock);
auto unlock = at_exit([svr_ctx]() { ABT_rwlock_unlock(svr_ctx->lock); });
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
......@@ -592,6 +671,9 @@ static void sdskv_get_multi_ult(hg_handle_t handle)
return;
}
ABT_rwlock_rdlock(svr_ctx->lock);
auto unlock = at_exit([svr_ctx]() { ABT_rwlock_unlock(svr_ctx->lock); });
/* deserialize input */
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
......@@ -716,6 +798,9 @@ static void sdskv_length_multi_ult(hg_handle_t handle)
return;
}
ABT_rwlock_rdlock(svr_ctx->lock);
auto unlock = at_exit([svr_ctx]() { ABT_rwlock_unlock(svr_ctx->lock); });
/* deserialize input */
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
......@@ -817,6 +902,9 @@ static void sdskv_open_ult(hg_handle_t handle)
return;
}
ABT_rwlock_rdlock(svr_ctx->lock);
auto unlock = at_exit([svr_ctx]() { ABT_rwlock_unlock(svr_ctx->lock); });
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
......@@ -865,6 +953,9 @@ static void sdskv_bulk_put_ult(hg_handle_t handle)
return;
}
ABT_rwlock_rdlock(svr_ctx->lock);