Commit bd3254cf authored by Matthieu Dorier's avatar Matthieu Dorier

added a callback called before migration and an error-reporting system

parent 3bfa6afb
......@@ -86,9 +86,10 @@ int main(int argc, char** argv)
remi_fileset_register_metadata(fileset, "AERED", "qerqwer");
// migrate the fileset
ret = remi_fileset_migrate(remi_ph, fileset, remote_root, REMI_REMOVE_SOURCE);
int status = 0;
ret = remi_fileset_migrate(remi_ph, fileset, remote_root, REMI_REMOVE_SOURCE, &status);
if(ret != REMI_SUCCESS) {
fprintf(stderr, "ERROR: remi_fileset_migrate() returned %d\n", ret);
fprintf(stderr, "ERROR: remi_fileset_migrate() returned %d (status = %d)\n", ret, status);
ret = -1;
goto error;
}
......
......@@ -86,9 +86,13 @@ int main(int argc, char** argv)
remi_fileset_register_metadata(fileset, "AERED", "qerqwer");
// migrate the fileset
ret = remi_fileset_migrate(remi_ph, fileset, remote_root, REMI_KEEP_SOURCE);
int status = 0;
ret = remi_fileset_migrate(remi_ph, fileset, remote_root, REMI_KEEP_SOURCE, &status);
if(ret != REMI_SUCCESS) {
fprintf(stderr, "ERROR: remi_fileset_migrate() returned %d\n", ret);
if(ret == REMI_ERR_USER) {
fprintf(stderr, "----- user error: %d\n", status);
}
ret = -1;
goto error;
}
......
......@@ -12,12 +12,18 @@ void my_metadata_printer(const char* key, const char* value, void* uarg) {
fprintf(stdout," - %s\t==>\t%s\n", key, value);
}
void my_migration_callback(remi_fileset_t fileset, void* uargs) {
int my_pre_migration_callback(remi_fileset_t fileset, void* uargs) {
fprintf(stdout, "Migration starting.\n");
return 0;
}
int my_post_migration_callback(remi_fileset_t fileset, void* uargs) {
fprintf(stdout, "Migration terminated.\n");
fprintf(stdout, "The following files were transfered:\n");
remi_fileset_foreach_file(fileset, my_fileset_printer, NULL);
fprintf(stdout, "Associated metadata:\n");
remi_fileset_foreach_metadata(fileset, my_metadata_printer, NULL);
return 0;
}
int main(int argc, char** argv)
......@@ -71,7 +77,9 @@ int main(int argc, char** argv)
// create migration class
ret = remi_provider_register_migration_class(remi_prov,
"my_migration_class", my_migration_callback, NULL, NULL);
"my_migration_class",
my_pre_migration_callback,
my_post_migration_callback, NULL, NULL);
if(ret != REMI_SUCCESS) {
fprintf(stderr, "ERROR: remi_provider_register_migration_class() returned %d\n", ret);
ret = -1;
......
......@@ -101,6 +101,7 @@ int remi_shutdown_service(remi_client_t client, hg_addr_t addr);
* @param fileset Fileset to migrate.
* @param remote_root Root of the fileset when migrated.
* @param flag REMI_REMOVE_SOURCE or REMI_KEEP_SOURCE.
* @param status Value returned by the user-defined migration callbacks.
*
* @return REMI_SUCCESS or error code defined in remi-common.h.
*/
......@@ -108,7 +109,8 @@ int remi_fileset_migrate(
remi_provider_handle_t handle,
remi_fileset_t fileset,
const char* remote_root,
int flag);
int flag,
int* status);
#if defined(__cplusplus)
}
......
......@@ -28,6 +28,7 @@ extern "C" {
#define REMI_ERR_CLASS_EXISTS -10 /* Migration class already registered */
#define REMI_ERR_FILE_EXISTS -11 /* File already exists */
#define REMI_ERR_IO -12 /* Error in I/O (stat, open, etc.) call */
#define REMI_ERR_USER -13 /* User-defined error reported in "status" argument */
/**
* @brief Fileset type.
......
......@@ -23,11 +23,18 @@ typedef struct remi_provider* remi_provider_t;
#define REMI_PROVIDER_NULL ((remi_provider_t)0)
/**
* @brief Callback called when a migration completes. This callback
* takes the fileset that was newly created on the provider, as well
* as a pointer to user-provided arguments.
* @brief Callback called when a migration starts or completes.
* This callback takes the fileset that will or has been migrated, as well
* as a pointer to user-provided arguments corresponding to the void*
* argument given when registering the migration class.
* The function should return 0 when successful, a non-zero value
* otherwise. If a callback called before migration returns a non-zero
* value, this will abort the migration. If a callback called after the
* migration returns a non-zero value, the source of the migration will
* not erase its original files. In all cases, the return value of this
* function is propagated back to the source.
*/
typedef void (*remi_migration_callback_t)(remi_fileset_t, void*);
typedef int (*remi_migration_callback_t)(remi_fileset_t, void*);
#define REMI_MIGRATION_CALLBACK_NULL ((remi_migration_callback_t)0)
......@@ -83,7 +90,8 @@ int remi_provider_registered(
*
* @param[in] provider Provider in which to register a migration class.
* @param[in] class_name Migration class name.
* @param[in] callback Callback to call after migration of a fileset of this class.
* @param[in] before_migration_cb Callback to call before migration of a fileset of this class.
* @param[in] after_migration_cb Callback to call after migration of a fileset of this class.
* @param[in] free_fn Function to call on uargs when the provider is destroyed.
* @param[in] uargs User-argument to pass to the callback.
*
......@@ -92,7 +100,8 @@ int remi_provider_registered(
int remi_provider_register_migration_class(
remi_provider_t provider,
const char* class_name,
remi_migration_callback_t callback,
remi_migration_callback_t before_migration_cb,
remi_migration_callback_t after_migration_cb,
remi_uarg_free_t free_fn,
void* uargs);
......
......@@ -11,6 +11,7 @@
#include <fcntl.h>
#include <sys/mman.h>
#include <thallium.hpp>
#include <thallium/serialization/stl/pair.hpp>
#include "fs-util.hpp"
#include "remi/remi-client.h"
#include "remi-fileset.hpp"
......@@ -109,7 +110,8 @@ extern "C" int remi_fileset_migrate(
remi_provider_handle_t ph,
remi_fileset_t fileset,
const char* remote_root,
int flag)
int flag,
int* status)
{
if(ph == REMI_PROVIDER_HANDLE_NULL
......@@ -189,7 +191,9 @@ extern "C" int remi_fileset_migrate(
fileset->m_root = theRemoteRoot;
// send the RPC
int32_t result = ph->m_client->m_migrate_rpc.on(*ph)(*fileset, theSizes, localBulk);
std::pair<int32_t, int32_t> result = ph->m_client->m_migrate_rpc.on(*ph)(*fileset, theSizes, localBulk);
int ret = result.first;
*status = result.second;
// put back the fileset's original members
fileset->m_root = std::move(tmp_root);
......@@ -198,8 +202,12 @@ extern "C" int remi_fileset_migrate(
cleanup();
if(result != REMI_SUCCESS) {
return result;
if(ret != REMI_SUCCESS) {
return ret;
}
if(*status != 0) {
return REMI_ERR_USER;
}
if(flag == REMI_REMOVE_SOURCE) {
......
......@@ -15,6 +15,7 @@
#include <iostream>
#include <unordered_map>
#include <thallium.hpp>
#include <thallium/serialization/stl/pair.hpp>
#include "remi/remi-server.h"
#include "remi-fileset.hpp"
#include "fs-util.hpp"
......@@ -22,7 +23,8 @@
namespace tl = thallium;
struct migration_class {
remi_migration_callback_t m_callback;
remi_migration_callback_t m_before_callback;
remi_migration_callback_t m_after_callback;
remi_uarg_free_t m_free;
void* m_uargs;
};
......@@ -40,10 +42,12 @@ struct remi_provider : public tl::provider<remi_provider> {
const std::vector<std::size_t>& filesizes,
tl::bulk& remote_bulk)
{
int32_t result = 0;
// pair of <returnvalue, status>
std::pair<int32_t,int32_t> result = {0, 0};
// check that the class of the fileset exists
if(m_migration_classes.count(fileset.m_class) == 0) {
result = REMI_ERR_UNKNOWN_CLASS;
result.first = REMI_ERR_UNKNOWN_CLASS;
req.respond(result);
return;
}
......@@ -53,13 +57,24 @@ struct remi_provider : public tl::provider<remi_provider> {
for(const auto& filename : fileset.m_files) {
auto theFilename = fileset.m_root + filename;
if(access(theFilename.c_str(), F_OK) != -1) {
result = REMI_ERR_FILE_EXISTS;
result.first = REMI_ERR_FILE_EXISTS;
req.respond(result);
return;
}
}
// alright, none of the files already exist
// call the "before migration" callback
auto& klass = m_migration_classes[fileset.m_class];
if(klass.m_before_callback != nullptr) {
result.second = klass.m_before_callback(&fileset, klass.m_uargs);
}
if(result.second != 0) {
result.first = REMI_ERR_USER;
req.respond(result);
return;
}
std::vector<std::pair<void*,std::size_t>> theData;
// function to cleanup the segments
......@@ -81,7 +96,7 @@ struct remi_provider : public tl::provider<remi_provider> {
int fd = open(theFilename.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0600);
if(fd == -1) {
cleanup();
result = REMI_ERR_IO;
result.first = REMI_ERR_IO;
req.respond(result);
return;
}
......@@ -92,7 +107,7 @@ struct remi_provider : public tl::provider<remi_provider> {
}
if(ftruncate(fd, filesizes[i]) == -1) {
cleanup();
result = REMI_ERR_IO;
result.first = REMI_ERR_IO;
req.respond(result);
return;
}
......@@ -100,7 +115,7 @@ struct remi_provider : public tl::provider<remi_provider> {
if(segment == NULL) {
close(fd);
cleanup();
result = REMI_ERR_IO;
result.first = REMI_ERR_IO;
req.respond(result);
return;
}
......@@ -116,7 +131,7 @@ struct remi_provider : public tl::provider<remi_provider> {
if(transferred != totalSize) {
// XXX we should cleanup the files that were created
result = REMI_ERR_MIGRATION;
result.first = REMI_ERR_MIGRATION;
req.respond(result);
return;
}
......@@ -125,7 +140,7 @@ struct remi_provider : public tl::provider<remi_provider> {
if(msync(seg.first, seg.second, MS_SYNC) == -1) {
// XXX we should cleanup the files that were created
cleanup();
result = REMI_ERR_IO;
result.first = REMI_ERR_IO;
req.respond(result);
return;
}
......@@ -133,12 +148,11 @@ struct remi_provider : public tl::provider<remi_provider> {
cleanup();
// call the migration callback associated with the class of fileset
auto& klass = m_migration_classes[fileset.m_class];
if(klass.m_callback != nullptr) {
klass.m_callback(&fileset, klass.m_uargs);
// call the "after" migration callback associated with the class of fileset
if(klass.m_after_callback != nullptr) {
result.second = klass.m_after_callback(&fileset, klass.m_uargs);
}
result = REMI_SUCCESS;
result.first = result.second == 0 ? REMI_SUCCESS : REMI_ERR_USER;
req.respond(result);
return;
}
......@@ -206,7 +220,8 @@ extern "C" int remi_provider_registered(
extern "C" int remi_provider_register_migration_class(
remi_provider_t provider,
const char* class_name,
remi_migration_callback_t callback,
remi_migration_callback_t before_callback,
remi_migration_callback_t after_callback,
remi_uarg_free_t free_fn,
void* uargs)
{
......@@ -215,7 +230,8 @@ extern "C" int remi_provider_register_migration_class(
if(provider->m_migration_classes.count(class_name) != 0)
return REMI_ERR_CLASS_EXISTS;
auto& klass = provider->m_migration_classes[class_name];
klass.m_callback = callback;
klass.m_before_callback = before_callback;
klass.m_after_callback = after_callback;
klass.m_uargs = uargs;
klass.m_free = free_fn;
return REMI_SUCCESS;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment