remi-client.cpp 6.27 KB
Newer Older
Matthieu Dorier's avatar
Matthieu Dorier committed
1 2 3 4 5
/*
 * (C) 2018 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
6 7 8 9 10 11 12
#include <sys/stat.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/mman.h>
Matthieu Dorier's avatar
Matthieu Dorier committed
13
#include <thallium.hpp>
14
#include <thallium/serialization/stl/pair.hpp>
15
#include "fs-util.hpp"
Matthieu Dorier's avatar
Matthieu Dorier committed
16
#include "remi/remi-client.h"
Matthieu Dorier's avatar
Matthieu Dorier committed
17
#include "remi-fileset.hpp"
18

Matthieu Dorier's avatar
Matthieu Dorier committed
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
namespace tl = thallium;

struct remi_client {

    margo_instance_id    m_mid = MARGO_INSTANCE_NULL;
    tl::engine*          m_engine = nullptr;
    uint64_t             m_num_providers = 0;
    tl::remote_procedure m_migrate_rpc;

    remi_client(tl::engine* e)
    : m_engine(e)
    , m_migrate_rpc(m_engine->define("remi_migrate")) {}

};

struct remi_provider_handle : public tl::provider_handle {

    remi_client_t m_client    = nullptr;
    uint64_t      m_ref_count = 0;
    
    template<typename ... Args>
    remi_provider_handle(Args&&... args)
    : tl::provider_handle(std::forward<Args>(args)...) {}
};

44 45
extern "C" int remi_client_init(margo_instance_id mid, remi_client_t* client)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
46 47 48 49 50
    auto theEngine           = new tl::engine(mid, THALLIUM_CLIENT_MODE);
    remi_client_t theClient  = new remi_client(theEngine);
    theClient->m_mid         = mid;
    *client = theClient;
    return REMI_SUCCESS;
51 52 53 54
}

extern "C" int remi_client_finalize(remi_client_t client)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
55 56 57 58 59
    if(client == REMI_CLIENT_NULL)
        return REMI_SUCCESS;
    delete client->m_engine;
    delete client;
    return REMI_SUCCESS;
60 61 62 63 64 65 66 67
}

extern "C" int remi_provider_handle_create(
        remi_client_t client,
        hg_addr_t addr,
        uint16_t provider_id,
        remi_provider_handle_t* handle)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
68 69 70 71 72
    if(client == REMI_CLIENT_NULL)
        return REMI_ERR_INVALID_ARG;
    auto theHandle = new remi_provider_handle(
            tl::endpoint(*(client->m_engine), addr, false), provider_id);
    theHandle->m_client = client;
Matthieu Dorier's avatar
Matthieu Dorier committed
73
    theHandle->m_ref_count = 1;
Matthieu Dorier's avatar
Matthieu Dorier committed
74 75 76
    *handle = theHandle;
    client->m_num_providers += 1;
    return REMI_SUCCESS;
77 78 79 80
}

extern "C" int remi_provider_handle_ref_incr(remi_provider_handle_t handle)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
81 82 83 84
    if(handle == REMI_PROVIDER_HANDLE_NULL)
        return REMI_ERR_INVALID_ARG;
    handle->m_ref_count += 1;
    return REMI_SUCCESS;
85 86 87 88
}

extern "C" int remi_provider_handle_release(remi_provider_handle_t handle)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
89 90 91 92 93 94 95 96
    if(handle == REMI_PROVIDER_HANDLE_NULL)
        return REMI_SUCCESS;
    handle->m_ref_count -= 1;
    if(handle->m_ref_count == 0) {
        handle->m_client->m_num_providers -= 1;
        delete handle;
    }
    return REMI_SUCCESS;
97 98 99 100
}

extern "C" int remi_shutdown_service(remi_client_t client, hg_addr_t addr)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
101
    return margo_shutdown_remote_instance(client->m_mid, addr);
102
}
Matthieu Dorier's avatar
Matthieu Dorier committed
103

104 105 106 107 108
static void list_existing_files(const char* filename, void* uargs) {
    auto files = static_cast<std::set<std::string>*>(uargs);
    files->emplace(filename);
}

Matthieu Dorier's avatar
Matthieu Dorier committed
109
extern "C" int remi_fileset_migrate(
110
        remi_provider_handle_t ph,
Matthieu Dorier's avatar
Matthieu Dorier committed
111
        remi_fileset_t fileset,
112
        const char* remote_root,
113 114
        int flag,
        int* status)
Matthieu Dorier's avatar
Matthieu Dorier committed
115
{
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130

    if(ph == REMI_PROVIDER_HANDLE_NULL
    || fileset == REMI_FILESET_NULL
    || remote_root == NULL)
        return REMI_ERR_INVALID_ARG;
    if(remote_root[0] != '/')
        return REMI_ERR_INVALID_ARG;

    std::string theRemoteRoot(remote_root);
    if(theRemoteRoot[theRemoteRoot.size()-1] != '/')
        theRemoteRoot += "/";

    // expose the data
    std::vector<std::pair<void*,std::size_t>> theData;
    std::vector<std::size_t> theSizes;
Matthieu Dorier's avatar
Matthieu Dorier committed
131
    std::vector<mode_t> theModes;
132

133
    // prepare lambda for cleaning up mapped files
134 135 136 137 138 139
    auto cleanup = [&theData]() {
        for(auto& seg : theData) {
            munmap(seg.first, seg.second);
        }
    };

140 141 142 143 144 145 146
    // find the set of files to migrate from the fileset
    std::set<std::string> files;

    remi_fileset_walkthrough(fileset, list_existing_files,
            static_cast<void*>(&files));

    for(auto& filename : files) {
147 148 149 150 151 152 153 154 155 156 157 158 159
        // compose full name
        auto theFilename = fileset->m_root + filename;
        // open file
        int fd = open(theFilename.c_str(), O_RDONLY, 0);
        if(fd == -1) {
            cleanup();
            return REMI_ERR_UNKNOWN_FILE;
        }
        // get file size
        struct stat st;
        if(0 != fstat(fd, &st)) {
            close(fd);
            cleanup();
Matthieu Dorier's avatar
Matthieu Dorier committed
160
            return REMI_ERR_IO;
161 162 163
        }
        auto size = st.st_size;
        theSizes.push_back(size);
Matthieu Dorier's avatar
Matthieu Dorier committed
164 165
        auto mode = st.st_mode;
        theModes.push_back(mode);
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
        if(size == 0) {
            close(fd);
            continue;
        }
        // map the file
        void* segment = mmap(0, size, PROT_READ, MAP_PRIVATE, fd, 0);
        if(segment == NULL) {
            cleanup();
            return REMI_ERR_ALLOCATION;
        }
        // close file descriptor
        close(fd);
        // insert the segment
        theData.emplace_back(segment, size);
    }

    // expose the segments for bulk operations
183 184 185
    tl::bulk localBulk;
    if(theData.size() != 0) 
        localBulk = ph->m_client->m_engine->expose(theData, tl::bulk_mode::read_only);
186

187 188 189 190 191 192 193
    // create a copy of the fileset where m_directory is empty
    // and the filenames in directories have been resolved
    auto tmp_files = std::move(fileset->m_files);
    auto tmp_dirs  = std::move(fileset->m_directories);
    auto tmp_root  = std::move(fileset->m_root);
    fileset->m_files = files;
    fileset->m_directories = decltype(fileset->m_directories)();
194
    fileset->m_root = theRemoteRoot;
195 196

    // send the RPC
Matthieu Dorier's avatar
Matthieu Dorier committed
197
    std::pair<int32_t, int32_t> result = ph->m_client->m_migrate_rpc.on(*ph)(*fileset, theSizes, theModes, localBulk);
198 199
    int ret = result.first;
    *status = result.second;
200 201 202 203 204

    // put back the fileset's original members
    fileset->m_root        = std::move(tmp_root);
    fileset->m_files       = std::move(tmp_files);
    fileset->m_directories = std::move(tmp_dirs);
205 206 207

    cleanup();

208 209 210 211 212 213
    if(ret != REMI_SUCCESS) {
        return ret;
    }

    if(*status != 0) {
        return REMI_ERR_USER;
214 215 216
    }

    if(flag == REMI_REMOVE_SOURCE) {
217
        for(auto& filename : files) {
218 219 220
            auto theFilename = fileset->m_root + filename;
            remove(theFilename.c_str());
        }
221 222 223 224
        for(auto& dirname : fileset->m_directories) {
            auto theDirname = fileset->m_root + dirname;
            removeRec(theDirname);
        }
225 226 227
    }

    return REMI_SUCCESS;
Matthieu Dorier's avatar
Matthieu Dorier committed
228
}