Commit 4fc2bd6c authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

Merge branch 'dev-better-serialization' into 'master'

Developped better serialization mechanism

See merge request !7
parents 62a82393 fa2ec4e8
......@@ -6,8 +6,8 @@
#ifndef __THALLIUM_ASYNC_RESPONSE_HPP
#define __THALLIUM_ASYNC_RESPONSE_HPP
#include <thallium/proc_object.hpp>
#include <thallium/margo_exception.hpp>
#include <thallium/buffer.hpp>
#include <thallium/packed_response.hpp>
#include <vector>
#include <utility>
......@@ -41,8 +41,8 @@ private:
* @param c callable_remote_procedure that created the async_response.
* @param ignore_resp whether response should be ignored.
*/
async_response(margo_request req, engine& e, hg_handle_t handle, bool ignore_resp)
: m_request(req), m_engine(&e), m_handle(handle), m_ignore_response(ignore_resp) {
async_response(margo_request req, engine* e, hg_handle_t handle, bool ignore_resp)
: m_request(req), m_engine(e), m_handle(handle), m_ignore_response(ignore_resp) {
margo_ref_incr(handle);
}
......@@ -150,15 +150,10 @@ public:
throw timeout();
}
MARGO_ASSERT(ret, margo_wait_any);
buffer output;
if(completed->m_ignore_response) {
return packed_response(std::move(output), *(completed->m_engine));
return packed_response();
}
ret = margo_get_output(completed->m_handle, &output);
MARGO_ASSERT(ret, margo_get_output);
ret = margo_free_output(completed->m_handle, &output); // won't do anything on a buffer type
MARGO_ASSERT(ret, margo_free_output);
return packed_response(std::move(output), *(completed->m_engine));
return packed_response(completed->m_handle, completed->m_engine);
}
};
......
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __THALLIUM_BUFFER_HPP
#define __THALLIUM_BUFFER_HPP
#include <stdlib.h>
#include <string.h>
namespace thallium {
class buffer {
char* m_data = nullptr;
size_t m_size = 0;
size_t m_capacity = 0;
public:
buffer() = default;
buffer(size_t initialSize) {
resize(initialSize);
}
~buffer() {
if(m_data != nullptr)
free(m_data);
}
buffer(const buffer& other) {
if(other.m_data != nullptr) {
m_data = static_cast<char*>(malloc(other.m_size));
m_size = other.m_size;
m_capacity = other.m_size;
memcpy(m_data, other.m_data, m_size);
} else {
m_data = nullptr;
m_size = 0;
m_capacity = 0;
}
}
buffer(buffer&& other) {
m_data = other.m_data;
m_size = other.m_size;
m_capacity = other.m_capacity;
other.m_data = nullptr;
other.m_size = 0;
other.m_capacity = 0;
}
buffer& operator=(const buffer& other) {
if(&other == this) return *this;
if(m_data != nullptr)
free(m_data);
if(other.m_data != nullptr) {
m_data = static_cast<char*>(malloc(other.m_size));
m_size = other.m_size;
m_capacity = other.m_size;
memcpy(m_data, other.m_data, m_size);
} else {
m_data = nullptr;
m_size = 0;
m_capacity = 0;
}
return *this;
}
buffer& operator=(buffer&& other) {
if(&other == this) return *this;
if(m_data != nullptr)
free(m_data);
m_data = other.m_data;
m_size = other.m_size;
m_capacity = other.m_capacity;
other.m_data = nullptr;
other.m_size = 0;
other.m_capacity = 0;
return *this;
}
const char* data() const {
return m_data;
}
char* data() {
return m_data;
}
size_t size() const {
return m_size;
}
size_t capacity() const {
return m_capacity;
}
void resize(size_t newSize) {
if(m_capacity == 0) {
m_data = static_cast<char*>(malloc(newSize));
m_size = newSize;
m_capacity = newSize;
} else if(m_capacity >= newSize) {
m_size = newSize;
} else { // capacity not 0 but too small
while(m_capacity < newSize) m_capacity *= 2;
m_data = static_cast<char*>(realloc(m_data, m_capacity));
m_size = newSize;
}
}
void reserve(size_t newCapacity) {
if(newCapacity <= m_capacity)
return;
m_capacity = newCapacity;
m_data = static_cast<char*>(realloc(m_data, m_capacity));
}
};
}
#endif
......@@ -12,13 +12,12 @@
#include <chrono>
#include <margo.h>
#include <thallium/config.hpp>
#include <thallium/buffer.hpp>
#include <thallium/timeout.hpp>
#include <thallium/packed_response.hpp>
#include <thallium/async_response.hpp>
#include <thallium/margo_exception.hpp>
#include <thallium/serialization/serialize.hpp>
#include <thallium/serialization/buffer_output_archive.hpp>
#include <thallium/serialization/proc_output_archive.hpp>
#include <thallium/serialization/stl/vector.hpp>
namespace thallium {
......@@ -65,30 +64,53 @@ private:
*
* @return a packed_response object from which the returned value can be deserialized.
*/
packed_response forward(const buffer& buf, double timeout_ms=-1.0) const {
template<typename ... T>
packed_response forward(const std::tuple<T...>& args, double timeout_ms=-1.0) {
hg_return_t ret;
meta_proc_fn mproc = [this, &args](hg_proc_t proc) {
return proc_object(proc, const_cast<std::tuple<T...>&>(args), m_engine);
};
if(timeout_ms > 0.0) {
ret = margo_provider_forward_timed(
m_provider_id,
m_handle,
const_cast<void*>(static_cast<const void*>(&buf)),
const_cast<void*>(static_cast<const void*>(&mproc)),
timeout_ms);
if(ret == HG_TIMEOUT)
throw timeout();
MARGO_ASSERT(ret, margo_provider_iforward);
} else {
ret = margo_provider_forward(
m_provider_id,
m_handle,
const_cast<void*>(static_cast<const void*>(&buf)));
const_cast<void*>(static_cast<const void*>(&mproc)));
MARGO_ASSERT(ret, margo_provider_forward);
}
MARGO_ASSERT(ret, margo_forward);
buffer output;
if(m_ignore_response) return packed_response(std::move(output), *m_engine);
ret = margo_get_output(m_handle, &output);
MARGO_ASSERT(ret, margo_get_output);
ret = margo_free_output(m_handle, &output); // won't do anything on a buffer type
MARGO_ASSERT(ret, margo_free_output);
return packed_response(std::move(output), *m_engine);
if(m_ignore_response) return packed_response();
return packed_response(m_handle, m_engine);
}
packed_response forward(double timeout_ms=-1.0) const {
hg_return_t ret;
meta_proc_fn mproc = proc_void_object;
if(timeout_ms > 0.0) {
ret = margo_provider_forward_timed(
m_provider_id,
m_handle,
const_cast<void*>(static_cast<const void*>(&mproc)),
timeout_ms);
if(ret == HG_TIMEOUT)
throw timeout();
MARGO_ASSERT(ret, margo_provider_forward_timed);
} else {
ret = margo_provider_forward(
m_provider_id,
m_handle,
const_cast<void*>(static_cast<const void*>(&mproc)));
MARGO_ASSERT(ret, margo_provider_forward);
}
if(m_ignore_response) return packed_response();
return packed_response(m_handle, m_engine);
}
/**
......@@ -103,25 +125,53 @@ private:
* Note: If the request times out, the timeout exception will occure when calling wait()
* on the async_response.
*/
async_response iforward(const buffer& buf, double timeout_ms=-1.0) const {
template<typename ... T>
async_response iforward(const std::tuple<T...>& args, double timeout_ms=-1.0) {
hg_return_t ret;
margo_request req;
meta_proc_fn mproc = [this, &args](hg_proc_t proc) {
return proc_object(proc, const_cast<std::tuple<T...>&>(args), m_engine);
};
if(timeout_ms > 0.0) {
ret = margo_provider_iforward_timed(
m_provider_id,
m_handle,
const_cast<void*>(static_cast<const void*>(&mproc)),
timeout_ms,
&req);
MARGO_ASSERT(ret, margo_provider_iforward_timed);
} else {
ret = margo_provider_iforward(
m_provider_id,
m_handle,
const_cast<void*>(static_cast<const void*>(&mproc)),
&req);
MARGO_ASSERT(ret, margo_provider_iforward);
}
return async_response(req, m_engine, m_handle, m_ignore_response);
}
async_response iforward(double timeout_ms=-1.0) const {
hg_return_t ret;
margo_request req;
meta_proc_fn mproc = proc_void_object;
if(timeout_ms > 0.0) {
ret = margo_provider_iforward_timed(
m_provider_id,
m_handle,
const_cast<void*>(static_cast<const void*>(&buf)),
const_cast<void*>(static_cast<const void*>(&mproc)),
timeout_ms,
&req);
MARGO_ASSERT(ret, margo_provider_iforward_timed);
} else {
ret = margo_provider_iforward(
m_provider_id,
m_handle,
const_cast<void*>(static_cast<const void*>(&buf)),
const_cast<void*>(static_cast<const void*>(&mproc)),
&req);
MARGO_ASSERT(ret, margo_provider_iforward);
}
MARGO_ASSERT(ret, margo_iforward);
return async_response(req, *m_engine, m_handle, m_ignore_response);
return async_response(req, m_engine, m_handle, m_ignore_response);
}
public:
......@@ -197,11 +247,8 @@ public:
* @return a packed_response object containing the returned value.
*/
template<typename ... T>
packed_response operator()(T&& ... args) const {
buffer b;
buffer_output_archive arch(b, *m_engine);
arch(std::forward<T>(args)...);
return forward(b);
packed_response operator()(const T& ... args) {
return forward(std::make_tuple<const T&...>(args...));
}
/**
......@@ -219,13 +266,10 @@ public:
* @return a packed_response object containing the returned value.
*/
template<typename R, typename P, typename ... T>
packed_response timed(const std::chrono::duration<R,P>& t, T&& ... args) const {
buffer b;
buffer_output_archive arch(b, *m_engine);
arch(std::forward<T>(args)...);
packed_response timed(const std::chrono::duration<R,P>& t, const T& ... args) {
std::chrono::duration<double, std::milli> fp_ms = t;
double timeout_ms = fp_ms.count();
return forward(b, timeout_ms);
return forward(std::make_tuple<const T&...>(args...), timeout_ms);
}
/**
......@@ -234,8 +278,7 @@ public:
* @return a packed_response object containing the returned value.
*/
packed_response operator()() const {
buffer b;
return forward(b);
return forward();
}
/**
......@@ -248,11 +291,10 @@ public:
* @return a packed_response object containing the returned value.
*/
template<typename R, typename P>
packed_response timed(const std::chrono::duration<R,P>& t) const {
buffer b;
packed_response timed(const std::chrono::duration<R,P>& t) {
std::chrono::duration<double, std::milli> fp_ms = t;
double timeout_ms = fp_ms.count();
return forward(b, timeout_ms);
return forward(timeout_ms);
}
/**
......@@ -265,11 +307,8 @@ public:
* @return an async_response object that the caller can wait on.
*/
template<typename ... T>
async_response async(T&& ... t) {
buffer b;
buffer_output_archive arch(b, *m_engine);
arch(std::forward<T>(t)...);
return iforward(b);
async_response async(const T& ... args) {
return iforward(std::make_tuple<const T&...>(args...));
}
/**
......@@ -286,13 +325,10 @@ public:
* @return an async_response object that the caller can wait on.
*/
template<typename R, typename P, typename ... T>
async_response timed_async(const std::chrono::duration<R,P>& t, T&& ... args) {
buffer b;
buffer_output_archive arch(b, *m_engine);
arch(std::forward<T>(t)...);
async_response timed_async(const std::chrono::duration<R,P>& t, const T& ... args) {
std::chrono::duration<double, std::milli> fp_ms = t;
double timeout_ms = fp_ms.count();
return iforward(b, timeout_ms);
return iforward(std::make_tuple<const T&...>(args...), timeout_ms);
}
/**
......@@ -301,8 +337,7 @@ public:
* @return an async_response object that the caller can wait on.
*/
async_response async() {
buffer b;
return iforward(b);
return iforward();
}
/**
......@@ -316,10 +351,9 @@ public:
*/
template<typename R, typename P>
async_response timed_async(const std::chrono::duration<R,P>& t) {
buffer b;
std::chrono::duration<double, std::milli> fp_ms = t;
double timeout_ms = fp_ms.count();
return iforward(b, timeout_ms);
return iforward(timeout_ms);
}
};
......
......@@ -19,7 +19,7 @@
#include <thallium/pool.hpp>
#include <thallium/tuple_util.hpp>
#include <thallium/function_cast.hpp>
#include <thallium/buffer.hpp>
#include <thallium/proc_object.hpp>
#include <thallium/request.hpp>
#include <thallium/bulk_mode.hpp>
......@@ -53,7 +53,7 @@ class engine {
private:
using rpc_t = std::function<void(const request&, const buffer&)>;
using rpc_t = std::function<void(const request&)>;
margo_instance_id m_mid;
std::unordered_map<hg_id_t, rpc_t> m_rpcs;
......@@ -103,14 +103,8 @@ private:
THALLIUM_ASSERT_CONDITION(data != nullptr, "margo_registered_data returned null");
auto cb_data = static_cast<rpc_callback_data*>(data);
auto f = function_cast<G>(cb_data->m_function);
request req(*(cb_data->m_engine), handle, disable_response);
buffer input;
hg_return_t ret;
ret = margo_get_input(handle, &input);
MARGO_ASSERT(ret, margo_get_input);
(*f)(req, input);
ret = margo_free_input(handle, &input);
MARGO_ASSERT(ret, margo_free_input);
request req(cb_data->m_engine, handle, disable_response);
(*f)(req);
margo_destroy(handle); // because of margo_ref_incr in rpc_callback
__margo_internal_post_wrapper_hooks(mid);
}
......@@ -555,33 +549,38 @@ public:
} // namespace thallium
#include <thallium/remote_procedure.hpp>
#include <thallium/proc_buffer.hpp>
#include <thallium/serialization/buffer_input_archive.hpp>
#include <thallium/serialization/buffer_output_archive.hpp>
#include <thallium/proc_object.hpp>
#include <thallium/serialization/proc_input_archive.hpp>
#include <thallium/serialization/proc_output_archive.hpp>
#include <thallium/serialization/stl/tuple.hpp>
namespace thallium {
template<typename A1, typename ... Args>
template<typename T1, typename ... Tn>
remote_procedure engine::define(const std::string& name,
const std::function<void(const request&, A1, Args...)>& fun,
const std::function<void(const request&, T1, Tn...)>& fun,
uint16_t provider_id, const pool& p) {
hg_id_t id = margo_provider_register_name(m_mid, name.c_str(),
process_buffer,
process_buffer,
meta_serialization,
meta_serialization,
rpc_callback<rpc_t, false>,
provider_id,
p.native_handle());
m_rpcs[id] = [fun,this](const request& r, const buffer& b) {
std::function<void(A1, Args...)> call_function = [&fun, &r](A1&& a1, Args&&... args) {
fun(r, std::forward<A1>(a1), std::forward<Args>(args)...);
m_rpcs[id] = [fun,this](const request& r) {
std::function<void(T1, Tn...)> call_function = [&fun, &r](const T1& a1, const Tn&... args) {
fun(r, a1, args...);
};
std::tuple<typename std::decay<A1>::type, typename std::decay<Args>::type...> iargs;
buffer_input_archive iarch(b, *this);
iarch(iargs);
std::tuple<typename std::decay<T1>::type, typename std::decay<Tn>::type...> iargs;
meta_proc_fn mproc = [this, &iargs](hg_proc_t proc) {
return proc_object(proc, iargs, this);
};
hg_return_t ret = margo_get_input(r.m_handle, &mproc);
if(ret != HG_SUCCESS) return ret;
ret = margo_free_input(r.m_handle, &mproc);
if(ret != HG_SUCCESS) return ret;
apply_function_to_tuple(call_function, iargs);
return HG_SUCCESS;
};
rpc_callback_data* cb_data = new rpc_callback_data;
......
......@@ -6,9 +6,10 @@
#ifndef __THALLIUM_PACKED_RESPONSE_HPP
#define __THALLIUM_PACKED_RESPONSE_HPP
#include <thallium/buffer.hpp>
#include <thallium/margo_exception.hpp>
#include <thallium/serialization/serialize.hpp>
#include <thallium/serialization/buffer_input_archive.hpp>
#include <thallium/serialization/proc_input_archive.hpp>
#include <thallium/proc_object.hpp>
namespace thallium {
......@@ -27,23 +28,34 @@ class packed_response {
private:
engine* m_engine;
buffer m_buffer;
engine* m_engine = nullptr;
hg_handle_t m_handle = HG_HANDLE_NULL;
/**
* @brief Constructor. Made private since packed_response
* objects are created by callable_remote_procedure only.
*
* @param b Buffer containing the result of an RPC.
* @param h Handle containing the result of an RPC.
* @param e Engine associated with the RPC.
*/
packed_response(buffer&& b, engine& e)
: m_engine(&e), m_buffer(std::move(b)) {}
packed_response(hg_handle_t h, engine* e)
: m_engine(e), m_handle(h) {
hg_return_t ret = margo_ref_incr(h);
MARGO_ASSERT(ret, margo_ref_incr);
}
packed_response() = default;
public:
~packed_response() {
if(m_handle != HG_HANDLE_NULL) {
margo_destroy(m_handle);
}
}
/**
* @brief Converts the buffer into the requested object.
* @brief Converts the handle's content into the requested object.
*
* @tparam T Type into which to convert the content of the buffer.
*
......@@ -51,10 +63,19 @@ public:
*/
template<typename T>
T as() const {
T t;
buffer_input_archive iarch(m_buffer, *m_engine);
iarch(t);
return t;
if(m_handle == HG_HANDLE_NULL) {
throw exception("Cannot unpack data from handle. Are you trying to "
"unpack data from an RPC that does not return any?");
}
std::tuple<T> t;
meta_proc_fn mproc = [this, &t](hg_proc_t proc) {
return proc_object(proc, t, m_engine);
};
hg_return_t ret = margo_get_output(m_handle, &mproc);
MARGO_ASSERT(ret, margo_get_output);
ret = margo_free_output(m_handle, &mproc);
MARGO_ASSERT(ret, margo_free_output);
return std::get<0>(t);
}
/**
......@@ -74,11 +95,20 @@ public:
*/
template<typename T1, typename T2, typename ... Tn>
auto as() const {
if(m_handle == HG_HANDLE_NULL) {
throw exception("Cannot unpack data from handle. Are you trying to "
"unpack data from an RPC that does not return any?");
}
std::tuple<typename std::decay<T1>::type,
typename std::decay<T2>::type,
typename std::decay_t<Tn>::type...> t;
buffer_input_archive iarch(m_buffer, *m_engine);
arch(t);
meta_proc_fn mproc = [this, &t](hg_proc_t proc) {
return proc_object(proc, t, &m_engine);
};
hg_return_t ret = margo_get_output(m_handle, &mproc);
MARGO_ASSERT(ret, margo_get_output);
ret = margo_free_output(m_handle, &mproc);
MARGO_ASSERT(ret, margo_free_output);
return t;
}
......
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/