Commit 02c51546 authored by Matthieu Dorier's avatar Matthieu Dorier

done integrating serialization

parent f1f532cf
......@@ -6,6 +6,8 @@
#include <utility>
#include <margo.h>
#include <thallium/buffer.hpp>
#include <thallium/packed_response.hpp>
#include <thallium/serialization/serialize.hpp>
#include <thallium/serialization/stl/vector.hpp>
#include <thallium/serialization/buffer_output_archive.hpp>
......@@ -28,10 +30,10 @@ private:
auto forward(const buffer& buf) const {
margo_forward(m_handle, const_cast<void*>(static_cast<const void*>(&buf)));
buffer output;
if(m_ignore_response) return output;
if(m_ignore_response) return packed_response(std::move(output));
margo_get_output(m_handle, &output);
margo_free_output(m_handle, &output); // won't do anything on a buffer type
return output;
return packed_response(std::move(output));
}
public:
......@@ -83,10 +85,15 @@ public:
template<typename ... T>
auto operator()(T&& ... t) const {
buffer b;
buffer_output_archive arch(b);
serialize_many(arch, std::forward<T>(t)...);
buffer_output_archive arch(b);
serialize_many(arch, std::forward<T>(t)...);
return forward(b);
}
auto operator()() const {
buffer b;
return forward(b);
}
};
}
......
......@@ -6,6 +6,7 @@
#include <functional>
#include <unordered_map>
#include <margo.h>
#include <thallium/tuple_util.hpp>
#include <thallium/function_cast.hpp>
#include <thallium/buffer.hpp>
#include <thallium/request.hpp>
......@@ -23,8 +24,11 @@ class engine {
private:
using rpc_t = std::function<void(const request&, const buffer&)>;
margo_instance_id m_mid;
bool m_is_server;
std::unordered_map<hg_id_t, rpc_t> m_rpcs;
template<typename F, bool disable_response>
static void rpc_handler_ult(hg_handle_t handle) {
......@@ -97,8 +101,12 @@ public:
remote_procedure define(const std::string& name);
template<typename F>
remote_procedure define(const std::string& name, F&& fun);
template<typename ... Args>
remote_procedure define(const std::string& name,
const std::function<void(const request&, Args...)>& fun);
template<typename ... Args>
remote_procedure define(const std::string& name, void (*f)(const request&, Args...));
endpoint lookup(const std::string& address) const;
......@@ -109,21 +117,44 @@ public:
#include <thallium/remote_procedure.hpp>
#include <thallium/proc_buffer.hpp>
#include <thallium/serialization/stl/tuple.hpp>
#include <thallium/serialization/buffer_input_archive.hpp>
#include <thallium/serialization/buffer_output_archive.hpp>
namespace thallium {
template<typename F>
remote_procedure engine::define(const std::string& name, F&& fun) {
template<typename ... Args>
remote_procedure engine::define(const std::string& name,
const std::function<void(const request&, Args...)>& fun) {
// TODO throw an exception if the following call fails
hg_id_t id = margo_register_name(m_mid, name.c_str(),
process_buffer,
process_buffer,
rpc_callback<decltype(fun), false>);
margo_register_data(m_mid, id, void_cast(&fun), nullptr);
rpc_callback<rpc_t, false>);
m_rpcs[id] = [fun](const request& r, const buffer& b) {
std::function<void(Args...)> l = [&fun, &r](Args&&... args) {
fun(r, std::forward<Args>(args)...);
};
std::tuple<std::decay_t<Args>...> iargs;
if(sizeof...(Args) > 0) {
buffer_input_archive iarch(b);
iarch & iargs;
}
apply_function_to_tuple(l,iargs);
};
margo_register_data(m_mid, id, void_cast(&m_rpcs[id]), nullptr);
return remote_procedure(*this, id);
}
template<typename ... Args>
remote_procedure engine::define(const std::string& name, void (*f)(const request&, Args...)) {
return define(name, std::function<void(const request&,Args...)>(f));
}
}
#endif
#ifndef __THALLIUM_PACKED_RESPONSE_HPP
#define __THALLIUM_PACKED_RESPONSE_HPP
#include <thallium/buffer.hpp>
#include <thallium/serialization/serialize.hpp>
#include <thallium/serialization/buffer_input_archive.hpp>
namespace thallium {
class callable_remote_procedure;
class packed_response {
friend class callable_remote_procedure;
private:
buffer m_buffer;
packed_response(buffer&& b)
: m_buffer(std::move(b)) {}
public:
template<typename T>
T as() const {
T t;
buffer_input_archive iarch(m_buffer);
iarch & t;
return t;
}
template<typename T1, typename T2, typename ... Tn>
auto as() const {
std::tuple<std::decay_t<T1>, std::decay_t<T2>, std::decay_t<Tn>...> t;
buffer_input_archive iarch(m_buffer);
iarch & t;
return t;
}
template<typename T>
operator T() const {
return as<T>();
}
};
}
#endif
......@@ -2,6 +2,8 @@
#define __THALLIUM_REQUEST_HPP
#include <margo.h>
#include <thallium/serialization/serialize.hpp>
#include <thallium/serialization/buffer_output_archive.hpp>
namespace thallium {
......@@ -53,15 +55,17 @@ public:
margo_destroy(m_handle);
}
template<typename T>
void respond(T&& t) const {
template<typename ... T>
void respond(T&&... t) const {
if(m_disable_response) return; // XXX throwing an exception?
// TODO serialize
if(m_handle != HG_HANDLE_NULL) {
margo_respond(m_handle, nullptr);
buffer b;
buffer_output_archive arch(b);
serialize_many(arch, std::forward<T>(t)...);
margo_respond(m_handle, &b);
}
}
/*
void respond(const buffer& output) const {
if(m_disable_response) return; // XXX throwing an exception?
if(m_handle != HG_HANDLE_NULL) {
......@@ -76,6 +80,7 @@ public:
void respond(buffer&& output) const {
respond((const buffer&)output);
}
*/
};
}
......
......@@ -24,12 +24,12 @@ private:
std::size_t pos;
template<typename T, bool b>
inline void read_impl(T& t, const std::integral_constant<bool, b>&) {
load(*this,t);
inline void read_impl(T&& t, const std::integral_constant<bool, b>&) {
load(*this,std::forward<T>(t));
}
template<typename T>
inline void read_impl(T& t, const std::true_type&) {
inline void read_impl(T&& t, const std::true_type&) {
read(&t);
}
......@@ -54,8 +54,8 @@ public:
* a load member function has been provided.
*/
template<typename T>
buffer_input_archive& operator&(T& obj) {
read_impl(obj, std::is_arithmetic<T>());
buffer_input_archive& operator&(T&& obj) {
read_impl(std::forward<T>(obj), std::is_arithmetic<std::decay_t<T>>());
return *this;
}
......@@ -64,8 +64,8 @@ public:
* \see operator&
*/
template<typename T>
buffer_input_archive& operator>>(T& obj) {
return (*this) & obj;
buffer_input_archive& operator>>(T&& obj) {
return (*this) & std::forward<T>(obj);
}
/**
......
......@@ -22,12 +22,12 @@ private:
std::size_t pos;
template<typename T, bool b>
inline void write_impl(T& t, const std::integral_constant<bool, b>&) {
save(*this,t);
inline void write_impl(T&& t, const std::integral_constant<bool, b>&) {
save(*this,std::forward<T>(t));
}
template<typename T>
inline void write_impl(T& t, const std::true_type&) {
inline void write_impl(T&& t, const std::true_type&) {
write((char*)&t,sizeof(T));
}
......@@ -54,8 +54,8 @@ public:
* a load member function has been provided.
*/
template<typename T>
buffer_output_archive& operator&(T& obj) {
write_impl(obj, std::is_arithmetic<T>());
buffer_output_archive& operator&(T&& obj) {
write_impl(std::forward<T>(obj), std::is_arithmetic<std::decay_t<T>>());
return *this;
}
......@@ -64,8 +64,8 @@ public:
* \see operator&
*/
template<typename T>
buffer_output_archive& operator<<(T& obj) {
return (*this) & obj;
buffer_output_archive& operator<<(T&& obj) {
return (*this) & std::forward<T>(obj);
}
/**
......
#ifndef SERIALIZE_H
#define SERIALIZE_H
#include <utility>
#include <type_traits>
namespace thallium {
......@@ -97,14 +98,14 @@ struct serializer;
template<class A, typename T>
struct serializer<A,T,true> {
static void apply(A& ar, T& t) {
static void apply(A& ar, T&& t) {
t.serialize(ar);
}
};
template<class A, typename T>
struct serializer<A,T,false> {
static void apply(A& ar, T& t) {
static void apply(A& ar, T&& t) {
static_assert(has_serialize_method<A,T>::value,
"Undefined \"serialize\" member function");
}
......@@ -114,8 +115,8 @@ struct serializer<A,T,false> {
* Generic serialize method calling apply on a serializer.
*/
template<class A, typename T>
void serialize(A& ar, T& t) {
serializer<A,T,has_serialize_method<A,T>::value>::apply(ar,t);
void serialize(A& ar, T&& t) {
serializer<A,T,has_serialize_method<A,T>::value>::apply(ar,std::forward<T>(t));
}
/**
......@@ -127,15 +128,15 @@ struct saver;
template<class A, typename T>
struct saver<A,T,true> {
static void apply(A& ar, T& t) {
static void apply(A& ar, T&& t) {
t.save(ar);
}
};
template<class A, typename T>
struct saver<A,T,false> {
static void apply(A& ar, T& t) {
serialize(ar,t);
static void apply(A& ar, T&& t) {
serialize(ar,std::forward<T>(t));
}
};
......@@ -143,8 +144,8 @@ struct saver<A,T,false> {
* Generic save method calling apply on a saver.
*/
template<class A, typename T>
inline void save(A& ar, T& t) {
saver<A,T,has_save_method<A,T>::value>::apply(ar,t);
inline void save(A& ar, T&& t) {
saver<A,T,has_save_method<A,T>::value>::apply(ar,std::forward<T>(t));
}
/**
......@@ -156,15 +157,15 @@ struct loader;
template<class A, typename T>
struct loader<A,T,true> {
static void apply(A& ar, T& t) {
static void apply(A& ar, T&& t) {
t.load(ar);
}
};
template<class A, typename T>
struct loader<A,T,false> {
static void apply(A& ar, T& t) {
serialize(ar,t);
static void apply(A& ar, T&& t) {
serialize(ar,std::forward<T>(t));
}
};
......@@ -172,8 +173,8 @@ struct loader<A,T,false> {
* Generic load method calling allpy on a loader.
*/
template<class A, typename T>
inline void load(A& ar, T& t) {
loader<A,T,has_load_method<A,T>::value>::apply(ar,t);
inline void load(A& ar, T&& t) {
loader<A,T,has_load_method<A,T>::value>::apply(ar,std::forward<T>(t));
}
/**
......@@ -181,14 +182,14 @@ inline void load(A& ar, T& t) {
* objects passed as arguments.
*/
template<class A, typename T1, typename... Tn>
void serialize_many(A& ar, T1& t1, Tn&... rest) {
ar & t1;
serialize_many(ar,rest...);
void serialize_many(A& ar, T1&& t1, Tn&&... rest) {
ar & std::forward<T1>(t1);
serialize_many(ar, std::forward<Tn>(rest)...);
}
template<class A, typename T>
void serialize_many(A& ar, T& t) {
ar & t;
void serialize_many(A& ar, T&& t) {
ar & std::forward<T>(t);
}
}
......
#ifndef __THALLIUM_TUPLE_UTIL_HPP
#define __THALLIUM_TUPLE_UTIL_HPP
#include <tuple>
namespace thallium {
namespace detail {
template<size_t N>
struct apply_f_to_t_impl {
template<typename R, typename... ArgsF, typename... ArgsT, typename... Args>
static R apply(const std::function<R(ArgsF...)>& f, std::tuple<ArgsT...> const& t, Args... args) {
return apply_f_to_t_impl<N-1>::apply(f, t, std::get<N-1>(t), args...);
}
};
template<>
struct apply_f_to_t_impl<0> {
template<typename R, typename... ArgsF, typename... ArgsT, typename... Args>
static R apply(const std::function<R(ArgsF...)>& f, std::tuple<ArgsT...> const& t, Args... args) {
return f(args...);
}
};
}
/**
* Applies a function with arbitrary arguments to a tuple holding the same types of arguments.
*
* \param f : function to call on the tuple.
* \param t : tupe of arguments.
* \return the value returned by f.
*/
template<typename R, typename... ArgsF, typename... ArgsT>
R apply_function_to_tuple(const std::function<R(ArgsF...)>& f, std::tuple<ArgsT...> const& t) {
return detail::apply_f_to_t_impl<sizeof...(ArgsT)>::apply(f,t);
}
/**
* Applies a function with arbitrary arguments to a tuple holding the same types of arguments.
*
* \param f : function to call on the tuple.
* \param t : tupe of arguments.
* \return the value returned by f.
*/
template<typename R, typename... ArgsF, typename... ArgsT>
R apply_function_to_tuple(R (*f)(ArgsF...), std::tuple<ArgsT...> const& t) {
std::function<R(ArgsF...)> fun(f);
return apply_function_to_tuple(fun,t);
}
}
#endif
......@@ -5,27 +5,33 @@
#include <unistd.h>
#include <iostream>
#include <thallium.hpp>
#include <thallium/serialization/stl/string.hpp>
namespace tl = thallium;
void hello(const tl::request& req, const tl::buffer& input) {
std::cout << "(1) Hello World ";
for(auto c : input) std::cout << c;
std::cout << std::endl;
tl::buffer ret(6,'c');
req.respond(ret);
void hello(const tl::request& req, const std::string& name) {
std::cout << "Hello " << name << std::endl;
}
int server() {
tl::engine margo("bmi+tcp://127.0.0.1:1234", MARGO_SERVER_MODE);
margo.define("hello1", hello);
margo.define("hello2", [&margo](const tl::request& req, const tl::buffer& input)
{ std::cout << "(2) Hello World ";
for(auto c : input) std::cout << c;
std::cout << std::endl;
margo.finalize(); }
).ignore_response();
margo.define("hello", hello).ignore_response();
std::function<void(const tl::request&, int, int)> f =
[](const tl::request& req, int x, int y) {
std::cout << x << "+" << y << " = " << (x+y) << std::endl;
req.respond(x+y);
};
margo.define("sum", f);
std::function<void(const tl::request&)> g =
[&margo](const tl::request& req) {
std::cout << "Stopping server" << std::endl;
margo.finalize();
};
margo.define("stop", g);
std::string addr = margo.self();
std::cout << "Server running at address " << addr << std::endl;
......@@ -35,22 +41,24 @@ int server() {
int client() {
tl::engine margo("bmi+tcp", MARGO_CLIENT_MODE);
auto remote_hello1 = margo.define("hello1");
auto remote_hello2 = margo.define("hello2").ignore_response();
auto remote_hello = margo.define("hello").ignore_response();
auto remote_sum = margo.define("sum");
auto remote_stop = margo.define("stop").ignore_response();
std::string server_addr = "bmi+tcp://127.0.0.1:1234";
sleep(1);
auto server_endpoint = margo.lookup(server_addr);
std::cout << "Lookup done for endpoint " << (std::string)server_endpoint << std::endl;
tl::buffer b(16,'a');
auto ret = remote_hello1.on(server_endpoint)(b);
std::cout << "Response from hello1: ";
for(auto c : ret) std::cout << c;
std::cout << std::endl;
std::string name("Matthieu");
remote_hello.on(server_endpoint)(name);
remote_hello2.on(server_endpoint)(b);
int ret = remote_sum.on(server_endpoint)(23,67);
std::cout << "Server returned " << ret << std::endl;
remote_stop.on(server_endpoint)();
return 0;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment