Commit 61b39fc4 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

added example with dedicated pool

parent 12499ce2
add_executable(15_sum_server server.cpp)
target_link_libraries(15_sum_server thallium)
add_executable(15_sum_client client.cpp)
target_link_libraries(15_sum_client thallium)
#include <iostream>
#include <thallium.hpp>
namespace tl = thallium;
int main(int argc, char** argv) {
if(argc != 2) {
std::cerr << "Usage: " << argv[0] << " <address>" << std::endl;
exit(0);
}
tl::engine myEngine("tcp", THALLIUM_CLIENT_MODE);
tl::remote_procedure sum = myEngine.define("sum");
tl::endpoint server = myEngine.lookup(argv[1]);
tl::provider_handle ph(server, 1);
int ret = sum.on(ph)(42,63);
std::cout << "Server answered " << ret << std::endl;
return 0;
}
#include <iostream>
#include <thallium.hpp>
namespace tl = thallium;
void sum(const tl::request& req, int x, int y) {
std::cout << "Computing " << x << "+" << y << std::endl;
req.respond(x+y);
}
int main(int argc, char** argv) {
tl::abt scope;
tl::engine myEngine("tcp", THALLIUM_SERVER_MODE);
std::vector<tl::managed<tl::xstream>> ess;
tl::managed<tl::pool> myPool = tl::pool::create(tl::pool::access::spmc);
for(int i=0; i < 4; i++) {
tl::managed<tl::xstream> es
= tl::xstream::create(tl::scheduler::predef::deflt, *myPool);
ess.push_back(std::move(es));
}
std::cout << "Server running at address " << myEngine.self() << std::endl;
myEngine.define("sum", sum, 1, *myPool);
myEngine.wait_for_finalize();
for(int i=0; i < 4; i++) {
ess[i]->join();
}
return 0;
}
......@@ -13,3 +13,4 @@ add_subdirectory(11_tasks)
add_subdirectory(12_shared_pool)
add_subdirectory(13_mutex)
add_subdirectory(14_custom_sched)
add_subdirectory(15_rpc_pool)
......@@ -12,6 +12,7 @@
#include <unordered_map>
#include <atomic>
#include <margo.h>
#include <thallium/pool.hpp>
#include <thallium/tuple_util.hpp>
#include <thallium/function_cast.hpp>
#include <thallium/buffer.hpp>
......@@ -55,6 +56,8 @@ private:
bool m_is_server;
bool m_owns_mid;
std::atomic<bool> m_finalize_called;
hg_context_t* m_hg_context = nullptr;
hg_class_t* m_hg_class = nullptr;
/**
* @brief Encapsulation of some data needed by RPC callbacks
......@@ -158,6 +161,26 @@ public:
&engine::on_finalize, static_cast<void*>(this));
}
engine(const std::string& addr, int mode,
const pool& progress_pool,
const pool& default_handler_pool) {
m_is_server = (mode == THALLIUM_SERVER_MODE);
m_finalize_called = false;
m_owns_mid = true;
m_hg_class = HG_Init(addr.c_str(), mode);
//if(!hg_class); // XXX throw exception
m_hg_context = HG_Context_create(m_hg_class);
//if(!hg_context); // XXX throw exception
m_mid = margo_init_pool(progress_pool.native_handle(),
default_handler_pool.native_handle(), m_hg_context);
// XXX throw an exception if m_mid not initialized
margo_push_finalize_callback(m_mid,
&engine::on_finalize, static_cast<void*>(this));
}
engine(margo_instance_id mid, int mode) {
m_mid = mid;
m_is_server = (mode == THALLIUM_SERVER_MODE);
......@@ -205,6 +228,10 @@ public:
*/
void finalize() {
margo_finalize(m_mid);
if(m_hg_context)
HG_Context_destroy(m_hg_context);
if(m_hg_class)
HG_Finalize(m_hg_class);
}
/**
......@@ -249,7 +276,7 @@ public:
template<typename ... Args>
remote_procedure define(const std::string& name,
const std::function<void(const request&, Args...)>& fun,
uint16_t provider_id=0, ABT_pool pool=ABT_POOL_NULL);
uint16_t provider_id=0, const pool& p = pool());
/**
* @brief Defines an RPC with a name and a function pointer
......@@ -265,7 +292,7 @@ public:
*/
template<typename ... Args>
remote_procedure define(const std::string& name, void (*f)(const request&, Args...),
uint16_t provider_id=0, ABT_pool pool=ABT_POOL_NULL);
uint16_t provider_id=0, const pool& p = pool());
/**
* @brief Lookup an address and returns an endpoint object
......@@ -302,14 +329,14 @@ namespace thallium {
template<typename ... Args>
remote_procedure engine::define(const std::string& name,
const std::function<void(const request&, Args...)>& fun,
uint16_t provider_id, ABT_pool pool) {
uint16_t provider_id, const pool& p) {
hg_id_t id = margo_provider_register_name(m_mid, name.c_str(),
process_buffer,
process_buffer,
rpc_callback<rpc_t, false>,
provider_id,
pool);
p.native_handle());
m_rpcs[id] = [fun,this](const request& r, const buffer& b) {
std::function<void(Args...)> l = [&fun, &r](Args&&... args) {
......@@ -337,9 +364,9 @@ template<typename ... Args>
remote_procedure engine::define(
const std::string& name,
void (*f)(const request&, Args...),
uint16_t provider_id, ABT_pool pool) {
uint16_t provider_id, const pool& p) {
return define(name, std::function<void(const request&,Args...)>(f), provider_id, pool);
return define(name, std::function<void(const request&,Args...)>(f), provider_id, p);
}
}
......
......@@ -205,6 +205,12 @@ class pool {
public:
/**
* @brief Default constructor handles a null pool.
*/
pool()
: m_pool(ABT_POOL_NULL) {}
/**
* @brief Type of the underlying native handle.
*/
......
......@@ -90,33 +90,33 @@ public:
* @param T::*func member function
*/
template<typename S, typename R, typename ... Args>
remote_procedure define(S&& name, R(T::*func)(const request&, Args...)) {
remote_procedure define(S&& name, R(T::*func)(const request&, Args...), const pool& p = pool()) {
T* self = dynamic_cast<T*>(this);
std::function<void(const request&, Args...)> fun = [self, func](const request& r, Args... args) {
(self->*func)(r, args...);
};
return m_engine.define(std::forward<S>(name), fun, m_provider_id);
return m_engine.define(std::forward<S>(name), fun, m_provider_id, p);
}
private:
template<typename S, typename R, typename ... Args>
remote_procedure define_member(S&& name, R(T::*func)(Args...), const std::integral_constant<bool, false>&) {
remote_procedure define_member(S&& name, R(T::*func)(Args...), const std::integral_constant<bool, false>&, const pool& p = pool()) {
T* self = dynamic_cast<T*>(this);
std::function<void(const request&, Args...)> fun = [self, func](const request& req, Args... args) {
R r = (self->*func)(args...);
req.respond(r);
};
return m_engine.define(std::forward<S>(name), fun, m_provider_id);
return m_engine.define(std::forward<S>(name), fun, m_provider_id, p);
}
template<typename S, typename R, typename ... Args>
remote_procedure define_member(S&& name, R(T::*func)(Args...), const std::integral_constant<bool, true>&) {
remote_procedure define_member(S&& name, R(T::*func)(Args...), const std::integral_constant<bool, true>&, const pool& p = pool()) {
T* self = dynamic_cast<T*>(this);
std::function<void(const request&, Args...)> fun = [self, func](const request& req, Args... args) {
(self->*func)(args...);
};
return m_engine.define(std::forward<S>(name), fun, m_provider_id).disable_response();
return m_engine.define(std::forward<S>(name), fun, m_provider_id, p).disable_response();
}
protected:
......@@ -136,10 +136,19 @@ protected:
* @tparam X Dispatcher type.
* @param name Name of the RPC.
* @param T::*func Member function.
* @param p Argobots pool to use to execute the RPC.
*/
template<typename S, typename R, typename ... Args, typename X = typename std::is_void<R>::type>
inline remote_procedure define(S&& name, R(T::*func)(Args...), X x = X()) {
return define_member(std::forward<S>(name), func, x);
inline remote_procedure define(S&& name, R(T::*func)(Args...), const pool& p = pool(), X x = X()) {
return define_member(std::forward<S>(name), func, x, p);
}
/**
* @brief Same as the previous function but does not take a pool.
*/
template<typename S, typename R, typename ... Args, typename X = typename std::is_void<R>::type>
inline remote_procedure define(S&& name, R(T::*func)(Args...), X x) {
return define_member(std::forward<S>(name), func, x, pool());
}
public:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment