GitLab maintenance scheduled for Today, 2019-12-05, from 17:00 to 18:00 CT - Services will be unavailable during this time.

Commit 92ade40d authored by Matthieu Dorier's avatar Matthieu Dorier

Added provider and provider_handle classes as well as examples

parent 0deea5b8
add_executable(09_sum_provider server.cpp)
target_link_libraries(09_sum_provider thallium)
add_executable(09_sum_client client.cpp)
target_link_libraries(09_sum_client thallium)
#include <iostream>
#include <thallium/serialization/stl/string.hpp>
#include <thallium.hpp>
namespace tl = thallium;
int main(int argc, char** argv) {
if(argc != 3) {
std::cerr << "Usage: " << argv[0] << " <address> <provider_id>" << std::endl;
exit(0);
}
tl::engine myEngine("tcp", THALLIUM_CLIENT_MODE);
tl::remote_procedure sum = myEngine.define("sum");
tl::remote_procedure prod = myEngine.define("prod");
tl::remote_procedure hello = myEngine.define("hello").ignore_response();
tl::remote_procedure print = myEngine.define("print").ignore_response();
tl::endpoint server = myEngine.lookup(argv[1]);
uint16_t provider_id = atoi(argv[2]);
tl::provider_handle ph(server, provider_id);
int ret = sum.on(ph)(42,63);
std::cout << "(sum) Server answered " << ret << std::endl;
ret = prod.on(ph)(42,63);
std::cout << "(prod) Server answered " << ret << std::endl;
std::string name("Matthieu");
hello.on(ph)(name);
print.on(ph)(name);
return 0;
}
#include <iostream>
#include <thallium.hpp>
#include <thallium/serialization/stl/string.hpp>
namespace tl = thallium;
class my_sum_provider : public tl::provider<my_sum_provider> {
private:
void prod(const tl::request& req, int x, int y) {
std::cout << "Computing " << x << "*" << y << std::endl;
req.respond(x+y);
}
int sum(int x, int y) {
std::cout << "Computing " << x << "+" << y << std::endl;
return x+y;
}
void hello(const std::string& name) {
std::cout << "Hello, " << name << std::endl;
}
int print(const std::string& word) {
std::cout << "Printing " << word << std::endl;
return word.size();
}
public:
my_sum_provider(tl::engine& e, uint16_t provider_id=1)
: tl::provider<my_sum_provider>(e, provider_id) {
define("prod", &my_sum_provider::prod);
define("sum", &my_sum_provider::sum);
define("hello", &my_sum_provider::hello);
define("print", &my_sum_provider::print, tl::ignore_return_value());
}
~my_sum_provider() {
wait_for_finalize();
}
};
int main(int argc, char** argv) {
uint16_t provider_id = 22;
tl::engine myEngine("tcp", THALLIUM_SERVER_MODE);
std::cout << "Server running at address " << myEngine.self()
<< " with provider id " << provider_id << std::endl;
my_sum_provider myProvider(myEngine, provider_id);
return 0;
}
......@@ -7,3 +7,4 @@ add_subdirectory(05_stl)
add_subdirectory(06_custom)
add_subdirectory(07_rdma)
add_subdirectory(08_async)
add_subdirectory(09_provider)
......@@ -9,5 +9,7 @@
#include <thallium/remote_procedure.hpp>
#include <thallium/callable_remote_procedure.hpp>
#include <thallium/remote_bulk.hpp>
#include <thallium/provider.hpp>
#include <thallium/provider_handle.hpp>
#endif
......@@ -39,6 +39,7 @@ private:
engine* m_engine;
hg_handle_t m_handle;
bool m_ignore_response;
uint16_t m_provider_id;
/**
* @brief Constructor. Made private since callable_remote_procedure can only
......@@ -49,7 +50,8 @@ private:
* @param ep endpoint on which to call the RPC.
* @param ignore_resp whether the response should be ignored.
*/
callable_remote_procedure(engine& e, hg_id_t id, const endpoint& ep, bool ignore_resp);
callable_remote_procedure(engine& e, hg_id_t id, const endpoint& ep,
bool ignore_resp, uint16_t provider_id=0);
/**
* @brief Sends the RPC to the endpoint (calls margo_forward), passing a buffer
......@@ -61,7 +63,8 @@ private:
*/
packed_response forward(const buffer& buf) const {
hg_return_t ret;
ret = margo_forward(m_handle, const_cast<void*>(static_cast<const void*>(&buf)));
ret = margo_provider_forward(m_provider_id,
m_handle, const_cast<void*>(static_cast<const void*>(&buf)));
MARGO_ASSERT(ret, margo_forward);
buffer output;
if(m_ignore_response) return packed_response(std::move(output), *m_engine);
......@@ -83,7 +86,8 @@ private:
async_response iforward(const buffer& buf) {
hg_return_t ret;
margo_request req;
ret = margo_iforward(m_handle, const_cast<void*>(static_cast<const void*>(&buf)), &req);
ret = margo_provider_iforward(m_provider_id,
m_handle, const_cast<void*>(static_cast<const void*>(&buf)), &req);
MARGO_ASSERT(ret, margo_iforward);
return async_response(req, *m_engine, *this, m_ignore_response);
}
......
......@@ -11,6 +11,13 @@
#include <margo.h>
#include <thallium/margo_exception.hpp>
namespace thallium {
class endpoint;
}
template<typename S>
S& operator<<(S& s, const thallium::endpoint& e);
namespace thallium {
class engine;
......@@ -95,8 +102,30 @@ public:
bool is_null() const {
return m_addr == HG_ADDR_NULL;
}
template<typename S>
friend S& ::operator<<(S& s, const endpoint& e);
};
}
/**
* @brief Streaming operator for endpoint, converts the endpoint
* into a string before feeding it to the string operator. This
* enables, for instance, streaming the endpoint into std::cout
* for logging without having to explicitely convert it into a
* string.
*
* @tparam S Type of stream.
* @param s Stream.
* @param e Endpoint.
*
* @return Reference to the provided stream.
*/
template<typename S>
S& operator<<(S& s, const thallium::endpoint& e) {
s << (std::string)e;
return s;
}
#endif
......@@ -27,6 +27,7 @@ class bulk;
class endpoint;
class remote_bulk;
class remote_procedure;
template<typename T> class provider;
/**
* @brief The engine class is at the core of Thallium,
......@@ -42,6 +43,8 @@ class engine {
friend class remote_bulk;
friend class remote_procedure;
friend class callable_remote_procedure;
template<typename T>
friend class provider;
private:
......@@ -204,6 +207,16 @@ public:
margo_finalize(m_mid);
}
/**
* @brief Makes the calling thread block until someone calls
* finalize on this engine. This function will not do anything
* if finalize was already called.
*/
void wait_for_finalize() {
if(!m_finalize_called)
margo_wait_for_finalize(m_mid);
}
/**
* @brief Creates an endpoint from this engine.
*
......@@ -228,12 +241,15 @@ public:
* @tparam Args Types of arguments accepted by the RPC.
* @param name Name of the RPC.
* @param fun Function to associate with the RPC.
* @param provider_id ID of the provider registering this RPC.
* @param pool Argobots pool to use when receiving this type of RPC
*
* @return a remote_procedure object.
*/
template<typename ... Args>
remote_procedure define(const std::string& name,
const std::function<void(const request&, Args...)>& fun);
const std::function<void(const request&, Args...)>& fun,
uint16_t provider_id=0, ABT_pool pool=ABT_POOL_NULL);
/**
* @brief Defines an RPC with a name and a function pointer
......@@ -242,11 +258,14 @@ public:
* @tparam Args Types of arguments accepted by the RPC.
* @param name Name of the RPC.
* @param f Function to associate with the RPC.
* @param provider_id ID of the provider registering this RPC.
* @param pool Argobots pool to use when receiving this type of RPC.
*
* @return a remote_procedure object.
*/
template<typename ... Args>
remote_procedure define(const std::string& name, void (*f)(const request&, Args...));
remote_procedure define(const std::string& name, void (*f)(const request&, Args...),
uint16_t provider_id=0, ABT_pool pool=ABT_POOL_NULL);
/**
* @brief Lookup an address and returns an endpoint object
......@@ -268,12 +287,6 @@ public:
*/
bulk expose(const std::vector<std::pair<void*,size_t>>& segments, bulk_mode flag);
/**
* @brief String representation of the engine's address.
*
* @return String representation of the engine's address.
*/
operator std::string() const;
};
} // namespace thallium
......@@ -288,12 +301,15 @@ namespace thallium {
template<typename ... Args>
remote_procedure engine::define(const std::string& name,
const std::function<void(const request&, Args...)>& fun) {
const std::function<void(const request&, Args...)>& fun,
uint16_t provider_id, ABT_pool pool) {
hg_id_t id = margo_register_name(m_mid, name.c_str(),
process_buffer,
process_buffer,
rpc_callback<rpc_t, false>);
hg_id_t id = margo_provider_register_name(m_mid, name.c_str(),
process_buffer,
process_buffer,
rpc_callback<rpc_t, false>,
provider_id,
pool);
m_rpcs[id] = [fun,this](const request& r, const buffer& b) {
std::function<void(Args...)> l = [&fun, &r](Args&&... args) {
......@@ -318,8 +334,12 @@ remote_procedure engine::define(const std::string& name,
}
template<typename ... Args>
remote_procedure engine::define(const std::string& name, void (*f)(const request&, Args...)) {
return define(name, std::function<void(const request&,Args...)>(f));
remote_procedure engine::define(
const std::string& name,
void (*f)(const request&, Args...),
uint16_t provider_id, ABT_pool pool) {
return define(name, std::function<void(const request&,Args...)>(f), provider_id, pool);
}
}
......
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __THALLIUM_PROVIDER_HPP
#define __THALLIUM_PROVIDER_HPP
#include <memory>
#include <iostream>
#include <string>
#include <functional>
#include <unordered_map>
#include <atomic>
#include <margo.h>
#include <thallium/engine.hpp>
namespace thallium {
typedef std::integral_constant<bool,true> ignore_return_value;
/**
* @brief The provider class represents an object that
* exposes its member functions as RPCs. It is a template
* class meant to be used as base class for other objects.
* For example:
* class MyProvider : public thallium::provider<MyProvider> { ... };
*
* @tparam T
*/
template<typename T>
class provider {
private:
engine& m_engine;
uint16_t m_provider_id;
public:
provider(engine& e, uint16_t provider_id)
: m_engine(e), m_provider_id(provider_id) {}
virtual ~provider() {}
/**
* @brief Copy-constructor is deleted.
*/
provider(const provider& other) = delete;
/**
* @brief Move-constructor is deleted.
*/
provider(provider&& other) = delete;
/**
* @brief Move-assignment operator is deleted.
*/
provider& operator=(provider&& other) = delete;
/**
* @brief Copy-assignment operator is deleted.
*/
provider& operator=(const provider& other) = delete;
/**
* @brief Waits for the engine to finalize.
*/
inline void wait_for_finalize() {
m_engine.wait_for_finalize();
}
/**
* @brief Finalize the engine.
*/
inline void finalize() {
m_engine.finalize();
}
/**
* @brief Defines an RPC using a member function of the child class.
* The member function should have a const request& as first parameter
* and use this request parameter to respond to the client.
*
* @tparam S type of the name (e.g. C-like string or std::string)
* @tparam R return value of the member function
* @tparam Args Arguments of the member function
* @param name name of the RPC
* @param T::*func member function
*/
template<typename S, typename R, typename ... Args>
void define(S&& name, R(T::*func)(const request&, Args...)) {
T* self = dynamic_cast<T*>(this);
std::function<void(const request&, Args...)> fun = [self, func](const request& r, Args... args) {
(self->*func)(r, args...);
};
m_engine.define(std::forward<S>(name), fun, m_provider_id);
}
private:
template<typename S, typename R, typename ... Args>
void define_member(S&& name, R(T::*func)(Args...), const std::integral_constant<bool, false>&) {
T* self = dynamic_cast<T*>(this);
std::function<void(const request&, Args...)> fun = [self, func](const request& req, Args... args) {
R r = (self->*func)(args...);
req.respond(r);
};
m_engine.define(std::forward<S>(name), fun, m_provider_id);
}
template<typename S, typename R, typename ... Args>
void define_member(S&& name, R(T::*func)(Args...), const std::integral_constant<bool, true>&) {
T* self = dynamic_cast<T*>(this);
std::function<void(const request&, Args...)> fun = [self, func](const request& req, Args... args) {
(self->*func)(args...);
};
m_engine.define(std::forward<S>(name), fun, m_provider_id).ignore_response();
}
public:
/**
* @brief Defines an RPC from a member function of the child class.
* This member function doesn't have a const request& paramater, so
* the RPC will be formed by assuming the return value of the function
* is what is sent back to the client (nothing is sent back if the
* return type is void). If the member function returns something but
* this return value should not be sent back to the client, the caller
* can pass ignore_return_value() as last argument of define().
*
* @tparam S Type of the RPC name (e.g. std::string)
* @tparam R Return value of the member function.
* @tparam Args Argument types of the member function.
* @tparam X Dispatcher type.
* @param name Name of the RPC.
* @param T::*func Member function.
*/
template<typename S, typename R, typename ... Args, typename X = typename std::is_void<R>::type>
inline void define(S&& name, R(T::*func)(Args...), X x = X()) {
define_member(std::forward<S>(name), func, x);
}
/**
* @brief Get the engine associated with this provider.
*
* @return The engine associated with this provider.
*/
const engine& get_engine() const {
return m_engine;
}
/**
* @brief Get the engine associated with this provider.
*
* @return The engine associated with this provider.
*/
engine& get_engine() {
return m_engine;
}
/**
* @brief Get the provider id.
*
* @return The provider id.
*/
uint16_t get_provider_id() const {
return m_provider_id;
}
};
} // namespace thallium
#endif
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __THALLIUM_PROVIDER_HANDLE_HPP
#define __THALLIUM_PROVIDER_HANDLE_HPP
#include <cstdint>
#include <string>
#include <margo.h>
#include <thallium/endpoint.hpp>
#include <thallium/margo_exception.hpp>
namespace thallium {
/**
* The provider_handle class encapsulates an enpoint
* with a provider id to reference a particular provider object
* at a given address. provider_handle inherites from endpoint
* so it can be used wherever endpoint is needed.
*/
class provider_handle : public endpoint {
private:
uint16_t m_provider_id;
public:
/**
* @brief Constructor.
*
* @param e enpoint to encapsulate.
* @param provider_id provider id.
*/
provider_handle(endpoint e, uint16_t provider_id=0)
: endpoint(std::move(e)), m_provider_id(provider_id) {}
/**
* @brief Default constructor defined so that provider_handlers can
* be member of other objects and assigned later.
*/
provider_handle() = default;
/**
* @brief Copy constructor.
*/
provider_handle(const provider_handle& other) = default;
/**
* @brief Move constructor.
*/
provider_handle(provider_handle&& other) = default;
/**
* @brief Copy-assignment operator.
*/
provider_handle& operator=(const provider_handle& other) = default;
/**
* @brief Move-assignment operator.
*/
provider_handle& operator=(provider_handle&& other) = default;
/**
* @brief Destructor.
*/
~provider_handle() = default;
/**
* @brief Get the provider id.
*
* @return the provider id.
*/
uint16_t provider_id() const {
return m_provider_id;
}
};
}
#endif
......@@ -12,6 +12,7 @@ namespace thallium {
class engine;
class endpoint;
class provider_handle;
class callable_remote_procedure;
/**
......@@ -26,9 +27,9 @@ class remote_procedure {
friend class engine;
private:
engine* m_engine;
hg_id_t m_id;
bool m_ignore_response;
engine* m_engine;
hg_id_t m_id;
bool m_ignore_response;
/**
* @brief Constructor. Made private because remote_procedure
......@@ -76,6 +77,17 @@ public:
*/
callable_remote_procedure on(const endpoint& ep) const;
/**
* @brief Creates a callable remote_procedure by associating the
* remote_procedure with a particular provider_handle.
*
* @param ph provider_handle with which to associate the procedure.
*
* @return a callable_remote_procedure.
*/
callable_remote_procedure on(const provider_handle& ph) const;
/**
* @brief Tell the remote_procedure that it should not expect responses.
*
......
......@@ -11,8 +11,9 @@
namespace thallium {
callable_remote_procedure::callable_remote_procedure(engine& e, hg_id_t id, const endpoint& ep, bool ignore_resp)
: m_engine(&e) {
callable_remote_procedure::callable_remote_procedure(
engine& e, hg_id_t id, const endpoint& ep, bool ignore_resp, uint16_t provider_id)
: m_engine(&e), m_ignore_response(ignore_resp), m_provider_id(provider_id) {
m_ignore_response = ignore_resp;
hg_return_t ret = margo_create(ep.m_engine->m_mid, ep.m_addr, id, &m_handle);
MARGO_ASSERT(ret, margo_create);
......
......@@ -6,6 +6,7 @@
#include <thallium/remote_procedure.hpp>
#include <thallium/callable_remote_procedure.hpp>
#include <thallium/engine.hpp>
#include <thallium/provider_handle.hpp>
namespace thallium {
......@@ -16,6 +17,10 @@ callable_remote_procedure remote_procedure::on(const endpoint& ep) const {
return callable_remote_procedure(*m_engine, m_id, ep, m_ignore_response);
}
callable_remote_procedure remote_procedure::on(const provider_handle& ph) const {
return callable_remote_procedure(*m_engine, m_id, ph, m_ignore_response, ph.provider_id());
}
remote_procedure& remote_procedure::ignore_response() {
m_ignore_response = true;
margo_registered_disable_response(m_engine->m_mid, m_id, HG_TRUE);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment