Commit 25f0fdf2 authored by Matthieu Dorier's avatar Matthieu Dorier

done with implementation

parent 91532401
......@@ -21,7 +21,8 @@ extern "C" {
#define REMI_ERR_SIZE -8 /* Client did not allocate enough for the requested data */
#define REMI_ERR_MIGRATION -9 /* Error during data migration */
#define REMI_ERR_CLASS_EXISTS -10 /* Migration class already registered */
#define REMI_ERR_STAT -11 /* Error in stat call */
#define REMI_ERR_FILE_EXISTS -11 /* File already exists */
#define REMI_ERR_IO -12 /* Error in I/O (stat, open, etc.) call */
typedef struct remi_fileset* remi_fileset_t;
#define REMI_FILESET_NULL ((remi_fileset_t)0)
......
......@@ -136,7 +136,7 @@ extern "C" int remi_fileset_migrate(
if(0 != fstat(fd, &st)) {
close(fd);
cleanup();
return REMI_ERR_STAT;
return REMI_ERR_IO;
}
auto size = st.st_size;
theSizes.push_back(size);
......
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <string.h>
#include <thallium.hpp>
#include "remi/remi-server.h"
#include "remi-fileset.hpp"
#include "fs-util.h"
namespace tl = thallium;
......@@ -20,8 +29,89 @@ struct remi_provider : public tl::provider<remi_provider> {
const std::vector<std::size_t>& filesizes,
tl::bulk& remote_bulk)
{
// TODO
// check that the class of the fileset exists
if(m_migration_classes.count(fileset.m_class) == 0) {
return REMI_ERR_UNKNOWN_CLASS;
}
// check if any of the target files already exist
// (we don't want to overwrite)
for(const auto& filename : fileset.m_files) {
auto theFilename = fileset.m_root + filename;
if(access(theFilename.c_str(), F_OK) != -1)
return REMI_ERR_FILE_EXISTS;
}
// alright, none of the files already exist
std::vector<std::pair<void*,std::size_t>> theData;
// function to cleanup the segments
auto cleanup = [&theData]() {
for(auto& seg : theData) {
munmap(seg.first, seg.second);
}
};
// create files, truncate them, and expose them with mmap
unsigned i=0;
size_t totalSize = 0;
for(const auto& filename : fileset.m_files) {
auto theFilename = fileset.m_root + filename;
auto p = theFilename.find_last_of('/');
auto theDir = theFilename.substr(0, p);
mkdirs(theDir.c_str());
totalSize += filesizes[i];
int fd = open(theFilename.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0600);
if(fd == -1) {
cleanup();
return REMI_ERR_IO;
}
if(filesizes[i] == 0) {
i += 1;
close(fd);
continue;
}
if(ftruncate(fd, filesizes[i]) == -1) {
cleanup();
return REMI_ERR_IO;
}
void *segment = mmap(0, filesizes[i], PROT_WRITE, MAP_PRIVATE, fd, 0);
if(segment == NULL) {
close(fd);
cleanup();
return REMI_ERR_IO;
}
theData.emplace_back(segment, filesizes[i]);
i += 1;
}
return REMI_SUCCESS;
// create a local bulk handle to expose the segments
auto localBulk = m_engine->expose(theData, tl::bulk_mode::write_only);
// issue bulk transfer
size_t transferred = remote_bulk.on(req.get_endpoint()) >> localBulk;
cleanup();
if(transferred != totalSize) {
// XXX we should cleanup the files that were created
return REMI_ERR_MIGRATION;
}
for(auto& seg : theData) {
if(msync(seg.first, seg.second, MS_SYNC) == -1) {
// XXX we should cleanup the files that were created
cleanup();
return REMI_ERR_IO;
}
}
// call the migration callback associated with the class of fileset
auto& klass = m_migration_classes[fileset.m_class];
if(klass.m_callback != nullptr) {
klass.m_callback(this, &fileset, klass.m_uargs);
}
}
remi_provider(tl::engine* e, uint16_t provider_id, tl::pool& pool)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment