Commit 3bfa6afb authored by Matthieu Dorier's avatar Matthieu Dorier

finished implementing directory support

parent c8d4cdb7
add_executable(remi-test-client client.c)
target_link_libraries(remi-test-client remi)
add_executable(remi-test-client-dir client-dir.c)
target_link_libraries(remi-test-client-dir remi)
add_executable(remi-test-server server.c)
target_link_libraries(remi-test-server remi)
#include <margo.h>
#include <remi/remi-client.h>
int main(int argc, char** argv)
{
if(argc < 5) {
fprintf(stderr, "Usage: %s <server-address> <local-root> <dest-root> dir1 [ dir2 ...\n", argv[0]);
return -1;
}
int ret = 0;
hg_return_t hret = HG_SUCCESS;
char** dirnames = argv+4;
unsigned num_dirs = argc-4;
char* server_addr_str = argv[1];
char* local_root = argv[2];
char* remote_root = argv[3];
margo_instance_id mid = MARGO_INSTANCE_NULL;
remi_client_t remi_clt = REMI_CLIENT_NULL;
remi_provider_handle_t remi_ph = REMI_PROVIDER_HANDLE_NULL;
hg_addr_t svr_addr = HG_ADDR_NULL;
remi_fileset_t fileset = REMI_FILESET_NULL;
// initialize margo
unsigned i;
char cli_addr_prefix[64] = {0};
for(i=0; (i<63 && server_addr_str[i] != '\0' && server_addr_str[i] != ':'); i++)
cli_addr_prefix[i] = server_addr_str[i];
mid = margo_init(cli_addr_prefix, MARGO_CLIENT_MODE, 0, 0);
if(mid == MARGO_INSTANCE_NULL) {
fprintf(stderr, "ERROR: margo_initialize()\n");
ret = -1;
goto error;
}
// initialize REMI client
ret = remi_client_init(mid, &remi_clt);
if(ret != REMI_SUCCESS) {
fprintf(stderr, "ERROR: remi_client_init() returned %d\n", ret);
ret = -1;
goto error;
}
// lookup server address
hret = margo_addr_lookup(mid, server_addr_str, &svr_addr);
if(hret != HG_SUCCESS) {
fprintf(stderr, "ERROR: margo_addr_lookup() returned %d\n", hret);
ret = -1;
goto error;
}
// create REMI provider handle
ret = remi_provider_handle_create(remi_clt, svr_addr, 1, &remi_ph);
if(ret != REMI_SUCCESS) {
fprintf(stderr, "ERROR: remi_provider_handle_create() returned %d\n", ret);
ret = -1;
goto error;
}
// create a fileset
ret = remi_fileset_create("my_migration_class", local_root, &fileset);
if(ret != REMI_SUCCESS) {
fprintf(stderr, "ERROR: remi_fileset_create() returned %d\n", ret);
ret = -1;
goto error;
}
// fill the fileset with directory paths
for(i=0; i < num_dirs; i++) {
ret = remi_fileset_register_directory(fileset, dirnames[i]);
if(ret != REMI_SUCCESS) {
fprintf(stderr, "ERROR: remi_fileset_register_directory() returned %d\n", ret);
ret = -1;
goto error;
}
}
// fill the fileset with some metadata
ret = remi_fileset_register_metadata(fileset, "ABC", "DEF");
if(ret != REMI_SUCCESS) {
fprintf(stderr, "ERROR: remi_fileset_register_metadata() returned %d\n", ret);
ret = -1;
goto error;
}
remi_fileset_register_metadata(fileset, "AERED", "qerqwer");
// migrate the fileset
ret = remi_fileset_migrate(remi_ph, fileset, remote_root, REMI_REMOVE_SOURCE);
if(ret != REMI_SUCCESS) {
fprintf(stderr, "ERROR: remi_fileset_migrate() returned %d\n", ret);
ret = -1;
goto error;
}
// shut down the server
remi_shutdown_service(remi_clt, svr_addr);
finish:
// cleanup
remi_fileset_free(fileset);
remi_provider_handle_release(remi_ph);
margo_addr_free(mid, svr_addr);
remi_client_finalize(remi_clt);
margo_finalize(mid);
return ret;
error:
goto finish;
}
......@@ -117,8 +117,8 @@ int remi_fileset_get_root(
/**
* @brief Registers a file in the fileset. The provided path
* should be relative to the fileset's root. The file need not
* exist at the moment it is being registered (it needs to exist
* should be relative to the fileset's root. The file does not need
* to exist at the moment it is being registered (it needs to exist
* when migrating the fileset).
*
* @param fileset Fileset in which to register the file.
......@@ -130,6 +130,22 @@ int remi_fileset_register_file(
remi_fileset_t fileset,
const char* filename);
/**
* @brief Registers a directory in the fileset. The provided path
* should be relative to the fileset's root. The directory does not need
* to exist at the moment it is being registered (it needs to exist when
* migrating the fileset). When migrating the fileset, all the files and
* subdirectories in this directory will be migrated.
*
* @param fileset Fileset in which to register the directory.
* @param dirname Directory path.
*
* @return REMI_SUCCESS or error code defined in remi-common.h.
*/
int remi_fileset_register_directory(
remi_fileset_t fileset,
const char* dirname);
/**
* @brief Deregisters a file from the fileset. This deregisters
* the file only if the file has been added using remi_fileset_register_file.
......@@ -145,6 +161,21 @@ int remi_fileset_deregister_file(
remi_fileset_t fileset,
const char* filename);
/**
* @brief Deregisters a directory from the fileset. This deregisters
* the directory only if the directory has been added using
* remi_fileset_register_directory. If individual files in the directory
* have been added independently, this will not deregister them.
*
* @param fileset Fileset from which to deregister the directory.
* @param dirname Directory name.
*
* @return REMI_SUCCESS or error code defined in remi-common.h.
*/
int remi_fileset_deregister_directory(
remi_fileset_t fileset,
const char* dirname);
/**
* @brief Iterates over all the files in a fileset in alphabetical
* order and call the provided callback on the file's name and the
......@@ -161,6 +192,37 @@ int remi_fileset_foreach_file(
remi_fileset_callback_t callback,
void* uargs);
/**
* @brief Iterates over all the directories registered in a fileset
* in alphabetical order and call the provided callback on the directory's
* name and the provided user-arguments.
*
* @param fileset Fileset on which to iterate.
* @param callback Callback to call on each directory.
* @param uargs User-provided arguments (may be NULL).
*
* @return REMI_SUCCESS or error code defined in remi-common.h.
*/
int remi_fileset_foreach_directory(
remi_fileset_t fileset,
remi_fileset_callback_t callback,
void* uargs);
/**
* @brief Iterate over all the files and directories registered in
* a fileset and call the callback on all files that actually exist.
*
* @param fileset Fileset on which to iterate.
* @param callback Callback to call on each existing file.
* @param uargs User-provided arguments (may be NULL).
*
* @return REMI_SUCCESS or error code defined in remi-common.h.
*/
int remi_fileset_walkthrough(
remi_fileset_t fileset,
remi_fileset_callback_t callback,
void* uargs);
/**
* @brief Registers a metadata (key/value pair of strings) in the
* fileset. If a metadata already exists with the provided key,
......
......@@ -8,6 +8,10 @@
#include <sys/stat.h>
#include <limits.h>
#include <string>
#include <iostream>
#include <functional>
#include <dirent.h>
inline void mkdirs(const char *dir) {
std::string tmp(dir);
......@@ -25,4 +29,34 @@ inline void mkdirs(const char *dir) {
mkdir(tmp.c_str(), S_IRWXU);
}
template<typename F>
void listFiles(const std::string& root, const std::string &path, F&& cb) {
auto fullpath = root + "/" + path;
if(auto dir = opendir(fullpath.c_str())) {
while(auto f = readdir(dir)) {
if(!f->d_name || f->d_name[0] == '.') continue;
if(f->d_type == DT_DIR)
listFiles(root, path + f->d_name + "/", std::forward<F>(cb));
if(f->d_type == DT_REG)
cb(path + f->d_name);
}
closedir(dir);
}
}
inline void removeRec(const std::string &path) {
if(auto dir = opendir(path.c_str())) {
while(auto f = readdir(dir)) {
if(!f->d_name || f->d_name[0] == '.') continue;
if(f->d_type == DT_DIR) {
removeRec(path + f->d_name + "/");
} else {
remove((path+f->d_name).c_str());
}
}
closedir(dir);
}
remove(path.c_str());
}
#endif
......@@ -11,6 +11,7 @@
#include <fcntl.h>
#include <sys/mman.h>
#include <thallium.hpp>
#include "fs-util.hpp"
#include "remi/remi-client.h"
#include "remi-fileset.hpp"
......@@ -99,6 +100,11 @@ extern "C" int remi_shutdown_service(remi_client_t client, hg_addr_t addr)
return margo_shutdown_remote_instance(client->m_mid, addr);
}
static void list_existing_files(const char* filename, void* uargs) {
auto files = static_cast<std::set<std::string>*>(uargs);
files->emplace(filename);
}
extern "C" int remi_fileset_migrate(
remi_provider_handle_t ph,
remi_fileset_t fileset,
......@@ -121,14 +127,20 @@ extern "C" int remi_fileset_migrate(
std::vector<std::pair<void*,std::size_t>> theData;
std::vector<std::size_t> theSizes;
// prepare lambda for cleanin up mapped files
// prepare lambda for cleaning up mapped files
auto cleanup = [&theData]() {
for(auto& seg : theData) {
munmap(seg.first, seg.second);
}
};
for(auto& filename : fileset->m_files) {
// find the set of files to migrate from the fileset
std::set<std::string> files;
remi_fileset_walkthrough(fileset, list_existing_files,
static_cast<void*>(&files));
for(auto& filename : files) {
// compose full name
auto theFilename = fileset->m_root + filename;
// open file
......@@ -167,11 +179,22 @@ extern "C" int remi_fileset_migrate(
if(theData.size() != 0)
localBulk = ph->m_client->m_engine->expose(theData, tl::bulk_mode::read_only);
// send the RPC
auto localRoot = fileset->m_root;
// create a copy of the fileset where m_directory is empty
// and the filenames in directories have been resolved
auto tmp_files = std::move(fileset->m_files);
auto tmp_dirs = std::move(fileset->m_directories);
auto tmp_root = std::move(fileset->m_root);
fileset->m_files = files;
fileset->m_directories = decltype(fileset->m_directories)();
fileset->m_root = theRemoteRoot;
// send the RPC
int32_t result = ph->m_client->m_migrate_rpc.on(*ph)(*fileset, theSizes, localBulk);
fileset->m_root = localRoot;
// put back the fileset's original members
fileset->m_root = std::move(tmp_root);
fileset->m_files = std::move(tmp_files);
fileset->m_directories = std::move(tmp_dirs);
cleanup();
......@@ -180,10 +203,14 @@ extern "C" int remi_fileset_migrate(
}
if(flag == REMI_REMOVE_SOURCE) {
for(auto& filename : fileset->m_files) {
for(auto& filename : files) {
auto theFilename = fileset->m_root + filename;
remove(theFilename.c_str());
}
for(auto& dirname : fileset->m_directories) {
auto theDirname = fileset->m_root + dirname;
removeRec(theDirname);
}
}
return REMI_SUCCESS;
......
......@@ -5,6 +5,7 @@
*/
#include <cstring>
#include "remi/remi-common.h"
#include "fs-util.hpp"
#include "remi-fileset.hpp"
extern "C" int remi_fileset_create(
......@@ -114,6 +115,69 @@ extern "C" int remi_fileset_foreach_file(
return REMI_SUCCESS;
}
extern "C" int remi_fileset_register_directory(
remi_fileset_t fileset,
const char* dirname)
{
if(fileset == REMI_FILESET_NULL || dirname == NULL)
return REMI_ERR_INVALID_ARG;
unsigned i = 0;
while(dirname[i] == '/') i += 1;
fileset->m_directories.insert(dirname+i);
return REMI_SUCCESS;
}
extern "C" int remi_fileset_deregister_directory(
remi_fileset_t fileset,
const char* dirname)
{
if(fileset == REMI_FILESET_NULL || dirname == NULL)
return REMI_ERR_INVALID_ARG;
unsigned i = 0;
while(dirname[i] == '/') i += 1;
auto it = fileset->m_directories.find(dirname+i);
if(it == fileset->m_directories.end())
return REMI_ERR_UNKNOWN_FILE;
else
fileset->m_directories.erase(dirname+i);
return REMI_SUCCESS;
}
extern "C" int remi_fileset_foreach_directory(
remi_fileset_t fileset,
remi_fileset_callback_t callback,
void* uargs)
{
if(fileset == REMI_FILESET_NULL || callback == NULL)
return REMI_ERR_INVALID_ARG;
for(auto& dirname : fileset->m_directories) {
callback(dirname.c_str(), uargs);
}
return REMI_SUCCESS;
}
extern "C" int remi_fileset_walkthrough(
remi_fileset_t fileset,
remi_fileset_callback_t callback,
void* uargs)
{
if(fileset == REMI_FILESET_NULL || callback == NULL)
return REMI_ERR_INVALID_ARG;
std::set<std::string> files;
for(const auto& filename : fileset->m_files) {
files.insert(filename);
}
for(const auto& dirname : fileset->m_directories) {
listFiles(fileset->m_root, dirname, [&files](const std::string& filename) {
files.insert(filename);
});
}
for(const auto& filename : files) {
callback(filename.c_str(), uargs);
}
return REMI_SUCCESS;
}
extern "C" int remi_fileset_register_metadata(
remi_fileset_t fileset,
const char* key,
......
......@@ -19,6 +19,7 @@ struct remi_fileset {
std::string m_root;
std::map<std::string,std::string> m_metadata;
std::set<std::string> m_files;
std::set<std::string> m_directories;
template<typename A>
void serialize(A& ar) {
......@@ -26,6 +27,7 @@ struct remi_fileset {
ar & m_root;
ar & m_metadata;
ar & m_files;
ar & m_directories;
}
};
......
......@@ -17,7 +17,7 @@
#include <thallium.hpp>
#include "remi/remi-server.h"
#include "remi-fileset.hpp"
#include "fs-util.h"
#include "fs-util.hpp"
namespace tl = thallium;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment