Commit 91532401 authored by Matthieu Dorier's avatar Matthieu Dorier

implemented migration function on client side

parent 9ef8e7da
......@@ -33,6 +33,7 @@ int remi_shutdown_service(remi_client_t client, hg_addr_t addr);
int remi_fileset_migrate(
remi_provider_handle_t handle,
remi_fileset_t fileset,
const char* remote_root,
int flag);
#if defined(__cplusplus)
......
......@@ -8,7 +8,7 @@ extern "C" {
#endif
#define REMI_KEEP_SOURCE 0
#define REMI_REMOBE_SOURCE 1
#define REMI_REMOVE_SOURCE 1
#define REMI_SUCCESS 0 /* Success */
#define REMI_ERR_ALLOCATION -1 /* Error allocating something */
......@@ -21,6 +21,7 @@ extern "C" {
#define REMI_ERR_SIZE -8 /* Client did not allocate enough for the requested data */
#define REMI_ERR_MIGRATION -9 /* Error during data migration */
#define REMI_ERR_CLASS_EXISTS -10 /* Migration class already registered */
#define REMI_ERR_STAT -11 /* Error in stat call */
typedef struct remi_fileset* remi_fileset_t;
#define REMI_FILESET_NULL ((remi_fileset_t)0)
......
#ifndef __FS_UTIL
#define __FS_UTIL
#include <sys/stat.h>
#include <limits.h>
inline void mkdirs(const char *dir) {
std::string tmp(dir);
char *p = NULL;
size_t len = tmp.size();
if(tmp[len - 1] == '/')
tmp[len - 1] = 0;
for(p = (char*)tmp.c_str() + 1; *p; p++)
if(*p == '/') {
*p = 0;
mkdir(tmp.c_str(), S_IRWXU);
*p = '/';
}
mkdir(tmp.c_str(), S_IRWXU);
}
#endif
#include <sys/stat.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <thallium.hpp>
#include "remi/remi-client.h"
#include "remi-fileset.hpp"
......@@ -87,9 +94,89 @@ extern "C" int remi_shutdown_service(remi_client_t client, hg_addr_t addr)
}
extern "C" int remi_fileset_migrate(
remi_provider_handle_t handle,
remi_provider_handle_t ph,
remi_fileset_t fileset,
const char* remote_root,
int flag)
{
// TODO
if(ph == REMI_PROVIDER_HANDLE_NULL
|| fileset == REMI_FILESET_NULL
|| remote_root == NULL)
return REMI_ERR_INVALID_ARG;
if(remote_root[0] != '/')
return REMI_ERR_INVALID_ARG;
std::string theRemoteRoot(remote_root);
if(theRemoteRoot[theRemoteRoot.size()-1] != '/')
theRemoteRoot += "/";
// expose the data
std::vector<std::pair<void*,std::size_t>> theData;
std::vector<std::size_t> theSizes;
// prepare lambda for cleanin up mapped files
auto cleanup = [&theData]() {
for(auto& seg : theData) {
munmap(seg.first, seg.second);
}
};
for(auto& filename : fileset->m_files) {
// compose full name
auto theFilename = fileset->m_root + filename;
// open file
int fd = open(theFilename.c_str(), O_RDONLY, 0);
if(fd == -1) {
cleanup();
return REMI_ERR_UNKNOWN_FILE;
}
// get file size
struct stat st;
if(0 != fstat(fd, &st)) {
close(fd);
cleanup();
return REMI_ERR_STAT;
}
auto size = st.st_size;
theSizes.push_back(size);
if(size == 0) {
close(fd);
continue;
}
// map the file
void* segment = mmap(0, size, PROT_READ, MAP_PRIVATE, fd, 0);
if(segment == NULL) {
cleanup();
return REMI_ERR_ALLOCATION;
}
// close file descriptor
close(fd);
// insert the segment
theData.emplace_back(segment, size);
}
// expose the segments for bulk operations
auto localBulk = ph->m_client->m_engine->expose(theData, tl::bulk_mode::read_only);
// send the RPC
auto localRoot = fileset->m_root;
fileset->m_root = theRemoteRoot;
int32_t result = ph->m_client->m_migrate_rpc.on(*ph)(*fileset, theSizes, localBulk);
fileset->m_root = localRoot;
cleanup();
if(result != REMI_SUCCESS) {
return result;
}
if(flag == REMI_REMOVE_SOURCE) {
for(auto& filename : fileset->m_files) {
auto theFilename = fileset->m_root + filename;
remove(theFilename.c_str());
}
}
return REMI_SUCCESS;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment