Commit d0421aa8 authored by Matthieu Dorier's avatar Matthieu Dorier

corrected bug

parent eaf5e6be
...@@ -67,6 +67,7 @@ struct remi_provider : public tl::provider<remi_provider> { ...@@ -67,6 +67,7 @@ struct remi_provider : public tl::provider<remi_provider> {
tl::pool& m_pool; tl::pool& m_pool;
abt_io_instance_id m_abtio; abt_io_instance_id m_abtio;
std::unordered_map<uuid, operation, uuid_hash> m_op_in_progress; std::unordered_map<uuid, operation, uuid_hash> m_op_in_progress;
tl::mutex m_op_in_progress_mtx;
static std::unordered_map<uint16_t, remi_provider*> s_registered_providers; static std::unordered_map<uint16_t, remi_provider*> s_registered_providers;
static std::unordered_map<std::string, device> s_devices; static std::unordered_map<std::string, device> s_devices;
...@@ -142,12 +143,15 @@ struct remi_provider : public tl::provider<remi_provider> { ...@@ -142,12 +143,15 @@ struct remi_provider : public tl::provider<remi_provider> {
devices.push_back(d); devices.push_back(d);
} }
// store the operation into the map of pending operations // store the operation into the map of pending operations
auto& op = m_op_in_progress[std::get<2>(result)]; {
op.m_fileset = std::move(fileset); std::lock_guard<tl::mutex> guard(m_op_in_progress_mtx);
op.m_filesizes = std::move(filesizes); auto& op = m_op_in_progress[std::get<2>(result)];
op.m_modes = std::move(theModes); op.m_fileset = std::move(fileset);
op.m_fds = std::move(openedFileDescriptors); op.m_filesizes = std::move(filesizes);
op.m_devices = std::move(devices); op.m_modes = std::move(theModes);
op.m_fds = std::move(openedFileDescriptors);
op.m_devices = std::move(devices);
}
req.respond(result); req.respond(result);
} }
...@@ -158,42 +162,50 @@ struct remi_provider : public tl::provider<remi_provider> { ...@@ -158,42 +162,50 @@ struct remi_provider : public tl::provider<remi_provider> {
std::pair<int32_t, int32_t> result = {0,0}; std::pair<int32_t, int32_t> result = {0,0};
// get the operation associated with the operation id // get the operation associated with the operation id
auto it = m_op_in_progress.find(operation_id); operation* op = nullptr;
if(it == m_op_in_progress.end()) { {
result.first = REMI_ERR_INVALID_OPID; std::lock_guard<tl::mutex> guard(m_op_in_progress_mtx);
req.respond(result); auto it = m_op_in_progress.find(operation_id);
return; if(it == m_op_in_progress.end()) {
result.first = REMI_ERR_INVALID_OPID;
req.respond(result);
return;
}
op = &(it->second);
} }
auto& op = it->second;
{ {
std::lock_guard<tl::mutex> guard(op.m_mutex); std::lock_guard<tl::mutex> guard(op->m_mutex);
// close all the file descriptors // close all the file descriptors
for(int fd : op.m_fds) { for(int fd : op->m_fds) {
close(fd); close(fd);
} }
if(op.m_error != REMI_SUCCESS) { if(op->m_error != REMI_SUCCESS) {
result.first = op.m_error; result.first = op->m_error;
req.respond(result); req.respond(result);
m_op_in_progress.erase(it); std::lock_guard<tl::mutex> guard(m_op_in_progress_mtx);
m_op_in_progress.erase(operation_id);
return; return;
} }
// find the class of migration // find the class of migration
auto& klass = m_migration_classes[op.m_fileset.m_class]; auto& klass = m_migration_classes[op->m_fileset.m_class];
// call the "after" migration callback associated with the class of fileset // call the "after" migration callback associated with the class of fileset
if(klass.m_after_callback != nullptr) { if(klass.m_after_callback != nullptr) {
result.second = klass.m_after_callback(&(op.m_fileset), klass.m_uargs); result.second = klass.m_after_callback(&(op->m_fileset), klass.m_uargs);
} }
result.first = result.second == 0 ? REMI_SUCCESS : REMI_ERR_USER; result.first = result.second == 0 ? REMI_SUCCESS : REMI_ERR_USER;
req.respond(result); req.respond(result);
} }
m_op_in_progress.erase(it); {
std::lock_guard<tl::mutex> guard(m_op_in_progress_mtx);
m_op_in_progress.erase(operation_id);
}
return; return;
} }
...@@ -205,54 +217,59 @@ struct remi_provider : public tl::provider<remi_provider> { ...@@ -205,54 +217,59 @@ struct remi_provider : public tl::provider<remi_provider> {
{ {
int ret; int ret;
// get the operation associated with the operation id // get the operation associated with the operation id
auto it = m_op_in_progress.find(operation_id); operation* op = nullptr;
if(it == m_op_in_progress.end()) { {
ret = REMI_ERR_INVALID_OPID; std::lock_guard<tl::mutex> guard(m_op_in_progress_mtx);
req.respond(ret); auto it = m_op_in_progress.find(operation_id);
return; if(it == m_op_in_progress.end()) {
ret = REMI_ERR_INVALID_OPID;
req.respond(ret);
return;
}
op = &(it->second);
} }
auto& op = it->second;
// we found the operation, let's mmap some files! // we found the operation, let's mmap some files!
std::vector<std::pair<void*,std::size_t>> theData; std::vector<std::pair<void*,std::size_t>> theData;
// function to cleanup the segments // function to cleanup the segments
auto cleanup = [this, &theData, &it](bool error) { auto cleanup = [this, &theData, &operation_id](bool error) {
for(auto& seg : theData) { for(auto& seg : theData) {
munmap(seg.first, seg.second); munmap(seg.first, seg.second);
} }
if(error) { if(error) {
this->m_op_in_progress.erase(it); std::lock_guard<tl::mutex> guard(m_op_in_progress_mtx);
this->m_op_in_progress.erase(operation_id);
} }
}; };
// compute total file size // compute total file size
size_t totalSize = 0; size_t totalSize = 0;
for(auto& s : op.m_filesizes) for(auto& s : op->m_filesizes)
totalSize += s; totalSize += s;
// create files, truncate them, and expose them with mmap // create files, truncate them, and expose them with mmap
unsigned i=0; unsigned i=0;
for(int fd : op.m_fds) { for(int fd : op->m_fds) {
if(op.m_filesizes[i] == 0) { if(op->m_filesizes[i] == 0) {
i += 1; i += 1;
continue; continue;
} }
if(ftruncate(fd, op.m_filesizes[i]) == -1) { if(ftruncate(fd, op->m_filesizes[i]) == -1) {
cleanup(true); cleanup(true);
ret = REMI_ERR_IO; ret = REMI_ERR_IO;
req.respond(ret); req.respond(ret);
return; return;
} }
void *segment = mmap(0, op.m_filesizes[i], PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0); void *segment = mmap(0, op->m_filesizes[i], PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0);
if(segment == NULL) { if(segment == NULL) {
cleanup(true); cleanup(true);
ret = REMI_ERR_IO; ret = REMI_ERR_IO;
req.respond(ret); req.respond(ret);
return; return;
} }
madvise(segment, op.m_filesizes[i], MADV_SEQUENTIAL); madvise(segment, op->m_filesizes[i], MADV_SEQUENTIAL);
theData.emplace_back(segment, op.m_filesizes[i]); theData.emplace_back(segment, op->m_filesizes[i]);
i += 1; i += 1;
} }
...@@ -294,39 +311,44 @@ struct remi_provider : public tl::provider<remi_provider> { ...@@ -294,39 +311,44 @@ struct remi_provider : public tl::provider<remi_provider> {
{ {
int ret; int ret;
// get the operation associated with the operation id // get the operation associated with the operation id
auto it = m_op_in_progress.find(operation_id); operation* op = nullptr;
if(it == m_op_in_progress.end()) { {
ret = REMI_ERR_INVALID_OPID; std::lock_guard<tl::mutex> guard(m_op_in_progress_mtx);
req.respond(ret); auto it = m_op_in_progress.find(operation_id);
return; if(it == m_op_in_progress.end()) {
ret = REMI_ERR_INVALID_OPID;
req.respond(ret);
return;
}
op = &(it->second);
} }
auto& op = it->second;
std::lock_guard<tl::mutex> guard(op.m_mutex); std::lock_guard<tl::mutex> guard(op->m_mutex);
// we found the operation, let's open some files! // we found the operation, let's open some files!
std::vector<int> openedFileDescriptors; std::vector<int> openedFileDescriptors;
// function to cleanup everything in case of error // function to cleanup everything in case of error
auto cleanup = [this, &openedFileDescriptors, &it](bool error) { auto cleanup = [this, &openedFileDescriptors, &operation_id](bool error) {
for(auto& fd : openedFileDescriptors) { for(auto& fd : openedFileDescriptors) {
close(fd); close(fd);
} }
if(error) { if(error) {
this->m_op_in_progress.erase(it); std::lock_guard<tl::mutex> guard(m_op_in_progress_mtx);
this->m_op_in_progress.erase(operation_id);
} }
}; };
// check the RPC's target file index // check the RPC's target file index
// and the size of the file // and the size of the file
if(fileNumber >= op.m_fds.size()) { if(fileNumber >= op->m_fds.size()) {
ret = REMI_ERR_IO; ret = REMI_ERR_IO;
cleanup(true); cleanup(true);
req.respond(ret); req.respond(ret);
return; return;
} }
if(op.m_filesizes[fileNumber] < writeOffset + data.size()) { if(op->m_filesizes[fileNumber] < writeOffset + data.size()) {
ret = REMI_ERR_IO; ret = REMI_ERR_IO;
cleanup(true); cleanup(true);
req.respond(ret); req.respond(ret);
...@@ -335,10 +357,10 @@ struct remi_provider : public tl::provider<remi_provider> { ...@@ -335,10 +357,10 @@ struct remi_provider : public tl::provider<remi_provider> {
// write the chunk received // write the chunk received
int fd = op.m_fds[fileNumber]; int fd = op->m_fds[fileNumber];
ssize_t s; ssize_t s;
{ // only HDDs will be locked to avoid concurrent writes { // only HDDs will be locked to avoid concurrent writes
std::lock_guard<device> guard(*(op.m_devices[fileNumber])); std::lock_guard<device> guard(*(op->m_devices[fileNumber]));
// send an early response so the client can start sending the next chunk // send an early response so the client can start sending the next chunk
// in parallel while this chunk is being written // in parallel while this chunk is being written
...@@ -351,7 +373,7 @@ struct remi_provider : public tl::provider<remi_provider> { ...@@ -351,7 +373,7 @@ struct remi_provider : public tl::provider<remi_provider> {
s = abt_io_pwrite(m_abtio, fd, data.data(), data.size(), writeOffset); s = abt_io_pwrite(m_abtio, fd, data.data(), data.size(), writeOffset);
} }
if(s != data.size()) { if(s != data.size()) {
op.m_error = REMI_ERR_IO; op->m_error = REMI_ERR_IO;
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment