Commit a23f6ad7 authored by Matthieu Dorier's avatar Matthieu Dorier

started implementing cereal support

parent bb0a5840
......@@ -48,8 +48,15 @@ if (NOT CMAKE_BUILD_TYPE)
endif ()
set (CMAKE_PREFIX_PATH "" CACHE STRING "External dependencies path")
set (BUILD_SHARED_LIBS "OFF" CACHE BOOL "Build a shared library")
set (ENABLE_CEREAL "OFF" CACHE BOOL "Enable cereal serialization")
include_directories (${CMAKE_BINARY_DIR}/include)
# packages we depend on
if (ENABLE_CEREAL)
find_package (cereal CONFIG REQUIRED)
endif (ENABLE_CEREAL)
include (xpkg-import)
find_package (mercury CONFIG REQUIRED)
xpkg_import_module (margo REQUIRED margo)
......@@ -62,4 +69,4 @@ if (ENABLE_EXAMPLES)
add_subdirectory (examples)
endif (ENABLE_EXAMPLES)
configure_file (include/thallium/config.hpp.in ${CMAKE_BINARY_DIR}/include/thallium/config.hpp)
......@@ -2,6 +2,7 @@
#define __THALLIUM_HPP
#include <margo.h>
#include <thallium/config.hpp>
#include <thallium/abt.hpp>
#include <thallium/anonymous.hpp>
#include <thallium/bulk_mode.hpp>
......
......@@ -15,10 +15,14 @@
#include <thallium/timeout.hpp>
#include <thallium/packed_response.hpp>
#include <thallium/async_response.hpp>
#include <thallium/serialization/serialize.hpp>
#include <thallium/serialization/stl/vector.hpp>
#include <thallium/serialization/buffer_output_archive.hpp>
#include <thallium/margo_exception.hpp>
#ifdef USE_CEREAL
#include <thallium/serialization/cereal/archives.hpp>
#else
#include <thallium/serialization/serialize.hpp>
#include <thallium/serialization/stl/vector.hpp>
#include <thallium/serialization/buffer_output_archive.hpp>
#endif
namespace thallium {
......@@ -198,8 +202,13 @@ public:
template<typename ... T>
packed_response operator()(T&& ... args) const {
buffer b;
#ifdef USE_CEREAL
cereal_output_archive arch(b, *m_engine);
arch(std::forward<T>(args)...);
#else
buffer_output_archive arch(b, *m_engine);
serialize_many(arch, std::forward<T>(args)...);
#endif
return forward(b);
}
......@@ -220,8 +229,13 @@ public:
template<typename R, typename P, typename ... T>
packed_response timed(const std::chrono::duration<R,P>& t, T&& ... args) const {
buffer b;
#ifdef USE_CEREAL
cereal_output_archive arch(b, *m_engine);
arch(std::forward<T>(args)...);
#else
buffer_output_archive arch(b, *m_engine);
serialize_many(arch, std::forward<T>(args)...);
#endif
std::chrono::duration<double, std::milli> fp_ms = t;
double timeout_ms = fp_ms.count();
return forward(b, timeout_ms);
......@@ -266,8 +280,13 @@ public:
template<typename ... T>
async_response async(T&& ... t) {
buffer b;
#ifdef USE_CEREAL
cereal_output_archive arch(b, *m_engine);
arch(std::forward<T>(t)...);
#else
buffer_output_archive arch(b, *m_engine);
serialize_many(arch, std::forward<T>(t)...);
#endif
return iforward(b);
}
......@@ -287,8 +306,13 @@ public:
template<typename R, typename P, typename ... T>
async_response timed_async(const std::chrono::duration<R,P>& t, T&& ... args) {
buffer b;
#ifdef USE_CEREAL
cereal_output_archive arch(b, *m_engine);
arch(std::forward<T>(t)...);
#else
buffer_output_archive arch(b, *m_engine);
serialize_many(arch, std::forward<T>(args)...);
#endif
std::chrono::duration<double, std::milli> fp_ms = t;
double timeout_ms = fp_ms.count();
return iforward(b, timeout_ms);
......
#cmakedefine ENABLE_CEREAL
#ifdef ENABLE_CEREAL
#define USE_CEREAL
#endif
......@@ -391,9 +391,13 @@ public:
#include <thallium/remote_procedure.hpp>
#include <thallium/proc_buffer.hpp>
#include <thallium/serialization/stl/tuple.hpp>
#include <thallium/serialization/buffer_input_archive.hpp>
#include <thallium/serialization/buffer_output_archive.hpp>
#ifdef USE_CEREAL
#include <thallium/serialization/cereal/archives.hpp>
#else
#include <thallium/serialization/stl/tuple.hpp>
#include <thallium/serialization/buffer_input_archive.hpp>
#include <thallium/serialization/buffer_output_archive.hpp>
#endif
namespace thallium {
......@@ -410,6 +414,18 @@ remote_procedure engine::define(const std::string& name,
p.native_handle());
m_rpcs[id] = [fun,this](const request& r, const buffer& b) {
#ifdef USE_CEREAL
std::function<void(Args...)> deserialize = [&b, this](Args&&... args) {
cereal_input_archive arch(b, *this);
arch(std::forward<Args>(args)...);
};
std::function<void(Args...)> call_function = [&fun, &r](Args&&... args) {
fun(r, std::forward<Args>(args)...);
};
std::tuple<typename std::decay<Args>::type...> iargs;
apply_function_to_tuple(deserialize, iargs);
apply_function_to_tuple(call_function, iargs);
#else
std::function<void(Args...)> l = [&fun, &r](Args&&... args) {
fun(r, std::forward<Args>(args)...);
};
......@@ -419,6 +435,7 @@ remote_procedure engine::define(const std::string& name,
iarch & iargs;
}
apply_function_to_tuple(l,iargs);
#endif
};
rpc_callback_data* cb_data = new rpc_callback_data;
......
......@@ -7,8 +7,12 @@
#define __THALLIUM_PACKED_RESPONSE_HPP
#include <thallium/buffer.hpp>
#include <thallium/serialization/serialize.hpp>
#include <thallium/serialization/buffer_input_archive.hpp>
#ifdef USE_CEREAL
#include <thallium/serialization/cereal/archives.hpp>
#else
#include <thallium/serialization/serialize.hpp>
#include <thallium/serialization/buffer_input_archive.hpp>
#endif
namespace thallium {
......@@ -52,8 +56,13 @@ public:
template<typename T>
T as() const {
T t;
#ifdef USE_CEREAL
cereal_input_archive iarch(m_buffer, *m_engine);
iarch(t);
#else
buffer_input_archive iarch(m_buffer, *m_engine);
iarch & t;
#endif
return t;
}
......@@ -77,8 +86,18 @@ public:
std::tuple<typename std::decay<T1>::type,
typename std::decay<T2>::type,
typename std::decay_t<Tn>::type...> t;
buffer_input_archive iarch(m_buffer);
#ifdef USE_CEREAL
std::function<void(T1, T2, Tn...)> deserialize = [this](T1&& t1, T2&& t2, Tn&&... tn) {
cereal_input_archive arch(m_buffer, *m_engine);
arch(std::forward<T1>(t1),
std::forward<T2>(t2),
std::forward<Tn>(tn)...);
};
apply_function_to_tuple(deserialize, t);
#else
buffer_input_archive iarch(m_buffer, *m_engine);
iarch & t;
#endif
return t;
}
......
......@@ -8,8 +8,12 @@
#include <margo.h>
#include <thallium/margo_exception.hpp>
#include <thallium/serialization/serialize.hpp>
#include <thallium/serialization/buffer_output_archive.hpp>
#ifdef USE_CEREAL
#include <thallium/serialization/cereal/archives.hpp>
#else
#include <thallium/serialization/serialize.hpp>
#include <thallium/serialization/buffer_output_archive.hpp>
#endif
namespace thallium {
......@@ -113,8 +117,13 @@ public:
if(m_disable_response) return; // XXX throwing an exception?
if(m_handle != HG_HANDLE_NULL) {
buffer b;
#ifdef USE_CEREAL
cereal_output_archive arch(b, *m_engine);
arch(std::forward<T>(t)...);
#else
buffer_output_archive arch(b, *m_engine);
serialize_many(arch, std::forward<T>(t)...);
#endif
hg_return_t ret = margo_respond(m_handle, &b);
MARGO_ASSERT(ret, margo_respond);
}
......
/*
* Copyright (c) 2019 UChicago Argonne, LLC
*
* See COPYRIGHT in top-level directory.
*/
#ifndef THALLIUM_CEREAL_ARCHIVES_BINARY_HPP
#define THALLIUM_CEREAL_ARCHIVES_BINARY_HPP
#include <cstring>
#include <thallium/buffer.hpp>
#include <cereal/cereal.hpp>
namespace thallium {
class engine;
class cereal_output_archive : public cereal::OutputArchive<cereal_output_archive, cereal::AllowEmptyClassElision>
{
public:
cereal_output_archive(buffer& b, engine& e)
: cereal::OutputArchive<cereal_output_archive, cereal::AllowEmptyClassElision>(this)
, m_buffer(b)
, m_pos(0)
, m_engine(&e)
{
m_buffer.resize(0);
}
cereal_output_archive(buffer& b)
: cereal::OutputArchive<cereal_output_archive, cereal::AllowEmptyClassElision>(this)
, m_buffer(b)
, m_pos(0)
, m_engine(nullptr)
{
m_buffer.resize(0);
}
~cereal_output_archive() = default;
inline void write(void* const data, size_t size) {
if(m_pos+size > m_buffer.size()) {
if(m_pos+size > m_buffer.capacity()) {
m_buffer.reserve(m_buffer.capacity()*2);
}
m_buffer.resize(m_pos+size);
}
memcpy((void*)(m_buffer.data() + m_pos),(void*)data, size);
m_pos += size;
}
engine& get_engine() const {
return *m_engine;
}
private:
buffer& m_buffer;
std::size_t m_pos;
engine* m_engine;
};
class cereal_input_archive : public cereal::InputArchive<cereal_input_archive, cereal::AllowEmptyClassElision>
{
public:
cereal_input_archive(const buffer& b, engine& e)
: cereal::InputArchive<cereal_input_archive, cereal::AllowEmptyClassElision>(this)
, m_buffer(b)
, m_pos(0)
, m_engine(&e)
{}
cereal_input_archive(buffer& b)
: cereal::InputArchive<cereal_input_archive, cereal::AllowEmptyClassElision>(this)
, m_buffer(b)
, m_pos(0)
, m_engine(nullptr)
{}
~cereal_input_archive() = default;
inline void read(void* data, std::size_t size) {
if(m_pos + size > m_buffer.size()) {
throw std::runtime_error("Reading beyond buffer size");
}
std::memcpy((void*)data,(const void*)(m_buffer.data() + m_pos), size);
m_pos += size;
}
engine& get_engine() const {
return *m_engine;
}
private:
const buffer& m_buffer;
std::size_t m_pos;
engine* m_engine;
};
template<class T> inline
typename std::enable_if<std::is_arithmetic<T>::value, void>::type
CEREAL_SAVE_FUNCTION_NAME(cereal_output_archive & ar, T const & t)
{
ar.write(std::addressof(t), sizeof(t));
}
template<class T> inline
typename std::enable_if<std::is_arithmetic<T>::value, void>::type
CEREAL_LOAD_FUNCTION_NAME(cereal_input_archive & ar, T & t)
{
ar.read(std::addressof(t), sizeof(t));
}
template <class Archive, class T> inline
CEREAL_ARCHIVE_RESTRICT(cereal_input_archive, cereal_output_archive)
CEREAL_SERIALIZE_FUNCTION_NAME(Archive& ar, cereal::NameValuePair<T>& t)
{
ar(t.value);
}
template <class Archive, class T> inline
CEREAL_ARCHIVE_RESTRICT(cereal_input_archive, cereal_output_archive)
CEREAL_SERIALIZE_FUNCTION_NAME(Archive& ar, cereal::SizeTag<T>& t)
{
ar(t.size);
}
template <class T> inline
void CEREAL_SAVE_FUNCTION_NAME(cereal_output_archive& ar, cereal::BinaryData<T> const & bd)
{
ar.write(bd.data, static_cast<std::size_t>(bd.size));
}
template <class T> inline
void CEREAL_LOAD_FUNCTION_NAME(cereal_input_archive & ar, cereal::BinaryData<T> & bd)
{
ar.read(bd.data, static_cast<std::size_t>(bd.size));
}
}
// register archives for polymorphic support
CEREAL_REGISTER_ARCHIVE(thallium::cereal_output_archive)
CEREAL_REGISTER_ARCHIVE(thallium::cereal_input_archive)
// tie input and output archives together
CEREAL_SETUP_ARCHIVE_TRAITS(thallium::cereal_input_archive, thallium::cereal_output_archive)
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment