Commit 9a71beb7 authored by Matthieu Dorier's avatar Matthieu Dorier

added lots of comments

parent c1f0447c
#ifndef __THALLIUM_HPP
#define __THALLIUM_HPP
#include <margo.h>
#include <thallium/bulk_mode.hpp>
#include <thallium/bulk.hpp>
#include <thallium/engine.hpp>
......
......@@ -10,6 +10,9 @@
namespace thallium {
/**
* @brief buffer object defined as std::vector<char>.
*/
using buffer = std::vector<char>;
}
......
......@@ -82,7 +82,7 @@ private:
/**
* @brief Copy constructor is deleted.
*/
bulk_segment(const bulk_segment&) = delete;
bulk_segment(const bulk_segment&) = default;
/**
* @brief Move constructor is default.
......
......@@ -10,6 +10,10 @@
namespace thallium {
/**
* @brief bulk_mode enum indicates whether a bulk object is
* created for read/write, or read only, or write only.
*/
enum class bulk_mode : hg_uint32_t {
read_write = HG_BULK_READWRITE,
read_only = HG_BULK_READ_ONLY,
......
......@@ -22,6 +22,12 @@ class engine;
class remote_procedure;
class endpoint;
/**
* @brief callable_remote_procedure objects represent an RPC
* ready to be called (using the parenthesis operator).
* It is created from a remote_procedure object using
* remote_procedure::on(endpoint).
*/
class callable_remote_procedure {
friend class remote_procedure;
......@@ -31,9 +37,26 @@ private:
hg_handle_t m_handle;
bool m_ignore_response;
/**
* @brief Constructor. Made private since callable_remote_procedure can only
* be created from remote_procedure::on().
*
* @param e engine used to create the remote_procedure.
* @param id id of the RPC to call.
* @param ep endpoint on which to call the RPC.
* @param ignore_resp whether the response should be ignored.
*/
callable_remote_procedure(engine& e, hg_id_t id, const endpoint& ep, bool ignore_resp);
auto forward(const buffer& buf) const {
/**
* @brief Sends the RPC to the endpoint (calls margo_forward), passing a buffer
* in which the arguments have been serialized.
*
* @param buf Buffer containing a serialized version of the arguments.
*
* @return a packed_response object from which the returned value can be deserialized.
*/
packed_response forward(const buffer& buf) const {
margo_forward(m_handle, const_cast<void*>(static_cast<const void*>(&buf)));
buffer output;
if(m_ignore_response) return packed_response(std::move(output), *m_engine);
......@@ -44,6 +67,9 @@ private:
public:
/**
* @brief Copy-constructor.
*/
callable_remote_procedure(const callable_remote_procedure& other) {
if(m_handle != HG_HANDLE_NULL) {
margo_destroy(m_handle);
......@@ -54,6 +80,9 @@ public:
}
}
/**
* @brief Move-constructor.
*/
callable_remote_procedure(callable_remote_procedure&& other) {
if(m_handle != HG_HANDLE_NULL) {
margo_destroy(m_handle);
......@@ -62,6 +91,9 @@ public:
other.m_handle = HG_HANDLE_NULL;
}
/**
* @brief Copy-assignment operator.
*/
callable_remote_procedure& operator=(const callable_remote_procedure& other) {
if(&other == this) return *this;
if(m_handle != HG_HANDLE_NULL) {
......@@ -72,6 +104,10 @@ public:
return *this;
}
/**
* @brief Move-assignment operator.
*/
callable_remote_procedure& operator=(callable_remote_procedure&& other) {
if(&other == this) return *this;
if(m_handle != HG_HANDLE_NULL) {
......@@ -82,21 +118,38 @@ public:
return *this;
}
/**
* @brief Destructor.
*/
~callable_remote_procedure() {
if(m_handle != HG_HANDLE_NULL) {
margo_destroy(m_handle);
}
}
/**
* @brief Operator to call the RPC. Will serialize the arguments
* in a buffer and send the RPC to the endpoint.
*
* @tparam T Types of the parameters.
* @param t Parameters of the RPC.
*
* @return a packed_response object containing the returned value.
*/
template<typename ... T>
auto operator()(T&& ... t) const {
packed_response operator()(T&& ... t) const {
buffer b;
buffer_output_archive arch(b, *m_engine);
serialize_many(arch, std::forward<T>(t)...);
return forward(b);
}
auto operator()() const {
/**
* @brief Operator to call the RPC without any argument.
*
* @return a packed_response object containing the returned value.
*/
packed_response operator()() const {
buffer b;
return forward(b);
}
......
......@@ -16,6 +16,10 @@ class engine;
class request;
class remote_bulk;
/**
* @brief endpoint objects represent an address to which
* RPC can be sent. They are created using engine::lookup().
*/
class endpoint {
friend class engine;
......@@ -28,29 +32,65 @@ private:
engine* m_engine;
hg_addr_t m_addr;
/**
* @brief Constructor. Made private since endpoint instances
* can only be created using engine::lookup.
*
* @param e Engine that created the endpoint.
* @param addr Mercury address.
*/
endpoint(engine& e, hg_addr_t addr)
: m_engine(&e), m_addr(addr) {}
public:
/**
* @brief Default constructor defined so that endpoints can
* be member of other objects and assigned later.
*/
endpoint()
: m_engine(nullptr), m_addr(HG_ADDR_NULL) {}
/**
* @brief Copy constructor.
*/
endpoint(const endpoint& other);
/**
* @brief Move constructor.
*/
endpoint(endpoint&& other)
: m_engine(other.m_engine), m_addr(other.m_addr) {
other.m_addr = HG_ADDR_NULL;
}
/**
* @brief Copy-assignment operator.
*/
endpoint& operator=(const endpoint& other);
/**
* @brief Move-assignment operator.
*/
endpoint& operator=(endpoint&& other);
/**
* @brief Destructor.
*/
~endpoint();
/**
* @brief Creates a string representation of the endpoint's address.
*
* @return A string representation of the endpoint's address.
*/
operator std::string() const;
/**
* @brief Indicates whether the endpoint is null or not.
*
* @return true if the endpoint is a null address.
*/
bool is_null() const {
return m_addr == HG_ADDR_NULL;
}
......
......@@ -17,6 +17,9 @@
#include <thallium/request.hpp>
#include <thallium/bulk_mode.hpp>
#define THALLIUM_SERVER_MODE MARGO_SERVER_MODE
#define THALLIUM_CLIENT_MODE MARGO_CLIENT_MODE
namespace thallium {
class bulk;
......@@ -24,6 +27,12 @@ class endpoint;
class remote_bulk;
class remote_procedure;
/**
* @brief The engine class is at the core of Thallium,
* it is the first object to instanciate to start using the
* Thallium runtime. It initializes Margo and other libraries,
* and allow users to declare RPCs and bulk objects.
*/
class engine {
friend class request;
......@@ -41,16 +50,32 @@ private:
bool m_is_server;
std::unordered_map<hg_id_t, rpc_t> m_rpcs;
/**
* @brief Encapsulation of some data needed by RPC callbacks
* (namely, the initiating thallium engine and the function to call)
*/
struct rpc_callback_data {
engine* m_engine;
void* m_function;
};
/**
* @brief Function to call to free the data registered with an RPC.
*
* @param data pointer to the data to free (instance of rpc_callback_data).
*/
static void free_rpc_callback_data(void* data) {
rpc_callback_data* cb_data = (rpc_callback_data*)data;
delete cb_data;
}
/**
* @brief Function run as a ULT when receiving an RPC.
*
* @tparam F type of the function to call.
* @tparam disable_response whether the caller expects a response.
* @param handle handle of the RPC.
*/
template<typename F, bool disable_response>
static void rpc_handler_ult(hg_handle_t handle) {
using G = std::remove_reference_t<F>;
......@@ -66,6 +91,15 @@ private:
margo_free_input(handle, &input);
}
/**
* @brief Callback called when an RPC is received.
*
* @tparam F type of the function exposed by the user for this RPC.
* @tparam disable_response whether the caller expects a response.
* @param handle handle of the RPC.
*
* @return HG_SUCCESS or a Mercury error code.
*/
template<typename F, bool disable_response>
static hg_return_t rpc_callback(hg_handle_t handle) {
int ret;
......@@ -90,11 +124,20 @@ private:
public:
/**
* @brief Constructor.
*
* @param addr address of this instance.
* @param mode THALLIUM_SERVER_MODE or THALLIUM_CLIENT_MODE.
* @param use_progress_thread whether to use a dedicated ES to drive progress.
* @param rpc_thread_count number of threads to use for servicing RPCs.
* Use -1 to indicate that RPCs should be serviced in the progress ES.
*/
engine(const std::string& addr, int mode,
bool use_progress_thread = false,
std::int32_t rpc_thread_count = 0) {
m_is_server = (mode == MARGO_SERVER_MODE);
m_is_server = (mode == THALLIUM_SERVER_MODE);
m_mid = margo_init(addr.c_str(), mode,
use_progress_thread ? 1 : 0,
......@@ -102,38 +145,115 @@ public:
// TODO throw exception if m_mid is null
}
/**
* @brief Copy-constructor is deleted.
*/
engine(const engine& other) = delete;
/**
* @brief Move-constructor is deleted.
*/
engine(engine&& other) = delete;
/**
* @brief Move-assignment operator is deleted.
*/
engine& operator=(engine&& other) = delete;
/**
* @brief Copy-assignment operator is deleted.
*/
engine& operator=(const engine& other) = delete;
/**
* @brief Destructor.
*/
~engine() {
if(m_is_server) {
// TODO throw an exception if following call fails
// TODO an exception if following call fails
margo_wait_for_finalize(m_mid);
}
}
/**
* @brief Finalize the engine. Can be called by any thread.
*/
void finalize() {
// TODO throw an exception if the following call fails
// TODO an exception if the following call fails
margo_finalize(m_mid);
}
/**
* @brief Creates an endpoint from this engine.
*
* @return An endpoint corresponding to this engine.
*/
endpoint self() const;
/**
* @brief Defines an RPC with a name, without providing a
* function pointer (used on clients).
*
* @param name Name of the RPC.
*
* @return a remote_procedure object.
*/
remote_procedure define(const std::string& name);
/**
* @brief Defines an RPC with a name and an std::function
* representing the RPC.
*
* @tparam Args Types of arguments accepted by the RPC.
* @param name Name of the RPC.
* @param fun Function to associate with the RPC.
*
* @return a remote_procedure object.
*/
template<typename ... Args>
remote_procedure define(const std::string& name,
const std::function<void(const request&, Args...)>& fun);
/**
* @brief Defines an RPC with a name and a function pointer
* to call when the RPC is received.
*
* @tparam Args Types of arguments accepted by the RPC.
* @param name Name of the RPC.
* @param f Function to associate with the RPC.
*
* @return a remote_procedure object.
*/
template<typename ... Args>
remote_procedure define(const std::string& name, void (*f)(const request&, Args...));
/**
* @brief Lookup an address and returns an endpoint object
* to communicate with this address.
*
* @param address String representation of the address.
*
* @return an endpoint object associated with the given address.
*/
endpoint lookup(const std::string& address) const;
/**
* @brief Exposes a series of memory segments for bulk operations.
*
* @param segments vector of <pointer,size> pairs of memory segments.
* @param flag indicates whether the bulk is read-write, read-only or write-only.
*
* @return a bulk object representing the memory exposed for RDMA.
*/
bulk expose(const std::vector<std::pair<void*,size_t>>& segments, bulk_mode flag);
/**
* @brief String representation of the engine's address.
*
* @return String representation of the engine's address.
*/
operator std::string() const;
};
......
......@@ -10,11 +10,27 @@
namespace thallium {
/**
* @brief Cast a void* into a function type.
*
* @tparam F Function type.
* @param f pointer to convert.
*
* @return Function pointer.
*/
template<typename F>
F* function_cast(void* f) {
return reinterpret_cast<F*>(reinterpret_cast<std::intptr_t>(f));
}
/**
* @brief Cast a function type into a void* pointer.
*
* @tparam F Function type.
* @param fun pointer to convert.
*
* @return A void* pointer to the function.
*/
template<typename F>
void* void_cast(F&& fun) {
return reinterpret_cast<void*>(reinterpret_cast<std::intptr_t>(fun));
......
......@@ -14,6 +14,11 @@ namespace thallium {
class callable_remote_procedure;
/**
* @brief packed_response objects are created as a reponse to
* an RPC. They can be used to extract the response from the
* RPC if the RPC sent one.
*/
class packed_response {
friend class callable_remote_procedure;
......@@ -23,11 +28,25 @@ private:
engine* m_engine;
buffer m_buffer;
/**
* @brief Constructor. Made private since packed_response
* objects are created by callable_remote_procedure only.
*
* @param b Buffer containing the result of an RPC.
* @param e Engine associated with the RPC.
*/
packed_response(buffer&& b, engine& e)
: m_engine(&e), m_buffer(std::move(b)) {}
public:
/**
* @brief Converts the buffer into the requested object.
*
* @tparam T Type into which to convert the content of the buffer.
*
* @return Buffer converted into the desired type.
*/
template<typename T>
T as() const {
T t;
......@@ -36,6 +55,21 @@ public:
return t;
}
/**
* @brief Converts the content of the buffer into a std::tuple
* of types T1, T2, ... Tn.
*
* This function allows to do something like the following:
* int x;
* double y;
* std::tie(x,y) = pack.as<int,double>();
*
* @tparam T1 First type of the tuple.
* @tparam T2 Second type of the tuple.
* @tparam Tn Other types of the tuple.
*
* @return buffer content converted into the desired std::tuple.
*/
template<typename T1, typename T2, typename ... Tn>
auto as() const {
std::tuple<std::decay_t<T1>, std::decay_t<T2>, std::decay_t<Tn>...> t;
......@@ -44,6 +78,15 @@ public:
return t;
}
/**
* @brief Converts the content of the packed_response into
* the desired object type. Allows to cast the packed_response
* into the desired object type.
*
* @tparam T Type into which to convert the response.
*
* @return An object of the desired type.
*/
template<typename T>
operator T() const {
return as<T>();
......
......@@ -12,6 +12,15 @@
namespace thallium {
/**
* @brief Mercury callback that serializes/deserializes
* a buffer (std::vector<char>).
*
* @param proc Mercury proc object.
* @param data pointer to a buffer object.
*
* @return HG_SUCCESS or a Mercury error code.
*/
hg_return_t process_buffer(hg_proc_t proc, void* data);
} // namespace thallium
......
......@@ -14,22 +14,55 @@
namespace thallium {
/**
* @brief A remote_bulk object represents a bulk_segment object
* that has been associated with an endpoint and is ready for
* RDMA operations.
*/
class remote_bulk {
friend class bulk;
private:
const bulk::bulk_segment& m_segment;
bulk::bulk_segment m_segment;
endpoint m_endpoint;
remote_bulk(const bulk::bulk_segment& b, const endpoint& ep)
: m_segment(b), m_endpoint(ep) {}
/**
* @brief Constructor. Made private since remote_bulk objects
* are created by the function bulk::on() or bulk_segment::on()
* functions.
*
* @param b bulk_segment that created the remote_bulk object.
* @param ep endpoint on which the bulk_segment is.
*/
remote_bulk(bulk::bulk_segment b, endpoint ep)
: m_segment(std::move(b)), m_endpoint(std::move(ep)) {}
public:
/**
* @brief Performs a pull operation from the remote_bulk
* (left operand) to the destination bulk (right operand).
* The destination must be local. If the sizes don't match,
* the smallest size is picked.
*
* @param dest Local bulk segment on which to pull the data.
*
* @return the size of data transfered.
*/
std::size_t operator>>(const bulk::bulk_segment& dest) const;
/**
* @brief Performs a push operation from the source bulk
* (right operand) to the remote_bulk (left operand).
* The source must be local. If the sizes don't match,
* the smallest size is picked.
*
* @param src Local bulk segment from which to push the data.
*
* @return the size of data transfered.
*/
std::size_t operator<<(const bulk::bulk_segment& src) const;
};
......
......@@ -14,6 +14,13 @@ class engine;
class endpoint;
class callable_remote_procedure;
/**
* @brief remote_procedure objects are produced by
* engine::define() when defining an RPC.
* Using remote_procedure::on(endpoint) creates a
* callable_remote_procedure that can be called with
* some parameters to send an RPC.
*/
class remote_procedure {
friend class engine;
......@@ -23,18 +30,57 @@ private:
hg_id_t m_id;
bool m_ignore_response;
/**
* @brief Constructor. Made private because remote_procedure
* objects are created only by engine::define().
*
* @param e Engine object that created the remote_procedure.
* @param id Mercury RPC id.
*/
remote_procedure(engine& e, hg_id_t id);
public:
/**
* @brief Copy-constructor is default.
*/
remote_procedure(const remote_procedure& other) = default;
/**
* @brief Move-constructor is default.
*/
remote_procedure(remote_procedure&& other) = default;
/**
* @brief Copy-assignment operator is default.
*/
remote_procedure& operator=(const remote_procedure& other) = default;
/**
* @brief Move-assignment operator is default.
*/
remote_procedure& operator=(remote_procedure&& other) = default;
/**
* @brief Destructor is default.
*/
~remote_procedure() = default;
/**
* @brief Creates a callable_remote_procedure by associating the
* remote_procedure with an endpoint.
*
* @param ep endpoint with which to associate the procedure.
*
* @return a callable_remote_procedure.
*/
callable_remote_procedure on(const endpoint& ep) const;
/**
* @brief Tell the remote_procedure that it should not expect responses.
*
* @return *this
*/
remote_procedure& ignore_response();
};
......
......@@ -15,6 +15,13 @@ namespace thallium {
class engine;
class endpoint;
/**
* @brief A request object is created whenever a server
* receives an RPC. The object is passed as first argument to
* the function associated with the RPC. The request allows
* one to get information from the caller and to respond to
* the RPC.
*/
class request {
friend class engine;
......@@ -25,21 +32,38 @@ private:
hg_handle_t m_handle;
bool m_disable_response;
/**
* @brief Constructor. Made private since request are only created
* by the engine within RPC callbacks.
*
* @param e engine object that created the request.
* @param h handle of the RPC that was received.
* @param disable_resp whether responses are disabled.
*/
request(engine& e, hg_handle_t h, bool disable_resp)
: m_engine(&e), m_handle(h), m_disable_response(disable_resp) {}
public:
/**
* @brief Copy constructor.
*/
request(const request& other)
: m_engine(other.m_engine), m_handle(other.m_handle), m_disable_response(other.m_disable_response) {
margo_ref_incr(m_handle);
}
/**
* @brief Move constructor.
*/
request(request&& other)
: m_engine(other.m_engine), m_handle(other.m_handle), m_disable_response(other.m_disable_response) {
other.m_handle = HG_HANDLE_NULL;
}
/**
* @brief Copy-assignment operator.
*/
request& operator=(const request& other) {
if(m_handle == other.m_handle) return *this;
margo_destroy(m_handle);
......@@ -50,6 +74,9 @@ public:
return *this;
}