engine.hpp 14.3 KB
Newer Older
Matthieu Dorier's avatar
Matthieu Dorier committed
1 2 3 4 5
/*
 * (C) 2017 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
6 7
#ifndef __THALLIUM_ENGINE_HPP
#define __THALLIUM_ENGINE_HPP
Matthieu Dorier's avatar
Matthieu Dorier committed
8 9 10

#include <iostream>
#include <string>
11
#include <functional>
Matthieu Dorier's avatar
Matthieu Dorier committed
12
#include <stack>
13
#include <unordered_map>
14
#include <vector>
15
#include <atomic>
Matthieu Dorier's avatar
Matthieu Dorier committed
16
#include <margo.h>
Matthieu Dorier's avatar
Matthieu Dorier committed
17
#include <thallium/config.hpp>
18
#include <thallium/pool.hpp>
19
#include <thallium/tuple_util.hpp>
Matthieu Dorier's avatar
Matthieu Dorier committed
20 21 22
#include <thallium/function_cast.hpp>
#include <thallium/buffer.hpp>
#include <thallium/request.hpp>
Matthieu Dorier's avatar
Matthieu Dorier committed
23
#include <thallium/bulk_mode.hpp>
Matthieu Dorier's avatar
Matthieu Dorier committed
24

Matthieu Dorier's avatar
Matthieu Dorier committed
25 26 27
#define THALLIUM_SERVER_MODE MARGO_SERVER_MODE
#define THALLIUM_CLIENT_MODE MARGO_CLIENT_MODE

Matthieu Dorier's avatar
Matthieu Dorier committed
28 29
namespace thallium {

Matthieu Dorier's avatar
Matthieu Dorier committed
30
class bulk;
Matthieu Dorier's avatar
Matthieu Dorier committed
31
class endpoint;
Matthieu Dorier's avatar
Matthieu Dorier committed
32
class remote_bulk;
Matthieu Dorier's avatar
Matthieu Dorier committed
33
class remote_procedure;
34
template<typename T> class provider;
Matthieu Dorier's avatar
Matthieu Dorier committed
35

Matthieu Dorier's avatar
Matthieu Dorier committed
36 37 38 39 40 41
/**
 * @brief The engine class is at the core of Thallium,
 * it is the first object to instanciate to start using the
 * Thallium runtime. It initializes Margo and other libraries,
 * and allow users to declare RPCs and bulk objects.
 */
42
class engine {
Matthieu Dorier's avatar
Matthieu Dorier committed
43

Matthieu Dorier's avatar
Matthieu Dorier committed
44 45
    friend class request;
    friend class bulk;
46
    friend class endpoint;
Matthieu Dorier's avatar
Matthieu Dorier committed
47
    friend class remote_bulk;
48
    friend class remote_procedure;
49
    friend class callable_remote_procedure;
50 51
    template<typename T>
    friend class provider;
Matthieu Dorier's avatar
Matthieu Dorier committed
52 53 54

private:

55 56
    using rpc_t = std::function<void(const request&, const buffer&)>;

57
    margo_instance_id                     m_mid;
Matthieu Dorier's avatar
Matthieu Dorier committed
58 59 60 61 62 63 64
    std::unordered_map<hg_id_t, rpc_t>    m_rpcs;
    bool                                  m_is_server;
    bool                                  m_owns_mid;
    std::atomic<bool>                     m_finalize_called;
    hg_context_t*                         m_hg_context = nullptr;
    hg_class_t*                           m_hg_class = nullptr;
    std::stack<std::function<void(void)>> m_finalize_callbacks;
Matthieu Dorier's avatar
Matthieu Dorier committed
65

Matthieu Dorier's avatar
Matthieu Dorier committed
66 67 68 69
    /**
     * @brief Encapsulation of some data needed by RPC callbacks
     * (namely, the initiating thallium engine and the function to call)
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
70 71 72 73 74
    struct rpc_callback_data {
        engine* m_engine;
        void*   m_function;
    };

Matthieu Dorier's avatar
Matthieu Dorier committed
75 76 77 78 79
    /**
     * @brief Function to call to free the data registered with an RPC.
     *
     * @param data pointer to the data to free (instance of rpc_callback_data).
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
80 81 82 83 84
    static void free_rpc_callback_data(void* data) {
        rpc_callback_data* cb_data = (rpc_callback_data*)data;
        delete cb_data;
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
85 86 87 88 89 90 91
    /**
     * @brief Function run as a ULT when receiving an RPC.
     *
     * @tparam F type of the function to call.
     * @tparam disable_response whether the caller expects a response.
     * @param handle handle of the RPC.
     */
92 93 94 95
    template<typename F, bool disable_response>
    static void rpc_handler_ult(hg_handle_t handle) {
        using G = typename std::remove_reference<F>::type;
        const struct hg_info* info = margo_get_info(handle);
96
        THALLIUM_ASSERT_CONDITION(info != nullptr, "margo_get_info returned null");
97
        margo_instance_id mid = margo_hg_handle_get_instance(handle);
98
        THALLIUM_ASSERT_CONDITION(mid != 0, "margo_hg_handle_get_instance returned null");
99
        void* data = margo_registered_data(mid, info->id);
100
        THALLIUM_ASSERT_CONDITION(data != nullptr, "margo_registered_data returned null");
Matthieu Dorier's avatar
Matthieu Dorier committed
101
        auto cb_data  = static_cast<rpc_callback_data*>(data);
102 103 104 105
        auto f = function_cast<G>(cb_data->m_function);
        request req(*(cb_data->m_engine), handle, disable_response);
        buffer input;
        hg_return_t ret;
106 107
        ret = margo_get_input(handle, &input);
        MARGO_ASSERT(ret, margo_get_input);
108
        (*f)(req, input);
109 110
        ret = margo_free_input(handle, &input);
        MARGO_ASSERT(ret, margo_free_input);
111
        margo_destroy(handle); // because of margo_ref_incr in rpc_callback
112
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
113

Matthieu Dorier's avatar
Matthieu Dorier committed
114 115 116 117 118 119 120 121 122
    /**
     * @brief Callback called when an RPC is received.
     *
     * @tparam F type of the function exposed by the user for this RPC.
     * @tparam disable_response whether the caller expects a response.
     * @param handle handle of the RPC.
     *
     * @return HG_SUCCESS or a Mercury error code.
     */
123
    template<typename F, bool disable_response>
Matthieu Dorier's avatar
Matthieu Dorier committed
124 125 126 127 128 129 130 131
    static hg_return_t rpc_callback(hg_handle_t handle) {
        int ret;
        ABT_pool pool;
        margo_instance_id mid;
        mid = margo_hg_handle_get_instance(handle);
        if(mid == MARGO_INSTANCE_NULL) {
            return HG_OTHER_ERROR;
        }
132
        pool = margo_hg_handle_get_handler_pool(handle);
133
        margo_ref_incr(handle);
134 135
        ret = ABT_thread_create(pool, (void (*)(void *)) rpc_handler_ult<F,disable_response>, 
                handle, ABT_THREAD_ATTR_NULL, NULL);
Matthieu Dorier's avatar
Matthieu Dorier committed
136
        if(ret != 0) {
137
            margo_destroy(handle);
Matthieu Dorier's avatar
Matthieu Dorier committed
138 139 140 141 142
            return HG_NOMEM_ERROR;
        }
        return HG_SUCCESS;
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
143
    static void on_finalize_cb(void* arg) {
144 145
        engine* e = static_cast<engine*>(arg);
        e->m_finalize_called = true;
Matthieu Dorier's avatar
Matthieu Dorier committed
146 147 148 149 150
        while(!(e->m_finalize_callbacks.empty())) {
            auto& cb = e->m_finalize_callbacks.top();
            cb();
            e->m_finalize_callbacks.pop();
        }
151 152
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
153 154
public:

Matthieu Dorier's avatar
Matthieu Dorier committed
155 156 157 158 159 160 161 162 163
    /**
     * @brief Constructor.
     *
     * @param addr address of this instance.
     * @param mode THALLIUM_SERVER_MODE or THALLIUM_CLIENT_MODE.
     * @param use_progress_thread whether to use a dedicated ES to drive progress.
     * @param rpc_thread_count number of threads to use for servicing RPCs.
     * Use -1 to indicate that RPCs should be serviced in the progress ES.
     */
164 165 166
    engine(const std::string& addr, int mode, 
            bool use_progress_thread = false,
            std::int32_t rpc_thread_count = 0) {
Matthieu Dorier's avatar
Matthieu Dorier committed
167

Matthieu Dorier's avatar
Matthieu Dorier committed
168
        m_is_server = (mode == THALLIUM_SERVER_MODE);
169
        m_finalize_called = false;
170 171 172
        m_mid = margo_init(addr.c_str(), mode,
                use_progress_thread ? 1 : 0,
                rpc_thread_count);
173
        // XXX throw an exception if m_mid not initialized
174
        m_owns_mid = true;
175
        margo_push_finalize_callback(m_mid,
Matthieu Dorier's avatar
Matthieu Dorier committed
176
                &engine::on_finalize_cb, static_cast<void*>(this));
177
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
178

179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
    engine(const std::string& addr, int mode,
            const pool& progress_pool,
            const pool& default_handler_pool) {
        m_is_server = (mode == THALLIUM_SERVER_MODE);
        m_finalize_called = false;
        m_owns_mid = true;

        m_hg_class = HG_Init(addr.c_str(), mode);
        //if(!hg_class); // XXX throw exception

        m_hg_context = HG_Context_create(m_hg_class);
        //if(!hg_context); // XXX throw exception

        m_mid = margo_init_pool(progress_pool.native_handle(), 
                default_handler_pool.native_handle(), m_hg_context);
        // XXX throw an exception if m_mid not initialized
        margo_push_finalize_callback(m_mid,
Matthieu Dorier's avatar
Matthieu Dorier committed
196
                &engine::on_finalize_cb, static_cast<void*>(this));
197 198
    }

199 200 201 202 203 204 205
    /**
     * @brief Builds an engine around an existing margo instance.
     *
     * @param mid Margo instance.
     * @param mode THALLIUM_SERVER_MODE or THALLIUM_CLIENT_MODE.
     */
    [[deprecated]]
206 207 208 209
    engine(margo_instance_id mid, int mode) {
        m_mid = mid;
        m_is_server = (mode == THALLIUM_SERVER_MODE);
        m_owns_mid = false;
210 211
        margo_push_finalize_callback(m_mid,
                &engine::on_finalize_cb, static_cast<void*>(this));
212 213
    }

214 215 216 217 218 219 220 221 222
    /**
     * @brief Builds an engine around an existing margo instance.
     *
     * @param mid Margo instance.
     */
    engine(margo_instance_id mid) {
        m_mid = mid;
        m_owns_mid = false;
        m_is_server = margo_is_listening(mid);
223 224
        margo_push_finalize_callback(m_mid,
                &engine::on_finalize_cb, static_cast<void*>(this));
225 226
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
227 228 229
    /**
     * @brief Copy-constructor is deleted.
     */
230
    engine(const engine& other)            = delete;
Matthieu Dorier's avatar
Matthieu Dorier committed
231 232 233 234

    /**
     * @brief Move-constructor is deleted.
     */
235
    engine(engine&& other)                 = delete;
Matthieu Dorier's avatar
Matthieu Dorier committed
236 237 238 239
    
    /**
     * @brief Move-assignment operator is deleted.
     */
240
    engine& operator=(engine&& other)      = delete;
Matthieu Dorier's avatar
Matthieu Dorier committed
241 242 243 244

    /**
     * @brief Copy-assignment operator is deleted.
     */
245
    engine& operator=(const engine& other) = delete;
Matthieu Dorier's avatar
Matthieu Dorier committed
246

Matthieu Dorier's avatar
Matthieu Dorier committed
247 248 249 250

    /**
     * @brief Destructor.
     */
251
    ~engine() {
252 253
        if(m_owns_mid) {
            if(m_is_server) {
254 255
                if(!m_finalize_called) 
                    margo_wait_for_finalize(m_mid);
256
            } else {
257 258
                if(!m_finalize_called)
                    margo_finalize(m_mid);
259
            }
Matthieu Dorier's avatar
Matthieu Dorier committed
260
        }
261
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
262

263 264 265 266 267 268 269 270 271 272
    /**
     * @brief Get the underlying margo instance. Useful
     * when working in conjunction with C code that need
     * to be initialized with the margo instance.
     *
     * @return The margo instance id.
     */
    margo_instance_id get_margo_instance() const {
        return m_mid;
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
273 274 275 276

    /**
     * @brief Finalize the engine. Can be called by any thread.
     */
277 278
    void finalize() {
        margo_finalize(m_mid);
279 280 281 282
        if(m_hg_context)
            HG_Context_destroy(m_hg_context);
        if(m_hg_class)
            HG_Finalize(m_hg_class);
283
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
284

285 286 287 288 289 290 291 292 293 294
    /**
     * @brief Makes the calling thread block until someone calls
     * finalize on this engine. This function will not do anything
     * if finalize was already called.
     */
    void wait_for_finalize() {
        if(!m_finalize_called)
            margo_wait_for_finalize(m_mid);
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
295 296 297 298 299
    /**
     * @brief Creates an endpoint from this engine.
     *
     * @return An endpoint corresponding to this engine.
     */
300
    endpoint self() const;
Matthieu Dorier's avatar
Matthieu Dorier committed
301

Matthieu Dorier's avatar
Matthieu Dorier committed
302 303 304 305 306 307 308 309
    /**
     * @brief Defines an RPC with a name, without providing a
     * function pointer (used on clients).
     *
     * @param name Name of the RPC.
     *
     * @return a remote_procedure object.
     */
310
    remote_procedure define(const std::string& name);
Matthieu Dorier's avatar
Matthieu Dorier committed
311

Matthieu Dorier's avatar
Matthieu Dorier committed
312 313 314 315 316 317 318
    /**
     * @brief Defines an RPC with a name and an std::function 
     * representing the RPC.
     *
     * @tparam Args Types of arguments accepted by the RPC.
     * @param name Name of the RPC.
     * @param fun Function to associate with the RPC.
319 320
     * @param provider_id ID of the provider registering this RPC.
     * @param pool Argobots pool to use when receiving this type of RPC
Matthieu Dorier's avatar
Matthieu Dorier committed
321 322 323
     *
     * @return a remote_procedure object.
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
324
    template<typename A1, typename ... Args>
325
    remote_procedure define(const std::string& name, 
Matthieu Dorier's avatar
Matthieu Dorier committed
326 327 328 329 330
        const std::function<void(const request&, A1, Args...)>& fun,
        uint16_t provider_id=0, const pool& p = pool());

    remote_procedure define(const std::string& name, 
        const std::function<void(const request&)>& fun,
331
        uint16_t provider_id=0, const pool& p = pool());
332

Matthieu Dorier's avatar
Matthieu Dorier committed
333 334 335 336 337 338 339
    /**
     * @brief Defines an RPC with a name and a function pointer
     * to call when the RPC is received.
     *
     * @tparam Args Types of arguments accepted by the RPC.
     * @param name Name of the RPC.
     * @param f Function to associate with the RPC.
340 341
     * @param provider_id ID of the provider registering this RPC.
     * @param pool Argobots pool to use when receiving this type of RPC.
Matthieu Dorier's avatar
Matthieu Dorier committed
342 343 344
     *
     * @return a remote_procedure object.
     */
345
    template<typename ... Args>
346
    remote_procedure define(const std::string& name, void (*f)(const request&, Args...),
347
            uint16_t provider_id=0, const pool& p = pool());
Matthieu Dorier's avatar
Matthieu Dorier committed
348

Matthieu Dorier's avatar
Matthieu Dorier committed
349 350 351 352 353 354 355 356
    /**
     * @brief Lookup an address and returns an endpoint object
     * to communicate with this address.
     *
     * @param address String representation of the address.
     *
     * @return an endpoint object associated with the given address.
     */
357
    endpoint lookup(const std::string& address) const;
Matthieu Dorier's avatar
Matthieu Dorier committed
358

Matthieu Dorier's avatar
Matthieu Dorier committed
359 360 361 362 363 364 365 366
    /**
     * @brief Exposes a series of memory segments for bulk operations.
     *
     * @param segments vector of <pointer,size> pairs of memory segments.
     * @param flag indicates whether the bulk is read-write, read-only or write-only.
     *
     * @return a bulk object representing the memory exposed for RDMA.
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
367 368
    bulk expose(const std::vector<std::pair<void*,size_t>>& segments, bulk_mode flag);

Matthieu Dorier's avatar
Matthieu Dorier committed
369 370 371 372 373 374 375 376 377 378 379 380
    /**
     * @brief Pushes a finalization callback into the engine. This callback will be
     * called when margo_finalize is called (e.g. through engine::finalize()).
     *
     * @tparam F type of callback. Must have a operator() defined.
     * @param f callback.
     */
    template<typename F>
    void on_finalize(F&& f) {
        m_finalize_callbacks.emplace(std::forward<F>(f));
    }

381 382 383 384 385 386 387 388 389 390 391 392
    /**
     * @brief Shuts down a remote thallium engine. The remote engine
     * should have enabled remote shutdown by calling enable_remote_shutdown().
     *
     * @param ep endpoint of the remote engine.
     */
    void shutdown_remote_engine(const endpoint& ep) const;

    /**
     * @brief Enables this engine to be shutdown remotely.
     */
    void enable_remote_shutdown();
Matthieu Dorier's avatar
Matthieu Dorier committed
393 394 395 396 397
};

} // namespace thallium

#include <thallium/remote_procedure.hpp>
398
#include <thallium/proc_buffer.hpp>
Matthieu Dorier's avatar
Matthieu Dorier committed
399 400 401
#include <thallium/serialization/buffer_input_archive.hpp>
#include <thallium/serialization/buffer_output_archive.hpp>
#include <thallium/serialization/stl/tuple.hpp>
Matthieu Dorier's avatar
Matthieu Dorier committed
402 403 404

namespace thallium {

Matthieu Dorier's avatar
Matthieu Dorier committed
405
template<typename A1, typename ... Args>
406
remote_procedure engine::define(const std::string& name, 
Matthieu Dorier's avatar
Matthieu Dorier committed
407
        const std::function<void(const request&, A1, Args...)>& fun,
408
        uint16_t provider_id, const pool& p) {
409

410 411 412 413 414
    hg_id_t id = margo_provider_register_name(m_mid, name.c_str(),
                process_buffer,
                process_buffer,
                rpc_callback<rpc_t, false>,
                provider_id,
415
                p.native_handle());
416

Matthieu Dorier's avatar
Matthieu Dorier committed
417
    m_rpcs[id] = [fun,this](const request& r, const buffer& b) {
Matthieu Dorier's avatar
Matthieu Dorier committed
418 419
        std::function<void(A1, Args...)> call_function = [&fun, &r](A1&& a1, Args&&... args) {
            fun(r, std::forward<A1>(a1), std::forward<Args>(args)...);
420
        };
Matthieu Dorier's avatar
Matthieu Dorier committed
421 422 423
        std::tuple<typename std::decay<A1>::type, typename std::decay<Args>::type...> iargs;
        buffer_input_archive iarch(b, *this);
        iarch(iargs);
424
        apply_function_to_tuple(call_function, iargs);
425 426
    };

Matthieu Dorier's avatar
Matthieu Dorier committed
427 428 429 430
    rpc_callback_data* cb_data = new rpc_callback_data;
    cb_data->m_engine   = this;
    cb_data->m_function = void_cast(&m_rpcs[id]);

431 432 433
    hg_return_t ret = margo_register_data(m_mid, id, (void*)cb_data, free_rpc_callback_data);
    MARGO_ASSERT(ret, margo_register_data);

434
    return remote_procedure(*this, id);
Matthieu Dorier's avatar
Matthieu Dorier committed
435 436
}

437
template<typename ... Args>
438 439 440
remote_procedure engine::define(
        const std::string& name,
        void (*f)(const request&, Args...),
441
        uint16_t provider_id, const pool& p) {
442

443
    return define(name, std::function<void(const request&,Args...)>(f), provider_id, p);
444 445
}

Matthieu Dorier's avatar
Matthieu Dorier committed
446 447 448
}

#endif