remi-client.cpp 4.98 KB
Newer Older
Matthieu Dorier's avatar
Matthieu Dorier committed
1 2 3 4 5
/*
 * (C) 2018 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
6 7 8 9 10 11 12
#include <sys/stat.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/mman.h>
Matthieu Dorier's avatar
Matthieu Dorier committed
13
#include <thallium.hpp>
Matthieu Dorier's avatar
Matthieu Dorier committed
14
#include "remi/remi-client.h"
Matthieu Dorier's avatar
Matthieu Dorier committed
15
#include "remi-fileset.hpp"
16

Matthieu Dorier's avatar
Matthieu Dorier committed
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
namespace tl = thallium;

struct remi_client {

    margo_instance_id    m_mid = MARGO_INSTANCE_NULL;
    tl::engine*          m_engine = nullptr;
    uint64_t             m_num_providers = 0;
    tl::remote_procedure m_migrate_rpc;

    remi_client(tl::engine* e)
    : m_engine(e)
    , m_migrate_rpc(m_engine->define("remi_migrate")) {}

};

struct remi_provider_handle : public tl::provider_handle {

    remi_client_t m_client    = nullptr;
    uint64_t      m_ref_count = 0;
    
    template<typename ... Args>
    remi_provider_handle(Args&&... args)
    : tl::provider_handle(std::forward<Args>(args)...) {}
};

42 43
extern "C" int remi_client_init(margo_instance_id mid, remi_client_t* client)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
44 45 46 47 48
    auto theEngine           = new tl::engine(mid, THALLIUM_CLIENT_MODE);
    remi_client_t theClient  = new remi_client(theEngine);
    theClient->m_mid         = mid;
    *client = theClient;
    return REMI_SUCCESS;
49 50 51 52
}

extern "C" int remi_client_finalize(remi_client_t client)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
53 54 55 56 57
    if(client == REMI_CLIENT_NULL)
        return REMI_SUCCESS;
    delete client->m_engine;
    delete client;
    return REMI_SUCCESS;
58 59 60 61 62 63 64 65
}

extern "C" int remi_provider_handle_create(
        remi_client_t client,
        hg_addr_t addr,
        uint16_t provider_id,
        remi_provider_handle_t* handle)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
66 67 68 69 70
    if(client == REMI_CLIENT_NULL)
        return REMI_ERR_INVALID_ARG;
    auto theHandle = new remi_provider_handle(
            tl::endpoint(*(client->m_engine), addr, false), provider_id);
    theHandle->m_client = client;
Matthieu Dorier's avatar
Matthieu Dorier committed
71
    theHandle->m_ref_count = 1;
Matthieu Dorier's avatar
Matthieu Dorier committed
72 73 74
    *handle = theHandle;
    client->m_num_providers += 1;
    return REMI_SUCCESS;
75 76 77 78
}

extern "C" int remi_provider_handle_ref_incr(remi_provider_handle_t handle)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
79 80 81 82
    if(handle == REMI_PROVIDER_HANDLE_NULL)
        return REMI_ERR_INVALID_ARG;
    handle->m_ref_count += 1;
    return REMI_SUCCESS;
83 84 85 86
}

extern "C" int remi_provider_handle_release(remi_provider_handle_t handle)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
87 88 89 90 91 92 93 94
    if(handle == REMI_PROVIDER_HANDLE_NULL)
        return REMI_SUCCESS;
    handle->m_ref_count -= 1;
    if(handle->m_ref_count == 0) {
        handle->m_client->m_num_providers -= 1;
        delete handle;
    }
    return REMI_SUCCESS;
95 96 97 98
}

extern "C" int remi_shutdown_service(remi_client_t client, hg_addr_t addr)
{
Matthieu Dorier's avatar
Matthieu Dorier committed
99
    return margo_shutdown_remote_instance(client->m_mid, addr);
100
}
Matthieu Dorier's avatar
Matthieu Dorier committed
101 102

extern "C" int remi_fileset_migrate(
103
        remi_provider_handle_t ph,
Matthieu Dorier's avatar
Matthieu Dorier committed
104
        remi_fileset_t fileset,
105
        const char* remote_root,
Matthieu Dorier's avatar
Matthieu Dorier committed
106 107
        int flag)
{
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144

    if(ph == REMI_PROVIDER_HANDLE_NULL
    || fileset == REMI_FILESET_NULL
    || remote_root == NULL)
        return REMI_ERR_INVALID_ARG;
    if(remote_root[0] != '/')
        return REMI_ERR_INVALID_ARG;

    std::string theRemoteRoot(remote_root);
    if(theRemoteRoot[theRemoteRoot.size()-1] != '/')
        theRemoteRoot += "/";

    // expose the data
    std::vector<std::pair<void*,std::size_t>> theData;
    std::vector<std::size_t> theSizes;

    // prepare lambda for cleanin up mapped files
    auto cleanup = [&theData]() {
        for(auto& seg : theData) {
            munmap(seg.first, seg.second);
        }
    };

    for(auto& filename : fileset->m_files) {
        // compose full name
        auto theFilename = fileset->m_root + filename;
        // open file
        int fd = open(theFilename.c_str(), O_RDONLY, 0);
        if(fd == -1) {
            cleanup();
            return REMI_ERR_UNKNOWN_FILE;
        }
        // get file size
        struct stat st;
        if(0 != fstat(fd, &st)) {
            close(fd);
            cleanup();
Matthieu Dorier's avatar
Matthieu Dorier committed
145
            return REMI_ERR_IO;
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
        }
        auto size = st.st_size;
        theSizes.push_back(size);
        if(size == 0) {
            close(fd);
            continue;
        }
        // map the file
        void* segment = mmap(0, size, PROT_READ, MAP_PRIVATE, fd, 0);
        if(segment == NULL) {
            cleanup();
            return REMI_ERR_ALLOCATION;
        }
        // close file descriptor
        close(fd);
        // insert the segment
        theData.emplace_back(segment, size);
    }

    // expose the segments for bulk operations
166 167 168
    tl::bulk localBulk;
    if(theData.size() != 0) 
        localBulk = ph->m_client->m_engine->expose(theData, tl::bulk_mode::read_only);
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189

    // send the RPC
    auto localRoot = fileset->m_root;
    fileset->m_root = theRemoteRoot;
    int32_t result = ph->m_client->m_migrate_rpc.on(*ph)(*fileset, theSizes, localBulk);
    fileset->m_root = localRoot;

    cleanup();

    if(result != REMI_SUCCESS) {
        return result;
    }

    if(flag == REMI_REMOVE_SOURCE) {
        for(auto& filename : fileset->m_files) {
            auto theFilename = fileset->m_root + filename;
            remove(theFilename.c_str());
        }
    }

    return REMI_SUCCESS;
Matthieu Dorier's avatar
Matthieu Dorier committed
190
}