Commit f9d825d9 authored by Matthieu Dorier's avatar Matthieu Dorier

corrected some small bugs

parent a995481e
......@@ -93,6 +93,9 @@ int main(int argc, char** argv)
goto error;
}
// shut down the server
remi_shutdown_service(remi_clt, svr_addr);
finish:
// cleanup
remi_fileset_free(fileset);
......
......@@ -63,6 +63,7 @@ extern "C" int remi_provider_handle_create(
auto theHandle = new remi_provider_handle(
tl::endpoint(*(client->m_engine), addr, false), provider_id);
theHandle->m_client = client;
theHandle->m_ref_count = 1;
*handle = theHandle;
client->m_num_providers += 1;
return REMI_SUCCESS;
......
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
......@@ -6,6 +7,7 @@
#include <fcntl.h>
#include <sys/mman.h>
#include <string.h>
#include <iostream>
#include <thallium.hpp>
#include "remi/remi-server.h"
#include "remi-fileset.hpp"
......@@ -23,23 +25,29 @@ struct remi_provider : public tl::provider<remi_provider> {
std::unordered_map<std::string, migration_class> m_migration_classes;
tl::engine* m_engine;
int32_t migrate(
void migrate(
const tl::request& req,
remi_fileset& fileset,
const std::vector<std::size_t>& filesizes,
tl::bulk& remote_bulk)
{
int32_t result = 0;
// check that the class of the fileset exists
if(m_migration_classes.count(fileset.m_class) == 0) {
return REMI_ERR_UNKNOWN_CLASS;
result = REMI_ERR_UNKNOWN_CLASS;
req.respond(result);
return;
}
// check if any of the target files already exist
// (we don't want to overwrite)
for(const auto& filename : fileset.m_files) {
auto theFilename = fileset.m_root + filename;
if(access(theFilename.c_str(), F_OK) != -1)
return REMI_ERR_FILE_EXISTS;
if(access(theFilename.c_str(), F_OK) != -1) {
result = REMI_ERR_FILE_EXISTS;
req.respond(result);
return;
}
}
// alright, none of the files already exist
......@@ -64,7 +72,9 @@ struct remi_provider : public tl::provider<remi_provider> {
int fd = open(theFilename.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0600);
if(fd == -1) {
cleanup();
return REMI_ERR_IO;
result = REMI_ERR_IO;
req.respond(result);
return;
}
if(filesizes[i] == 0) {
i += 1;
......@@ -73,18 +83,21 @@ struct remi_provider : public tl::provider<remi_provider> {
}
if(ftruncate(fd, filesizes[i]) == -1) {
cleanup();
return REMI_ERR_IO;
result = REMI_ERR_IO;
req.respond(result);
return;
}
void *segment = mmap(0, filesizes[i], PROT_WRITE, MAP_PRIVATE, fd, 0);
void *segment = mmap(0, filesizes[i], PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0);
if(segment == NULL) {
close(fd);
cleanup();
return REMI_ERR_IO;
result = REMI_ERR_IO;
req.respond(result);
return;
}
theData.emplace_back(segment, filesizes[i]);
i += 1;
}
return REMI_SUCCESS;
// create a local bulk handle to expose the segments
auto localBulk = m_engine->expose(theData, tl::bulk_mode::write_only);
......@@ -92,30 +105,37 @@ struct remi_provider : public tl::provider<remi_provider> {
// issue bulk transfer
size_t transferred = remote_bulk.on(req.get_endpoint()) >> localBulk;
cleanup();
if(transferred != totalSize) {
// XXX we should cleanup the files that were created
return REMI_ERR_MIGRATION;
result = REMI_ERR_MIGRATION;
req.respond(result);
return;
}
for(auto& seg : theData) {
if(msync(seg.first, seg.second, MS_SYNC) == -1) {
// XXX we should cleanup the files that were created
cleanup();
return REMI_ERR_IO;
result = REMI_ERR_IO;
req.respond(result);
return;
}
}
cleanup();
// call the migration callback associated with the class of fileset
auto& klass = m_migration_classes[fileset.m_class];
if(klass.m_callback != nullptr) {
klass.m_callback(&fileset, klass.m_uargs);
}
result = REMI_SUCCESS;
req.respond(result);
return;
}
remi_provider(tl::engine* e, uint16_t provider_id, tl::pool& pool)
: tl::provider<remi_provider>(*e, provider_id) {
: tl::provider<remi_provider>(*e, provider_id), m_engine(e) {
define("remi_migrate", &remi_provider::migrate);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment