Commit 042e0d27 authored by Matthieu Dorier's avatar Matthieu Dorier

added parallelism at server side

parent c6a98ee3
......@@ -26,6 +26,11 @@ int my_post_migration_callback(remi_fileset_t fileset, void* uargs) {
return 0;
}
static void finalize_abtio(void* data) {
abt_io_instance_id abtio = (abt_io_instance_id)data;
abt_io_finalize(abtio);
}
int main(int argc, char** argv)
{
if(argc != 2) {
......@@ -69,7 +74,8 @@ int main(int argc, char** argv)
fprintf(stdout,"Server running at address %s\n", my_addr_str);
// initialize ABT-IO
// TODO
abtio = abt_io_init(1);
margo_push_finalize_callback(mid, finalize_abtio, (void*)abtio);
// create the REMI provider
ret = remi_provider_register(mid, abtio, 1, REMI_ABT_POOL_DEFAULT, &remi_prov);
......@@ -79,6 +85,11 @@ int main(int argc, char** argv)
goto error;
}
// set /tmp and /home as HDDs, /dev/shm as memory
remi_set_device("/tmp", REMI_DEVICE_HDD);
remi_set_device("/home", REMI_DEVICE_HDD);
remi_set_device("/dev/shm", REMI_DEVICE_MEM);
// create migration class
ret = remi_provider_register_migration_class(remi_prov,
"my_migration_class",
......
......@@ -120,6 +120,33 @@ int remi_fileset_get_root(
char* buf,
size_t* size);
/**
* @brief Sets the transfer size for this fileset. This attribute
* has an effect only if the fileset is migrated with the REMI_USE_ABTIO
* option. It determins the maximum size of data an RPC is allowed to
* transfer at once.
*
* @param[in] fileset Fileset for which to set the xfer size.
* @param[in] size New size.
*
* @return REMI_SUCCESS or error code defined in remi-common.h.
*/
int remi_fileset_set_xfer_size(
remi_fileset_t fileset,
size_t size);
/**
* @brief Gets the transfer size for this fileset.
*
* @param[in] fileset Fileset for which to get the xfer size.
* @param[out] size resulting size.
*
* @return REMI_SUCCESS or error code defined in remi-common.h.
*/
int remi_fileset_get_xfer_size(
remi_fileset_t fileset,
size_t* size);
/**
* @brief Registers a file in the fileset. The provided path
* should be relative to the fileset's root. The file does not need
......
......@@ -16,6 +16,9 @@ extern "C" {
#define REMI_ABT_POOL_DEFAULT ABT_POOL_NULL /* Default Argobots pool for REMI */
#define REMI_PROVIDER_ID_DEFAULT 0 /* Default provider id for REMI */
#define REMI_DEVICE_MEM 1
#define REMI_DEVICE_HDD 2
#define REMI_DEVICE_SSD 3
/**
* @brief REMI provider type.
......@@ -110,6 +113,21 @@ int remi_provider_register_migration_class(
remi_uarg_free_t free_fn,
void* uargs);
/**
* @brief Set the type of device for a given mount point. Calling this function
* gives an opportunity for REMI to optimize transfers to files in this device,
* .e.g by using locks that are specific to this device (not shared with other
* devices), by restraining concurrency, etc.
*
* @param mount_point Mount point of the device.
* @param type Type of device (REMI_DEVICE_MEM, REMI_DEVICE_HDD, REMI_DEVICE_SSD).
*
* @return REMI_SUCCESS or error code defined in remi-common.h.
*/
int remi_set_device(
const char* mount_point,
int type);
#if defined(__cplusplus)
}
#endif
......
......@@ -369,7 +369,7 @@ int migrate_using_abtio(
auto abtio = ph->m_client->m_abtio;
// send a series of migrate_write RPC, pipelined with abti-io pread calls
size_t max_chunk_size = 1048576; // XXX make this configurable
size_t max_chunk_size = fileset->m_xfer_size;
if(abtio == ABT_IO_INSTANCE_NULL) {
......
......@@ -35,6 +35,28 @@ extern "C" int remi_fileset_free(remi_fileset_t fileset)
return REMI_SUCCESS;
}
extern "C" int remi_fileset_set_xfer_size(
remi_fileset_t fileset,
size_t size)
{
if(fileset == REMI_FILESET_NULL
|| size == 0)
return REMI_ERR_INVALID_ARG;
fileset->m_xfer_size = size;
return REMI_SUCCESS;
}
extern "C" int remi_fileset_get_xfer_size(
remi_fileset_t fileset,
size_t* size)
{
if(fileset == REMI_FILESET_NULL
|| size == nullptr)
return REMI_ERR_INVALID_ARG;
*size = fileset->m_xfer_size;
return REMI_SUCCESS;
}
extern "C" int remi_fileset_get_class(
remi_fileset_t fileset,
char* buf,
......
......@@ -20,6 +20,7 @@ struct remi_fileset {
std::map<std::string,std::string> m_metadata;
std::set<std::string> m_files;
std::set<std::string> m_directories;
size_t m_xfer_size = 1048576;
template<typename A>
void serialize(A& ar) {
......@@ -28,6 +29,7 @@ struct remi_fileset {
ar & m_metadata;
ar & m_files;
ar & m_directories;
ar & m_xfer_size;
}
};
......
......@@ -33,11 +33,30 @@ struct migration_class {
void* m_uargs;
};
struct device {
int m_type;
tl::mutex m_mutex;
void lock() {
if(m_type != REMI_DEVICE_HDD)
return;
m_mutex.lock();
}
void unlock() {
if(m_type != REMI_DEVICE_HDD)
return;
m_mutex.unlock();
}
};
struct operation {
remi_fileset m_fileset;
std::vector<std::size_t> m_filesizes;
std::vector<mode_t> m_modes;
std::vector<int> m_fds;
std::vector<device*> m_devices;
int m_error = REMI_SUCCESS;
};
struct remi_provider : public tl::provider<remi_provider> {
......@@ -48,7 +67,8 @@ struct remi_provider : public tl::provider<remi_provider> {
abt_io_instance_id m_abtio;
std::unordered_map<uuid, operation, uuid_hash> m_op_in_progress;
static std::unordered_map<uint16_t, remi_provider*> m_registered_providers;
static std::unordered_map<uint16_t, remi_provider*> s_registered_providers;
static std::unordered_map<std::string, device> s_devices;
void migrate_start(
const tl::request& req,
......@@ -92,8 +112,9 @@ struct remi_provider : public tl::provider<remi_provider> {
return;
}
// create and open the files
// create and open the files, record the device they belong to
std::vector<int> openedFileDescriptors;
std::vector<device*> devices;
unsigned i=0;
for(const auto& filename : fileset.m_files) {
auto theFilename = fileset.m_root + filename;
......@@ -110,6 +131,14 @@ struct remi_provider : public tl::provider<remi_provider> {
}
i += 1;
openedFileDescriptors.push_back(fd);
// find the device
device* d = &s_devices["###"];
for(auto& p : s_devices) {
if(theFilename.find(p.first) == 0) {
d = &(p.second);
}
}
devices.push_back(d);
}
// store the operation into the map of pending operations
auto& op = m_op_in_progress[std::get<2>(result)];
......@@ -117,8 +146,50 @@ struct remi_provider : public tl::provider<remi_provider> {
op.m_filesizes = std::move(filesizes);
op.m_modes = std::move(theModes);
op.m_fds = std::move(openedFileDescriptors);
op.m_devices = std::move(devices);
req.respond(result);
}
void migrate_end(const tl::request& req, const uuid& operation_id)
{
// the result of this RPC should be a pair <errorcode, userstatus>
std::pair<int32_t, int32_t> result = {0,0};
// get the operation associated with the operation id
auto it = m_op_in_progress.find(operation_id);
if(it == m_op_in_progress.end()) {
result.first = REMI_ERR_INVALID_OPID;
req.respond(result);
return;
}
auto& op = it->second;
// close all the file descriptors
for(int fd : op.m_fds) {
close(fd);
}
if(op.m_error != REMI_SUCCESS) {
result.first = op.m_error;
req.respond(result);
m_op_in_progress.erase(it);
return;
}
// find the class of migration
auto& klass = m_migration_classes[op.m_fileset.m_class];
// call the "after" migration callback associated with the class of fileset
if(klass.m_after_callback != nullptr) {
result.second = klass.m_after_callback(&(op.m_fileset), klass.m_uargs);
}
result.first = result.second == 0 ? REMI_SUCCESS : REMI_ERR_USER;
req.respond(result);
m_op_in_progress.erase(it);
return;
}
void migrate_mmap(
......@@ -208,40 +279,6 @@ struct remi_provider : public tl::provider<remi_provider> {
return;
}
void migrate_end(const tl::request& req, const uuid& operation_id)
{
// the result of this RPC should be a pair <errorcode, userstatus>
std::pair<int32_t, int32_t> result = {0,0};
// get the operation associated with the operation id
auto it = m_op_in_progress.find(operation_id);
if(it == m_op_in_progress.end()) {
result.first = REMI_ERR_INVALID_OPID;
req.respond(result);
return;
}
auto& op = it->second;
// close all the file descriptors
for(int fd : op.m_fds) {
close(fd);
}
// find the class of migration
auto& klass = m_migration_classes[op.m_fileset.m_class];
// call the "after" migration callback associated with the class of fileset
if(klass.m_after_callback != nullptr) {
result.second = klass.m_after_callback(&(op.m_fileset), klass.m_uargs);
}
result.first = result.second == 0 ? REMI_SUCCESS : REMI_ERR_USER;
req.respond(result);
m_op_in_progress.erase(it);
return;
}
void migrate_write(
const tl::request& req,
const uuid& operation_id,
......@@ -286,19 +323,30 @@ struct remi_provider : public tl::provider<remi_provider> {
req.respond(ret);
return;
}
// write the chunk received
int fd = op.m_fds[fileNumber];
auto s = pwrite(fd, data.data(), data.size(), writeOffset);
ssize_t s;
{ // only HDDs will be locked to avoid concurrent writes
std::lock_guard<device>(*(op.m_devices[fileNumber]));
if(s != data.size()) {
ret = REMI_ERR_IO;
cleanup(true);
// send an early response so the client can start sending the next chunk
// in parallel while this chunk is being written
ret = REMI_SUCCESS;
req.respond(ret);
return;
if(m_abtio == ABT_IO_INSTANCE_NULL) {
s = pwrite(fd, data.data(), data.size(), writeOffset);
} else {
s = abt_io_pwrite(m_abtio, fd, data.data(), data.size(), writeOffset);
}
}
if(s != data.size()) {
op.m_error = REMI_ERR_IO;
}
ret = REMI_SUCCESS;
req.respond(ret);
return;
}
......@@ -308,16 +356,20 @@ struct remi_provider : public tl::provider<remi_provider> {
define("remi_migrate_mmap", &remi_provider::migrate_mmap, pool);
define("remi_migrate_write", &remi_provider::migrate_write, pool);
define("remi_migrate_end", &remi_provider::migrate_end, pool);
m_registered_providers[provider_id] = this;
s_registered_providers[provider_id] = this;
if(s_devices.count("###")) { // default device
s_devices["###"].m_type = REMI_DEVICE_MEM;
}
}
~remi_provider() {
m_registered_providers.erase(get_provider_id());
s_registered_providers.erase(get_provider_id());
}
};
std::unordered_map<uint16_t, remi_provider*> remi_provider::m_registered_providers;
std::unordered_map<uint16_t, remi_provider*> remi_provider::s_registered_providers;
std::unordered_map<std::string, device> remi_provider::s_devices;
static void on_finalize(void* uargs) {
auto provider = static_cast<remi_provider_t>(uargs);
......@@ -353,8 +405,8 @@ extern "C" int remi_provider_registered(
ABT_pool* pool,
remi_provider_t* provider)
{
auto it = remi_provider::m_registered_providers.find(provider_id);
if(it == remi_provider::m_registered_providers.end()) {
auto it = remi_provider::s_registered_providers.find(provider_id);
if(it == remi_provider::s_registered_providers.end()) {
if(pool) *pool = ABT_POOL_NULL;
if(provider) *provider = REMI_PROVIDER_NULL;
*flag = 0;
......@@ -387,3 +439,17 @@ extern "C" int remi_provider_register_migration_class(
klass.m_free = free_fn;
return REMI_SUCCESS;
}
extern "C" int remi_set_device(
const char* mount_point,
int type)
{
std::string mp(mount_point);
for(auto& p : remi_provider::s_devices) {
if(p.first.find(mp) == 0 // an already-registered device starts with this mount-point
|| mp.find(p.first) == 0) // this mount-point starts with an already registered device
return REMI_ERR_INVALID_ARG;
}
remi_provider::s_devices[mp].m_type = type;
return REMI_SUCCESS;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment