Commit f71d04a2 authored by Matthieu Dorier's avatar Matthieu Dorier

big refactoring and adding serialization package

parent a17bcd7c
#ifndef __THALLIUM_HPP
#define __THALLIUM_HPP
#include <thallium/margo_engine.hpp>
#include <thallium/engine.hpp>
#include <thallium/endpoint.hpp>
#include <thallium/remote_procedure.hpp>
#include <thallium/callable_remote_procedure.hpp>
#include <thallium/serialization.hpp>
#endif
......@@ -8,7 +8,7 @@
namespace thallium {
class margo_engine;
class engine;
class remote_procedure;
class endpoint;
......@@ -18,8 +18,9 @@ class callable_remote_procedure {
private:
hg_handle_t m_handle;
bool m_ignore_response;
callable_remote_procedure(hg_id_t id, const endpoint& ep);
callable_remote_procedure(hg_id_t id, const endpoint& ep, bool ignore_resp);
public:
......@@ -74,8 +75,8 @@ public:
// BufferOutputArchive arch(input);
// serialize_many(arch, std::forward<T>(t)...);
// serialize(std::forward<T>(t)
//auto input = std::tie(t...);
margo_forward(m_handle, nullptr);//&input);
// auto input = std::tie(t...);
margo_forward(m_handle, nullptr);
// Buffer output;
......@@ -87,7 +88,11 @@ public:
auto operator()(const buffer& buf) const {
margo_forward(m_handle, const_cast<void*>(static_cast<const void*>(&buf)));
return true;
buffer output;
if(m_ignore_response) return output;
margo_get_output(m_handle, &output);
margo_free_output(m_handle, &output); // won't do anything on a buffer type
return output;
}
};
......
......@@ -7,27 +7,27 @@
namespace thallium {
class margo_engine;
class engine;
class endpoint {
friend class margo_engine;
friend class engine;
friend class callable_remote_procedure;
private:
margo_engine& m_margo;
hg_addr_t m_addr;
engine& m_engine;
hg_addr_t m_addr;
endpoint(margo_engine& m, hg_addr_t addr)
: m_margo(m), m_addr(addr) {}
endpoint(engine& e, hg_addr_t addr)
: m_engine(e), m_addr(addr) {}
public:
endpoint(const endpoint& other);
endpoint(endpoint&& other)
: m_margo(other.m_margo), m_addr(other.m_addr) {
: m_engine(other.m_engine), m_addr(other.m_addr) {
other.m_addr = HG_ADDR_NULL;
}
......
#ifndef __THALLIUM_MARGO_ENGINE_HPP
#define __THALLIUM_MARGO_ENGINE_HPP
#ifndef __THALLIUM_ENGINE_HPP
#define __THALLIUM_ENGINE_HPP
#include <iostream>
#include <string>
#include <functional>
#include <unordered_map>
#include <margo.h>
#include <thallium/function_cast.hpp>
#include <thallium/buffer.hpp>
......@@ -13,31 +15,32 @@ namespace thallium {
class endpoint;
class remote_procedure;
class margo_engine {
class engine {
friend class endpoint;
friend class remote_procedure;
friend class callable_remote_procedure;
private:
margo_instance_id m_mid;
bool m_is_server;
margo_instance_id m_mid;
bool m_is_server;
template<typename F>
template<typename F, bool disable_response>
static void rpc_handler_ult(hg_handle_t handle) {
using G = std::remove_reference_t<F>;
const struct hg_info* info = margo_get_info(handle);
margo_instance_id mid = margo_hg_handle_get_instance(handle);
void* data = margo_registered_data(mid, info->id);
auto f = function_cast<G>(data);
request req(handle);
request req(handle, disable_response);
buffer input;
margo_get_input(handle, &input);
(*f)(req, input);
margo_free_input(handle, &input);
}
template<typename F>
template<typename F, bool disable_response>
static hg_return_t rpc_callback(hg_handle_t handle) {
int ret;
ABT_pool pool;
......@@ -51,7 +54,8 @@ private:
if(ret != HG_SUCCESS) {
return HG_INVALID_PARAM;
}
ret = ABT_thread_create(pool, (void (*)(void *)) rpc_handler_ult<F>, handle, ABT_THREAD_ATTR_NULL, NULL);
ret = ABT_thread_create(pool, (void (*)(void *)) rpc_handler_ult<F,disable_response>,
handle, ABT_THREAD_ATTR_NULL, NULL);
if(ret != 0) {
return HG_NOMEM_ERROR;
}
......@@ -60,7 +64,7 @@ private:
public:
margo_engine(const std::string& addr, int mode,
engine(const std::string& addr, int mode,
bool use_progress_thread = false,
std::int32_t rpc_thread_count = 0) {
......@@ -72,12 +76,12 @@ public:
// TODO throw exception if m_mid is null
}
margo_engine(const margo_engine& other) = delete;
margo_engine(margo_engine&& other) = delete;
margo_engine& operator=(margo_engine&& other) = delete;
margo_engine& operator=(const margo_engine& other) = delete;
engine(const engine& other) = delete;
engine(engine&& other) = delete;
engine& operator=(engine&& other) = delete;
engine& operator=(const engine& other) = delete;
~margo_engine() {
~engine() {
if(m_is_server) {
// TODO throw an exception if following call fails
margo_wait_for_finalize(m_mid);
......@@ -91,7 +95,6 @@ public:
endpoint self() const;
template<typename F>
remote_procedure define(const std::string& name);
template<typename F>
......@@ -105,31 +108,20 @@ public:
} // namespace thallium
#include <thallium/remote_procedure.hpp>
#include <thallium/serialization.hpp>
#include <thallium/proc_buffer.hpp>
namespace thallium {
template<typename F>
remote_procedure margo_engine::define(const std::string& name) {
// TODO throw an exception if the following call fails
hg_id_t id = margo_register_name(m_mid, name.c_str(),
serialize_buffer,
serialize_buffer,
nullptr);
margo_registered_disable_response(m_mid, id, HG_TRUE);
return remote_procedure(id);
}
template<typename F>
remote_procedure margo_engine::define(const std::string& name, F&& fun) {
remote_procedure engine::define(const std::string& name, F&& fun) {
// TODO throw an exception if the following call fails
hg_id_t id = margo_register_name(m_mid, name.c_str(),
serialize_buffer,
serialize_buffer,
rpc_callback<decltype(fun)>);
process_buffer,
process_buffer,
rpc_callback<decltype(fun), false>);
margo_register_data(m_mid, id, void_cast(&fun), nullptr);
margo_registered_disable_response(m_mid, id, HG_TRUE);
return remote_procedure(id);
return remote_procedure(*this, id);
}
}
......
#ifndef __THALLIUM_PROC_BASIC_HPP
#define __THALLIUM_PROC_BASIC_HPP
#include <type_traits>
#include <mercury_proc.h>
namespace thallium {
namespace proc {
template<typename T,
typename std::enable_if_t<
std::is_arithmetic<
std::remove_reference_t<T>
>::value> = 0>
hg_return_t process_type(hg_proc_t proc, void *data, std::size_t& size) {
T* t = static_cast<T*>(data);
size = sizeof(*t);
return hg_proc_memcpy(proc, data, sizeof(*t));
}
} // namespace proc
} // namespace thallium
#endif
......@@ -7,11 +7,7 @@
namespace thallium {
namespace proc {
hg_return_t process_buffer(hg_proc_t proc, void* data, std::size_t& size);
} // namespace proc
hg_return_t process_buffer(hg_proc_t proc, void* data);
} // namespace thallium
......
#ifndef __THALLIUM_PROC_TUPLE_HPP
#define __THALLIUM_PROC_TUPLE_HPP
#include <type_traits>
#include <tuple>
#include <mercury_proc.h>
namespace thallium {
namespace proc {
template<std::size_t N, typename ... T>
struct tuple_proc {
static hg_return_t apply(hg_proc_t proc, std::tuple<T...>& t, std::size_t& size) {
size = 0;
std::size_t s;
hg_return_t hret = tuple_proc<N-1>::apply(proc, t, s);
if(hret != HG_SUCCESS) return hret;
size += s;
void* data = &(std::get<N-1>(t));
hret = process_type<std::remove_reference_t<decltype(std::get<N-1>(t))>>(proc, data, s);
if(hret != HG_SUCCESS) return hret;
size += s;
return HG_SUCCESS;
}
};
template<typename ... T>
struct tuple_proc<0, T...> {
static hg_return_t apply(hg_proc_t proc, std::tuple<T...>& t, std::size_t& size) {
size = 0;
return HG_SUCCESS;
}
};
using namespace std;
template<template <typename ... T> class tuple, typename ... T>
hg_return_t process_type(hg_proc_t proc, void* data, std::size_t& size) {
return tuple_proc<sizeof...(T),T...>::apply(proc, data, size);
}
} // namespace proc
} // namespace thallium
#endif
#ifndef __THALLIUM_PROC_VECTOR_HPP
#define __THALLIUM_PROC_VECTOR_HPP
#include <vector>
#include <mercury_proc.h>
namespace thallium {
namespace proc {
using namespace std;
template<template <typename T> class vector, class T,
typename std::enable_if_t<
std::is_arithmetic<
std::remove_reference_t<T>
>::value
> = 0>
hg_return_t process_type(hg_proc_t proc, void* data, std::size_t& size) {
std::vector<T> *vec = static_cast<std::vector<T>*>(data);
std::size_t num_T = 0;
hg_return_t hret;
size = 0;
switch(hg_proc_get_op(proc)) {
case HG_ENCODE:
num_T = vec->size();
hret = hg_proc_memcpy(proc, &num_T, sizeof(num_T));
if (hret != HG_SUCCESS) return hret;
size += sizeof(num_T);
if (num_T > 0)
hret = hg_proc_memcpy(proc, vec->data(), num_T*sizeof(T));
if (hret != HG_SUCCESS) return hret;
size += num_T*sizeof(T);
break;
case HG_DECODE:
hret = hg_proc_memcpy(proc, &num_T, sizeof(num_T));
if (hret != HG_SUCCESS) return hret;
size += sizeof(num_T);
if (num_T > 0) {
vec->resize(num_T);
hret = hg_proc_memcpy(proc, vec->data(), num_T*sizeof(T));
if (hret != HG_SUCCESS) return hret;
size += num_T*sizeof(T);
}
break;
case HG_FREE:
return HG_SUCCESS;
}
return HG_SUCCESS;
}
template<template <typename T> class vector, class T>
hg_return_t process_type(hg_proc_t proc, void* data, std::size_t& size) {
std::vector<T> *vec = static_cast<std::vector<T>*>(data);
std::size_t num_T = 0;
hg_return_t hret;
size = 0;
switch(hg_proc_get_op(proc)) {
case HG_ENCODE:
num_T = vec->size();
hret = hg_proc_memcpy(proc, &num_T, sizeof(num_T));
if (hret != HG_SUCCESS) return hret;
size += sizeof(num_T);
char* ptr = vec->data();
for(auto i=0; i < num_T; i++) {
std::size_t s;
hret = process_type<T>(proc, static_cast<void*>(ptr), s);
if (hret != HG_SUCCESS) return hret;
ptr += s;
size += s;
}
break;
case HG_DECODE:
hret = hg_proc_memcpy(proc, &num_T, sizeof(num_T));
if (hret != HG_SUCCESS) return hret;
size += sizeof(num_T);
if (num_T > 0) {
vec->resize(num_T);
char* ptr = vec->data();
for(auto i=0; i < num_T; i++) {
std::size_t s;
hret = process_type<T>(proc, static_cast<void*>(ptr), s);
if (hret != HG_SUCCESS) return hret;
ptr += s;
size += s;
}
}
break;
case HG_FREE:
return HG_SUCCESS;
}
return HG_SUCCESS;
}
} // namespace proc
} // namespace thallium
#endif
......@@ -5,19 +5,20 @@
namespace thallium {
class margo_engine;
class engine;
class endpoint;
class callable_remote_procedure;
class remote_procedure {
friend class margo_engine;
friend class engine;
private:
engine& m_engine;
hg_id_t m_id;
bool m_ignore_response;
remote_procedure(hg_id_t id)
: m_id(id) {}
remote_procedure(engine& e, hg_id_t id);
public:
......@@ -28,8 +29,9 @@ public:
~remote_procedure() = default;
callable_remote_procedure on(const endpoint& ep) const;
remote_procedure& ignore_response();
callable_remote_procedure operator>>(const endpoint& ep) const;
};
}
......
......@@ -5,28 +5,29 @@
namespace thallium {
class margo_engine;
class engine;
class request {
friend class margo_engine;
friend class engine;
private:
hg_handle_t m_handle;
bool m_disable_response;
request(hg_handle_t h)
: m_handle(h) {}
request(hg_handle_t h, bool disable_resp)
: m_handle(h), m_disable_response(disable_resp) {}
public:
request(const request& other)
: m_handle(other.m_handle) {
: m_handle(other.m_handle), m_disable_response(other.m_disable_response) {
margo_ref_incr(m_handle);
}
request(request&& other)
: m_handle(other.m_handle) {
: m_handle(other.m_handle), m_disable_response(other.m_disable_response) {
other.m_handle = HG_HANDLE_NULL;
}
......@@ -34,6 +35,7 @@ public:
if(m_handle == other.m_handle) return *this;
margo_destroy(m_handle);
m_handle = other.m_handle;
m_disable_response = other.m_disable_response;
margo_ref_incr(m_handle);
return *this;
}
......@@ -42,6 +44,7 @@ public:
if(m_handle == other.m_handle) return *this;
margo_destroy(m_handle);
m_handle = other.m_handle;
m_disable_response = other.m_disable_response;
other.m_handle = HG_HANDLE_NULL;
return *this;
}
......@@ -51,12 +54,28 @@ public:
}
template<typename T>
void respond(T&& t) {
void respond(T&& t) const {
if(m_disable_response) return; // XXX throwing an exception?
// TODO serialize
if(m_handle != HG_HANDLE_NULL) {
margo_respond(m_handle, nullptr);
}
}
void respond(const buffer& output) const {
if(m_disable_response) return; // XXX throwing an exception?
if(m_handle != HG_HANDLE_NULL) {
margo_respond(m_handle, const_cast<void*>(static_cast<const void*>(&output)));
}
}
void respond(buffer& output) const {
respond((const buffer&)output);
}
void respond(buffer&& output) const {
respond((const buffer&)output);
}
};
}
......
#ifndef __THALLIUM_SERIALIZATION_HPP
#define __THALLIUM_SERIALIZATION_HPP
#include <cstdint>
#include <mercury_proc.h>
namespace thallium {
namespace proc {
template<typename T> hg_return_t process_type(hg_proc_t proc, void* data, std::size_t& size);
}
}
#include <thallium/buffer.hpp>
#include <thallium/proc_buffer.hpp>
//#include <thallium/proc_basic.hpp>
//#include <thallium/proc_tuple.hpp>
//#include <thallium/proc_vector.hpp>
namespace thallium {
/*
template<typename T>
hg_return_t serialize(hg_proc_t proc, void* data) {
std::size_t size;
return proc::process_type<T>(proc, data, size);
}
*/
template<typename ... T>
hg_return_t serialize(hg_proc_t proc, void* data) {
std::size_t size;
return proc::process_type<std::tuple<T...>>(proc, data, size);
}
hg_return_t serialize_buffer(hg_proc_t proc, void* data);
} // namespace thallium
#endif
#ifndef __THALLIUM_BUFFER_INPUT_ARCHIVE_HPP
#define __THALLIUM_BUFFER_INPUT_ARCHIVE_HPP
#include <type_traits>
#include <stdexcept>
#include <cstring>
#include <thallium/serialization/serialize.hpp>
#include <thallium/buffer.hpp>
namespace thallium {
/**
* buffer_input_archive wraps a buffer object and
* offers the functionalities to deserialize its content
* into C++ objects. It inherits from the input_archive
* trait so that serialization methods know they have to
* take data out of the buffer and into C++ objects.
*/
class buffer_input_archive : public input_archive {
private:
const buffer& buffer_;
std::size_t pos;
template<typename T, bool b>
inline void read_impl(T& t, const std::integral_constant<bool, b>&) {
load(*this,t);
}
template<typename T>
inline void read_impl(T& t, const std::true_type&) {
read(&t);
}
public:
/**
* Constructor.
*
* \param b : reference to a buffer from which to read.
* \warning The buffer is held by reference so the life span of
* the buffer_input_archive instance should be shorter than that
* of the buffer.
*/
buffer_input_archive(const buffer& b) : buffer_(b), pos(0) {}
/**
* Operator to get C++ objects of type T from the archive.
* The object should either be a basic type, or an STL container
* (in which case the appropriate hgcxx/hg_stl/stl_* header should
* be included for this function to be properly instanciated), or
* any object for which either a serialize member function or
* a load member function has been provided.
*/
template<typename T>
buffer_input_archive& operator&(T& obj) {
read_impl(obj, std::is_arithmetic<T>());
return *this;
}
/**
* Operator >> is equivalent to operator &.
* \see operator&
*/
template<typename T>
buffer_input_archive& operator>>(T& obj) {
return (*this) & obj;
}
/**
* Basic function to read count objects of type T from the buffer.
* A memcopy is performed from the buffer to the object, so the
* object should either be a basic type or an object that can be
* memcopied instead of calling a more elaborate serialize function.
*/
template<typename T>
inline void read(T* t, std::size_t count=1) {
if(pos + count*sizeof(T) > buffer_.size()) {
throw std::runtime_error("Reading beyond buffer size");
}
std::memcpy((void*)t,(const void*)(&buffer_[pos]),count*sizeof(T));
pos += count*sizeof(T);
}
};
}
#endif
#ifndef __THALLIUM_BUFFER_OUTPUT_ARCHIVE_HPP
#define __THALLIUM_BUFFER_OUTPUT_ARCHIVE_HPP
#include <type_traits>
#include <thallium/serialization/serialize.hpp>
#include <thallium/buffer.hpp>
namespace thallium {
/**
* buffer_output_archive wraps and hg::buffer object and
* offers the functionalities to serialize C++ objects into
* the buffer. The buffer is resized down to 0 when creating
* the archive and will be extended back to an appropriate size
* as C++ objects are serialized into it.
*/
class buffer_output_archive : public output_archive {
private:
buffer& buffer_;
std::size_t pos;
template<typename T, bool b>
inline void write_impl(T& t, const std::integral_constant<bool, b>&) {
save(*this,t);
}
template<typename T>
inline void write_impl(T& t, const std::true_type&) {
write((char*)&t,sizeof(T));
}
public:
/**
* Constructor.
*
* \param b : reference to a buffer into which to write.
* \warning The buffer is held by reference so the life span
* of the buffer_output_archive instance should be shorter than
* that of the buffer itself.
*/
buffer_output_archive(buffer& b) : buffer_(b), pos(0) {
buffer_.resize(0);
}
/**
* Operator to add a C++ object of type T into the archive.
* The object should either be a basic type, or an STL container