remi-client.cpp 6.38 KB
Newer Older
Matthieu Dorier's avatar
Matthieu Dorier committed
1 2 3 4 5
/*
 * (C) 2018 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
6 7 8 9
#include <sys/stat.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
Matthieu Dorier's avatar
Matthieu Dorier committed
10
#include <sys/mman.h>
11 12 13
#include <unistd.h>
#include <fcntl.h>
#include <sys/mman.h>
Matthieu Dorier's avatar
Matthieu Dorier committed
14
#include <thallium.hpp>
15
#include <thallium/serialization/stl/pair.hpp>
16
#include "fs-util.hpp"
Matthieu Dorier's avatar
Matthieu Dorier committed
17
#include "remi/remi-client.h"
Matthieu Dorier's avatar
Matthieu Dorier committed
18
#include "remi-fileset.hpp"
19

Matthieu Dorier's avatar
Matthieu Dorier committed
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
namespace tl = thallium;

struct remi_client {

    margo_instance_id    m_mid = MARGO_INSTANCE_NULL;
    tl::engine*          m_engine = nullptr;
    uint64_t             m_num_providers = 0;
    tl::remote_procedure m_migrate_rpc;

    remi_client(tl::engine* e)
    : m_engine(e)
    , m_migrate_rpc(m_engine->define("remi_migrate")) {}

};

struct remi_provider_handle : public tl::provider_handle {

    remi_client_t m_client    = nullptr;
    uint64_t      m_ref_count = 0;
    
    template<typename ... Args>
    remi_provider_handle(Args&&... args)
    : tl::provider_handle(std::forward<Args>(args)...) {}
};

45 46
extern "C" int remi_client_init(margo_instance_id mid, remi_client_t* client)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
47 48 49 50 51
    auto theEngine           = new tl::engine(mid, THALLIUM_CLIENT_MODE);
    remi_client_t theClient  = new remi_client(theEngine);
    theClient->m_mid         = mid;
    *client = theClient;
    return REMI_SUCCESS;
52 53 54 55
}

extern "C" int remi_client_finalize(remi_client_t client)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
56 57 58 59 60
    if(client == REMI_CLIENT_NULL)
        return REMI_SUCCESS;
    delete client->m_engine;
    delete client;
    return REMI_SUCCESS;
61 62 63 64 65 66 67 68
}

extern "C" int remi_provider_handle_create(
        remi_client_t client,
        hg_addr_t addr,
        uint16_t provider_id,
        remi_provider_handle_t* handle)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
69 70 71 72 73
    if(client == REMI_CLIENT_NULL)
        return REMI_ERR_INVALID_ARG;
    auto theHandle = new remi_provider_handle(
            tl::endpoint(*(client->m_engine), addr, false), provider_id);
    theHandle->m_client = client;
Matthieu Dorier's avatar
Matthieu Dorier committed
74
    theHandle->m_ref_count = 1;
Matthieu Dorier's avatar
Matthieu Dorier committed
75 76 77
    *handle = theHandle;
    client->m_num_providers += 1;
    return REMI_SUCCESS;
78 79 80 81
}

extern "C" int remi_provider_handle_ref_incr(remi_provider_handle_t handle)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
82 83 84 85
    if(handle == REMI_PROVIDER_HANDLE_NULL)
        return REMI_ERR_INVALID_ARG;
    handle->m_ref_count += 1;
    return REMI_SUCCESS;
86 87 88 89
}

extern "C" int remi_provider_handle_release(remi_provider_handle_t handle)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
90 91 92 93 94 95 96 97
    if(handle == REMI_PROVIDER_HANDLE_NULL)
        return REMI_SUCCESS;
    handle->m_ref_count -= 1;
    if(handle->m_ref_count == 0) {
        handle->m_client->m_num_providers -= 1;
        delete handle;
    }
    return REMI_SUCCESS;
98 99 100 101
}

extern "C" int remi_shutdown_service(remi_client_t client, hg_addr_t addr)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
102
    return margo_shutdown_remote_instance(client->m_mid, addr);
103
}
Matthieu Dorier's avatar
Matthieu Dorier committed
104

105 106 107 108 109
static void list_existing_files(const char* filename, void* uargs) {
    auto files = static_cast<std::set<std::string>*>(uargs);
    files->emplace(filename);
}

Matthieu Dorier's avatar
Matthieu Dorier committed
110
extern "C" int remi_fileset_migrate(
111
        remi_provider_handle_t ph,
Matthieu Dorier's avatar
Matthieu Dorier committed
112
        remi_fileset_t fileset,
113
        const char* remote_root,
114 115
        int flag,
        int* status)
Matthieu Dorier's avatar
Matthieu Dorier committed
116
{
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131

    if(ph == REMI_PROVIDER_HANDLE_NULL
    || fileset == REMI_FILESET_NULL
    || remote_root == NULL)
        return REMI_ERR_INVALID_ARG;
    if(remote_root[0] != '/')
        return REMI_ERR_INVALID_ARG;

    std::string theRemoteRoot(remote_root);
    if(theRemoteRoot[theRemoteRoot.size()-1] != '/')
        theRemoteRoot += "/";

    // expose the data
    std::vector<std::pair<void*,std::size_t>> theData;
    std::vector<std::size_t> theSizes;
Matthieu Dorier's avatar
Matthieu Dorier committed
132
    std::vector<mode_t> theModes;
133

134
    // prepare lambda for cleaning up mapped files
135 136 137 138 139 140
    auto cleanup = [&theData]() {
        for(auto& seg : theData) {
            munmap(seg.first, seg.second);
        }
    };

141 142 143 144 145 146 147
    // find the set of files to migrate from the fileset
    std::set<std::string> files;

    remi_fileset_walkthrough(fileset, list_existing_files,
            static_cast<void*>(&files));

    for(auto& filename : files) {
148 149 150 151 152 153 154 155 156 157 158 159 160
        // compose full name
        auto theFilename = fileset->m_root + filename;
        // open file
        int fd = open(theFilename.c_str(), O_RDONLY, 0);
        if(fd == -1) {
            cleanup();
            return REMI_ERR_UNKNOWN_FILE;
        }
        // get file size
        struct stat st;
        if(0 != fstat(fd, &st)) {
            close(fd);
            cleanup();
Matthieu Dorier's avatar
Matthieu Dorier committed
161
            return REMI_ERR_IO;
162 163 164
        }
        auto size = st.st_size;
        theSizes.push_back(size);
Matthieu Dorier's avatar
Matthieu Dorier committed
165 166
        auto mode = st.st_mode;
        theModes.push_back(mode);
167 168 169 170 171 172 173 174 175 176
        if(size == 0) {
            close(fd);
            continue;
        }
        // map the file
        void* segment = mmap(0, size, PROT_READ, MAP_PRIVATE, fd, 0);
        if(segment == NULL) {
            cleanup();
            return REMI_ERR_ALLOCATION;
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
177 178
        // indicate sequential access
        madvise(segment, size, MADV_SEQUENTIAL);
179 180 181 182 183 184 185
        // close file descriptor
        close(fd);
        // insert the segment
        theData.emplace_back(segment, size);
    }

    // expose the segments for bulk operations
186 187 188
    tl::bulk localBulk;
    if(theData.size() != 0) 
        localBulk = ph->m_client->m_engine->expose(theData, tl::bulk_mode::read_only);
189

190 191 192 193 194 195 196
    // create a copy of the fileset where m_directory is empty
    // and the filenames in directories have been resolved
    auto tmp_files = std::move(fileset->m_files);
    auto tmp_dirs  = std::move(fileset->m_directories);
    auto tmp_root  = std::move(fileset->m_root);
    fileset->m_files = files;
    fileset->m_directories = decltype(fileset->m_directories)();
197
    fileset->m_root = theRemoteRoot;
198 199

    // send the RPC
Matthieu Dorier's avatar
Matthieu Dorier committed
200
    std::pair<int32_t, int32_t> result = ph->m_client->m_migrate_rpc.on(*ph)(*fileset, theSizes, theModes, localBulk);
201 202
    int ret = result.first;
    *status = result.second;
203 204 205 206 207

    // put back the fileset's original members
    fileset->m_root        = std::move(tmp_root);
    fileset->m_files       = std::move(tmp_files);
    fileset->m_directories = std::move(tmp_dirs);
208 209 210

    cleanup();

211 212 213 214 215 216
    if(ret != REMI_SUCCESS) {
        return ret;
    }

    if(*status != 0) {
        return REMI_ERR_USER;
217 218 219
    }

    if(flag == REMI_REMOVE_SOURCE) {
220
        for(auto& filename : files) {
221 222 223
            auto theFilename = fileset->m_root + filename;
            remove(theFilename.c_str());
        }
224 225 226 227
        for(auto& dirname : fileset->m_directories) {
            auto theDirname = fileset->m_root + dirname;
            removeRec(theDirname);
        }
228 229 230
    }

    return REMI_SUCCESS;
Matthieu Dorier's avatar
Matthieu Dorier committed
231
}