Commit c6a98ee3 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

implemented read part of the pipeline

parent 3a2fcd0e
......@@ -34,6 +34,8 @@ set (BUILD_SHARED_LIBS "OFF" CACHE BOOL "Build a shared library")
include (xpkg-import)
find_package (thallium REQUIRED)
xpkg_import_module (margo REQUIRED margo)
xpkg_import_module (abt-io REQUIRED abt-io)
xpkg_import_module (uuid REQUIRED uuid)
add_subdirectory (src)
add_subdirectory (test)
......
......@@ -16,6 +16,7 @@ int main(int argc, char** argv)
char* local_root = argv[2];
char* remote_root = argv[3];
margo_instance_id mid = MARGO_INSTANCE_NULL;
abt_io_instance_id abtio = ABT_IO_INSTANCE_NULL;
remi_client_t remi_clt = REMI_CLIENT_NULL;
remi_provider_handle_t remi_ph = REMI_PROVIDER_HANDLE_NULL;
hg_addr_t svr_addr = HG_ADDR_NULL;
......@@ -34,8 +35,11 @@ int main(int argc, char** argv)
goto error;
}
// initialize ABT-IO
// TODO
// initialize REMI client
ret = remi_client_init(mid, &remi_clt);
ret = remi_client_init(mid, abtio, &remi_clt);
if(ret != REMI_SUCCESS) {
fprintf(stderr, "ERROR: remi_client_init() returned %d\n", ret);
ret = -1;
......@@ -87,7 +91,7 @@ int main(int argc, char** argv)
// migrate the fileset
int status = 0;
ret = remi_fileset_migrate(remi_ph, fileset, remote_root, REMI_REMOVE_SOURCE, &status);
ret = remi_fileset_migrate(remi_ph, fileset, remote_root, REMI_REMOVE_SOURCE, REMI_USE_MMAP, &status);
if(ret != REMI_SUCCESS) {
fprintf(stderr, "ERROR: remi_fileset_migrate() returned %d (status = %d)\n", ret, status);
ret = -1;
......
......@@ -16,6 +16,7 @@ int main(int argc, char** argv)
char* local_root = argv[2];
char* remote_root = argv[3];
margo_instance_id mid = MARGO_INSTANCE_NULL;
abt_io_instance_id abtio = ABT_IO_INSTANCE_NULL;
remi_client_t remi_clt = REMI_CLIENT_NULL;
remi_provider_handle_t remi_ph = REMI_PROVIDER_HANDLE_NULL;
hg_addr_t svr_addr = HG_ADDR_NULL;
......@@ -34,8 +35,11 @@ int main(int argc, char** argv)
goto error;
}
// initialize ABT-IO
abtio = abt_io_init(1);
// initialize REMI client
ret = remi_client_init(mid, &remi_clt);
ret = remi_client_init(mid, abtio, &remi_clt);
if(ret != REMI_SUCCESS) {
fprintf(stderr, "ERROR: remi_client_init() returned %d\n", ret);
ret = -1;
......@@ -87,7 +91,7 @@ int main(int argc, char** argv)
// migrate the fileset
int status = 0;
ret = remi_fileset_migrate(remi_ph, fileset, remote_root, REMI_KEEP_SOURCE, &status);
ret = remi_fileset_migrate(remi_ph, fileset, remote_root, REMI_KEEP_SOURCE, REMI_USE_ABTIO, &status);
if(ret != REMI_SUCCESS) {
fprintf(stderr, "ERROR: remi_fileset_migrate() returned %d\n", ret);
if(ret == REMI_ERR_USER) {
......@@ -106,6 +110,7 @@ finish:
remi_provider_handle_release(remi_ph);
margo_addr_free(mid, svr_addr);
remi_client_finalize(remi_clt);
abt_io_finalize(abtio);
margo_finalize(mid);
return ret;
error:
......
......@@ -38,6 +38,7 @@ int main(int argc, char** argv)
char* listen_addr_str = argv[1];
uint16_t provider_id = 1;
margo_instance_id mid = MARGO_INSTANCE_NULL;
abt_io_instance_id abtio = ABT_IO_INSTANCE_NULL;
remi_provider_t remi_prov = REMI_PROVIDER_NULL;
hg_addr_t my_addr = HG_ADDR_NULL;
......@@ -67,8 +68,11 @@ int main(int argc, char** argv)
margo_addr_free(mid, my_addr);
fprintf(stdout,"Server running at address %s\n", my_addr_str);
// initialize ABT-IO
// TODO
// create the REMI provider
ret = remi_provider_register(mid, 1, REMI_ABT_POOL_DEFAULT, &remi_prov);
ret = remi_provider_register(mid, abtio, 1, REMI_ABT_POOL_DEFAULT, &remi_prov);
if(ret != REMI_SUCCESS) {
fprintf(stderr, "ERROR: remi_provider_register() returned %d\n", ret);
ret = -1;
......
......@@ -7,6 +7,7 @@
#define __REMI_CLIENT_H
#include <remi/remi-common.h>
#include <abt-io.h>
#include <margo.h>
#if defined(__cplusplus)
......@@ -29,11 +30,15 @@ typedef struct remi_provider_handle* remi_provider_handle_t;
* @brief Initializes a REMI client.
*
* @param[in] mid Margo instance.
* @param[in] abtio ABT-IO instance.
* @param[out] client Resulting client.
*
* @return REMI_SUCCESS or error code defined in remi-common.h.
*/
int remi_client_init(margo_instance_id mid, remi_client_t* client);
int remi_client_init(
margo_instance_id mid,
abt_io_instance_id abtio,
remi_client_t* client);
/**
* @brief Finalizes a REMI client.
......@@ -100,7 +105,8 @@ int remi_shutdown_service(remi_client_t client, hg_addr_t addr);
* @param handle Provider handle of the target provider.
* @param fileset Fileset to migrate.
* @param remote_root Root of the fileset when migrated.
* @param flag REMI_REMOVE_SOURCE or REMI_KEEP_SOURCE.
* @param remove_source REMI_REMOVE_SOURCE or REMI_KEEP_SOURCE.
* @param mode REMI_USE_MMAP or REMI_USE_ABTIO.
* @param status Value returned by the user-defined migration callbacks.
*
* @return REMI_SUCCESS or error code defined in remi-common.h.
......@@ -109,7 +115,8 @@ int remi_fileset_migrate(
remi_provider_handle_t handle,
remi_fileset_t fileset,
const char* remote_root,
int flag,
int remove_source,
int mode,
int* status);
#if defined(__cplusplus)
......
......@@ -12,9 +12,12 @@
extern "C" {
#endif
#define REMI_KEEP_SOURCE 0 /* Keep the source files/directories */
#define REMI_KEEP_SOURCE 0 /* Keep the source files/directories */
#define REMI_REMOVE_SOURCE 1 /* Remove the source files/directories */
#define REMI_USE_MMAP 2 /* Use mmap-ed files to issue transfers (good for memory-based storage) */
#define REMI_USE_ABTIO 4 /* Use ABT-IO to pipeline read/write with data transfers (good for disks) */
#define REMI_SUCCESS 0 /* Success */
#define REMI_ERR_ALLOCATION -1 /* Error allocating something */
#define REMI_ERR_INVALID_ARG -2 /* An argument is invalid */
......@@ -29,6 +32,7 @@ extern "C" {
#define REMI_ERR_FILE_EXISTS -11 /* File already exists */
#define REMI_ERR_IO -12 /* Error in I/O (stat, open, etc.) call */
#define REMI_ERR_USER -13 /* User-defined error reported in "status" argument */
#define REMI_ERR_INVALID_OPID -14 /* Invalid UUID operation identifier received */
/**
* @brief Fileset type.
......
......@@ -7,6 +7,7 @@
#define __REMI_SERVER_H
#include <remi/remi-common.h>
#include <abt-io.h>
#if defined(__cplusplus)
extern "C" {
......@@ -34,7 +35,7 @@ typedef struct remi_provider* remi_provider_t;
* not erase its original files. In all cases, the return value of this
* function is propagated back to the source.
*/
typedef int (*remi_migration_callback_t)(remi_fileset_t, void*);
typedef int32_t (*remi_migration_callback_t)(remi_fileset_t, void*);
#define REMI_MIGRATION_CALLBACK_NULL ((remi_migration_callback_t)0)
......@@ -49,6 +50,7 @@ typedef void (*remi_uarg_free_t)(void*);
* automatically destroyed upon finalizing the margo instance.
*
* @param[in] mid Margo instance.
* @param[in] abtio ABT-IO instance. May be ABT_IO_INSTANCE_NULL.
* @param[in] provider_id Provider id.
* @param[in] pool Argobots pool.
* @param[out] provider Resulting provider.
......@@ -57,6 +59,7 @@ typedef void (*remi_uarg_free_t)(void*);
*/
int remi_provider_register(
margo_instance_id mid,
abt_io_instance_id abtio,
uint16_t provider_id,
ABT_pool pool,
remi_provider_t* provider);
......@@ -72,6 +75,7 @@ int remi_provider_register(
* @param[in] mid Margo instance.
* @param[in] provider_id Provider id.
* @param[out] flag 1 if provider is registered, 0 otherwise.
* @param[out] abtio ABT-IO instance used by the provider.
* @param[out] pool Pool used to register the provider.
* @param[out] provider Registered provider (if it exists).
*
......@@ -81,6 +85,7 @@ int remi_provider_registered(
margo_instance_id mid,
uint16_t provider_id,
int* flag,
abt_io_instance_id* abtio,
ABT_pool* pool,
remi_provider_t* provider);
......
......@@ -17,7 +17,7 @@ set (remi-vers "${REMI_VERSION_MAJOR}.${REMI_VERSION_MINOR}")
set (REMI_VERSION "${remi-vers}.${REMI_VERSION_PATCH}")
add_library(remi ${remi-src})
target_link_libraries (remi thallium margo)
target_link_libraries (remi thallium margo abt-io uuid)
target_include_directories (remi PUBLIC $<INSTALL_INTERFACE:include>)
# local include's BEFORE, in case old incompatable .h files in prefix/include
......
......@@ -11,8 +11,11 @@
#include <unistd.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <abt-io.h>
#include <uuid/uuid.h>
#include <thallium.hpp>
#include <thallium/serialization/stl/pair.hpp>
#include "uuid-util.hpp"
#include "fs-util.hpp"
#include "remi/remi-client.h"
#include "remi-fileset.hpp"
......@@ -24,11 +27,19 @@ struct remi_client {
margo_instance_id m_mid = MARGO_INSTANCE_NULL;
tl::engine* m_engine = nullptr;
uint64_t m_num_providers = 0;
tl::remote_procedure m_migrate_rpc;
tl::remote_procedure m_migrate_start_rpc;
tl::remote_procedure m_migrate_mmap_rpc;
tl::remote_procedure m_migrate_write_rpc;
tl::remote_procedure m_migrate_end_rpc;
abt_io_instance_id m_abtio = ABT_IO_INSTANCE_NULL;
remi_client(tl::engine* e)
remi_client(tl::engine* e, abt_io_instance_id abtio)
: m_engine(e)
, m_migrate_rpc(m_engine->define("remi_migrate")) {}
, m_migrate_start_rpc(m_engine->define("remi_migrate_start"))
, m_migrate_mmap_rpc(m_engine->define("remi_migrate_mmap"))
, m_migrate_write_rpc(m_engine->define("remi_migrate_write"))
, m_migrate_end_rpc(m_engine->define("remi_migrate_end"))
, m_abtio(abtio) {}
};
......@@ -42,10 +53,13 @@ struct remi_provider_handle : public tl::provider_handle {
: tl::provider_handle(std::forward<Args>(args)...) {}
};
extern "C" int remi_client_init(margo_instance_id mid, remi_client_t* client)
extern "C" int remi_client_init(
margo_instance_id mid,
abt_io_instance_id abtio,
remi_client_t* client)
{
auto theEngine = new tl::engine(mid, THALLIUM_CLIENT_MODE);
remi_client_t theClient = new remi_client(theEngine);
remi_client_t theClient = new remi_client(theEngine, abtio);
theClient->m_mid = mid;
*client = theClient;
return REMI_SUCCESS;
......@@ -107,13 +121,29 @@ static void list_existing_files(const char* filename, void* uargs) {
files->emplace(filename);
}
static int migrate_using_mmap(
remi_provider_handle_t ph,
remi_fileset_t fileset,
const std::set<std::string>& files,
const std::string& remote_root,
int* status);
static int migrate_using_abtio(
remi_provider_handle_t ph,
remi_fileset_t fileset,
const std::set<std::string>& files,
const std::string& remote_root,
int* status);
extern "C" int remi_fileset_migrate(
remi_provider_handle_t ph,
remi_fileset_t fileset,
const char* remote_root,
int flag,
int remove_source,
int mode,
int* status)
{
int ret;
if(ph == REMI_PROVIDER_HANDLE_NULL
|| fileset == REMI_FILESET_NULL
......@@ -126,11 +156,52 @@ extern "C" int remi_fileset_migrate(
if(theRemoteRoot[theRemoteRoot.size()-1] != '/')
theRemoteRoot += "/";
// find the set of files to migrate from the fileset
std::set<std::string> files;
remi_fileset_walkthrough(fileset, list_existing_files,
static_cast<void*>(&files));
if(mode == REMI_USE_MMAP) {
ret = migrate_using_mmap(ph, fileset, files, theRemoteRoot.c_str(), status);
} else {
ret = migrate_using_abtio(ph, fileset, files, theRemoteRoot.c_str(), status);
}
if(ret != REMI_SUCCESS) {
return ret;
}
if(*status != 0) {
return REMI_ERR_USER;
}
if(remove_source == REMI_REMOVE_SOURCE) {
for(auto& filename : files) {
auto theFilename = fileset->m_root + filename;
remove(theFilename.c_str());
}
for(auto& dirname : fileset->m_directories) {
auto theDirname = fileset->m_root + dirname;
removeRec(theDirname);
}
}
return REMI_SUCCESS;
}
int migrate_using_mmap(
remi_provider_handle_t ph,
remi_fileset_t fileset,
const std::set<std::string>& files,
const std::string& remote_root,
int* status)
{
// expose the data
std::vector<std::pair<void*,std::size_t>> theData;
std::vector<std::size_t> theSizes;
std::vector<mode_t> theModes;
// prepare lambda for cleaning up mapped files
auto cleanup = [&theData]() {
for(auto& seg : theData) {
......@@ -138,12 +209,6 @@ extern "C" int remi_fileset_migrate(
}
};
// find the set of files to migrate from the fileset
std::set<std::string> files;
remi_fileset_walkthrough(fileset, list_existing_files,
static_cast<void*>(&files));
for(auto& filename : files) {
// compose full name
auto theFilename = fileset->m_root + filename;
......@@ -194,38 +259,215 @@ extern "C" int remi_fileset_migrate(
auto tmp_root = std::move(fileset->m_root);
fileset->m_files = files;
fileset->m_directories = decltype(fileset->m_directories)();
fileset->m_root = theRemoteRoot;
fileset->m_root = remote_root;
// call migrate_start RPC
// the response is in the form <errorcode, userstatus, uuid>
std::tuple<int32_t, int32_t, uuid> start_call_result
= ph->m_client->m_migrate_start_rpc.on(*ph)(*fileset, theSizes, theModes);
int ret = std::get<0>(start_call_result);
if(ret != REMI_SUCCESS) {
cleanup();
if(ret == REMI_ERR_USER)
*status = std::get<1>(start_call_result);
return ret;
}
auto& operation_id = std::get<2>(start_call_result);
// send the RPC
std::pair<int32_t, int32_t> result = ph->m_client->m_migrate_rpc.on(*ph)(*fileset, theSizes, theModes, localBulk);
int ret = result.first;
*status = result.second;
// send the migrate_mmap RPC
ret = ph->m_client->m_migrate_mmap_rpc.on(*ph)(operation_id, localBulk);
// put back the fileset's original members
fileset->m_root = std::move(tmp_root);
fileset->m_files = std::move(tmp_files);
fileset->m_directories = std::move(tmp_dirs);
if(ret != REMI_SUCCESS) {
cleanup();
return ret;
}
// xfer went ok, now send migrate_end rpc.
// the response is in the form <errorcode, userstatus>
std::pair<int32_t, int32_t> end_call_result =
ph->m_client->m_migrate_end_rpc.on(*ph)(operation_id);
cleanup();
return ret;
}
int migrate_using_abtio(
remi_provider_handle_t ph,
remi_fileset_t fileset,
const std::set<std::string>& files,
const std::string& remote_root,
int* status)
{
// expose the data
std::vector<int> openedFileDescriptors;
std::vector<std::pair<void*,std::size_t>> theData;
std::vector<std::size_t> theSizes;
std::vector<mode_t> theModes;
auto cleanup = [&openedFileDescriptors]() {
for(auto& fd : openedFileDescriptors) {
close(fd);
}
};
for(auto& filename : files) {
// compose full name
auto theFilename = fileset->m_root + filename;
// open file
int fd = open(theFilename.c_str(), O_RDONLY, 0);
if(fd == -1) {
cleanup();
return REMI_ERR_UNKNOWN_FILE;
}
openedFileDescriptors.push_back(fd);
// get file size
struct stat st;
if(0 != fstat(fd, &st)) {
close(fd);
cleanup();
return REMI_ERR_IO;
}
auto size = st.st_size;
theSizes.push_back(size);
auto mode = st.st_mode;
theModes.push_back(mode);
}
// create a copy of the fileset where m_directory is empty
// and the filenames in directories have been resolved
auto tmp_files = std::move(fileset->m_files);
auto tmp_dirs = std::move(fileset->m_directories);
auto tmp_root = std::move(fileset->m_root);
fileset->m_files = files;
fileset->m_directories = decltype(fileset->m_directories)();
fileset->m_root = remote_root;
// call migrate_start RPC
// the response is in the form <errorcode, userstatus, uuid>
std::tuple<int32_t, int32_t, uuid> start_call_result
= ph->m_client->m_migrate_start_rpc.on(*ph)(*fileset, theSizes, theModes);
int ret = std::get<0>(start_call_result);
if(ret != REMI_SUCCESS) {
cleanup();
if(ret == REMI_ERR_USER)
*status = std::get<1>(start_call_result);
return ret;
}
// put back the fileset's original members
fileset->m_root = std::move(tmp_root);
fileset->m_files = std::move(tmp_files);
fileset->m_directories = std::move(tmp_dirs);
auto& operation_id = std::get<2>(start_call_result);
auto abtio = ph->m_client->m_abtio;
// send a series of migrate_write RPC, pipelined with abti-io pread calls
size_t max_chunk_size = 1048576; // XXX make this configurable
if(abtio == ABT_IO_INSTANCE_NULL) {
std::vector<char> buffer(max_chunk_size);
for(uint32_t i = 0; i < files.size(); i++) {
size_t remaining_size = theSizes[i];
int fd = openedFileDescriptors[i];
size_t current_offset = 0;
while(remaining_size != 0) {
size_t chunk_size = remaining_size < max_chunk_size ? remaining_size : max_chunk_size;
buffer.resize(chunk_size);
auto sizeRead = read(fd, &buffer[0], chunk_size);
ret = ph->m_client->m_migrate_write_rpc.on(*ph)(operation_id, i, current_offset, buffer);
current_offset += chunk_size;
remaining_size -= chunk_size;
}
}
} else {
size_t current_chunk_offset = 0; // offset of the chunk that ABT-IO has to read in this iteration
size_t previous_chunk_offset = 0; // offset of the chunk that should be send in this iteration
size_t current_chunk_size = 0; // size of the chunk that ABT-IO has to read in this iteration
size_t previous_chunk_size = 0; // size of the chunk that should be send in this iteration
std::vector<char> current_buffer(max_chunk_size); // buffer for ABT-IO to place data
std::vector<char> previous_buffer(max_chunk_size); // buffer to be sent
for(uint32_t i = 0; i < files.size(); i++) {
// reset variables
current_chunk_offset = 0;
previous_chunk_offset = 0;
current_chunk_size = 0;
previous_chunk_size = 0;
size_t remaining_size = theSizes[i];
int fd = openedFileDescriptors[i];
// read first chunk
current_chunk_size = remaining_size < max_chunk_size ? remaining_size : max_chunk_size;
current_buffer.resize(current_chunk_size);
auto sizeRead = abt_io_pread(abtio, fd, &current_buffer[0], current_chunk_size, current_chunk_offset);
// swap variables
previous_chunk_offset = current_chunk_offset;
previous_chunk_size = current_chunk_size;
std::swap(current_buffer, previous_buffer);
current_chunk_offset += current_chunk_size;
remaining_size -= current_chunk_size;
// read remaining chunks and send in parallel
bool can_stop = false;
while(!can_stop) {
// issue RPC for the previous chunk
auto async_req = ph->m_client->m_migrate_write_rpc.on(*ph).async(operation_id, i, previous_chunk_offset, previous_buffer);
// read the current chunk
if(remaining_size == 0) {
can_stop = true;
} else {
current_chunk_size = remaining_size < max_chunk_size ? remaining_size : max_chunk_size;
current_buffer.resize(current_chunk_size);
auto sizeRead = abt_io_pread(abtio, fd, &current_buffer[0], current_chunk_size, current_chunk_offset);
current_chunk_offset += current_chunk_size;
remaining_size -= current_chunk_size;
}
// wait for the RPC to finish
ret = async_req.wait();
if(ret != REMI_SUCCESS)
can_stop = true;
if(!can_stop) {
previous_chunk_offset = current_chunk_offset;
previous_chunk_size = current_chunk_size;
std::swap(current_buffer, previous_buffer);
}
}
if(ret != REMI_SUCCESS) {
break;
}
}
}
if(*status != 0) {
return REMI_ERR_USER;
if(ret != REMI_SUCCESS) {
cleanup();
return ret;
}
// xfer went ok, now send migrate_end rpc.
// the response is in the form <errorcode, userstatus>
std::pair<int32_t, int32_t> end_call_result =
ph->m_client->m_migrate_end_rpc.on(*ph)(operation_id);
if(flag == REMI_REMOVE_SOURCE) {
for(auto& filename : files) {
auto theFilename = fileset->m_root + filename;
remove(theFilename.c_str());
}
for(auto& dirname : fileset->m_directories) {
auto theDirname = fileset->m_root + dirname;
removeRec(theDirname);
}
cleanup();
ret = end_call_result.first;
if(ret == REMI_ERR_USER) {
*status = end_call_result.second;
} else {
*status = 0;
}
return REMI_SUCCESS;
return ret;
}
......@@ -15,11 +15,14 @@
#include <string.h>
#include <iostream>
#include <unordered_map>
#include <abt-io.h>
#include <thallium.hpp>
#include <thallium/serialization/stl/pair.hpp>
#include <thallium/serialization/stl/tuple.hpp>
#include "remi/remi-server.h"
#include "remi-fileset.hpp"
#include "fs-util.hpp"
#include "uuid-util.hpp"
namespace tl = thallium;
......@@ -30,36 +33,48 @@ struct migration_class {
void* m_uargs;
};
struct operation {
remi_fileset m_fileset;
std::vector<std::size_t> m_filesizes;
std::vector<mode_t> m_modes;
std::vector<int> m_fds;