...
 
Commits (26)
......@@ -48,8 +48,15 @@ if (NOT CMAKE_BUILD_TYPE)
endif ()
set (CMAKE_PREFIX_PATH "" CACHE STRING "External dependencies path")
set (BUILD_SHARED_LIBS "OFF" CACHE BOOL "Build a shared library")
set (ENABLE_CEREAL "OFF" CACHE BOOL "Enable cereal serialization")
include_directories (${CMAKE_BINARY_DIR}/include)
# packages we depend on
if (ENABLE_CEREAL)
find_package (cereal CONFIG REQUIRED)
endif (ENABLE_CEREAL)
include (xpkg-import)
find_package (mercury CONFIG REQUIRED)
xpkg_import_module (margo REQUIRED margo)
......@@ -62,4 +69,4 @@ if (ENABLE_EXAMPLES)
add_subdirectory (examples)
endif (ENABLE_EXAMPLES)
configure_file (include/thallium/config.hpp.in ${CMAKE_BINARY_DIR}/include/thallium/config.hpp)
......@@ -17,7 +17,7 @@ int main(int argc, char** argv) {
myEngine.define("sum", sum);
myEngine.on_finalize([]() { std::cout << "Finalization was called" << std::endl; });
myEngine.push_finalize_callback([]() { std::cout << "Finalization was called" << std::endl; });
return 0;
}
......
......@@ -2,6 +2,7 @@
#define __THALLIUM_HPP
#include <margo.h>
#include <thallium/config.hpp>
#include <thallium/abt.hpp>
#include <thallium/anonymous.hpp>
#include <thallium/bulk_mode.hpp>
......
......@@ -9,9 +9,6 @@
#include <thallium/margo_exception.hpp>
#include <thallium/buffer.hpp>
#include <thallium/packed_response.hpp>
#include <thallium/serialization/serialize.hpp>
#include <thallium/serialization/stl/vector.hpp>
#include <thallium/serialization/buffer_output_archive.hpp>
namespace thallium {
......
......@@ -12,6 +12,7 @@
#include <margo.h>
#include <thallium/endpoint.hpp>
#include <thallium/margo_exception.hpp>
#include <thallium/serialization/stl/vector.hpp>
namespace thallium {
......@@ -318,14 +319,27 @@ public:
*/
std::size_t operator<<(const remote_bulk& b) const;
/**
* @brief Returns the underlying hg_bulk_t handle.
* If copy == false, the returned handle is a reference to the internal
* handle managed by this bulk object, it should not be destroyed by the
* user and its lifetime will not exceed that of the bulk object.
* If copy == true, the returned handle should be destroyed by the user.
*
* @param copy whether to make a copy or not.
*
* @return The underlying hg_bulk_t handle.
*/
hg_bulk_t get_bulk(bool copy=false) const;
/**
* @brief Function that serializes a bulk object into an archive.
*
* @tparam A Archive type.
* @param ar Input archive.
* @param ar Output archive.
*/
template<typename A>
void save(A& ar) {
void save(A& ar) const {
if(m_bulk == HG_BULK_NULL) {
std::vector<char> buf;
ar & buf;
......@@ -343,7 +357,7 @@ public:
* @brief Deserializes a bulk object from an output archive.
*
* @tparam A Archive type.
* @param ar Output archive.
* @param ar Input archive.
*/
template<typename A>
void load(A& ar);
......
......@@ -11,14 +11,15 @@
#include <utility>
#include <chrono>
#include <margo.h>
#include <thallium/config.hpp>
#include <thallium/buffer.hpp>
#include <thallium/timeout.hpp>
#include <thallium/packed_response.hpp>
#include <thallium/async_response.hpp>
#include <thallium/margo_exception.hpp>
#include <thallium/serialization/serialize.hpp>
#include <thallium/serialization/stl/vector.hpp>
#include <thallium/serialization/buffer_output_archive.hpp>
#include <thallium/margo_exception.hpp>
#include <thallium/serialization/stl/vector.hpp>
namespace thallium {
......@@ -199,7 +200,7 @@ public:
packed_response operator()(T&& ... args) const {
buffer b;
buffer_output_archive arch(b, *m_engine);
serialize_many(arch, std::forward<T>(args)...);
arch(std::forward<T>(args)...);
return forward(b);
}
......@@ -221,7 +222,7 @@ public:
packed_response timed(const std::chrono::duration<R,P>& t, T&& ... args) const {
buffer b;
buffer_output_archive arch(b, *m_engine);
serialize_many(arch, std::forward<T>(args)...);
arch(std::forward<T>(args)...);
std::chrono::duration<double, std::milli> fp_ms = t;
double timeout_ms = fp_ms.count();
return forward(b, timeout_ms);
......@@ -267,7 +268,7 @@ public:
async_response async(T&& ... t) {
buffer b;
buffer_output_archive arch(b, *m_engine);
serialize_many(arch, std::forward<T>(t)...);
arch(std::forward<T>(t)...);
return iforward(b);
}
......@@ -288,7 +289,7 @@ public:
async_response timed_async(const std::chrono::duration<R,P>& t, T&& ... args) {
buffer b;
buffer_output_archive arch(b, *m_engine);
serialize_many(arch, std::forward<T>(args)...);
arch(std::forward<T>(t)...);
std::chrono::duration<double, std::milli> fp_ms = t;
double timeout_ms = fp_ms.count();
return iforward(b, timeout_ms);
......
#undef THALLIUM_USE_CEREAL
#undef ENABLE_CEREAL
#cmakedefine ENABLE_CEREAL
#ifdef ENABLE_CEREAL
#define THALLIUM_USE_CEREAL
#undef ENABLE_CEREAL
#endif
......@@ -84,7 +84,7 @@ public:
/**
* @brief Destructor.
*/
~endpoint();
virtual ~endpoint();
/**
* @brief Creates a string representation of the endpoint's address.
......
......@@ -9,11 +9,13 @@
#include <iostream>
#include <string>
#include <functional>
#include <stack>
#include <algorithm>
#include <list>
#include <unordered_map>
#include <vector>
#include <atomic>
#include <margo.h>
#include <thallium/config.hpp>
#include <thallium/pool.hpp>
#include <thallium/tuple_util.hpp>
#include <thallium/function_cast.hpp>
......@@ -60,7 +62,8 @@ private:
std::atomic<bool> m_finalize_called;
hg_context_t* m_hg_context = nullptr;
hg_class_t* m_hg_class = nullptr;
std::stack<std::function<void(void)>> m_finalize_callbacks;
std::list<std::pair<intptr_t, std::function<void(void)>>> m_prefinalize_callbacks;
std::list<std::pair<intptr_t, std::function<void(void)>>> m_finalize_callbacks;
/**
* @brief Encapsulation of some data needed by RPC callbacks
......@@ -108,6 +111,10 @@ private:
ret = margo_free_input(handle, &input);
MARGO_ASSERT(ret, margo_free_input);
margo_destroy(handle); // because of margo_ref_incr in rpc_callback
__margo_internal_decr_pending(mid);
if(__margo_internal_finalize_requested(mid)) {
margo_finalize(mid);
}
}
/**
......@@ -129,6 +136,7 @@ private:
return HG_OTHER_ERROR;
}
pool = margo_hg_handle_get_handler_pool(handle);
__margo_internal_incr_pending(mid);
margo_ref_incr(handle);
ret = ABT_thread_create(pool, (void (*)(void *)) rpc_handler_ult<F,disable_response>,
handle, ABT_THREAD_ATTR_NULL, NULL);
......@@ -143,9 +151,18 @@ private:
engine* e = static_cast<engine*>(arg);
e->m_finalize_called = true;
while(!(e->m_finalize_callbacks.empty())) {
auto& cb = e->m_finalize_callbacks.top();
cb();
e->m_finalize_callbacks.pop();
auto cb = e->m_finalize_callbacks.front();
e->m_finalize_callbacks.pop_front();
cb.second();
}
}
static void on_prefinalize_cb(void* arg) {
engine* e = static_cast<engine*>(arg);
while(!(e->m_prefinalize_callbacks.empty())) {
auto cb = e->m_prefinalize_callbacks.front();
e->m_prefinalize_callbacks.pop_front();
cb.second();
}
}
......@@ -171,6 +188,8 @@ public:
rpc_thread_count);
// XXX throw an exception if m_mid not initialized
m_owns_mid = true;
margo_push_prefinalize_callback(m_mid,
&engine::on_prefinalize_cb, static_cast<void*>(this));
margo_push_finalize_callback(m_mid,
&engine::on_finalize_cb, static_cast<void*>(this));
}
......@@ -191,6 +210,8 @@ public:
m_mid = margo_init_pool(progress_pool.native_handle(),
default_handler_pool.native_handle(), m_hg_context);
// XXX throw an exception if m_mid not initialized
margo_push_prefinalize_callback(m_mid,
&engine::on_prefinalize_cb, static_cast<void*>(this));
margo_push_finalize_callback(m_mid,
&engine::on_finalize_cb, static_cast<void*>(this));
}
......@@ -206,6 +227,8 @@ public:
m_mid = mid;
m_is_server = (mode == THALLIUM_SERVER_MODE);
m_owns_mid = false;
margo_push_prefinalize_callback(m_mid,
&engine::on_prefinalize_cb, static_cast<void*>(this));
margo_push_finalize_callback(m_mid,
&engine::on_finalize_cb, static_cast<void*>(this));
}
......@@ -219,6 +242,8 @@ public:
m_mid = mid;
m_owns_mid = false;
m_is_server = margo_is_listening(mid);
margo_push_prefinalize_callback(m_mid,
&engine::on_prefinalize_cb, static_cast<void*>(this));
margo_push_finalize_callback(m_mid,
&engine::on_finalize_cb, static_cast<void*>(this));
}
......@@ -254,9 +279,13 @@ public:
margo_wait_for_finalize(m_mid);
} else {
if(!m_finalize_called)
margo_finalize(m_mid);
finalize();
}
}
if(m_hg_context)
HG_Context_destroy(m_hg_context);
if(m_hg_class)
HG_Finalize(m_hg_class);
}
/**
......@@ -275,10 +304,6 @@ public:
*/
void finalize() {
margo_finalize(m_mid);
if(m_hg_context)
HG_Context_destroy(m_hg_context);
if(m_hg_class)
HG_Finalize(m_hg_class);
}
/**
......@@ -320,9 +345,13 @@ public:
*
* @return a remote_procedure object.
*/
template<typename ... Args>
template<typename A1, typename ... Args>
remote_procedure define(const std::string& name,
const std::function<void(const request&, A1, Args...)>& fun,
uint16_t provider_id=0, const pool& p = pool());
remote_procedure define(const std::string& name,
const std::function<void(const request&, Args...)>& fun,
const std::function<void(const request&)>& fun,
uint16_t provider_id=0, const pool& p = pool());
/**
......@@ -361,6 +390,93 @@ public:
*/
bulk expose(const std::vector<std::pair<void*,size_t>>& segments, bulk_mode flag);
/**
* @brief Creates a bulk object from an hg_bulk_t handle. The user
* is still responsible for calling margo_bulk_free or HG_Bulk_free
* on the original handle (this function will increment the hg_bulk_t's
* internal reference counter).
*
* @param blk Bulk handle.
* @param is_local Whether the bulk handle refers to memory that is local.
*
* @return a bulk object representing the memory exposed for RDMA.
*/
bulk wrap(hg_bulk_t blk, bool is_local);
/**
* @brief Pushes a pre-finalization callback into the engine. This callback will be
* called when margo_finalize is called (e.g. through engine::finalize()),
* before the Mercury progress loop is terminated.
*
* @tparam F type of callback. Must have a operator() defined.
* @param f callback.
*/
template<typename F>
void push_prefinalize_callback(F&& f) {
m_prefinalize_callbacks.emplace_back(0,std::forward<F>(f));
}
/**
* @brief Same as push_prefinalize_callback(F&& f) but takes an object whose address will
* be used to identify the callback (e.g. a provider).
*
* @tparam T Type of object used to identify the callback.
* @tparam F Callback type.
* @param owner Pointer to the object owning the callback.
* @param f Callback.
*/
template<typename T, typename F>
void push_prefinalize_callback(const T* owner, F&& f) {
m_prefinalize_callbacks.emplace_back(reinterpret_cast<intptr_t>(owner), std::forward<F>(f));
}
/**
* @brief Pops the most recently pushed pre-finalization callback and returns it.
* If no finalization callback are present, this function returns a null std::function.
*
* @return finalization callback.
*/
std::function<void(void)> pop_prefinalize_callback() {
auto it = std::find_if(m_prefinalize_callbacks.rbegin(), m_prefinalize_callbacks.rend(),
[](const auto& p) { return p.first == 0; });
if(it != m_prefinalize_callbacks.rend()) {
auto cb = std::move(it->second);
m_prefinalize_callbacks.erase(std::next(it).base());
return cb;
}
return std::function<void(void)>();
}
/**
* @brief Pops the most recently pushed pre-finalization callback pushed for a given owner.
*
* @tparam T Type of owner.
* @param owner Pointer to the owner.
*
* @return finalization callback.
*/
template<typename T>
std::function<void(void)> pop_prefinalize_callback(const T* owner) {
auto it = std::find_if(m_prefinalize_callbacks.rbegin(), m_prefinalize_callbacks.rend(),
[owner](const auto& p) { return p.first == reinterpret_cast<intptr_t>(owner); });
if(it != m_prefinalize_callbacks.rend()) {
auto cb = std::move(it->second);
m_prefinalize_callbacks.erase(std::next(it).base());
return cb;
}
return std::function<void(void)>();
}
template<typename F>
[[deprecated("Use push_finalize_callback")]] void on_finalize(F&& f) {
m_finalize_callbacks.emplace_back(0,std::forward<F>(f));
}
template<typename T, typename F>
[[deprecated("Use push_finalize_callback")]] void on_finalize(const T& owner, F&& f) {
m_finalize_callbacks.emplace_back(reinterpret_cast<intptr_t>(&owner), std::forward<F>(f));
}
/**
* @brief Pushes a finalization callback into the engine. This callback will be
* called when margo_finalize is called (e.g. through engine::finalize()).
......@@ -369,8 +485,59 @@ public:
* @param f callback.
*/
template<typename F>
void on_finalize(F&& f) {
m_finalize_callbacks.emplace(std::forward<F>(f));
void push_finalize_callback(F&& f) {
m_finalize_callbacks.emplace_back(0,std::forward<F>(f));
}
/**
* @brief Same as push_finalize_callback(F&& f) but takes an object whose address will
* be used to identify the callback (e.g. a provider).
*
* @tparam T Type of object used to identify the callback.
* @tparam F Callback type.
* @param owner Pointer to the object owning the callback.
* @param f Callback.
*/
template<typename T, typename F>
void push_finalize_callback(const T* owner, F&& f) {
m_finalize_callbacks.emplace_back(reinterpret_cast<intptr_t>(owner), std::forward<F>(f));
}
/**
* @brief Pops the most recently pushed finalization callback and returns it.
* If no finalization callback are present, this function returns a null std::function.
*
* @return finalization callback.
*/
std::function<void(void)> pop_finalize_callback() {
auto it = std::find_if(m_finalize_callbacks.rbegin(), m_finalize_callbacks.rend(),
[](const auto& p) { return p.first == 0; });
if(it != m_finalize_callbacks.rend()) {
auto cb = std::move(it->second);
m_finalize_callbacks.erase(std::next(it).base());
return cb;
}
return std::function<void(void)>();
}
/**
* @brief Pops the most recently pushed finalization callback pushed for a given owner.
*
* @tparam T Type of owner.
* @param owner Pointer to the owner.
*
* @return finalization callback.
*/
template<typename T>
std::function<void(void)> pop_finalize_callback(const T* owner) {
auto it = std::find_if(m_finalize_callbacks.rbegin(), m_finalize_callbacks.rend(),
[owner](const auto& p) { return p.first == reinterpret_cast<intptr_t>(owner); });
if(it != m_finalize_callbacks.rend()) {
auto cb = std::move(it->second);
m_finalize_callbacks.erase(std::next(it).base());
return cb;
}
return std::function<void(void)>();
}
/**
......@@ -391,15 +558,15 @@ public:
#include <thallium/remote_procedure.hpp>
#include <thallium/proc_buffer.hpp>
#include <thallium/serialization/stl/tuple.hpp>
#include <thallium/serialization/buffer_input_archive.hpp>
#include <thallium/serialization/buffer_output_archive.hpp>
#include <thallium/serialization/stl/tuple.hpp>
namespace thallium {
template<typename ... Args>
template<typename A1, typename ... Args>
remote_procedure engine::define(const std::string& name,
const std::function<void(const request&, Args...)>& fun,
const std::function<void(const request&, A1, Args...)>& fun,
uint16_t provider_id, const pool& p) {
hg_id_t id = margo_provider_register_name(m_mid, name.c_str(),
......@@ -410,15 +577,13 @@ remote_procedure engine::define(const std::string& name,
p.native_handle());
m_rpcs[id] = [fun,this](const request& r, const buffer& b) {
std::function<void(Args...)> l = [&fun, &r](Args&&... args) {
fun(r, std::forward<Args>(args)...);
std::function<void(A1, Args...)> call_function = [&fun, &r](A1&& a1, Args&&... args) {
fun(r, std::forward<A1>(a1), std::forward<Args>(args)...);
};
std::tuple<typename std::decay<Args>::type...> iargs;
if(sizeof...(Args) > 0) {
buffer_input_archive iarch(b, *this);
iarch & iargs;
}
apply_function_to_tuple(l,iargs);
std::tuple<typename std::decay<A1>::type, typename std::decay<Args>::type...> iargs;
buffer_input_archive iarch(b, *this);
iarch(iargs);
apply_function_to_tuple(call_function, iargs);
};
rpc_callback_data* cb_data = new rpc_callback_data;
......
......@@ -217,7 +217,7 @@ class eventual<void> {
* the right operand.
*/
eventual& operator=(eventual&& other) {
if(this == other.m_eventual) return *this;
if(m_eventual == other.m_eventual) return *this;
if(m_eventual != ABT_EVENTUAL_NULL) {
TL_EVENTUAL_ASSERT(ABT_eventual_free(&m_eventual));
}
......
......@@ -12,17 +12,14 @@
#include <string>
#include <sstream>
#include <margo.h>
#include <thallium/exception.hpp>
namespace thallium {
/**
* @brief Exception class used when margo functions fail.
*/
class margo_exception : public std::exception {
private:
std::ostringstream m_content;
class margo_exception : public exception {
public:
......@@ -35,19 +32,9 @@ public:
* @param message Additional message.
*/
margo_exception(const std::string& function, const std::string file, unsigned line,
const std::string& message = std::string()) {
m_content << "[" << file << ":" << line << "][" << function << "] "
<< message;
}
const std::string& message = std::string())
: exception("[", file, ":", line, "][", function, "] ", message) {}
/**
* @brief Return the error message.
*
* @return error message associated with the exception.
*/
virtual const char* what() const throw() {
return m_content.str().c_str();
}
};
std::string translate_margo_error_code(hg_return_t ret);
......
......@@ -53,7 +53,7 @@ public:
T as() const {
T t;
buffer_input_archive iarch(m_buffer, *m_engine);
iarch & t;
iarch(t);
return t;
}
......@@ -77,8 +77,8 @@ public:
std::tuple<typename std::decay<T1>::type,
typename std::decay<T2>::type,
typename std::decay_t<Tn>::type...> t;
buffer_input_archive iarch(m_buffer);
iarch & t;
buffer_input_archive iarch(m_buffer, *m_engine);
arch(t);
return t;
}
......
......@@ -86,39 +86,39 @@ class pool {
public:
static ABT_unit_type u_get_type(ABT_unit u) {
auto uu = static_cast<U*>(u);
auto uu = reinterpret_cast<U*>(u);
return (ABT_unit_type)(uu->get_type());
}
static ABT_thread u_get_thread(ABT_unit u) {
auto uu = static_cast<U*>(u);
auto uu = reinterpret_cast<U*>(u);
return uu->get_thread().native_handle();
}
static ABT_task u_get_task(ABT_unit u) {
auto uu = static_cast<U*>(u);
auto uu = reinterpret_cast<U*>(u);
return uu->get_task().native_handle();
}
static ABT_bool u_is_in_pool(ABT_unit u) {
auto uu = static_cast<U*>(u);
auto uu = reinterpret_cast<U*>(u);
return (ABT_bool)(uu->is_in_pool());
}
static ABT_unit u_create_from_thread(ABT_thread t) {
auto uu = std::allocator_traits<Ualloc>::allocate(unit_allocator,1);
std::allocator_traits<Ualloc>::construct(unit_allocator, uu, thread(t));
return static_cast<ABT_unit>(uu);
return reinterpret_cast<ABT_unit>(uu);
}
static ABT_unit u_create_from_task(ABT_task t) {
auto uu = std::allocator_traits<Ualloc>::allocate(unit_allocator,1);
std::allocator_traits<Ualloc>::construct(unit_allocator, uu, task(t));
return static_cast<ABT_unit>(uu);
return reinterpret_cast<ABT_unit>(uu);
}
static void u_free(ABT_unit* u) {
auto uu = static_cast<U*>(*u);
auto uu = reinterpret_cast<U*>(*u);
std::allocator_traits<Ualloc>::destroy(unit_allocator, uu);
std::allocator_traits<Ualloc>::deallocate(unit_allocator, uu, 1);
*u = nullptr;
......@@ -127,38 +127,38 @@ class pool {
static int p_init(ABT_pool p, ABT_pool_config cfg) {
P* impl = std::allocator_traits<Palloc>::allocate(pool_allocator, 1);
std::allocator_traits<Palloc>::construct(pool_allocator, impl);
int ret = ABT_pool_set_data(p, static_cast<void*>(impl));
int ret = ABT_pool_set_data(p, reinterpret_cast<void*>(impl));
return ret;
}
static size_t p_get_size(ABT_pool p) {
void* data;
int ret = ABT_pool_get_data(p, &data);
auto impl = static_cast<P*>(data);
auto impl = reinterpret_cast<P*>(data);
return impl->get_size();
}
static void p_push(ABT_pool p, ABT_unit u) {
void* data;
int ret = ABT_pool_get_data(p, &data);
auto impl = static_cast<P*>(data);
impl->push(static_cast<U*>(u));
auto impl = reinterpret_cast<P*>(data);
impl->push(reinterpret_cast<U*>(u));
}
static int p_remove(ABT_pool p, ABT_unit u) {
void* data;
int ret = ABT_pool_get_data(p, &data);
auto impl = static_cast<P*>(data);
impl->remove(static_cast<U*>(u));
auto impl = reinterpret_cast<P*>(data);
impl->remove(reinterpret_cast<U*>(u));
return ret;
}
static ABT_unit p_pop(ABT_pool p) {
void* data;
int ret = ABT_pool_get_data(p, &data);
auto impl = static_cast<P*>(data);
auto impl = reinterpret_cast<P*>(data);
U* u = impl->pop();
return static_cast<ABT_unit>(u);
return reinterpret_cast<ABT_unit>(u);
}
static int p_free(ABT_pool p) {
......@@ -166,7 +166,7 @@ class pool {
int ret = ABT_pool_get_data(p, &data);
if(ret != ABT_SUCCESS)
return ret;
auto impl = static_cast<P*>(data);
auto impl = reinterpret_cast<P*>(data);
std::allocator_traits<Palloc>::destroy(pool_allocator, impl);
std::allocator_traits<Palloc>::deallocate(pool_allocator, impl, 1);
return ret;
......@@ -182,7 +182,7 @@ class pool {
friend class thread;
static void forward_work_unit(void* fp) {
auto f = static_cast<std::function<void(void)>*>(fp);
auto f = reinterpret_cast<std::function<void(void)>*>(fp);
(*f)();
delete f;
}
......@@ -369,7 +369,7 @@ class pool {
inline U* pop() {
ABT_unit u;
TL_POOL_ASSERT(ABT_pool_pop(m_pool, &u));
return static_cast<U*>(u);
return reinterpret_cast<U*>(u);
}
/**
......@@ -384,7 +384,7 @@ class pool {
*/
template<typename U>
inline void push(U* unit) {
TL_POOL_ASSERT(ABT_pool_push(m_pool, static_cast<ABT_unit>(unit)));
TL_POOL_ASSERT(ABT_pool_push(m_pool, reinterpret_cast<ABT_unit>(unit)));
}
/**
......@@ -396,7 +396,7 @@ class pool {
*/
template<typename U>
inline void remove(U* unit) {
TL_POOL_ASSERT(ABT_pool_remove(m_pool, static_cast<ABT_unit>(unit)));
TL_POOL_ASSERT(ABT_pool_remove(m_pool, reinterpret_cast<ABT_unit>(unit)));
}
/**
......@@ -409,7 +409,7 @@ class pool {
*/
template<typename U>
inline void run_unit(U* unit) {
TL_POOL_ASSERT(ABT_xstream_run_unit(static_cast<ABT_unit>(unit), m_pool));
TL_POOL_ASSERT(ABT_xstream_run_unit(reinterpret_cast<ABT_unit>(unit), m_pool));
}
/**
......@@ -436,13 +436,13 @@ class pool {
template<typename F>
managed<task> make_task(F&& f) {
auto fp = new std::function<void(void)>(std::forward<F>(f));
return task::create_on_pool(m_pool, forward_work_unit, static_cast<void*>(fp));
return task::create_on_pool(m_pool, forward_work_unit, reinterpret_cast<void*>(fp));
}
template<typename F>
void make_task(F&& f, const anonymous& a) {
auto fp = new std::function<void(void)>(std::forward<F>(f));
task::create_on_pool(m_pool, forward_work_unit, static_cast<void*>(fp), a);
task::create_on_pool(m_pool, forward_work_unit, reinterpret_cast<void*>(fp), a);
}
/**
......@@ -457,13 +457,13 @@ class pool {
template<typename F>
managed<thread> make_thread(F&& f) {
auto fp = new std::function<void(void)>(std::forward<F>(f));
return thread::create_on_pool(m_pool, forward_work_unit, static_cast<void*>(fp));
return thread::create_on_pool(m_pool, forward_work_unit, reinterpret_cast<void*>(fp));
}
template<typename F>
void make_thread(F&& f, const anonymous& a) {
auto fp = new std::function<void(void)>(std::forward<F>(f));
thread::create_on_pool(m_pool, forward_work_unit, static_cast<void*>(fp), a);
thread::create_on_pool(m_pool, forward_work_unit, reinterpret_cast<void*>(fp), a);
}
/**
......@@ -479,13 +479,13 @@ class pool {
template<typename F>
managed<thread> make_thread(F&& f, const thread::attribute& attr) {
auto fp = new std::function<void(void)>(std::forward<F>(f));
return thread::create_on_pool(m_pool, forward_work_unit, static_cast<void*>(fp), attr);
return thread::create_on_pool(m_pool, forward_work_unit, reinterpret_cast<void*>(fp), attr);
}
template<typename F>
void make_thread(F&& f, const thread::attribute& attr, const anonymous& a) {
auto fp = new std::function<void(void)>(std::forward<F>(f));
thread::create_on_pool(m_pool, forward_work_unit, static_cast<void*>(fp), attr, a);
thread::create_on_pool(m_pool, forward_work_unit, reinterpret_cast<void*>(fp), attr, a);
}
};
......
......@@ -91,6 +91,8 @@ private:
const std::integral_constant<bool, false>& r_is_void,
const std::integral_constant<bool, true>& first_arg_is_request,
const pool& p) {
(void)r_is_void;
(void)first_arg_is_request;
T* self = static_cast<T*>(this);
std::function<void(const request&, Args...)> fun = [self, func](const request& req, Args... args) {
(self->*func)(req, args...);
......@@ -108,6 +110,8 @@ private:
const std::integral_constant<bool, false>& r_is_void,
const std::integral_constant<bool, false>& first_arg_is_request,
const pool& p) {
(void)r_is_void;
(void)first_arg_is_request;
T* self = static_cast<T*>(this);
std::function<void(const request&, Args...)> fun = [self, func](const request& req, Args... args) {
R r = (self->*func)(args...);
......@@ -126,6 +130,8 @@ private:
const std::integral_constant<bool, true>& r_is_void,
const std::integral_constant<bool, true>& first_arg_is_request,
const pool& p = pool()) {
(void)r_is_void;
(void)first_arg_is_request;
T* self = static_cast<T*>(this);
std::function<void(const request&, Args...)> fun = [self, func](const request& req, Args... args) {
(self->*func)(req, args...);
......@@ -142,6 +148,8 @@ private:
const std::integral_constant<bool, true>& r_is_void,
const std::integral_constant<bool, false>& first_arg_is_request,
const pool& p = pool()) {
(void)r_is_void;
(void)first_arg_is_request;
T* self = static_cast<T*>(this);
std::function<void(const request&, Args...)> fun = [self, func](const request& req, Args... args) {
(self->*func)(args...);
......@@ -163,6 +171,8 @@ private:
const std::integral_constant<bool, false>& r_is_void,
const std::integral_constant<bool, true>& first_arg_is_request,
const pool& p) {
(void)r_is_void;
(void)first_arg_is_request;
T* self = static_cast<T*>(this);
std::function<void(const request&, Args...)> fun = [self, func](const request& req, Args... args) {
(self->*func)(req, args...);
......@@ -180,6 +190,8 @@ private:
const std::integral_constant<bool, false>& r_is_void,
const std::integral_constant<bool, false>& first_arg_is_request,
const pool& p) {
(void)r_is_void;
(void)first_arg_is_request;
T* self = static_cast<T*>(this);
std::function<void(const request&, Args...)> fun = [self, func](const request& req, Args... args) {
R r = (self->*func)(args...);
......@@ -198,6 +210,8 @@ private:
const std::integral_constant<bool, true>& r_is_void,
const std::integral_constant<bool, true>& first_arg_is_request,
const pool& p = pool()) {
(void)r_is_void;
(void)first_arg_is_request;
T* self = static_cast<T*>(this);
std::function<void(const request&, Args...)> fun = [self, func](const request& req, Args... args) {
(self->*func)(req, args...);
......@@ -214,6 +228,8 @@ private:
const std::integral_constant<bool, true>& r_is_void,
const std::integral_constant<bool, false>& first_arg_is_request,
const pool& p = pool()) {
(void)r_is_void;
(void)first_arg_is_request;
T* self = static_cast<T*>(this);
std::function<void(const request&, Args...)> fun = [self, func](const request& req, Args... args) {
(self->*func)(args...);
......
......@@ -95,6 +95,11 @@ public:
*/
remote_procedure& disable_response();
/**
* @brief Deregisters this RPC from the engine.
*/
void deregister();
[[deprecated("use disable_response() instead")]]
inline remote_procedure& ignore_response() {
return disable_response();
......
......@@ -114,7 +114,7 @@ public:
if(m_handle != HG_HANDLE_NULL) {
buffer b;
buffer_output_archive arch(b, *m_engine);
serialize_many(arch, std::forward<T>(t)...);
arch(std::forward<T>(t)...);
hg_return_t ret = margo_respond(m_handle, &b);
MARGO_ASSERT(ret, margo_respond);
}
......
......@@ -77,13 +77,13 @@ class scheduler {
static int init(ABT_sched s, ABT_sched_config) {
auto ss = std::allocator_traits<Salloc>::allocate(scheduler_allocator, 1);
std::allocator_traits<Salloc>::construct(scheduler_allocator,ss,s);
return ABT_sched_set_data(s, static_cast<void*>(ss));
return ABT_sched_set_data(s, reinterpret_cast<void*>(ss));
}
static void run(ABT_sched s) {
void* data;
ABT_sched_get_data(s, &data);
S* impl = static_cast<S*>(data);
S* impl = reinterpret_cast<S*>(data);
impl->run();
}
......@@ -92,7 +92,7 @@ class scheduler {
int ret = ABT_sched_get_data(s, &data);
if(ret != ABT_SUCCESS)
return ret;
S* impl = static_cast<S*>(data);
S* impl = reinterpret_cast<S*>(data);
std::allocator_traits<Salloc>::destroy(scheduler_allocator, impl);
std::allocator_traits<Salloc>::deallocate(scheduler_allocator,impl,1);
return ret;
......@@ -101,7 +101,7 @@ class scheduler {
static ABT_pool get_migr_pool(ABT_sched s) {
void* data;
ABT_sched_get_data(s, &data);
S* impl = static_cast<S*>(data);
S* impl = reinterpret_cast<S*>(data);
return impl->get_migr_pool().native_handle();
}
};
......
......@@ -6,6 +6,21 @@
#ifndef __THALLIUM_BUFFER_INPUT_ARCHIVE_HPP
#define __THALLIUM_BUFFER_INPUT_ARCHIVE_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <thallium/serialization/cereal/archives.hpp>
namespace thallium {
using buffer_input_archive = cereal_input_archive;
}
#else
#include <type_traits>
#include <stdexcept>
#include <cstring>
......@@ -60,17 +75,35 @@ public:
/**
* Operator to get C++ objects of type T from the archive.
* The object should either be a basic type, or an STL container
* (in which case the appropriate hgcxx/hg_stl/stl_* header should
* be included for this function to be properly instanciated), or
* any object for which either a serialize member function or
* (in which case the appropriate thallium/serialization/stl/ header
* should be included for this function to be properly instanciated),
* or any object for which either a serialize member function or
* a load member function has been provided.
*/
template<typename T>
buffer_input_archive& operator&(T&& obj) {
inline buffer_input_archive& operator&(T&& obj) {
read_impl(std::forward<T>(obj), std::is_arithmetic<typename std::decay<T>::type>());
return *this;
}
/**
* @brief Parenthesis operator with one argument, equivalent to & operator.
*/
template<typename T>
inline buffer_input_archive& operator()(T&& obj) {
return (*this) & std::forward<T>(obj);
}
/**
* @brief Parenthesis operator with multiple arguments.
* ar(x,y,z) is equivalent to ar & x & y & z.
*/
template<typename T, typename ... Targs>
inline buffer_input_archive& operator()(T&& obj, Targs&&... others) {
(*this) & std::forward<T>(obj);
return (*this)(std::forward<Targs>(others)...);
}
/**
* Operator >> is equivalent to operator &.
* \see operator&
......@@ -95,6 +128,19 @@ public:
m_pos += count*sizeof(T);
}
/**
* @brief Equivalent to read().
*/
template<typename T>
inline void copy(T* t, std::size_t count=1) {
read(t, count);
}
/**
* @brief Returns the engine registered in the archive.
*
* @return The engine registered in the archive.
*/
engine& get_engine() const {
return *m_engine;
}
......@@ -102,3 +148,4 @@ public:
}
#endif
#endif
......@@ -6,6 +6,20 @@
#ifndef __THALLIUM_BUFFER_OUTPUT_ARCHIVE_HPP
#define __THALLIUM_BUFFER_OUTPUT_ARCHIVE_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <thallium/serialization/cereal/archives.hpp>
namespace thallium {
using buffer_output_archive = cereal_output_archive;
}
#else
#include <type_traits>
#include <thallium/serialization/serialize.hpp>
#include <thallium/buffer.hpp>
......@@ -68,11 +82,29 @@ public:
* a load member function has been provided.
*/
template<typename T>
buffer_output_archive& operator&(T&& obj) {
inline buffer_output_archive& operator&(T&& obj) {
write_impl(std::forward<T>(obj), std::is_arithmetic<typename std::decay<T>::type>());
return *this;
}
/**
* @brief Parenthesis operator with one argument, equivalent to & operator.
*/
template<typename T>
inline buffer_output_archive& operator()(T&& obj) {
return (*this) & std::forward<T>(obj);
}
/**
* @brief Parenthesis operator with multiple arguments.
* ar(x,y,z) is equivalent to ar & x & y & z.
*/
template<typename T, typename ... Targs>
inline buffer_output_archive& operator()(T&& obj, Targs&&... others) {
(*this) & std::forward<T>(obj);
return (*this)(std::forward<Targs>(others)...);
}
/**
* Operator << is equivalent to operator &.
* \see operator&
......@@ -100,8 +132,17 @@ public:
memcpy((void*)(m_buffer.data() + m_pos),(void*)t,s);
m_pos += s;
}
/**
* @brief Equivalent to write().
*/
template<typename T>
inline void copy(T* const t, size_t count=1) {
write(t, count);
}
};
}
#endif
#endif
/*
* Copyright (c) 2019 UChicago Argonne, LLC
*
* See COPYRIGHT in top-level directory.
*/
#ifndef THALLIUM_CEREAL_ARCHIVES_BINARY_HPP
#define THALLIUM_CEREAL_ARCHIVES_BINARY_HPP
#include <cstring>
#include <thallium/buffer.hpp>
#include <cereal/cereal.hpp>
namespace thallium {
class engine;
class cereal_output_archive : public cereal::OutputArchive<cereal_output_archive, cereal::AllowEmptyClassElision>
{
public:
cereal_output_archive(buffer& b, engine& e)
: cereal::OutputArchive<cereal_output_archive, cereal::AllowEmptyClassElision>(this)
, m_buffer(b)
, m_pos(0)
, m_engine(&e)
{
m_buffer.resize(0);
}
cereal_output_archive(buffer& b)
: cereal::OutputArchive<cereal_output_archive, cereal::AllowEmptyClassElision>(this)
, m_buffer(b)
, m_pos(0)
, m_engine(nullptr)
{
m_buffer.resize(0);
}
~cereal_output_archive() = default;
inline void write(const void* data, size_t size) {
if(m_pos+size > m_buffer.size()) {
if(m_pos+size > m_buffer.capacity()) {
m_buffer.reserve(m_buffer.capacity()*2);
}
m_buffer.resize(m_pos+size);
}
memcpy((void*)(m_buffer.data() + m_pos),(void*)data, size);
m_pos += size;
}
engine& get_engine() const {
return *m_engine;
}
private:
buffer& m_buffer;
std::size_t m_pos;
engine* m_engine;
};
class cereal_input_archive : public cereal::InputArchive<cereal_input_archive, cereal::AllowEmptyClassElision>
{
public:
cereal_input_archive(const buffer& b, engine& e)
: cereal::InputArchive<cereal_input_archive, cereal::AllowEmptyClassElision>(this)
, m_buffer(b)
, m_pos(0)
, m_engine(&e)
{}
cereal_input_archive(buffer& b)
: cereal::InputArchive<cereal_input_archive, cereal::AllowEmptyClassElision>(this)
, m_buffer(b)
, m_pos(0)
, m_engine(nullptr)
{}
~cereal_input_archive() = default;
inline void read(void* data, std::size_t size) {
if(m_pos + size > m_buffer.size()) {
throw std::runtime_error("Reading beyond buffer size");
}
std::memcpy((void*)data,(const void*)(m_buffer.data() + m_pos), size);
m_pos += size;
}
engine& get_engine() const {
return *m_engine;
}
private:
const buffer& m_buffer;
std::size_t m_pos;
engine* m_engine;
};
template<class T> inline
typename std::enable_if<std::is_arithmetic<T>::value, void>::type
CEREAL_SAVE_FUNCTION_NAME(cereal_output_archive & ar, T const & t)
{
ar.write(std::addressof(t), sizeof(t));
}
template<class T> inline
typename std::enable_if<std::is_arithmetic<T>::value, void>::type
CEREAL_LOAD_FUNCTION_NAME(cereal_input_archive & ar, T & t)
{
ar.read(std::addressof(t), sizeof(t));
}
template <class Archive, class T> inline
CEREAL_ARCHIVE_RESTRICT(cereal_input_archive, cereal_output_archive)
CEREAL_SERIALIZE_FUNCTION_NAME(Archive& ar, cereal::NameValuePair<T>& t)
{
ar(t.value);
}
template <class Archive, class T> inline
CEREAL_ARCHIVE_RESTRICT(cereal_input_archive, cereal_output_archive)
CEREAL_SERIALIZE_FUNCTION_NAME(Archive& ar, cereal::SizeTag<T>& t)
{
ar(t.size);
}
template <class T> inline
void CEREAL_SAVE_FUNCTION_NAME(cereal_output_archive& ar, cereal::BinaryData<T> const & bd)
{
ar.write(bd.data, static_cast<std::size_t>(bd.size));
}
template <class T> inline
void CEREAL_LOAD_FUNCTION_NAME(cereal_input_archive & ar, cereal::BinaryData<T> & bd)
{
ar.read(bd.data, static_cast<std::size_t>(bd.size));
}
}
// register archives for polymorphic support
CEREAL_REGISTER_ARCHIVE(thallium::cereal_output_archive)
CEREAL_REGISTER_ARCHIVE(thallium::cereal_input_archive)
// tie input and output archives together
CEREAL_SETUP_ARCHIVE_TRAITS(thallium::cereal_input_archive, thallium::cereal_output_archive)
#endif
......@@ -6,6 +6,10 @@
#ifndef SERIALIZE_H
#define SERIALIZE_H
#include <thallium/config.hpp>
#ifndef THALLIUM_USE_CEREAL
#include <utility>
#include <type_traits>
......@@ -205,3 +209,4 @@ inline void serialize_many(A& ar, T&& t) {
}
#endif
#endif
......@@ -6,6 +6,12 @@
#ifndef __THALLIUM_ARRAY_SERIALIZATION_HPP
#define __THALLIUM_ARRAY_SERIALIZATION_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <cereal/types/array.hpp>
#else
#include <type_traits>
#include <array>
......@@ -52,3 +58,4 @@ inline void load(A& ar, std::array<T,N>& v) {
} // namespace thallium
#endif
#endif
......@@ -6,6 +6,12 @@
#ifndef __THALLIUM_COMPLEX_SERIALIZATION_HPP
#define __THALLIUM_COMPLEX_SERIALIZATION_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <cereal/types/complex.hpp>
#else
#include <complex>
namespace thallium {
......@@ -25,3 +31,4 @@ inline void load(A& ar, std::complex<T>& t) {
}
#endif
#endif
......@@ -6,6 +6,12 @@
#ifndef __THALLIUM_DEQUE_SERIALIZATION_HPP
#define __THALLIUM_DEQUE_SERIALIZATION_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <cereal/types/deque.hpp>
#else
#include <deque>
namespace thallium {
......@@ -33,3 +39,4 @@ inline void load(A& ar, std::deque<T,Alloc>& l) {
}
#endif
#endif
......@@ -6,6 +6,12 @@
#ifndef __THALLIUM_FORWARD_LIST_SERIALIZATION_HPP
#define __THALLIUM_FORWARD_LIST_SERIALIZATION_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <cereal/types/forward_list.hpp>
#else
#include <forward_list>
namespace thallium {
......@@ -33,3 +39,4 @@ inline void load(A& ar, std::forward_list<T,Alloc>& l) {
}
#endif
#endif
......@@ -6,6 +6,12 @@
#ifndef __THALLIUM_LIST_SERIALIZATION_HPP
#define __THALLIUM_LIST_SERIALIZATION_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <cereal/types/list.hpp>
#else
#include <list>
namespace thallium {
......@@ -33,3 +39,4 @@ inline void load(A& ar, std::list<T,Alloc>& l) {
}
#endif
#endif
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __THALLIUM_MAP_SERIALIZATION_HPP
#define __THALLIUM_MAP_SERIALIZATION_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <cereal/types/map.hpp>
#else
#include <map>
namespace thallium {
......@@ -30,3 +41,4 @@ inline void load(A& ar, std::map<K,V,Compare,Alloc>& m) {
}
#endif
#endif
......@@ -6,6 +6,12 @@
#ifndef __THALLIUM_MULTIMAP_SERIALIZATION_HPP
#define __THALLIUM_MULTIMAP_SERIALIZATION_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <cereal/types/map.hpp>
#else
#include <map>
namespace thallium {
......@@ -35,3 +41,4 @@ inline void load(A& ar, std::multimap<K,V,Compare,Alloc>& m) {
}
#endif
#endif
......@@ -6,6 +6,12 @@
#ifndef __THALLIUM_MULTISET_SERIALIZATION_HPP
#define __THALLIUM_MULTISET_SERIALIZATION_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <cereal/types/set.hpp>
#else
#include <utility>
#include <set>
......@@ -35,3 +41,4 @@ inline void load(A& ar, std::multiset<T,Compare,Alloc>& s) {
} // namespace
#endif
#endif
......@@ -6,6 +6,12 @@
#ifndef __THALLIUM_PAIR_SERIALIZE_HPP
#define __THALLIUM_PAIR_SERIALIZE_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <cereal/types/utility.hpp>
#else
#include <utility>
namespace thallium {
......@@ -19,3 +25,4 @@ inline void serialize(A& a, std::pair<T1,T2>& p) {
}
#endif
#endif
......@@ -6,6 +6,12 @@
#ifndef __THALLIUM_SET_SERIALIZATION_HPP
#define __THALLIUM_SET_SERIALIZATION_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <cereal/types/set.hpp>
#else
#include <utility>
#include <set>
......@@ -35,3 +41,4 @@ inline void load(A& ar, std::set<T,Compare,Alloc>& s) {
} // namespace
#endif
#endif
#ifndef __THALLIUM_STRING_SERIALIZATION_HPP
#define __THALLIUM_STRING_SERIALIZATION_HPP
#include <thallium/config.hpp>
#ifdef THALLIUM_USE_CEREAL
#include <cereal/types/string.hpp>
#else
#include <string>
#include <iostream>
......@@ -23,5 +29,5 @@ inline void load(A& ar, std::string& s) {