remi-client.cpp 5.97 KB
Newer Older
Matthieu Dorier's avatar
Matthieu Dorier committed
1 2 3 4 5
/*
 * (C) 2018 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
6 7 8 9 10 11 12
#include <sys/stat.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/mman.h>
Matthieu Dorier's avatar
Matthieu Dorier committed
13
#include <thallium.hpp>
14
#include "fs-util.hpp"
Matthieu Dorier's avatar
Matthieu Dorier committed
15
#include "remi/remi-client.h"
Matthieu Dorier's avatar
Matthieu Dorier committed
16
#include "remi-fileset.hpp"
17

Matthieu Dorier's avatar
Matthieu Dorier committed
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
namespace tl = thallium;

struct remi_client {

    margo_instance_id    m_mid = MARGO_INSTANCE_NULL;
    tl::engine*          m_engine = nullptr;
    uint64_t             m_num_providers = 0;
    tl::remote_procedure m_migrate_rpc;

    remi_client(tl::engine* e)
    : m_engine(e)
    , m_migrate_rpc(m_engine->define("remi_migrate")) {}

};

struct remi_provider_handle : public tl::provider_handle {

    remi_client_t m_client    = nullptr;
    uint64_t      m_ref_count = 0;
    
    template<typename ... Args>
    remi_provider_handle(Args&&... args)
    : tl::provider_handle(std::forward<Args>(args)...) {}
};

43 44
extern "C" int remi_client_init(margo_instance_id mid, remi_client_t* client)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
45 46 47 48 49
    auto theEngine           = new tl::engine(mid, THALLIUM_CLIENT_MODE);
    remi_client_t theClient  = new remi_client(theEngine);
    theClient->m_mid         = mid;
    *client = theClient;
    return REMI_SUCCESS;
50 51 52 53
}

extern "C" int remi_client_finalize(remi_client_t client)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
54 55 56 57 58
    if(client == REMI_CLIENT_NULL)
        return REMI_SUCCESS;
    delete client->m_engine;
    delete client;
    return REMI_SUCCESS;
59 60 61 62 63 64 65 66
}

extern "C" int remi_provider_handle_create(
        remi_client_t client,
        hg_addr_t addr,
        uint16_t provider_id,
        remi_provider_handle_t* handle)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
67 68 69 70 71
    if(client == REMI_CLIENT_NULL)
        return REMI_ERR_INVALID_ARG;
    auto theHandle = new remi_provider_handle(
            tl::endpoint(*(client->m_engine), addr, false), provider_id);
    theHandle->m_client = client;
Matthieu Dorier's avatar
Matthieu Dorier committed
72
    theHandle->m_ref_count = 1;
Matthieu Dorier's avatar
Matthieu Dorier committed
73 74 75
    *handle = theHandle;
    client->m_num_providers += 1;
    return REMI_SUCCESS;
76 77 78 79
}

extern "C" int remi_provider_handle_ref_incr(remi_provider_handle_t handle)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
80 81 82 83
    if(handle == REMI_PROVIDER_HANDLE_NULL)
        return REMI_ERR_INVALID_ARG;
    handle->m_ref_count += 1;
    return REMI_SUCCESS;
84 85 86 87
}

extern "C" int remi_provider_handle_release(remi_provider_handle_t handle)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
88 89 90 91 92 93 94 95
    if(handle == REMI_PROVIDER_HANDLE_NULL)
        return REMI_SUCCESS;
    handle->m_ref_count -= 1;
    if(handle->m_ref_count == 0) {
        handle->m_client->m_num_providers -= 1;
        delete handle;
    }
    return REMI_SUCCESS;
96 97 98 99
}

extern "C" int remi_shutdown_service(remi_client_t client, hg_addr_t addr)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
100
    return margo_shutdown_remote_instance(client->m_mid, addr);
101
}
Matthieu Dorier's avatar
Matthieu Dorier committed
102

103 104 105 106 107
static void list_existing_files(const char* filename, void* uargs) {
    auto files = static_cast<std::set<std::string>*>(uargs);
    files->emplace(filename);
}

Matthieu Dorier's avatar
Matthieu Dorier committed
108
extern "C" int remi_fileset_migrate(
109
        remi_provider_handle_t ph,
Matthieu Dorier's avatar
Matthieu Dorier committed
110
        remi_fileset_t fileset,
111
        const char* remote_root,
Matthieu Dorier's avatar
Matthieu Dorier committed
112 113
        int flag)
{
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129

    if(ph == REMI_PROVIDER_HANDLE_NULL
    || fileset == REMI_FILESET_NULL
    || remote_root == NULL)
        return REMI_ERR_INVALID_ARG;
    if(remote_root[0] != '/')
        return REMI_ERR_INVALID_ARG;

    std::string theRemoteRoot(remote_root);
    if(theRemoteRoot[theRemoteRoot.size()-1] != '/')
        theRemoteRoot += "/";

    // expose the data
    std::vector<std::pair<void*,std::size_t>> theData;
    std::vector<std::size_t> theSizes;

130
    // prepare lambda for cleaning up mapped files
131 132 133 134 135 136
    auto cleanup = [&theData]() {
        for(auto& seg : theData) {
            munmap(seg.first, seg.second);
        }
    };

137 138 139 140 141 142 143
    // find the set of files to migrate from the fileset
    std::set<std::string> files;

    remi_fileset_walkthrough(fileset, list_existing_files,
            static_cast<void*>(&files));

    for(auto& filename : files) {
144 145 146 147 148 149 150 151 152 153 154 155 156
        // compose full name
        auto theFilename = fileset->m_root + filename;
        // open file
        int fd = open(theFilename.c_str(), O_RDONLY, 0);
        if(fd == -1) {
            cleanup();
            return REMI_ERR_UNKNOWN_FILE;
        }
        // get file size
        struct stat st;
        if(0 != fstat(fd, &st)) {
            close(fd);
            cleanup();
Matthieu Dorier's avatar
Matthieu Dorier committed
157
            return REMI_ERR_IO;
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
        }
        auto size = st.st_size;
        theSizes.push_back(size);
        if(size == 0) {
            close(fd);
            continue;
        }
        // map the file
        void* segment = mmap(0, size, PROT_READ, MAP_PRIVATE, fd, 0);
        if(segment == NULL) {
            cleanup();
            return REMI_ERR_ALLOCATION;
        }
        // close file descriptor
        close(fd);
        // insert the segment
        theData.emplace_back(segment, size);
    }

    // expose the segments for bulk operations
178 179 180
    tl::bulk localBulk;
    if(theData.size() != 0) 
        localBulk = ph->m_client->m_engine->expose(theData, tl::bulk_mode::read_only);
181

182 183 184 185 186 187 188
    // create a copy of the fileset where m_directory is empty
    // and the filenames in directories have been resolved
    auto tmp_files = std::move(fileset->m_files);
    auto tmp_dirs  = std::move(fileset->m_directories);
    auto tmp_root  = std::move(fileset->m_root);
    fileset->m_files = files;
    fileset->m_directories = decltype(fileset->m_directories)();
189
    fileset->m_root = theRemoteRoot;
190 191

    // send the RPC
192
    int32_t result = ph->m_client->m_migrate_rpc.on(*ph)(*fileset, theSizes, localBulk);
193 194 195 196 197

    // put back the fileset's original members
    fileset->m_root        = std::move(tmp_root);
    fileset->m_files       = std::move(tmp_files);
    fileset->m_directories = std::move(tmp_dirs);
198 199 200 201 202 203 204 205

    cleanup();

    if(result != REMI_SUCCESS) {
        return result;
    }

    if(flag == REMI_REMOVE_SOURCE) {
206
        for(auto& filename : files) {
207 208 209
            auto theFilename = fileset->m_root + filename;
            remove(theFilename.c_str());
        }
210 211 212 213
        for(auto& dirname : fileset->m_directories) {
            auto theDirname = fileset->m_root + dirname;
            removeRec(theDirname);
        }
214 215 216
    }

    return REMI_SUCCESS;
Matthieu Dorier's avatar
Matthieu Dorier committed
217
}