callable_remote_procedure.hpp 6.09 KB
Newer Older
Matthieu Dorier's avatar
Matthieu Dorier committed
1 2 3 4 5
/*
 * (C) 2017 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
Matthieu Dorier's avatar
Matthieu Dorier committed
6 7 8 9 10
#ifndef __THALLIUM_CALLABLE_REMOTE_PROCEDURE_HPP
#define __THALLIUM_CALLABLE_REMOTE_PROCEDURE_HPP

#include <tuple>
#include <cstdint>
11
#include <utility>
Matthieu Dorier's avatar
Matthieu Dorier committed
12 13
#include <margo.h>
#include <thallium/buffer.hpp>
14
#include <thallium/packed_response.hpp>
15
#include <thallium/async_response.hpp>
16
#include <thallium/serialization/serialize.hpp>
17 18
#include <thallium/serialization/stl/vector.hpp>
#include <thallium/serialization/buffer_output_archive.hpp>
19
#include <thallium/margo_exception.hpp>
Matthieu Dorier's avatar
Matthieu Dorier committed
20 21 22

namespace thallium {

23
class engine;
Matthieu Dorier's avatar
Matthieu Dorier committed
24 25 26
class remote_procedure;
class endpoint;

Matthieu Dorier's avatar
Matthieu Dorier committed
27 28 29 30 31 32
/**
 * @brief callable_remote_procedure objects represent an RPC
 * ready to be called (using the parenthesis operator).
 * It is created from a remote_procedure object using
 * remote_procedure::on(endpoint).
 */
Matthieu Dorier's avatar
Matthieu Dorier committed
33 34 35
class callable_remote_procedure {

	friend class remote_procedure;
36
    friend class async_response;
Matthieu Dorier's avatar
Matthieu Dorier committed
37 38

private:
Matthieu Dorier's avatar
Matthieu Dorier committed
39
    engine*     m_engine;
Matthieu Dorier's avatar
Matthieu Dorier committed
40
	hg_handle_t m_handle;
41
    bool        m_ignore_response;
Matthieu Dorier's avatar
Matthieu Dorier committed
42

Matthieu Dorier's avatar
Matthieu Dorier committed
43 44 45 46 47 48 49 50 51
    /**
     * @brief Constructor. Made private since callable_remote_procedure can only
     * be created from remote_procedure::on().
     *
     * @param e engine used to create the remote_procedure.
     * @param id id of the RPC to call.
     * @param ep endpoint on which to call the RPC.
     * @param ignore_resp whether the response should be ignored.
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
52
	callable_remote_procedure(engine& e, hg_id_t id, const endpoint& ep, bool ignore_resp);
Matthieu Dorier's avatar
Matthieu Dorier committed
53

Matthieu Dorier's avatar
Matthieu Dorier committed
54 55 56 57 58 59 60 61 62
    /**
     * @brief Sends the RPC to the endpoint (calls margo_forward), passing a buffer
     * in which the arguments have been serialized.
     *
     * @param buf Buffer containing a serialized version of the arguments.
     *
     * @return a packed_response object from which the returned value can be deserialized.
     */
     packed_response forward(const buffer& buf) const {
63 64 65
        hg_return_t ret;
        ret = margo_forward(m_handle, const_cast<void*>(static_cast<const void*>(&buf)));
        MARGO_ASSERT(ret, margo_forward);
66
        buffer output;
Matthieu Dorier's avatar
Matthieu Dorier committed
67
        if(m_ignore_response) return packed_response(std::move(output), *m_engine);
68 69 70 71
        ret = margo_get_output(m_handle, &output);
        MARGO_ASSERT(ret, margo_get_output);
        ret = margo_free_output(m_handle, &output); // won't do anything on a buffer type
        MARGO_ASSERT(ret, margo_free_output);
Matthieu Dorier's avatar
Matthieu Dorier committed
72
        return packed_response(std::move(output), *m_engine);
73 74
	}

75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
    /**
     * @brief Sends the RPC to the endpoint (calls margo_iforward), passing a buffer
     * in which the arguments have been serialized. The RPC is sent in a non-blocking manner.
     *
     * @param buf Buffer containing a serialized version of the arguments.
     *
     * @return an async_response object that can be waited on.
     */
    async_response iforward(const buffer& buf) {
        hg_return_t ret;
        margo_request req;
        ret = margo_iforward(m_handle, const_cast<void*>(static_cast<const void*>(&buf)), &req);
        MARGO_ASSERT(ret, margo_iforward);
        return async_response(req, *m_engine, *this, m_ignore_response);
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
91 92
public:

93 94 95
    /**
     * @brief Copy-constructor.
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
96
	callable_remote_procedure(const callable_remote_procedure& other) {
97
        hg_return_t ret;
Matthieu Dorier's avatar
Matthieu Dorier committed
98 99
		m_handle = other.m_handle;
		if(m_handle != HG_HANDLE_NULL) {
100 101
            ret = margo_ref_incr(m_handle);
            MARGO_ASSERT(ret, margo_ref_incr);
Matthieu Dorier's avatar
Matthieu Dorier committed
102 103 104
		}
	}

Matthieu Dorier's avatar
Matthieu Dorier committed
105 106 107
    /**
     * @brief Move-constructor.
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
108 109 110 111 112
	callable_remote_procedure(callable_remote_procedure&& other) {
		m_handle = other.m_handle;
		other.m_handle = HG_HANDLE_NULL;
	}

Matthieu Dorier's avatar
Matthieu Dorier committed
113 114 115
    /**
     * @brief Copy-assignment operator.
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
116
	callable_remote_procedure& operator=(const callable_remote_procedure& other) {
117
        hg_return_t ret;
Matthieu Dorier's avatar
Matthieu Dorier committed
118 119
		if(&other == this) return *this;
		if(m_handle != HG_HANDLE_NULL) {
120 121
            ret = margo_destroy(m_handle);
            MARGO_ASSERT(ret, margo_destroy);
Matthieu Dorier's avatar
Matthieu Dorier committed
122 123
		}
		m_handle = other.m_handle;
124 125
		ret = margo_ref_incr(m_handle);
        MARGO_ASSERT(ret, margo_ref_incr);
Matthieu Dorier's avatar
Matthieu Dorier committed
126 127 128
		return *this;
	}

Matthieu Dorier's avatar
Matthieu Dorier committed
129 130 131 132
    
    /**
     * @brief Move-assignment operator.
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
133 134 135
	callable_remote_procedure& operator=(callable_remote_procedure&& other) {
		if(&other == this) return *this;
		if(m_handle != HG_HANDLE_NULL) {
136 137
			hg_return_t ret = margo_destroy(m_handle);
            MARGO_ASSERT(ret, margo_destroy);
Matthieu Dorier's avatar
Matthieu Dorier committed
138 139 140 141 142 143
		}
		m_handle = other.m_handle;
		other.m_handle = HG_HANDLE_NULL;
		return *this;
	}

Matthieu Dorier's avatar
Matthieu Dorier committed
144 145 146
    /**
     * @brief Destructor.
     */
147
	~callable_remote_procedure()  {
Matthieu Dorier's avatar
Matthieu Dorier committed
148
		if(m_handle != HG_HANDLE_NULL) {
149 150
            hg_return_t ret = margo_destroy(m_handle);
            MARGO_ASSERT(ret, margo_destroy);
Matthieu Dorier's avatar
Matthieu Dorier committed
151 152 153
		}
	}

Matthieu Dorier's avatar
Matthieu Dorier committed
154 155 156 157 158 159 160 161 162
    /**
     * @brief Operator to call the RPC. Will serialize the arguments
     * in a buffer and send the RPC to the endpoint.
     *
     * @tparam T Types of the parameters.
     * @param t Parameters of the RPC.
     *
     * @return a packed_response object containing the returned value.
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
163
	template<typename ... T>
Matthieu Dorier's avatar
Matthieu Dorier committed
164
	packed_response operator()(T&& ... t) const {
165
		buffer b;
Matthieu Dorier's avatar
Matthieu Dorier committed
166
        buffer_output_archive arch(b, *m_engine);
167
        serialize_many(arch, std::forward<T>(t)...);
168
		return forward(b);
Matthieu Dorier's avatar
Matthieu Dorier committed
169
	}
170

Matthieu Dorier's avatar
Matthieu Dorier committed
171 172 173 174 175 176
    /**
     * @brief Operator to call the RPC without any argument.
     *
     * @return a packed_response object containing the returned value.
     */
    packed_response operator()() const {
177 178 179
        buffer b;
        return forward(b);
    }
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206

    /**
     * @brief Issues an RPC in a non-blocking way. Will serialize the arguments
     * in a buffer and send the RPC to the endpoint.
     *
     * @tparam T Types of the parameters.
     * @param t Parameters of the RPC.
     *
     * @return an async_response object that the caller can wait on.
     */
    template<typename ... T>
    async_response async(T&& ... t) {
        buffer b;
        buffer_output_archive arch(b, *m_engine);
        serialize_many(arch, std::forward<T>(t)...);
        return iforward(b);
    }

    /**
     * @brief Non-blocking call to the RPC without any argument.
     *
     * @return an async_response object that the caller can wait on.
     */
    async_response async() {
        buffer b;
        return iforward(b);
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
207 208 209 210 211
};

}

#endif