Commit 665e7609 authored by Matthieu Dorier's avatar Matthieu Dorier

Merge branch 'dev-target-migration' into 'master'

Dev target migration

See merge request !4
parents ae3f4d88 535dae12
......@@ -80,6 +80,12 @@ LIBS="$UUID_LIBS $LIBS"
CPPFLAGS="$UUID_CFLAGS $CPPFLAGS"
CFLAGS="$UUID_CFLAGS $CFLAGS"
PKG_CHECK_MODULES([REMI],[remi],[],
[AC_MSG_ERROR([Could not find working remi installation!])])
LIBS="$REMI_LIBS $LIBS"
CPPFLAGS="$REMI_CFLAGS $CPPFLAGS"
CFLAGS="$REMI_CFLAGS $CFLAGS"
AC_CONFIG_FILES([Makefile maint/bake-client.pc maint/bake-server.pc])
AC_OUTPUT
......@@ -334,7 +334,7 @@ int bake_proxy_read(
*
* @return BAKE_SUCCESS or corresponding error code.
*/
int bake_migrate(
int bake_migrate_region(
bake_provider_handle_t source,
bake_region_id_t source_rid,
int remove_source,
......@@ -343,6 +343,56 @@ int bake_migrate(
bake_target_id_t dest_target_id,
bake_region_id_t* dest_rid);
/**
* @brief Requests the source provider to migrate a particular
* region (source_rid) to a destination provider. After the call,
* the designated region will have been removed from the source
* and the dest_rid parameter will be set to the new region id
* in the destination provider.
*
* @param source Source provider.
* @param source_rid Region to migrate.
* @param remove_source Whether the source region should be removed.
* @param dest_addr Address of the destination provider.
* @param dest_provider_id Id of the destination provider.
* @param dest_target_id Destination target.
* @param dest_rid Resulting region id in the destination target.
*
* @return BAKE_SUCCESS or corresponding error code.
*/
__attribute__((deprecated("use bake_migrate_region instead")))
inline int bake_migrate(
bake_provider_handle_t source,
bake_region_id_t source_rid,
int remove_source,
const char* dest_addr,
uint16_t dest_provider_id,
bake_target_id_t dest_target_id,
bake_region_id_t* dest_rid)
{
return bake_migrate_region(source, source_rid, remove_source,
dest_addr, dest_provider_id, dest_target_id, dest_rid);
}
/**
* @brief Migrates a full target from a provider to another.
*
* @param source Provider initially managing the target
* @param src_target_id Source target it.
* @param remove_source Whether the source target should be removed.
* @param dest_addr Address of the destination provider.
* @param dest_provider_id Provider id of the destination provider.
*
* @return BAKE_SUCCESS or corresponding error code.
*/
int bake_migrate_target(
bake_provider_handle_t source,
bake_target_id_t src_target_id,
int remove_source,
const char* dest_addr,
uint16_t dest_provider_id,
const char* dest_root);
/**
* Shuts down a remote BAKE service (given an address).
* This will shutdown all the providers on the target address.
......
......@@ -36,6 +36,7 @@ typedef struct {
#define BAKE_ERR_UNKNOWN_PROVIDER (-7) /* Provider id could not be matched with a provider */
#define BAKE_ERR_UNKNOWN_REGION (-8) /* Region id could not be found */
#define BAKE_ERR_OUT_OF_BOUNDS (-9) /* Attempting an out of bound access */
#define BAKE_ERR_REMI (-10) /* Error related to REMI */
#ifdef __cplusplus
}
......
......@@ -34,7 +34,8 @@ struct bake_client
hg_id_t bake_read_id;
hg_id_t bake_noop_id;
hg_id_t bake_remove_id;
hg_id_t bake_migrate_id;
hg_id_t bake_migrate_region_id;
hg_id_t bake_migrate_target_id;
uint64_t num_provider_handles;
};
......@@ -70,7 +71,8 @@ static int bake_client_register(bake_client_t client, margo_instance_id mid)
margo_registered_name(mid, "bake_read_rpc", &client->bake_read_id, &flag);
margo_registered_name(mid, "bake_noop_rpc", &client->bake_noop_id, &flag);
margo_registered_name(mid, "bake_remove_rpc", &client->bake_remove_id, &flag);
margo_registered_name(mid, "bake_migrate_rpc", &client->bake_migrate_id, &flag);
margo_registered_name(mid, "bake_migrate_region_rpc", &client->bake_migrate_region_id, &flag);
margo_registered_name(mid, "bake_migrate_target_rpc", &client->bake_migrate_target_id, &flag);
} else { /* RPCs not already registered */
......@@ -110,9 +112,12 @@ static int bake_client_register(bake_client_t client, margo_instance_id mid)
client->bake_remove_id =
MARGO_REGISTER(mid, "bake_remove_rpc",
bake_remove_in_t, bake_remove_out_t, NULL);
client->bake_migrate_id =
MARGO_REGISTER(mid, "bake_migrate_rpc",
bake_migrate_in_t, bake_migrate_out_t, NULL);
client->bake_migrate_region_id =
MARGO_REGISTER(mid, "bake_migrate_region_rpc",
bake_migrate_region_in_t, bake_migrate_region_out_t, NULL);
client->bake_migrate_target_id =
MARGO_REGISTER(mid, "bake_migrate_target_rpc",
bake_migrate_target_in_t, bake_migrate_target_out_t, NULL);
}
return BAKE_SUCCESS;
......@@ -721,7 +726,7 @@ int bake_get_data(
return(ret);
}
int bake_migrate(
int bake_migrate_region(
bake_provider_handle_t source,
bake_region_id_t source_rid,
int remove_source,
......@@ -732,8 +737,8 @@ int bake_migrate(
{
hg_return_t hret;
hg_handle_t handle;
bake_migrate_in_t in;
bake_migrate_out_t out;
bake_migrate_region_in_t in;
bake_migrate_region_out_t out;
int ret;
in.source_rid = source_rid;
......@@ -743,7 +748,7 @@ int bake_migrate(
in.dest_target_id = dest_target_id;
hret = margo_create(source->client->mid, source->addr,
source->client->bake_migrate_id, &handle);
source->client->bake_migrate_region_id, &handle);
if(hret != HG_SUCCESS)
return BAKE_ERR_MERCURY;
......@@ -771,6 +776,52 @@ int bake_migrate(
return(ret);
}
int bake_migrate_target(
bake_provider_handle_t source,
bake_target_id_t src_target_id,
int remove_source,
const char* dest_addr,
uint16_t dest_provider_id,
const char* dest_root)
{
hg_return_t hret;
hg_handle_t handle;
bake_migrate_target_in_t in;
bake_migrate_target_out_t out;
int ret;
in.target_id = src_target_id;
in.remove_src = remove_source;
in.dest_remi_addr = dest_addr;
in.dest_remi_provider_id = dest_provider_id;
in.dest_root = dest_root;
hret = margo_create(source->client->mid, source->addr,
source->client->bake_migrate_target_id, &handle);
if(hret != HG_SUCCESS)
return BAKE_ERR_MERCURY;
hret = margo_provider_forward(source->provider_id, handle, &in);
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
return BAKE_ERR_MERCURY;
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
return BAKE_ERR_MERCURY;
}
ret = out.ret;
margo_free_output(handle, &out);
margo_destroy(handle);
return(ret);
}
int bake_noop(bake_provider_handle_t provider)
{
hg_return_t hret;
......
......@@ -122,17 +122,27 @@ MERCURY_GEN_PROC(bake_remove_in_t,
MERCURY_GEN_PROC(bake_remove_out_t,
((int32_t)(ret)))
/* BAKE migrate */
MERCURY_GEN_PROC(bake_migrate_in_t,
/* BAKE migrate region */
MERCURY_GEN_PROC(bake_migrate_region_in_t,
((bake_region_id_t)(source_rid))\
((int32_t)(remove_src))\
((hg_const_string_t)(dest_addr))\
((uint16_t)(dest_provider_id))\
((bake_target_id_t)(dest_target_id)))
MERCURY_GEN_PROC(bake_migrate_out_t,
MERCURY_GEN_PROC(bake_migrate_region_out_t,
((int32_t)(ret))\
((bake_region_id_t)(dest_rid)))
/* BAKE migrate target */
MERCURY_GEN_PROC(bake_migrate_target_in_t,
((bake_target_id_t)(target_id))\
((int32_t)(remove_src))\
((hg_const_string_t)(dest_remi_addr))\
((uint16_t)(dest_remi_provider_id))\
((hg_const_string_t)(dest_root)))
MERCURY_GEN_PROC(bake_migrate_target_out_t,
((int32_t)(ret)))
static inline hg_return_t hg_proc_bake_region_id_t(hg_proc_t proc, bake_region_id_t *rid)
{
/* TODO: update later depending on final region_id_t type */
......
......@@ -8,7 +8,9 @@
#include <assert.h>
#include <libpmemobj.h>
#include <bake-server.h>
#include <remi/remi-client.h>
#include <remi/remi-server.h>
#include "bake-server.h"
#include "uthash.h"
#include "bake-rpc.h"
......@@ -25,7 +27,8 @@ DECLARE_MARGO_RPC_HANDLER(bake_eager_read_ult)
DECLARE_MARGO_RPC_HANDLER(bake_probe_ult)
DECLARE_MARGO_RPC_HANDLER(bake_noop_ult)
DECLARE_MARGO_RPC_HANDLER(bake_remove_ult)
DECLARE_MARGO_RPC_HANDLER(bake_migrate_ult)
DECLARE_MARGO_RPC_HANDLER(bake_migrate_region_ult)
DECLARE_MARGO_RPC_HANDLER(bake_migrate_target_ult)
/* definition of BAKE root data structure (just a uuid for now) */
typedef struct
......@@ -49,6 +52,8 @@ typedef struct
PMEMobjpool* pmem_pool;
bake_root_t* pmem_root;
bake_target_id_t target_id;
char* root;
char* filename;
UT_hash_handle hh;
} bake_pmem_entry_t;
......@@ -57,10 +62,14 @@ typedef struct bake_server_context_t
uint64_t num_targets;
bake_pmem_entry_t* targets;
hg_id_t bake_create_write_persist_id;
remi_client_t remi_client;
remi_provider_t remi_provider;
} bake_server_context_t;
static void bake_server_finalize_cb(void *data);
static void bake_target_migration_callback(remi_fileset_t fileset, void* provider);
int bake_makepool(
const char *pool_name,
size_t pool_size,
......@@ -163,8 +172,12 @@ int bake_provider_register(
bake_remove_in_t, bake_remove_out_t, bake_remove_ult,
provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_migrate_rpc",
bake_migrate_in_t, bake_migrate_out_t, bake_migrate_ult,
rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_migrate_region_rpc",
bake_migrate_region_in_t, bake_migrate_region_out_t, bake_migrate_region_ult,
provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_migrate_target_rpc",
bake_migrate_target_in_t, bake_migrate_target_out_t, bake_migrate_target_ult,
provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
......@@ -179,6 +192,14 @@ int bake_provider_register(
bake_create_write_persist_in_t, bake_create_write_persist_out_t, NULL);
}
/* register a REMI client */
remi_client_init(mid, &(tmp_svr_ctx->remi_client));
/* register a REMI provider */
remi_provider_register(mid, provider_id, abt_pool, &(tmp_svr_ctx->remi_provider));
remi_provider_register_migration_class(tmp_svr_ctx->remi_provider,
"bake", bake_target_migration_callback, NULL, tmp_svr_ctx);
/* install the bake server finalize callback */
margo_push_finalize_callback(mid, &bake_server_finalize_cb, tmp_svr_ctx);
......@@ -194,10 +215,19 @@ int bake_provider_add_storage_target(
bake_target_id_t* target_id)
{
bake_pmem_entry_t* new_entry = calloc(1, sizeof(*new_entry));
new_entry->root = NULL;
new_entry->filename = NULL;
char* tmp = strrchr(target_name, '/');
new_entry->filename = strdup(tmp);
ptrdiff_t d = tmp - target_name;
new_entry->root = strndup(target_name, d);
new_entry->pmem_pool = pmemobj_open(target_name, NULL);
if(!(new_entry->pmem_pool)) {
fprintf(stderr, "pmemobj_open: %s\n", pmemobj_errormsg());
free(new_entry->filename);
free(new_entry->root);
free(new_entry);
return BAKE_ERR_PMEM;
}
......@@ -212,6 +242,8 @@ int bake_provider_add_storage_target(
{
fprintf(stderr, "Error: BAKE pool %s is not properly initialized\n", target_name);
pmemobj_close(new_entry->pmem_pool);
free(new_entry->filename);
free(new_entry->root);
free(new_entry);
return BAKE_ERR_UNKNOWN_TARGET;
}
......@@ -224,6 +256,8 @@ int bake_provider_add_storage_target(
if(check_entry != new_entry) {
fprintf(stderr, "Error: BAKE could not insert new pmem pool into the hash\n");
pmemobj_close(new_entry->pmem_pool);
free(new_entry->filename);
free(new_entry->root);
free(new_entry);
return BAKE_ERR_ALLOCATION;
}
......@@ -251,6 +285,8 @@ int bake_provider_remove_storage_target(
if(!entry) return BAKE_ERR_UNKNOWN_TARGET;
pmemobj_close(entry->pmem_pool);
HASH_DEL(provider->targets, entry);
free(entry->filename);
free(entry->root);
free(entry);
return 0;
}
......@@ -262,6 +298,8 @@ int bake_provider_remove_all_storage_targets(
HASH_ITER(hh, provider->targets, p, tmp) {
HASH_DEL(provider->targets, p);
pmemobj_close(p->pmem_pool);
free(p->filename);
free(p->root);
free(p);
}
provider->num_targets = 0;
......@@ -1134,10 +1172,10 @@ static void bake_remove_ult(hg_handle_t handle)
}
DEFINE_MARGO_RPC_HANDLER(bake_remove_ult)
static void bake_migrate_ult(hg_handle_t handle)
static void bake_migrate_region_ult(hg_handle_t handle)
{
bake_migrate_in_t in;
bake_migrate_out_t out;
bake_migrate_region_in_t in;
bake_migrate_region_out_t out;
hg_return_t hret;
pmemobj_region_id_t* prid;
......@@ -1277,7 +1315,94 @@ static void bake_migrate_ult(hg_handle_t handle)
return;
}
DEFINE_MARGO_RPC_HANDLER(bake_migrate_ult)
DEFINE_MARGO_RPC_HANDLER(bake_migrate_region_ult)
static void bake_migrate_target_ult(hg_handle_t handle)
{
bake_migrate_target_in_t in;
in.dest_remi_addr = NULL;
in.dest_root = NULL;
bake_migrate_target_out_t out;
hg_addr_t dest_addr = HG_ADDR_NULL;
hg_return_t hret;
int ret;
remi_provider_handle_t remi_ph = REMI_PROVIDER_HANDLE_NULL;
remi_fileset_t local_fileset = REMI_FILESET_NULL;
memset(&out, 0, sizeof(out));
margo_instance_id mid = margo_hg_handle_get_instance(handle);
assert(mid);
const struct hg_info* info = margo_get_info(handle);
bake_provider_t svr_ctx = margo_registered_data(mid, info->id);
if(!svr_ctx) {
out.ret = BAKE_ERR_UNKNOWN_PROVIDER;
goto finish;
}
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS)
{
out.ret = BAKE_ERR_MERCURY;
goto finish;
}
bake_pmem_entry_t* entry = find_pmem_entry(svr_ctx, in.target_id);
if(!entry) {
out.ret = BAKE_ERR_UNKNOWN_TARGET;
goto finish;
}
/* lookup the address of the destination REMI provider */
hret = margo_addr_lookup(mid, in.dest_remi_addr, &dest_addr);
if(hret != HG_SUCCESS) {
out.ret = BAKE_ERR_MERCURY;
goto finish;
}
/* use the REMI client to create a REMI provider handle */
ret = remi_provider_handle_create(svr_ctx->remi_client,
dest_addr, in.dest_remi_provider_id, &remi_ph);
if(ret != REMI_SUCCESS) {
out.ret = BAKE_ERR_REMI;
goto finish;
}
/* create a fileset */
ret = remi_fileset_create("bake", entry->root, &local_fileset);
if(ret != REMI_SUCCESS) {
out.ret = BAKE_ERR_REMI;
goto finish;
}
/* fill the fileset */
ret = remi_fileset_register_file(local_fileset, entry->filename);
if(ret != REMI_SUCCESS) {
out.ret = BAKE_ERR_REMI;
goto finish;
}
/* issue the migration */
ret = remi_fileset_migrate(remi_ph, local_fileset, in.dest_root, in.remove_src);
if(ret != REMI_SUCCESS) {
out.ret = BAKE_ERR_REMI;
goto finish;
}
/* remove the target from the list of managed targets */
bake_provider_remove_storage_target(svr_ctx, in.target_id);
out.ret = BAKE_SUCCESS;
finish:
remi_fileset_free(local_fileset);
remi_provider_handle_release(remi_ph);
margo_addr_free(mid, dest_addr);
margo_free_input(handle, &in);
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(bake_migrate_target_ult)
static void bake_server_finalize_cb(void *data)
{
......@@ -1286,8 +1411,34 @@ static void bake_server_finalize_cb(void *data)
bake_provider_remove_all_storage_targets(svr_ctx);
remi_client_finalize(svr_ctx->remi_client);
free(svr_ctx);
return;
}
typedef struct migration_cb_args {
char root[1024];
bake_server_context_t* provider;
} migration_cb_args;
static void migration_fileset_cb(const char* filename, void* arg)
{
migration_cb_args* mig_args = (migration_cb_args*)arg;
char fullname[1024];
fullname[0] = '\0';
strcat(fullname, mig_args->root);
strcat(fullname, filename);
bake_target_id_t tid;
bake_provider_add_storage_target(mig_args->provider, fullname, &tid);
}
static void bake_target_migration_callback(remi_fileset_t fileset, void* uarg)
{
migration_cb_args args;
args.provider = (bake_server_context_t *)uarg;
size_t root_size = 1024;
remi_fileset_get_root(fileset, args.root, &root_size);
remi_fileset_foreach_file(fileset, migration_fileset_cb, &args);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment