Commit 4ec0955b authored by Matthieu Dorier's avatar Matthieu Dorier

made REMI optional

parent 48e93ac4
......@@ -153,11 +153,27 @@ if test "x${bwtree_backend}" == xyes ; then
CXXFLAGS="-pthread -g -Wall -mcx16 -Wno-invalid-offsetof ${CXXFLAGS}"
fi
PKG_CHECK_MODULES([REMI],[remi],[],
[AC_MSG_ERROR([Could not find working remi installation!])])
LIBS="$REMI_LIBS $LIBS"
CPPFLAGS="$REMI_CFLAGS $CPPFLAGS"
CFLAGS="$REMI_CFLAGS $CFLAGS"
AC_ARG_ENABLE(remi,
[AS_HELP_STRING([--enable-remi],[Enable REMI (migration) support @<:@default=no@:>@])],
[case "${enableval}" in
yes) enable_remi="yes" ;;
no) enable_remi="no" ;;
*) AC_MSG_ERROR(bad value ${enableval} for --enable-remi) ;;
esac],
[enable_remi="no"]
)
AM_CONDITIONAL(ENABLE_REMI, test x$enable_remi = xyes)
if test "$enable_remi" = "yes"; then
PKG_CHECK_MODULES(REMI, remi)
AC_DEFINE(USE_REMI, 1, [REMI support enabled.])
USE_REMI=1
LIBS="$REMI_LIBS $LIBS"
CPPFLAGS="$REMI_CFLAGS $CPPFLAGS"
CFLAGS="$REMI_CFLAGS $CFLAGS"
else
USE_REMI=0
fi
AC_SUBST(USE_REMI)
AM_CONDITIONAL([BUILD_BDB], [test "x${berkelydb_backend}" == xyes])
AM_CONDITIONAL([BUILD_LEVELDB], [test "x${leveldb_backend}" == xyes])
......
......@@ -3,12 +3,14 @@
*
* See COPYRIGHT in top-level directory.
*/
#include "kv-config.h"
#include <map>
#include <iostream>
#include <unordered_map>
#ifdef USE_REMI
#include <remi/remi-client.h>
#include <remi/remi-server.h>
#endif
#define SDSKV
#include "datastore/datastore_factory.h"
#include "sdskv-rpc-types.h"
......@@ -20,11 +22,14 @@ struct sdskv_server_context_t
std::map<std::string, sdskv_database_id_t> name2id;
std::map<sdskv_database_id_t, std::string> id2name;
std::map<std::string, sdskv_compare_fn> compfunctions;
#ifdef USE_REMI
remi_client_t remi_client;
remi_provider_t remi_provider;
sdskv_pre_migration_callback_fn pre_migration_callback;
sdskv_post_migration_callback_fn post_migration_callback;
void* migration_uargs;
#endif
ABT_rwlock lock; // write-locked during migration, read-locked by all other
// operations. There should be something better to avoid locking everything
......@@ -90,10 +95,14 @@ DECLARE_MARGO_RPC_HANDLER(sdskv_migrate_database_ult)
static void sdskv_server_finalize_cb(void *data);
#ifdef USE_REMI
static int sdskv_pre_migration_callback(remi_fileset_t fileset, void* uargs);
static int sdskv_post_migration_callback(remi_fileset_t fileset, void* uargs);
#endif
extern "C" int sdskv_provider_register(
margo_instance_id mid,
uint16_t provider_id,
......@@ -120,11 +129,13 @@ extern "C" int sdskv_provider_register(
if(!tmp_svr_ctx)
return SDSKV_ERR_ALLOCATION;
#ifdef USE_REMI
tmp_svr_ctx->remi_client = REMI_CLIENT_NULL;
tmp_svr_ctx->remi_provider = REMI_PROVIDER_NULL;
tmp_svr_ctx->pre_migration_callback = NULL;
tmp_svr_ctx->post_migration_callback = NULL;
tmp_svr_ctx->migration_uargs = NULL;
#endif
/* Create rwlock */
ret = ABT_rwlock_create(&(tmp_svr_ctx->lock));
......@@ -242,6 +253,7 @@ extern "C" int sdskv_provider_register(
tmp_svr_ctx->sdskv_migrate_database_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
#ifdef USE_REMI
/* register a REMI client */
ret = remi_client_init(mid, ABT_IO_INSTANCE_NULL, &(tmp_svr_ctx->remi_client));
if(ret != REMI_SUCCESS) {
......@@ -272,6 +284,7 @@ extern "C" int sdskv_provider_register(
return SDSKV_ERR_REMI;
}
}
#endif
/* install the bake server finalize callback */
margo_push_finalize_callback(mid, &sdskv_server_finalize_cb, tmp_svr_ctx);
......@@ -394,6 +407,7 @@ extern "C" int sdskv_provider_compute_database_size(
sdskv_database_id_t database_id,
size_t* size)
{
#ifdef USE_REMI
int ret;
// find the database
ABT_rwlock_rdlock(provider->lock);
......@@ -419,6 +433,10 @@ extern "C" int sdskv_provider_compute_database_size(
return SDSKV_ERR_REMI;
}
return SDSKV_SUCCESS;
#else
// TODO: implement this without REMI
return SDSKV_OP_NOT_IMPL;
#endif
}
extern "C" int sdskv_provider_set_migration_callbacks(
......@@ -427,10 +445,14 @@ extern "C" int sdskv_provider_set_migration_callbacks(
sdskv_post_migration_callback_fn post_cb,
void* uargs)
{
#ifdef USE_REMI
provider->pre_migration_callback = pre_cb;
provider->post_migration_callback = post_cb;
provider->migration_uargs = uargs;
return SDSKV_SUCCESS;
#else
return SDSKV_OP_NOT_IMPL;
#endif
}
static void sdskv_open_ult(hg_handle_t handle)
......@@ -2266,8 +2288,10 @@ static void sdskv_migrate_database_ult(hg_handle_t handle)
hg_return_t hret;
margo_instance_id mid;
int ret;
#ifdef USE_REMI
remi_provider_handle_t remi_ph = REMI_PROVIDER_HANDLE_NULL;
remi_fileset_t local_fileset = REMI_FILESET_NULL;
#endif
memset(&out, 0, sizeof(out));
......@@ -2289,6 +2313,7 @@ static void sdskv_migrate_database_ult(hg_handle_t handle)
break;
}
#ifdef USE_REMI
ABT_rwlock_rdlock(svr_ctx->lock);
// find the database that needs to be migrated
auto it = svr_ctx->databases.find(in.source_db_id);
......@@ -2340,22 +2365,18 @@ static void sdskv_migrate_database_ult(hg_handle_t handle)
if(in.remove_src) {
ret = sdskv_provider_remove_database(svr_ctx, in.source_db_id);
out.ret = ret;
#if 0
ABT_rwlock_wrlock(svr_ctx->lock);
/* remove the target from the list of managed targets */
auto dbname = svr_ctx->id2name[in.source_db_id];
svr_ctx->id2name.erase(in.source_db_id);
svr_ctx->name2id.erase(dbname);
delete database;
svr_ctx->databases.erase(in.source_db_id);
ABT_rwlock_unlock(svr_ctx->lock);
#endif
}
#else
out.ret = SDSKV_OP_NOT_IMPL;
#endif
} while(false);
#ifdef USE_REMI
remi_fileset_free(local_fileset);
remi_provider_handle_release(remi_ph);
#endif
margo_addr_free(mid, dest_addr);
margo_free_input(handle, &in);
margo_respond(handle, &out);
......@@ -2388,6 +2409,8 @@ static void get_metadata(const char* key, const char* value, void* uargs) {
md->_metadata[key] = value;
}
#ifdef USE_REMI
static int sdskv_pre_migration_callback(remi_fileset_t fileset, void* uargs)
{
sdskv_provider_t provider = (sdskv_provider_t)uargs;
......@@ -2492,15 +2515,19 @@ static int sdskv_post_migration_callback(remi_fileset_t fileset, void* uargs)
return 0;
}
#endif
extern "C" int sdskv_provider_set_abtio_instance(
sdskv_provider_t provider,
abt_io_instance_id abtio)
{
#ifdef USE_REMI
remi_provider_set_abt_io_instance(
provider->remi_provider,
abtio);
remi_client_set_abt_io_instance(
provider->remi_client,
abtio);
#endif
return SDSKV_SUCCESS;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment