engine.hpp 9.57 KB
Newer Older
Matthieu Dorier's avatar
Matthieu Dorier committed
1 2 3 4 5
/*
 * (C) 2017 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
6 7
#ifndef __THALLIUM_ENGINE_HPP
#define __THALLIUM_ENGINE_HPP
Matthieu Dorier's avatar
Matthieu Dorier committed
8 9 10

#include <iostream>
#include <string>
11 12
#include <functional>
#include <unordered_map>
13
#include <atomic>
Matthieu Dorier's avatar
Matthieu Dorier committed
14
#include <margo.h>
15
#include <thallium/tuple_util.hpp>
Matthieu Dorier's avatar
Matthieu Dorier committed
16 17 18
#include <thallium/function_cast.hpp>
#include <thallium/buffer.hpp>
#include <thallium/request.hpp>
Matthieu Dorier's avatar
Matthieu Dorier committed
19
#include <thallium/bulk_mode.hpp>
Matthieu Dorier's avatar
Matthieu Dorier committed
20

Matthieu Dorier's avatar
Matthieu Dorier committed
21 22 23
#define THALLIUM_SERVER_MODE MARGO_SERVER_MODE
#define THALLIUM_CLIENT_MODE MARGO_CLIENT_MODE

Matthieu Dorier's avatar
Matthieu Dorier committed
24 25
namespace thallium {

Matthieu Dorier's avatar
Matthieu Dorier committed
26
class bulk;
Matthieu Dorier's avatar
Matthieu Dorier committed
27
class endpoint;
Matthieu Dorier's avatar
Matthieu Dorier committed
28
class remote_bulk;
Matthieu Dorier's avatar
Matthieu Dorier committed
29 30
class remote_procedure;

Matthieu Dorier's avatar
Matthieu Dorier committed
31 32 33 34 35 36
/**
 * @brief The engine class is at the core of Thallium,
 * it is the first object to instanciate to start using the
 * Thallium runtime. It initializes Margo and other libraries,
 * and allow users to declare RPCs and bulk objects.
 */
37
class engine {
Matthieu Dorier's avatar
Matthieu Dorier committed
38

Matthieu Dorier's avatar
Matthieu Dorier committed
39 40
    friend class request;
    friend class bulk;
Matthieu Dorier's avatar
Matthieu Dorier committed
41
	friend class endpoint;
Matthieu Dorier's avatar
Matthieu Dorier committed
42
    friend class remote_bulk;
43
    friend class remote_procedure;
Matthieu Dorier's avatar
Matthieu Dorier committed
44 45 46 47
	friend class callable_remote_procedure;

private:

48 49
    using rpc_t = std::function<void(const request&, const buffer&)>;

50
	margo_instance_id                  m_mid;
51
    std::unordered_map<hg_id_t, rpc_t> m_rpcs;
52 53
    bool                               m_is_server;
    bool                               m_owns_mid;
54
    std::atomic<bool>                  m_finalize_called;
Matthieu Dorier's avatar
Matthieu Dorier committed
55

Matthieu Dorier's avatar
Matthieu Dorier committed
56 57 58 59
    /**
     * @brief Encapsulation of some data needed by RPC callbacks
     * (namely, the initiating thallium engine and the function to call)
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
60 61 62 63 64
    struct rpc_callback_data {
        engine* m_engine;
        void*   m_function;
    };

Matthieu Dorier's avatar
Matthieu Dorier committed
65 66 67 68 69
    /**
     * @brief Function to call to free the data registered with an RPC.
     *
     * @param data pointer to the data to free (instance of rpc_callback_data).
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
70 71 72 73 74
    static void free_rpc_callback_data(void* data) {
        rpc_callback_data* cb_data = (rpc_callback_data*)data;
        delete cb_data;
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
75 76 77 78 79 80 81
    /**
     * @brief Function run as a ULT when receiving an RPC.
     *
     * @tparam F type of the function to call.
     * @tparam disable_response whether the caller expects a response.
     * @param handle handle of the RPC.
     */
82
	template<typename F, bool disable_response>
Matthieu Dorier's avatar
Matthieu Dorier committed
83
	static void rpc_handler_ult(hg_handle_t handle) {
Matthieu Dorier's avatar
Matthieu Dorier committed
84 85 86 87
		using G = std::remove_reference_t<F>;
		const struct hg_info* info = margo_get_info(handle);
		margo_instance_id mid = margo_hg_handle_get_instance(handle);
		void* data = margo_registered_data(mid, info->id);
Matthieu Dorier's avatar
Matthieu Dorier committed
88 89 90
        auto cb_data  = static_cast<rpc_callback_data*>(data);
		auto f = function_cast<G>(cb_data->m_function);
		request req(*(cb_data->m_engine), handle, disable_response);
Matthieu Dorier's avatar
Matthieu Dorier committed
91
		buffer input;
92 93 94
		hg_return_t ret;
        ret = margo_get_input(handle, &input);
        MARGO_ASSERT(ret, margo_get_input);
Matthieu Dorier's avatar
Matthieu Dorier committed
95
		(*f)(req, input);
96 97
        ret = margo_free_input(handle, &input);
        MARGO_ASSERT(ret, margo_free_input);
Matthieu Dorier's avatar
Matthieu Dorier committed
98 99
	}

Matthieu Dorier's avatar
Matthieu Dorier committed
100 101 102 103 104 105 106 107 108
    /**
     * @brief Callback called when an RPC is received.
     *
     * @tparam F type of the function exposed by the user for this RPC.
     * @tparam disable_response whether the caller expects a response.
     * @param handle handle of the RPC.
     *
     * @return HG_SUCCESS or a Mercury error code.
     */
109
    template<typename F, bool disable_response>
Matthieu Dorier's avatar
Matthieu Dorier committed
110 111 112 113 114 115 116 117
    static hg_return_t rpc_callback(hg_handle_t handle) {
        int ret;
        ABT_pool pool;
        margo_instance_id mid;
        mid = margo_hg_handle_get_instance(handle);
        if(mid == MARGO_INSTANCE_NULL) {
            return HG_OTHER_ERROR;
        }
118
        pool = margo_hg_handle_get_handler_pool(handle);
119 120
        ret = ABT_thread_create(pool, (void (*)(void *)) rpc_handler_ult<F,disable_response>, 
                handle, ABT_THREAD_ATTR_NULL, NULL);
Matthieu Dorier's avatar
Matthieu Dorier committed
121 122 123 124 125 126
        if(ret != 0) {
            return HG_NOMEM_ERROR;
        }
        return HG_SUCCESS;
    }

127 128 129 130 131
    static void on_finalize(void* arg) {
        engine* e = static_cast<engine*>(arg);
        e->m_finalize_called = true;
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
132 133
public:

Matthieu Dorier's avatar
Matthieu Dorier committed
134 135 136 137 138 139 140 141 142
    /**
     * @brief Constructor.
     *
     * @param addr address of this instance.
     * @param mode THALLIUM_SERVER_MODE or THALLIUM_CLIENT_MODE.
     * @param use_progress_thread whether to use a dedicated ES to drive progress.
     * @param rpc_thread_count number of threads to use for servicing RPCs.
     * Use -1 to indicate that RPCs should be serviced in the progress ES.
     */
143
	engine(const std::string& addr, int mode, 
Matthieu Dorier's avatar
Matthieu Dorier committed
144 145 146
	              bool use_progress_thread = false,
	              std::int32_t rpc_thread_count = 0) {

Matthieu Dorier's avatar
Matthieu Dorier committed
147
        m_is_server = (mode == THALLIUM_SERVER_MODE);
148
        m_finalize_called = false;
Matthieu Dorier's avatar
Matthieu Dorier committed
149
		m_mid = margo_init(addr.c_str(), mode,
Matthieu Dorier's avatar
Matthieu Dorier committed
150
				use_progress_thread ? 1 : 0,
Matthieu Dorier's avatar
Matthieu Dorier committed
151
				rpc_thread_count);
152
        // XXX throw an exception if m_mid not initialized
153
        m_owns_mid = true;
154 155
        margo_push_finalize_callback(m_mid,
                &engine::on_finalize, static_cast<void*>(this));
Matthieu Dorier's avatar
Matthieu Dorier committed
156 157
	}

158 159 160 161 162 163
    engine(margo_instance_id mid, int mode) {
        m_mid = mid;
        m_is_server = (mode == THALLIUM_SERVER_MODE);
        m_owns_mid = false;
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
164 165 166
    /**
     * @brief Copy-constructor is deleted.
     */
167
	engine(const engine& other)            = delete;
Matthieu Dorier's avatar
Matthieu Dorier committed
168 169 170 171

    /**
     * @brief Move-constructor is deleted.
     */
172
	engine(engine&& other)                 = delete;
Matthieu Dorier's avatar
Matthieu Dorier committed
173 174 175 176
    
    /**
     * @brief Move-assignment operator is deleted.
     */
177
	engine& operator=(engine&& other)      = delete;
Matthieu Dorier's avatar
Matthieu Dorier committed
178 179 180 181

    /**
     * @brief Copy-assignment operator is deleted.
     */
182
	engine& operator=(const engine& other) = delete;
Matthieu Dorier's avatar
Matthieu Dorier committed
183

Matthieu Dorier's avatar
Matthieu Dorier committed
184 185 186 187

    /**
     * @brief Destructor.
     */
188
	~engine() {
189 190
        if(m_owns_mid) {
            if(m_is_server) {
191 192
                if(!m_finalize_called) 
                    margo_wait_for_finalize(m_mid);
193 194 195
            } else {
                margo_finalize(m_mid);
            }
Matthieu Dorier's avatar
Matthieu Dorier committed
196
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
197 198
	}

Matthieu Dorier's avatar
Matthieu Dorier committed
199 200 201 202

    /**
     * @brief Finalize the engine. Can be called by any thread.
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
203 204 205 206
	void finalize() {
		margo_finalize(m_mid);
	}

Matthieu Dorier's avatar
Matthieu Dorier committed
207 208 209 210 211
    /**
     * @brief Creates an endpoint from this engine.
     *
     * @return An endpoint corresponding to this engine.
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
212 213
	endpoint self() const;

Matthieu Dorier's avatar
Matthieu Dorier committed
214 215 216 217 218 219 220 221
    /**
     * @brief Defines an RPC with a name, without providing a
     * function pointer (used on clients).
     *
     * @param name Name of the RPC.
     *
     * @return a remote_procedure object.
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
222 223
	remote_procedure define(const std::string& name);

Matthieu Dorier's avatar
Matthieu Dorier committed
224 225 226 227 228 229 230 231 232 233
    /**
     * @brief Defines an RPC with a name and an std::function 
     * representing the RPC.
     *
     * @tparam Args Types of arguments accepted by the RPC.
     * @param name Name of the RPC.
     * @param fun Function to associate with the RPC.
     *
     * @return a remote_procedure object.
     */
234 235 236 237
	template<typename ... Args>
	remote_procedure define(const std::string& name, 
        const std::function<void(const request&, Args...)>& fun);

Matthieu Dorier's avatar
Matthieu Dorier committed
238 239 240 241 242 243 244 245 246 247
    /**
     * @brief Defines an RPC with a name and a function pointer
     * to call when the RPC is received.
     *
     * @tparam Args Types of arguments accepted by the RPC.
     * @param name Name of the RPC.
     * @param f Function to associate with the RPC.
     *
     * @return a remote_procedure object.
     */
248 249
    template<typename ... Args>
    remote_procedure define(const std::string& name, void (*f)(const request&, Args...));
Matthieu Dorier's avatar
Matthieu Dorier committed
250

Matthieu Dorier's avatar
Matthieu Dorier committed
251 252 253 254 255 256 257 258
    /**
     * @brief Lookup an address and returns an endpoint object
     * to communicate with this address.
     *
     * @param address String representation of the address.
     *
     * @return an endpoint object associated with the given address.
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
259 260
	endpoint lookup(const std::string& address) const;

Matthieu Dorier's avatar
Matthieu Dorier committed
261 262 263 264 265 266 267 268
    /**
     * @brief Exposes a series of memory segments for bulk operations.
     *
     * @param segments vector of <pointer,size> pairs of memory segments.
     * @param flag indicates whether the bulk is read-write, read-only or write-only.
     *
     * @return a bulk object representing the memory exposed for RDMA.
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
269 270
    bulk expose(const std::vector<std::pair<void*,size_t>>& segments, bulk_mode flag);

Matthieu Dorier's avatar
Matthieu Dorier committed
271 272 273 274 275
    /**
     * @brief String representation of the engine's address.
     *
     * @return String representation of the engine's address.
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
276 277 278 279 280 281
	operator std::string() const;
};

} // namespace thallium

#include <thallium/remote_procedure.hpp>
282
#include <thallium/proc_buffer.hpp>
283 284 285
#include <thallium/serialization/stl/tuple.hpp>
#include <thallium/serialization/buffer_input_archive.hpp>
#include <thallium/serialization/buffer_output_archive.hpp>
Matthieu Dorier's avatar
Matthieu Dorier committed
286 287 288

namespace thallium {

289 290 291 292
template<typename ... Args>
remote_procedure engine::define(const std::string& name, 
        const std::function<void(const request&, Args...)>& fun) {

Matthieu Dorier's avatar
Matthieu Dorier committed
293
    hg_id_t id = margo_register_name(m_mid, name.c_str(),
294 295
                    process_buffer,
                    process_buffer,
296 297
                    rpc_callback<rpc_t, false>);

Matthieu Dorier's avatar
Matthieu Dorier committed
298
    m_rpcs[id] = [fun,this](const request& r, const buffer& b) {
299 300 301 302 303
        std::function<void(Args...)> l = [&fun, &r](Args&&... args) {
            fun(r, std::forward<Args>(args)...);
        };
        std::tuple<std::decay_t<Args>...> iargs;
        if(sizeof...(Args) > 0) {
Matthieu Dorier's avatar
Matthieu Dorier committed
304
            buffer_input_archive iarch(b, *this);
305 306 307 308 309
            iarch & iargs;
        }
        apply_function_to_tuple(l,iargs);
    };

Matthieu Dorier's avatar
Matthieu Dorier committed
310 311 312 313
    rpc_callback_data* cb_data = new rpc_callback_data;
    cb_data->m_engine   = this;
    cb_data->m_function = void_cast(&m_rpcs[id]);

314 315 316
    hg_return_t ret = margo_register_data(m_mid, id, (void*)cb_data, free_rpc_callback_data);
    MARGO_ASSERT(ret, margo_register_data);

317
    return remote_procedure(*this, id);
Matthieu Dorier's avatar
Matthieu Dorier committed
318 319
}

320 321 322 323 324
template<typename ... Args>
remote_procedure engine::define(const std::string& name, void (*f)(const request&, Args...)) {
    return define(name, std::function<void(const request&,Args...)>(f));
}

Matthieu Dorier's avatar
Matthieu Dorier committed
325 326 327
}

#endif