remi-server.cpp 7.7 KB
Newer Older
Matthieu Dorier's avatar
Matthieu Dorier committed
1 2 3 4 5
/*
 * (C) 2018 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
Matthieu Dorier's avatar
Matthieu Dorier committed
6
#include <errno.h>
Matthieu Dorier's avatar
Matthieu Dorier committed
7 8 9 10
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
Matthieu Dorier's avatar
Matthieu Dorier committed
11
#include <sys/mman.h>
Matthieu Dorier's avatar
Matthieu Dorier committed
12 13 14 15
#include <unistd.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <string.h>
Matthieu Dorier's avatar
Matthieu Dorier committed
16
#include <iostream>
17
#include <unordered_map>
Matthieu Dorier's avatar
Matthieu Dorier committed
18
#include <thallium.hpp>
19
#include <thallium/serialization/stl/pair.hpp>
Matthieu Dorier's avatar
Matthieu Dorier committed
20 21
#include "remi/remi-server.h"
#include "remi-fileset.hpp"
22
#include "fs-util.hpp"
23

Matthieu Dorier's avatar
Matthieu Dorier committed
24
namespace tl = thallium;
25

Matthieu Dorier's avatar
Matthieu Dorier committed
26
struct migration_class {
27 28
    remi_migration_callback_t m_before_callback;
    remi_migration_callback_t m_after_callback;
29
    remi_uarg_free_t          m_free;
Matthieu Dorier's avatar
Matthieu Dorier committed
30 31
    void*                     m_uargs;
};
32

Matthieu Dorier's avatar
Matthieu Dorier committed
33 34
struct remi_provider : public tl::provider<remi_provider> {

35 36 37 38
    std::unordered_map<std::string, migration_class>    m_migration_classes;
    tl::engine*                                         m_engine;
    tl::pool&                                           m_pool;
    static std::unordered_map<uint16_t, remi_provider*> m_registered_providers;
Matthieu Dorier's avatar
Matthieu Dorier committed
39

Matthieu Dorier's avatar
Matthieu Dorier committed
40
    void migrate(
Matthieu Dorier's avatar
Matthieu Dorier committed
41 42 43
            const tl::request& req,
            remi_fileset& fileset,
            const std::vector<std::size_t>& filesizes,
Matthieu Dorier's avatar
Matthieu Dorier committed
44
            const std::vector<mode_t>& theModes,
Matthieu Dorier's avatar
Matthieu Dorier committed
45 46
            tl::bulk& remote_bulk) 
    {
47 48 49
        // pair of <returnvalue, status>
        std::pair<int32_t,int32_t> result = {0, 0};

Matthieu Dorier's avatar
Matthieu Dorier committed
50 51
        // check that the class of the fileset exists
        if(m_migration_classes.count(fileset.m_class) == 0) {
52
            result.first = REMI_ERR_UNKNOWN_CLASS;
Matthieu Dorier's avatar
Matthieu Dorier committed
53 54
            req.respond(result);
            return;
Matthieu Dorier's avatar
Matthieu Dorier committed
55 56 57 58 59 60
        }

        // check if any of the target files already exist
        // (we don't want to overwrite)
        for(const auto& filename : fileset.m_files) {
            auto theFilename = fileset.m_root + filename;
Matthieu Dorier's avatar
Matthieu Dorier committed
61
            if(access(theFilename.c_str(), F_OK) != -1) {
62
                result.first = REMI_ERR_FILE_EXISTS;
Matthieu Dorier's avatar
Matthieu Dorier committed
63 64 65
                req.respond(result);
                return;
            }
Matthieu Dorier's avatar
Matthieu Dorier committed
66 67 68
        }
        // alright, none of the files already exist

69 70 71 72 73 74 75 76 77 78 79
        // call the "before migration" callback
        auto& klass = m_migration_classes[fileset.m_class];
        if(klass.m_before_callback != nullptr) {
            result.second = klass.m_before_callback(&fileset, klass.m_uargs);
        }
        if(result.second != 0) {
            result.first = REMI_ERR_USER;
            req.respond(result);
            return;
        }

Matthieu Dorier's avatar
Matthieu Dorier committed
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
        std::vector<std::pair<void*,std::size_t>> theData;

        // function to cleanup the segments
        auto cleanup = [&theData]() {
            for(auto& seg : theData) {
                munmap(seg.first, seg.second);
            }
        };

        // create files, truncate them, and expose them with mmap
        unsigned i=0;
        size_t totalSize = 0;
        for(const auto& filename : fileset.m_files) {
            auto theFilename = fileset.m_root + filename;
            auto p = theFilename.find_last_of('/');
            auto theDir = theFilename.substr(0, p);
            mkdirs(theDir.c_str());
            totalSize += filesizes[i];
Matthieu Dorier's avatar
Matthieu Dorier committed
98
            int fd = open(theFilename.c_str(), O_RDWR | O_CREAT | O_TRUNC, theModes[i]);
Matthieu Dorier's avatar
Matthieu Dorier committed
99 100
            if(fd == -1) {
                cleanup();
101
                result.first = REMI_ERR_IO;
Matthieu Dorier's avatar
Matthieu Dorier committed
102 103
                req.respond(result);
                return;
Matthieu Dorier's avatar
Matthieu Dorier committed
104 105 106 107 108 109 110 111
            }
            if(filesizes[i] == 0) {
                i += 1;
                close(fd);
                continue;
            }
            if(ftruncate(fd, filesizes[i]) == -1) {
                cleanup();
112
                result.first = REMI_ERR_IO;
Matthieu Dorier's avatar
Matthieu Dorier committed
113 114
                req.respond(result);
                return;
Matthieu Dorier's avatar
Matthieu Dorier committed
115
            }
Matthieu Dorier's avatar
Matthieu Dorier committed
116
            void *segment = mmap(0, filesizes[i], PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0);
Matthieu Dorier's avatar
Matthieu Dorier committed
117
            close(fd);
Matthieu Dorier's avatar
Matthieu Dorier committed
118 119
            if(segment == NULL) {
                cleanup();
120
                result.first = REMI_ERR_IO;
Matthieu Dorier's avatar
Matthieu Dorier committed
121 122
                req.respond(result);
                return;
Matthieu Dorier's avatar
Matthieu Dorier committed
123
            }
Matthieu Dorier's avatar
Matthieu Dorier committed
124
            madvise(segment, filesizes[i], MADV_SEQUENTIAL);
Matthieu Dorier's avatar
Matthieu Dorier committed
125 126 127 128 129 130 131 132 133 134 135 136
            theData.emplace_back(segment, filesizes[i]);
            i += 1;
        }

        // create a local bulk handle to expose the segments
        auto localBulk = m_engine->expose(theData, tl::bulk_mode::write_only);

        // issue bulk transfer
        size_t transferred = remote_bulk.on(req.get_endpoint()) >> localBulk;

        if(transferred != totalSize) {
            // XXX we should cleanup the files that were created
137
            result.first = REMI_ERR_MIGRATION;
Matthieu Dorier's avatar
Matthieu Dorier committed
138 139
            req.respond(result);
            return;
Matthieu Dorier's avatar
Matthieu Dorier committed
140 141 142 143 144 145
        }

        for(auto& seg : theData) {
            if(msync(seg.first, seg.second, MS_SYNC) == -1) {
                // XXX we should cleanup the files that were created
                cleanup();
146
                result.first = REMI_ERR_IO;
Matthieu Dorier's avatar
Matthieu Dorier committed
147 148
                req.respond(result);
                return;
Matthieu Dorier's avatar
Matthieu Dorier committed
149 150 151
            }
        }

Matthieu Dorier's avatar
Matthieu Dorier committed
152 153
        cleanup();

154 155 156
        // call the "after" migration callback associated with the class of fileset
        if(klass.m_after_callback != nullptr) {
            result.second = klass.m_after_callback(&fileset, klass.m_uargs);
Matthieu Dorier's avatar
Matthieu Dorier committed
157
        }
158
        result.first = result.second == 0 ? REMI_SUCCESS : REMI_ERR_USER;
Matthieu Dorier's avatar
Matthieu Dorier committed
159 160
        req.respond(result);
        return;
Matthieu Dorier's avatar
Matthieu Dorier committed
161 162 163
    }

    remi_provider(tl::engine* e, uint16_t provider_id, tl::pool& pool)
164 165 166 167 168 169 170 171
    : tl::provider<remi_provider>(*e, provider_id), m_engine(e), m_pool(pool) {
        define("remi_migrate", &remi_provider::migrate, pool);
        m_registered_providers[provider_id] = this;
    }

    ~remi_provider() {
        m_registered_providers.erase(get_provider_id());
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
172 173 174

};

175 176
std::unordered_map<uint16_t, remi_provider*> remi_provider::m_registered_providers;

Matthieu Dorier's avatar
Matthieu Dorier committed
177 178
static void on_finalize(void* uargs) {
    auto provider = static_cast<remi_provider_t>(uargs);
179 180 181 182 183
    for(auto& klass : provider->m_migration_classes) {
        if(klass.second.m_free != nullptr) {
            klass.second.m_free(klass.second.m_uargs);
        }
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
184
    delete provider->m_engine;
185
    delete provider;
Matthieu Dorier's avatar
Matthieu Dorier committed
186
}
187 188 189 190 191 192 193

extern "C" int remi_provider_register(
        margo_instance_id mid,
        uint16_t provider_id,
        ABT_pool pool,
        remi_provider_t* provider)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
194 195 196 197 198 199
    auto thePool   = tl::pool(pool);
    auto theEngine = new tl::engine(mid, THALLIUM_SERVER_MODE);
    auto theProvider = new remi_provider(theEngine, provider_id, thePool);
    margo_push_finalize_callback(mid, on_finalize, theProvider);
    *provider = theProvider;
    return REMI_SUCCESS;
200 201
}

202 203 204 205 206 207 208 209 210
extern "C" int remi_provider_registered(
        margo_instance_id mid,
        uint16_t provider_id,
        int* flag,
        ABT_pool* pool,
        remi_provider_t* provider)
{
    auto it = remi_provider::m_registered_providers.find(provider_id);
    if(it == remi_provider::m_registered_providers.end()) {
211 212
        if(pool) *pool = ABT_POOL_NULL;
        if(provider) *provider = REMI_PROVIDER_NULL;
213 214 215 216 217 218 219 220 221 222
        *flag = 0;
    } else {
        remi_provider* p = it->second;
        if(provider) *provider = p;
        if(pool) *pool = p->m_pool.native_handle();
        *flag = 1;
    }
    return REMI_SUCCESS;
}

Matthieu Dorier's avatar
Matthieu Dorier committed
223
extern "C" int remi_provider_register_migration_class(
224 225
        remi_provider_t provider,
        const char* class_name,
226 227
        remi_migration_callback_t before_callback,
        remi_migration_callback_t after_callback,
228
        remi_uarg_free_t free_fn,
229 230
        void* uargs)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
231 232 233 234 235
    if(provider == REMI_PROVIDER_NULL || class_name == NULL)
        return REMI_ERR_INVALID_ARG;
    if(provider->m_migration_classes.count(class_name) != 0)
        return REMI_ERR_CLASS_EXISTS;
    auto& klass = provider->m_migration_classes[class_name];
236 237
    klass.m_before_callback = before_callback;
    klass.m_after_callback = after_callback;
Matthieu Dorier's avatar
Matthieu Dorier committed
238
    klass.m_uargs = uargs;
239
    klass.m_free = free_fn;
Matthieu Dorier's avatar
Matthieu Dorier committed
240
    return REMI_SUCCESS;
241
}