...
 
Commits (23)
# master
* Added copy() function in serialization archives. This function
acts like a read() for input archive, and like a write() for
output archives. This allows writing a single serialize() function
instead of load()/store() functions in many situations.
* Added operator() to input and output archives to enable a syntax
similar to that of the cereal library.
......@@ -48,8 +48,15 @@ if (NOT CMAKE_BUILD_TYPE)
endif ()
set (CMAKE_PREFIX_PATH "" CACHE STRING "External dependencies path")
set (BUILD_SHARED_LIBS "OFF" CACHE BOOL "Build a shared library")
set (ENABLE_CEREAL "OFF" CACHE BOOL "Enable cereal serialization")
include_directories (${CMAKE_BINARY_DIR}/include)
# packages we depend on
if (ENABLE_CEREAL)
find_package (cereal CONFIG REQUIRED)
endif (ENABLE_CEREAL)
include (xpkg-import)
find_package (mercury CONFIG REQUIRED)
xpkg_import_module (margo REQUIRED margo)
......@@ -62,4 +69,4 @@ if (ENABLE_EXAMPLES)
add_subdirectory (examples)
endif (ENABLE_EXAMPLES)
configure_file (include/thallium/config.hpp.in ${CMAKE_BINARY_DIR}/include/thallium/config.hpp)
......@@ -17,7 +17,7 @@ int main(int argc, char** argv) {
myEngine.define("sum", sum);
myEngine.on_finalize([]() { std::cout << "Finalization was called" << std::endl; });
myEngine.push_finalize_callback([]() { std::cout << "Finalization was called" << std::endl; });
return 0;
}
......
......@@ -2,6 +2,7 @@
#define __THALLIUM_HPP
#include <margo.h>
#include <thallium/config.hpp>
#include <thallium/abt.hpp>
#include <thallium/anonymous.hpp>
#include <thallium/bulk_mode.hpp>
......
......@@ -9,9 +9,6 @@
#include <thallium/margo_exception.hpp>
#include <thallium/buffer.hpp>
#include <thallium/packed_response.hpp>
#include <thallium/serialization/serialize.hpp>
#include <thallium/serialization/stl/vector.hpp>
#include <thallium/serialization/buffer_output_archive.hpp>
namespace thallium {
......
......@@ -12,6 +12,7 @@
#include <margo.h>
#include <thallium/endpoint.hpp>
#include <thallium/margo_exception.hpp>
#include <thallium/serialization/stl/vector.hpp>
namespace thallium {
......@@ -318,14 +319,27 @@ public:
*/
std::size_t operator<<(const remote_bulk& b) const;
/**
* @brief Returns the underlying hg_bulk_t handle.
* If copy == false, the returned handle is a reference to the internal
* handle managed by this bulk object, it should not be destroyed by the
* user and its lifetime will not exceed that of the bulk object.
* If copy == true, the returned handle should be destroyed by the user.
*
* @param copy whether to make a copy or not.
*
* @return The underlying hg_bulk_t handle.
*/
hg_bulk_t get_bulk(bool copy=false) const;
/**
* @brief Function that serializes a bulk object into an archive.
*
* @tparam A Archive type.
* @param ar Input archive.
* @param ar Output archive.
*/
template<typename A>
void save(A& ar) {
void save(A& ar) const {
if(m_bulk == HG_BULK_NULL) {
std::vector<char> buf;
ar & buf;
......@@ -343,7 +357,7 @@ public:
* @brief Deserializes a bulk object from an output archive.
*
* @tparam A Archive type.
* @param ar Output archive.
* @param ar Input archive.
*/
template<typename A>
void load(A& ar);
......
......@@ -11,14 +11,15 @@
#include <utility>
#include <chrono>
#include <margo.h>
#include <thallium/config.hpp>
#include <thallium/buffer.hpp>
#include <thallium/timeout.hpp>
#include <thallium/packed_response.hpp>
#include <thallium/async_response.hpp>
#include <thallium/margo_exception.hpp>
#include <thallium/serialization/serialize.hpp>
#include <thallium/serialization/stl/vector.hpp>
#include <thallium/serialization/buffer_output_archive.hpp>
#include <thallium/margo_exception.hpp>
#include <thallium/serialization/stl/vector.hpp>
namespace thallium {
......@@ -199,7 +200,7 @@ public:
packed_response operator()(T&& ... args) const {
buffer b;
buffer_output_archive arch(b, *m_engine);
serialize_many(arch, std::forward<T>(args)...);
arch(std::forward<T>(args)...);
return forward(b);
}
......@@ -221,7 +222,7 @@ public:
packed_response timed(const std::chrono::duration<R,P>& t, T&& ... args) const {
buffer b;
buffer_output_archive arch(b, *m_engine);
serialize_many(arch, std::forward<T>(args)...);
arch(std::forward<T>(args)...);
std::chrono::duration<double, std::milli> fp_ms = t;
double timeout_ms = fp_ms.count();
return forward(b, timeout_ms);
......@@ -267,7 +268,7 @@ public:
async_response async(T&& ... t) {
buffer b;
buffer_output_archive arch(b, *m_engine);
serialize_many(arch, std::forward<T>(t)...);
arch(std::forward<T>(t)...);
return iforward(b);
}
......@@ -288,7 +289,7 @@ public:
async_response timed_async(const std::chrono::duration<R,P>& t, T&& ... args) {
buffer b;
buffer_output_archive arch(b, *m_engine);
serialize_many(arch, std::forward<T>(args)...);
arch(std::forward<T>(t)...);
std::chrono::duration<double, std::milli> fp_ms = t;
double timeout_ms = fp_ms.count();
return iforward(b, timeout_ms);
......
#undef THALLIUM_USE_CEREAL
#undef ENABLE_CEREAL
#cmakedefine ENABLE_CEREAL
#ifdef ENABLE_CEREAL
#define THALLIUM_USE_CEREAL
#undef ENABLE_CEREAL
#endif
......@@ -84,7 +84,7 @@ public:
/**
* @brief Destructor.
*/
~endpoint();
virtual ~endpoint();
/**
* @brief Creates a string representation of the endpoint's address.
......
This diff is collapsed.
......@@ -217,7 +217,7 @@ class eventual<void> {
* the right operand.
*/
eventual& operator=(eventual&& other) {
if(this == other.m_eventual) return *this;
if(m_eventual == other.m_eventual) return *this;
if(m_eventual != ABT_EVENTUAL_NULL) {
TL_EVENTUAL_ASSERT(ABT_eventual_free(&m_eventual));
}
......
......@@ -12,17 +12,14 @@
#include <string>
#include <sstream>
#include <margo.h>
#include <thallium/exception.hpp>
namespace thallium {
/**
* @brief Exception class used when margo functions fail.
*/
class margo_exception : public std::exception {
private:
std::ostringstream m_content;
class margo_exception : public exception {
public:
......@@ -35,19 +32,9 @@ public:
* @param message Additional message.
*/
margo_exception(const std::string& function, const std::string file, unsigned line,
const std::string& message = std::string()) {
m_content << "[" << file << ":" << line << "][" << function << "] "
<< message;
}
const std::string& message = std::string())
: exception("[", file, ":", line, "][", function, "] ", message) {}
/**
* @brief Return the error message.
*
* @return error message associated with the exception.
*/
virtual const char* what() const throw() {
return m_content.str().c_str();
}
};
std::string translate_margo_error_code(hg_return_t ret);
......
......@@ -53,7 +53,7 @@ public:
T as() const {
T t;
buffer_input_archive iarch(m_buffer, *m_engine);
iarch & t;
iarch(t);
return t;
}
......@@ -77,8 +77,8 @@ public:
std::tuple<typename std::decay<T1>::type,
typename std::decay<T2>::type,
typename std::decay_t<Tn>::type...> t;
buffer_input_archive iarch(m_buffer);
iarch & t;
buffer_input_archive iarch(m_buffer, *m_engine);
arch(t);
return t;
}
......
......@@ -91,6 +91,8 @@ private:
const std::integral_constant<bool, false>& r_is_void,
const std::integral_constant<bool, true>& first_arg_is_request,
const pool& p) {
(void)r_is_void;
(void)first_arg_is_request;
T* self = static_cast<T*>(this);
std::function<void(const request&, Args...)> fun = [self, func](const request& req, Args... args) {
(self->*func)(req, args...);
......@@ -108,6 +110,8 @@ private:
const std::integral_constant<bool, false>& r_is_void,
const std::integral_constant<bool, false>& first_arg_is_request,
const pool& p) {
(void)r_is_void;
(void)first_arg_is_request;
T* self = static_cast<T*>(this);
std::function<void(const request&, Args...)> fun = [self, func](const request& req, Args... args) {
R r = (self->*func)(args...);
......@@ -126,6 +130,8 @@ private:
const std::integral_constant<bool, true>& r_is_void,
const std::integral_constant<bool, true>& first_arg_is_request,
const pool& p = pool()) {
(void)r_is_void;
(void)first_arg_is_request;
T* self = static_cast<T*>(this);
std::function<void(const request&, Args...)> fun = [self, func](const request& req, Args... args) {
(self->*func)(req, args...);
......@@ -142,6 +148,8 @@ private:
const std::integral_constant<bool, true>& r_is_void,
const std::integral_constant<bool, false>& first_arg_is_request,
const pool& p = pool()) {
(void)r_is_void;
(void)first_arg_is_request;
T* self = static_cast<T*>(this);
std::function<void(const request&, Args...)> fun = [self, func](const request& req, Args... args) {
(self->*func)(args...);
......@@ -163,6 +171,8 @@ private:
const std::integral_constant<bool, false>& r_is_void,
const std::integral_constant<bool, true>& first_arg_is_request,
const pool& p) {
(void)r_is_void;
(void)first_arg_is_request;
T* self = static_cast<T*>(this);
std::function<void(const request&, Args...)> fun = [self, func](const request& req, Args... args) {
(self->*func)(req, args...);
......@@ -180,6 +190,8 @@ private:
const std::integral_constant<bool, false>& r_is_void,
const std::integral_constant<bool, false>& first_arg_is_request,
const pool& p) {
(void)r_is_void;
(void)first_arg_is_request;
T* self = static_cast<T*>(this);
std::function<void(const request&, Args...)> fun = [self, func](const request& req, Args... args) {
R r = (self->*func)(args...);
......@@ -198,6 +210,8 @@ private:
const std::integral_constant<bool, true>& r_is_void,
const std::integral_constant<bool, true>& first_arg_is_request,
const pool& p = pool()) {
(void)r_is_void;
(void)first_arg_is_request;
T* self = static_cast<T*>(this);
std::function<void(const request&, Args...)> fun = [self, func](const request& req, Args... args) {
(self->*func)(req, args...);
......@@ -214,6 +228,8 @@ private:
const std::integral_constant<bool, true>& r_is_void,
const std::integral_constant<bool, false>& first_arg_is_request,
const pool& p = pool()) {
(void)r_is_void;
(void)first_arg_is_request;
T* self = static_cast<T*>(this);
std::function<void(const request&, Args...)> fun = [self, func](const request& req, Args... args) {
(self->*func)(args...);
......
......@@ -95,6 +95,11 @@ public:
*/
remote_procedure& disable_response();
/**
* @brief Deregisters this RPC from the engine.
*/
void deregister();
[[deprecated("use disable_response() instead")]]
inline remote_procedure& ignore_response() {
return disable_response();
......
......@@ -114,7 +114,7 @@ public:
if(m_handle != HG_HANDLE_NULL) {
buffer b;
buffer_output_archive arch(b, *m_engine);
serialize_many(arch, std::forward<T>(t)...);
arch(std::forward<T>(t)...);
hg_return_t ret = margo_respond(m_handle, &b);
MARGO_ASSERT(ret, margo_respond);
}
......
......@@ -6,6 +6,21 @@
#ifndef __THALLIUM_BUFFER_INPUT_ARCHIVE_HPP
#define __THALLIUM_BUFFER_INPUT_ARCHIVE_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <thallium/serialization/cereal/archives.hpp>
namespace thallium {
using buffer_input_archive = cereal_input_archive;
}
#else
#include <type_traits>
#include <stdexcept>
#include <cstring>
......@@ -60,17 +75,35 @@ public:
/**
* Operator to get C++ objects of type T from the archive.
* The object should either be a basic type, or an STL container
* (in which case the appropriate hgcxx/hg_stl/stl_* header should
* be included for this function to be properly instanciated), or
* any object for which either a serialize member function or
* (in which case the appropriate thallium/serialization/stl/ header
* should be included for this function to be properly instanciated),
* or any object for which either a serialize member function or
* a load member function has been provided.
*/
template<typename T>
buffer_input_archive& operator&(T&& obj) {
inline buffer_input_archive& operator&(T&& obj) {
read_impl(std::forward<T>(obj), std::is_arithmetic<typename std::decay<T>::type>());
return *this;
}
/**
* @brief Parenthesis operator with one argument, equivalent to & operator.
*/
template<typename T>
inline buffer_input_archive& operator()(T&& obj) {
return (*this) & std::forward<T>(obj);
}
/**
* @brief Parenthesis operator with multiple arguments.
* ar(x,y,z) is equivalent to ar & x & y & z.
*/
template<typename T, typename ... Targs>
inline buffer_input_archive& operator()(T&& obj, Targs&&... others) {
(*this) & std::forward<T>(obj);
return (*this)(std::forward<Targs>(others)...);
}
/**
* Operator >> is equivalent to operator &.
* \see operator&
......@@ -95,6 +128,19 @@ public:
m_pos += count*sizeof(T);
}
/**
* @brief Equivalent to read().
*/
template<typename T>
inline void copy(T* t, std::size_t count=1) {
read(t, count);
}
/**
* @brief Returns the engine registered in the archive.
*
* @return The engine registered in the archive.
*/
engine& get_engine() const {
return *m_engine;
}
......@@ -102,3 +148,4 @@ public:
}
#endif
#endif
......@@ -6,6 +6,20 @@
#ifndef __THALLIUM_BUFFER_OUTPUT_ARCHIVE_HPP
#define __THALLIUM_BUFFER_OUTPUT_ARCHIVE_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <thallium/serialization/cereal/archives.hpp>
namespace thallium {
using buffer_output_archive = cereal_output_archive;
}
#else
#include <type_traits>
#include <thallium/serialization/serialize.hpp>
#include <thallium/buffer.hpp>
......@@ -68,11 +82,29 @@ public:
* a load member function has been provided.
*/
template<typename T>
buffer_output_archive& operator&(T&& obj) {
inline buffer_output_archive& operator&(T&& obj) {
write_impl(std::forward<T>(obj), std::is_arithmetic<typename std::decay<T>::type>());
return *this;
}
/**
* @brief Parenthesis operator with one argument, equivalent to & operator.
*/
template<typename T>
inline buffer_output_archive& operator()(T&& obj) {
return (*this) & std::forward<T>(obj);
}
/**
* @brief Parenthesis operator with multiple arguments.
* ar(x,y,z) is equivalent to ar & x & y & z.
*/
template<typename T, typename ... Targs>
inline buffer_output_archive& operator()(T&& obj, Targs&&... others) {
(*this) & std::forward<T>(obj);
return (*this)(std::forward<Targs>(others)...);
}
/**
* Operator << is equivalent to operator &.
* \see operator&
......@@ -100,8 +132,17 @@ public:
memcpy((void*)(m_buffer.data() + m_pos),(void*)t,s);
m_pos += s;
}
/**
* @brief Equivalent to write().
*/
template<typename T>
inline void copy(T* const t, size_t count=1) {
write(t, count);
}
};
}
#endif
#endif
/*
* Copyright (c) 2019 UChicago Argonne, LLC
*
* See COPYRIGHT in top-level directory.
*/
#ifndef THALLIUM_CEREAL_ARCHIVES_BINARY_HPP
#define THALLIUM_CEREAL_ARCHIVES_BINARY_HPP
#include <cstring>
#include <thallium/buffer.hpp>
#include <cereal/cereal.hpp>
namespace thallium {
class engine;
class cereal_output_archive : public cereal::OutputArchive<cereal_output_archive, cereal::AllowEmptyClassElision>
{
public:
cereal_output_archive(buffer& b, engine& e)
: cereal::OutputArchive<cereal_output_archive, cereal::AllowEmptyClassElision>(this)
, m_buffer(b)
, m_pos(0)
, m_engine(&e)
{
m_buffer.resize(0);
}
cereal_output_archive(buffer& b)
: cereal::OutputArchive<cereal_output_archive, cereal::AllowEmptyClassElision>(this)
, m_buffer(b)
, m_pos(0)
, m_engine(nullptr)
{
m_buffer.resize(0);
}
~cereal_output_archive() = default;
inline void write(const void* data, size_t size) {
if(m_pos+size > m_buffer.size()) {
if(m_pos+size > m_buffer.capacity()) {
m_buffer.reserve(m_buffer.capacity()*2);
}
m_buffer.resize(m_pos+size);
}
memcpy((void*)(m_buffer.data() + m_pos),(void*)data, size);
m_pos += size;
}
engine& get_engine() const {
return *m_engine;
}
private:
buffer& m_buffer;
std::size_t m_pos;
engine* m_engine;
};
class cereal_input_archive : public cereal::InputArchive<cereal_input_archive, cereal::AllowEmptyClassElision>
{
public:
cereal_input_archive(const buffer& b, engine& e)
: cereal::InputArchive<cereal_input_archive, cereal::AllowEmptyClassElision>(this)
, m_buffer(b)
, m_pos(0)
, m_engine(&e)
{}
cereal_input_archive(buffer& b)
: cereal::InputArchive<cereal_input_archive, cereal::AllowEmptyClassElision>(this)
, m_buffer(b)
, m_pos(0)
, m_engine(nullptr)
{}
~cereal_input_archive() = default;
inline void read(void* data, std::size_t size) {
if(m_pos + size > m_buffer.size()) {
throw std::runtime_error("Reading beyond buffer size");
}
std::memcpy((void*)data,(const void*)(m_buffer.data() + m_pos), size);
m_pos += size;
}
engine& get_engine() const {
return *m_engine;
}
private:
const buffer& m_buffer;
std::size_t m_pos;
engine* m_engine;
};
template<class T> inline
typename std::enable_if<std::is_arithmetic<T>::value, void>::type
CEREAL_SAVE_FUNCTION_NAME(cereal_output_archive & ar, T const & t)
{
ar.write(std::addressof(t), sizeof(t));
}
template<class T> inline
typename std::enable_if<std::is_arithmetic<T>::value, void>::type
CEREAL_LOAD_FUNCTION_NAME(cereal_input_archive & ar, T & t)
{
ar.read(std::addressof(t), sizeof(t));
}
template <class Archive, class T> inline
CEREAL_ARCHIVE_RESTRICT(cereal_input_archive, cereal_output_archive)
CEREAL_SERIALIZE_FUNCTION_NAME(Archive& ar, cereal::NameValuePair<T>& t)
{
ar(t.value);
}
template <class Archive, class T> inline
CEREAL_ARCHIVE_RESTRICT(cereal_input_archive, cereal_output_archive)
CEREAL_SERIALIZE_FUNCTION_NAME(Archive& ar, cereal::SizeTag<T>& t)
{
ar(t.size);
}
template <class T> inline
void CEREAL_SAVE_FUNCTION_NAME(cereal_output_archive& ar, cereal::BinaryData<T> const & bd)
{
ar.write(bd.data, static_cast<std::size_t>(bd.size));
}
template <class T> inline
void CEREAL_LOAD_FUNCTION_NAME(cereal_input_archive & ar, cereal::BinaryData<T> & bd)
{
ar.read(bd.data, static_cast<std::size_t>(bd.size));
}
}
// register archives for polymorphic support
CEREAL_REGISTER_ARCHIVE(thallium::cereal_output_archive)
CEREAL_REGISTER_ARCHIVE(thallium::cereal_input_archive)
// tie input and output archives together
CEREAL_SETUP_ARCHIVE_TRAITS(thallium::cereal_input_archive, thallium::cereal_output_archive)
#endif
......@@ -6,6 +6,10 @@
#ifndef SERIALIZE_H
#define SERIALIZE_H
#include <thallium/config.hpp>
#ifndef THALLIUM_USE_CEREAL
#include <utility>
#include <type_traits>
......@@ -205,3 +209,4 @@ inline void serialize_many(A& ar, T&& t) {
}
#endif
#endif
......@@ -6,6 +6,12 @@
#ifndef __THALLIUM_ARRAY_SERIALIZATION_HPP
#define __THALLIUM_ARRAY_SERIALIZATION_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <cereal/types/array.hpp>
#else
#include <type_traits>
#include <array>
......@@ -52,3 +58,4 @@ inline void load(A& ar, std::array<T,N>& v) {
} // namespace thallium
#endif
#endif
......@@ -6,6 +6,12 @@
#ifndef __THALLIUM_COMPLEX_SERIALIZATION_HPP
#define __THALLIUM_COMPLEX_SERIALIZATION_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <cereal/types/complex.hpp>
#else
#include <complex>
namespace thallium {
......@@ -25,3 +31,4 @@ inline void load(A& ar, std::complex<T>& t) {
}
#endif
#endif
......@@ -6,6 +6,12 @@
#ifndef __THALLIUM_DEQUE_SERIALIZATION_HPP
#define __THALLIUM_DEQUE_SERIALIZATION_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <cereal/types/deque.hpp>
#else
#include <deque>
namespace thallium {
......@@ -33,3 +39,4 @@ inline void load(A& ar, std::deque<T,Alloc>& l) {
}
#endif
#endif
......@@ -6,6 +6,12 @@
#ifndef __THALLIUM_FORWARD_LIST_SERIALIZATION_HPP
#define __THALLIUM_FORWARD_LIST_SERIALIZATION_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <cereal/types/forward_list.hpp>
#else
#include <forward_list>
namespace thallium {
......@@ -33,3 +39,4 @@ inline void load(A& ar, std::forward_list<T,Alloc>& l) {
}
#endif
#endif
......@@ -6,6 +6,12 @@
#ifndef __THALLIUM_LIST_SERIALIZATION_HPP
#define __THALLIUM_LIST_SERIALIZATION_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <cereal/types/list.hpp>
#else
#include <list>
namespace thallium {
......@@ -33,3 +39,4 @@ inline void load(A& ar, std::list<T,Alloc>& l) {
}
#endif
#endif
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __THALLIUM_MAP_SERIALIZATION_HPP
#define __THALLIUM_MAP_SERIALIZATION_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <cereal/types/map.hpp>
#else
#include <map>
namespace thallium {
......@@ -30,3 +41,4 @@ inline void load(A& ar, std::map<K,V,Compare,Alloc>& m) {
}
#endif
#endif
......@@ -6,6 +6,12 @@
#ifndef __THALLIUM_MULTIMAP_SERIALIZATION_HPP
#define __THALLIUM_MULTIMAP_SERIALIZATION_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <cereal/types/map.hpp>
#else
#include <map>
namespace thallium {
......@@ -35,3 +41,4 @@ inline void load(A& ar, std::multimap<K,V,Compare,Alloc>& m) {
}
#endif
#endif
......@@ -6,6 +6,12 @@
#ifndef __THALLIUM_MULTISET_SERIALIZATION_HPP
#define __THALLIUM_MULTISET_SERIALIZATION_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <cereal/types/set.hpp>
#else
#include <utility>
#include <set>
......@@ -35,3 +41,4 @@ inline void load(A& ar, std::multiset<T,Compare,Alloc>& s) {
} // namespace
#endif
#endif
......@@ -6,6 +6,12 @@
#ifndef __THALLIUM_PAIR_SERIALIZE_HPP
#define __THALLIUM_PAIR_SERIALIZE_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <cereal/types/utility.hpp>
#else
#include <utility>
namespace thallium {
......@@ -19,3 +25,4 @@ inline void serialize(A& a, std::pair<T1,T2>& p) {
}
#endif
#endif
......@@ -6,6 +6,12 @@
#ifndef __THALLIUM_SET_SERIALIZATION_HPP
#define __THALLIUM_SET_SERIALIZATION_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <cereal/types/set.hpp>
#else
#include <utility>
#include <set>
......@@ -35,3 +41,4 @@ inline void load(A& ar, std::set<T,Compare,Alloc>& s) {
} // namespace
#endif
#endif
#ifndef __THALLIUM_STRING_SERIALIZATION_HPP
#define __THALLIUM_STRING_SERIALIZATION_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <cereal/types/string.hpp>
#else
#include <string>
#include <iostream>
......@@ -23,5 +29,5 @@ inline void load(A& ar, std::string& s) {
}
}
#endif
#endif
......@@ -6,6 +6,12 @@
#ifndef __THALLIUM_TUPLE_SERIALIZATION_HPP
#define __THALLIUM_TUPLE_SERIALIZATION_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <cereal/types/tuple.hpp>
#else
#include <tuple>
namespace thallium {
......@@ -26,7 +32,10 @@ template<>
struct tuple_serializer<0> {
template<class A, class... Types>
static inline void apply(A& ar, std::tuple<Types...>& t) {}
static inline void apply(A& ar, std::tuple<Types...>& t) {
(void)ar;
(void)t;
}
};
}
......@@ -44,3 +53,4 @@ inline void load(A& ar, std::tuple<Types...>& t) {
}
#endif
#endif
......@@ -6,6 +6,12 @@
#ifndef __THALLIUM_UNORDERED_MAP_SERIALIZATION_HPP
#define __THALLIUM_UNORDERED_MAP_SERIALIZATION_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <cereal/types/unordered_map.hpp>
#else
#include <unordered_map>
namespace thallium {
......@@ -36,3 +42,4 @@ inline void load(A& ar, std::unordered_map<K,V,Hash,Pred,Alloc>& m) {
}
#endif
#endif
......@@ -6,6 +6,12 @@
#ifndef __THALLIUM_UNORDERED_MULTIMAP_SERIALIZATION_HPP
#define __THALLIUM_UNORDERED_MULTIMAP_SERIALIZATION_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <cereal/types/unordered_map.hpp>
#else
#include <unordered_map>
namespace thallium {
......@@ -36,3 +42,4 @@ inline void load(A& ar, std::unordered_multimap<K,V,Hash,Pred,Alloc>& m) {
}
#endif
#endif
......@@ -6,6 +6,12 @@
#ifndef __THALLIUM_UNORDERED_MULTISET_SERIALIZATION_HPP
#define __THALLIUM_UNORDERED_MULTISET_SERIALIZATION_HPP
#include <thallium/config.hpp>
#ifdef USE_CEREAL
#include <cereal/types/unordered_multiset.hpp>
#else
#include <utility>
#include <unordered_set>
......@@ -36,3 +42,4 @@ inline void load(A& ar, std::unordered_multiset<V,Hash,Pred,Alloc>& s) {
}
#endif
#endif
......@@ -6,6 +6,12 @@
#ifndef __THALLIUM_UNORDERED_SET_SERIALIZATION_HPP
#define __THALLIUM_UNORDERED_SET_SERIALIZATION_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <cereal/types/unordered_set.hpp>
#else
#include <utility>
#include <unordered_set>
......@@ -36,3 +42,4 @@ inline void load(A& ar, std::unordered_set<V,Hash,Pred,Alloc>& s) {
}
#endif
#endif
......@@ -6,6 +6,12 @@
#ifndef __THALLIUM_VECTOR_SERIALIZATION_HPP
#define __THALLIUM_VECTOR_SERIALIZATION_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <cereal/types/vector.hpp>
#else
#include <type_traits>
#include <vector>
......@@ -64,3 +70,4 @@ inline void load(A& ar, std::vector<T,Alloc>& v) {
} // namespace thallium
#endif
#endif
......@@ -16,8 +16,8 @@ template<size_t N>
struct apply_f_to_t_impl {
template<typename R, typename... ArgsF, typename... ArgsT, typename... Args>
static R apply(const std::function<R(ArgsF...)>& f, std::tuple<ArgsT...> const& t, Args... args) {
return apply_f_to_t_impl<N-1>::apply(f, t, std::get<N-1>(t), args...);
static R apply(const std::function<R(ArgsF...)>& f, std::tuple<ArgsT...>& t, Args&&... args) {
return apply_f_to_t_impl<N-1>::apply(f, t, std::get<N-1>(t), std::forward<Args>(args)...);
}
};
......@@ -25,8 +25,9 @@ template<>
struct apply_f_to_t_impl<0> {
template<typename R, typename... ArgsF, typename... ArgsT, typename... Args>
static R apply(const std::function<R(ArgsF...)>& f, std::tuple<ArgsT...> const& t, Args... args) {
return f(args...);
static R apply(const std::function<R(ArgsF...)>& f, std::tuple<ArgsT...>& t, Args&&... args) {
(void)t;
return f(std::forward<Args>(args)...);
}
};
......@@ -40,7 +41,7 @@ struct apply_f_to_t_impl<0> {
* \return the value returned by f.
*/
template<typename R, typename... ArgsF, typename... ArgsT>
R apply_function_to_tuple(const std::function<R(ArgsF...)>& f, std::tuple<ArgsT...> const& t) {
R apply_function_to_tuple(const std::function<R(ArgsF...)>& f, std::tuple<ArgsT...>& t) {
return detail::apply_f_to_t_impl<sizeof...(ArgsT)>::apply(f,t);
}
......@@ -52,7 +53,7 @@ R apply_function_to_tuple(const std::function<R(ArgsF...)>& f, std::tuple<ArgsT.
* \return the value returned by f.
*/
template<typename R, typename... ArgsF, typename... ArgsT>
R apply_function_to_tuple(R (*f)(ArgsF...), std::tuple<ArgsT...> const& t) {
R apply_function_to_tuple(R (*f)(ArgsF...), std::tuple<ArgsT...>& t) {
std::function<R(ArgsF...)> fun(f);
return apply_function_to_tuple(fun,t);
}
......
......@@ -30,8 +30,8 @@ set (thallium-pkg "share/cmake/thallium")
# library version set here (e.g. for shared libs).
#
set (THALLIUM_VERSION_MAJOR 0)
set (THALLIUM_VERSION_MINOR 3)
set (THALLIUM_VERSION_PATCH 3)
set (THALLIUM_VERSION_MINOR 5)
set (THALLIUM_VERSION_PATCH 0)
set (thallium-vers "${THALLIUM_VERSION_MAJOR}.${THALLIUM_VERSION_MINOR}")
set (THALLIUM_VERSION "${thallium-vers}.${THALLIUM_VERSION_PATCH}")
......@@ -84,3 +84,5 @@ install (DIRECTORY ../include/thallium
FILES_MATCHING PATTERN "*.hpp")
install (FILES ../include/thallium.hpp
DESTINATION include)
install (FILES ${CMAKE_CURRENT_BINARY_DIR}/../include/thallium/config.hpp
DESTINATION include/thallium)
......@@ -8,6 +8,12 @@
namespace thallium {
hg_bulk_t bulk::get_bulk(bool copy) const {
if(copy && m_bulk != HG_BULK_NULL)
margo_bulk_ref_incr(m_bulk);
return m_bulk;
}
bulk::bulk_segment bulk::select(std::size_t offset, std::size_t size) const {
return bulk_segment(*this, offset, size);
}
......
......@@ -53,6 +53,12 @@ bulk engine::expose(const std::vector<std::pair<void*,size_t>>& segments, bulk_m
return bulk(*this, handle, true);
}
bulk engine::wrap(hg_bulk_t blk, bool is_local) {
hg_return_t hret = margo_bulk_ref_incr(blk);
MARGO_ASSERT(hret, margo_bulk_ref_incr);
return bulk(*this, blk, is_local);
}
void engine::shutdown_remote_engine(const endpoint& ep) const {
int ret = margo_shutdown_remote_instance(m_mid, ep.m_addr);
hg_return_t r = ret == 0 ? HG_SUCCESS : HG_OTHER_ERROR;
......@@ -63,5 +69,30 @@ void engine::enable_remote_shutdown() {
margo_enable_remote_shutdown(m_mid);
}
remote_procedure engine::define(const std::string& name,
const std::function<void(const request&)>& fun,
uint16_t provider_id, const pool& p) {
hg_id_t id = margo_provider_register_name(m_mid, name.c_str(),
process_buffer,
process_buffer,
rpc_callback<rpc_t, false>,
provider_id,
p.native_handle());
m_rpcs[id] = [fun](const request& r, const buffer& b) {
fun(r);
};
rpc_callback_data* cb_data = new rpc_callback_data;
cb_data->m_engine = this;
cb_data->m_function = void_cast(&m_rpcs[id]);
hg_return_t ret = margo_register_data(m_mid, id, (void*)cb_data, free_rpc_callback_data);
MARGO_ASSERT(ret, margo_register_data);
return remote_procedure(*this, id);
}
}
......@@ -21,6 +21,10 @@ callable_remote_procedure remote_procedure::on(const provider_handle& ph) const
return callable_remote_procedure(*m_engine, m_id, ph, m_ignore_response, ph.provider_id());
}
void remote_procedure::deregister() {
margo_deregister(m_engine->m_mid, m_id);
}
remote_procedure& remote_procedure::disable_response() {
m_ignore_response = true;
margo_registered_disable_response(m_engine->m_mid, m_id, HG_TRUE);
......
......@@ -4,20 +4,24 @@
#include <unistd.h>
#include <iostream>
#include <thallium.hpp>
#include <thallium/serialization/stl/string.hpp>
#ifdef USE_CEREAL
#include <cereal/types/string.hpp>
#else
#include <thallium/serialization/stl/string.hpp>
#endif
namespace tl = thallium;
int client() {
tl::engine margo("bmi+tcp", MARGO_CLIENT_MODE);
auto remote_send = margo.define("send_bulk").disable_response();
tl::engine margo("tcp", MARGO_CLIENT_MODE);
auto remote_send = margo.define("send_bulk").disable_response();
auto remote_stop = margo.define("stop").disable_response();
std::string server_addr = "bmi+tcp://127.0.0.1:1234";
sleep(1);
std::string server_addr = "tcp://127.0.0.1:1234";
sleep(1);
auto server_endpoint = margo.lookup(server_addr);
std::cout << "Lookup done for endpoint " << server_endpoint << std::endl;
auto server_endpoint = margo.lookup(server_addr);
std::cout << "Lookup done for endpoint " << server_endpoint << std::endl;
std::string buf = "Matthieu";
std::vector<std::pair<void*,std::size_t>> seg(1);
......@@ -27,13 +31,14 @@ int client() {
tl::bulk b = margo.expose(seg, tl::bulk_mode::read_only);
remote_send.on(server_endpoint)(b);
sleep(1);
remote_stop.on(server_endpoint)();
return 0;
return 0;
}
int main(int argc, char** argv) {
client();
return 0;
client();
return 0;
}
......@@ -4,40 +4,44 @@
#include <unistd.h>
#include <iostream>
#include <thallium.hpp>
#ifdef USE_CEREAL
#include <cereal/types/string.hpp>
#else
#include <thallium/serialization/stl/string.hpp>
#endif
namespace tl = thallium;
int server() {
tl::engine margo("bmi+tcp://127.0.0.1:1234", MARGO_SERVER_MODE);
tl::engine engine("tcp://127.0.0.1:1234", MARGO_SERVER_MODE);
std::function<void(const tl::request&, tl::bulk& b)> f =
[&margo](const tl::request& req, tl::bulk& b) {
[&engine](const tl::request& req, tl::bulk& b) {
auto ep = req.get_endpoint();
std::vector<char> v(6,'*');
std::vector<std::pair<void*,std::size_t>> seg(1);
seg[0].first = (void*)(&v[0]);
seg[0].second = v.size();
tl::bulk local = margo.expose(seg, tl::bulk_mode::write_only);
tl::bulk local = engine.expose(seg, tl::bulk_mode::write_only);
b.on(ep) >> local;
std::cout << "Server received bulk: ";
for(auto c : v) std::cout << c;
std::cout << std::endl;
};
margo.define("send_bulk",f).disable_response();
engine.define("send_bulk",f).disable_response();
std::function<void(const tl::request&)> g =
[&margo](const tl::request& req) {
[&engine](const tl::request& req) {
std::cout << "Stopping server" << std::endl;
margo.finalize();
engine.finalize();
};
margo.define("stop", g);
engine.define("stop", g);
std::string addr = margo.self();
std::cout << "Server running at address " << addr << std::endl;
std::string addr = engine.self();
std::cout << "Server running at address " << addr << std::endl;
return 0;
return 0;
}
int main(int argc, char** argv) {
......
......@@ -4,7 +4,11 @@
#include <unistd.h>
#include <iostream>
#include <thallium.hpp>
#ifdef USE_CEREAL
#include <cereal/types/string.hpp>
#else
#include <thallium/serialization/stl/string.hpp>
#endif
namespace tl = thallium;
......
......@@ -4,17 +4,21 @@
#include <unistd.h>
#include <iostream>
#include <thallium.hpp>
#include <thallium/serialization/stl/string.hpp>
#ifdef USE_CEREAL
#include <cereal/types/string.hpp>
#else
#include <thallium/serialization/stl/string.hpp>
#endif
namespace tl = thallium;
void hello(const tl::request& req, const std::string& name) {
std::cout << "Hello " << name << std::endl;
std::cout << "Hello " << name << std::endl;
}
int server() {
tl::engine margo("tcp", MARGO_SERVER_MODE);
tl::engine margo("tcp", MARGO_SERVER_MODE);
margo.define("hello", hello).disable_response();
......@@ -33,12 +37,12 @@ int server() {
margo.define("stop", g);
std::string addr = margo.self();
std::cout << "Server running at address " << addr << std::endl;
std::cout << "Server running at address " << addr << std::endl;
return 0;
return 0;
}
int main(int argc, char** argv) {
server();
return 0;
server();
return 0;
}
#include <cassert>
#include <thallium/serialization/serialize.hpp>
#include <thallium/serialization/buffer_input_archive.hpp>
#include <thallium/serialization/buffer_output_archive.hpp>
#include <thallium/serialization/stl/vector.hpp>
#include <thallium/serialization/stl/tuple.hpp>
#include <thallium/config.hpp>
#ifdef USE_CEREAL
#include <thallium/serialization/cereal/archives.hpp>
#else
#include <thallium/serialization/serialize.hpp>
#include <thallium/serialization/buffer_input_archive.hpp>
#include <thallium/serialization/buffer_output_archive.hpp>
#include <thallium/serialization/stl/vector.hpp>
#include <thallium/serialization/stl/tuple.hpp>
#endif
using namespace thallium;
void SerializeValues() {
buffer buf;
double a1 = 1.0;
int b1 = 42;
char c1 = 'c';
double a1 = 1.0;
int b1 = 42;
char c1 = 'c';
{
#ifdef USE_CEREAL
cereal_output_archive arch(buf);
arch(a1,b1,c1);
#else
buffer_output_archive arch(buf);
arch & a1 & b1 & c1;
#endif
}
double a2;
......@@ -24,8 +34,13 @@ void SerializeValues() {
char c2;
{
#ifdef USE_CEREAL
cereal_input_archive arch(buf);
arch(a2,b2,c2);
#else
buffer_input_archive arch(buf);
arch & a2 & b2 & c2;
#endif
}
assert(a1 == a2);
......@@ -35,5 +50,5 @@ void SerializeValues() {
int main(int argc, char** argv) {
SerializeValues();
return 0;
return 0;
}