diff --git a/examples/09_provider/CMakeLists.txt b/examples/09_provider/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..c19ef2fa4ee2ef74a9fc42791a54565ea4ccc815 --- /dev/null +++ b/examples/09_provider/CMakeLists.txt @@ -0,0 +1,4 @@ +add_executable(09_sum_provider server.cpp) +target_link_libraries(09_sum_provider thallium) +add_executable(09_sum_client client.cpp) +target_link_libraries(09_sum_client thallium) diff --git a/examples/09_provider/client.cpp b/examples/09_provider/client.cpp new file mode 100644 index 0000000000000000000000000000000000000000..5c3dae1620560babb26213b70a71537b50d38cc3 --- /dev/null +++ b/examples/09_provider/client.cpp @@ -0,0 +1,30 @@ +#include +#include +#include + +namespace tl = thallium; + +int main(int argc, char** argv) { + if(argc != 3) { + std::cerr << "Usage: " << argv[0] << "
" << std::endl; + exit(0); + } + tl::engine myEngine("tcp", THALLIUM_CLIENT_MODE); + tl::remote_procedure sum = myEngine.define("sum"); + tl::remote_procedure prod = myEngine.define("prod"); + tl::remote_procedure hello = myEngine.define("hello").ignore_response(); + tl::remote_procedure print = myEngine.define("print").ignore_response(); + tl::endpoint server = myEngine.lookup(argv[1]); + uint16_t provider_id = atoi(argv[2]); + tl::provider_handle ph(server, provider_id); + int ret = sum.on(ph)(42,63); + std::cout << "(sum) Server answered " << ret << std::endl; + ret = prod.on(ph)(42,63); + std::cout << "(prod) Server answered " << ret << std::endl; + std::string name("Matthieu"); + hello.on(ph)(name); + print.on(ph)(name); + + return 0; +} + diff --git a/examples/09_provider/server.cpp b/examples/09_provider/server.cpp new file mode 100644 index 0000000000000000000000000000000000000000..2676044e50edcafa80d612a3b1d28d92fa443e09 --- /dev/null +++ b/examples/09_provider/server.cpp @@ -0,0 +1,55 @@ +#include +#include +#include + +namespace tl = thallium; + +class my_sum_provider : public tl::provider { + + private: + + void prod(const tl::request& req, int x, int y) { + std::cout << "Computing " << x << "*" << y << std::endl; + req.respond(x+y); + } + + int sum(int x, int y) { + std::cout << "Computing " << x << "+" << y << std::endl; + return x+y; + } + + void hello(const std::string& name) { + std::cout << "Hello, " << name << std::endl; + } + + int print(const std::string& word) { + std::cout << "Printing " << word << std::endl; + return word.size(); + } + + public: + + my_sum_provider(tl::engine& e, uint16_t provider_id=1) + : tl::provider(e, provider_id) { + define("prod", &my_sum_provider::prod); + define("sum", &my_sum_provider::sum); + define("hello", &my_sum_provider::hello); + define("print", &my_sum_provider::print, tl::ignore_return_value()); + } + + ~my_sum_provider() { + wait_for_finalize(); + } +}; + +int main(int argc, char** argv) { + + uint16_t provider_id = 22; + tl::engine myEngine("tcp", THALLIUM_SERVER_MODE); + std::cout << "Server running at address " << myEngine.self() + << " with provider id " << provider_id << std::endl; + my_sum_provider myProvider(myEngine, provider_id); + + return 0; +} + diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 2ddadd8c735f7f4a53870d9ae01de0415f1d037c..dac6afb9c37b09147b7ae3d2d464f8522d9f5f47 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -7,3 +7,4 @@ add_subdirectory(05_stl) add_subdirectory(06_custom) add_subdirectory(07_rdma) add_subdirectory(08_async) +add_subdirectory(09_provider) diff --git a/include/thallium.hpp b/include/thallium.hpp index 5aaec94b0d36a0d2a9bec6661c01f7f9400bc8a4..20131af958edd9a726de5af60a99bcd0a40eb824 100644 --- a/include/thallium.hpp +++ b/include/thallium.hpp @@ -9,5 +9,7 @@ #include #include #include +#include +#include #endif diff --git a/include/thallium/callable_remote_procedure.hpp b/include/thallium/callable_remote_procedure.hpp index 93b2c3f28eba502c4b7f4ca6b4721332a65e837d..a6c9dad0e9e6a05e250ed32017ecc5bf8bc3133e 100644 --- a/include/thallium/callable_remote_procedure.hpp +++ b/include/thallium/callable_remote_procedure.hpp @@ -39,6 +39,7 @@ private: engine* m_engine; hg_handle_t m_handle; bool m_ignore_response; + uint16_t m_provider_id; /** * @brief Constructor. Made private since callable_remote_procedure can only @@ -49,7 +50,8 @@ private: * @param ep endpoint on which to call the RPC. * @param ignore_resp whether the response should be ignored. */ - callable_remote_procedure(engine& e, hg_id_t id, const endpoint& ep, bool ignore_resp); + callable_remote_procedure(engine& e, hg_id_t id, const endpoint& ep, + bool ignore_resp, uint16_t provider_id=0); /** * @brief Sends the RPC to the endpoint (calls margo_forward), passing a buffer @@ -61,7 +63,8 @@ private: */ packed_response forward(const buffer& buf) const { hg_return_t ret; - ret = margo_forward(m_handle, const_cast(static_cast(&buf))); + ret = margo_provider_forward(m_provider_id, + m_handle, const_cast(static_cast(&buf))); MARGO_ASSERT(ret, margo_forward); buffer output; if(m_ignore_response) return packed_response(std::move(output), *m_engine); @@ -83,7 +86,8 @@ private: async_response iforward(const buffer& buf) { hg_return_t ret; margo_request req; - ret = margo_iforward(m_handle, const_cast(static_cast(&buf)), &req); + ret = margo_provider_iforward(m_provider_id, + m_handle, const_cast(static_cast(&buf)), &req); MARGO_ASSERT(ret, margo_iforward); return async_response(req, *m_engine, *this, m_ignore_response); } diff --git a/include/thallium/endpoint.hpp b/include/thallium/endpoint.hpp index 6ebc4b84a5b6b33c664c9063591176367b2794a2..1b5c87e5c3a9dcce858420772f38353d0f77c193 100644 --- a/include/thallium/endpoint.hpp +++ b/include/thallium/endpoint.hpp @@ -11,6 +11,13 @@ #include #include +namespace thallium { + class endpoint; +} + +template +S& operator<<(S& s, const thallium::endpoint& e); + namespace thallium { class engine; @@ -95,8 +102,30 @@ public: bool is_null() const { return m_addr == HG_ADDR_NULL; } + + template + friend S& ::operator<<(S& s, const endpoint& e); }; } +/** + * @brief Streaming operator for endpoint, converts the endpoint + * into a string before feeding it to the string operator. This + * enables, for instance, streaming the endpoint into std::cout + * for logging without having to explicitely convert it into a + * string. + * + * @tparam S Type of stream. + * @param s Stream. + * @param e Endpoint. + * + * @return Reference to the provided stream. + */ +template +S& operator<<(S& s, const thallium::endpoint& e) { + s << (std::string)e; + return s; +} + #endif diff --git a/include/thallium/engine.hpp b/include/thallium/engine.hpp index 93cb803f70cdebf0953ef2654ee20c9896fc2abd..53390660b9eb6af71bc9e7572e6dbf86276ea01a 100644 --- a/include/thallium/engine.hpp +++ b/include/thallium/engine.hpp @@ -27,6 +27,7 @@ class bulk; class endpoint; class remote_bulk; class remote_procedure; +template class provider; /** * @brief The engine class is at the core of Thallium, @@ -42,6 +43,8 @@ class engine { friend class remote_bulk; friend class remote_procedure; friend class callable_remote_procedure; + template + friend class provider; private: @@ -204,6 +207,16 @@ public: margo_finalize(m_mid); } + /** + * @brief Makes the calling thread block until someone calls + * finalize on this engine. This function will not do anything + * if finalize was already called. + */ + void wait_for_finalize() { + if(!m_finalize_called) + margo_wait_for_finalize(m_mid); + } + /** * @brief Creates an endpoint from this engine. * @@ -228,12 +241,15 @@ public: * @tparam Args Types of arguments accepted by the RPC. * @param name Name of the RPC. * @param fun Function to associate with the RPC. + * @param provider_id ID of the provider registering this RPC. + * @param pool Argobots pool to use when receiving this type of RPC * * @return a remote_procedure object. */ template remote_procedure define(const std::string& name, - const std::function& fun); + const std::function& fun, + uint16_t provider_id=0, ABT_pool pool=ABT_POOL_NULL); /** * @brief Defines an RPC with a name and a function pointer @@ -242,11 +258,14 @@ public: * @tparam Args Types of arguments accepted by the RPC. * @param name Name of the RPC. * @param f Function to associate with the RPC. + * @param provider_id ID of the provider registering this RPC. + * @param pool Argobots pool to use when receiving this type of RPC. * * @return a remote_procedure object. */ template - remote_procedure define(const std::string& name, void (*f)(const request&, Args...)); + remote_procedure define(const std::string& name, void (*f)(const request&, Args...), + uint16_t provider_id=0, ABT_pool pool=ABT_POOL_NULL); /** * @brief Lookup an address and returns an endpoint object @@ -268,12 +287,6 @@ public: */ bulk expose(const std::vector>& segments, bulk_mode flag); - /** - * @brief String representation of the engine's address. - * - * @return String representation of the engine's address. - */ - operator std::string() const; }; } // namespace thallium @@ -288,12 +301,15 @@ namespace thallium { template remote_procedure engine::define(const std::string& name, - const std::function& fun) { + const std::function& fun, + uint16_t provider_id, ABT_pool pool) { - hg_id_t id = margo_register_name(m_mid, name.c_str(), - process_buffer, - process_buffer, - rpc_callback); + hg_id_t id = margo_provider_register_name(m_mid, name.c_str(), + process_buffer, + process_buffer, + rpc_callback, + provider_id, + pool); m_rpcs[id] = [fun,this](const request& r, const buffer& b) { std::function l = [&fun, &r](Args&&... args) { @@ -318,8 +334,12 @@ remote_procedure engine::define(const std::string& name, } template -remote_procedure engine::define(const std::string& name, void (*f)(const request&, Args...)) { - return define(name, std::function(f)); +remote_procedure engine::define( + const std::string& name, + void (*f)(const request&, Args...), + uint16_t provider_id, ABT_pool pool) { + + return define(name, std::function(f), provider_id, pool); } } diff --git a/include/thallium/provider.hpp b/include/thallium/provider.hpp new file mode 100644 index 0000000000000000000000000000000000000000..f080e80f3d925c1e519ee77c983b322d57fc25ba --- /dev/null +++ b/include/thallium/provider.hpp @@ -0,0 +1,174 @@ +/* + * (C) 2017 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ +#ifndef __THALLIUM_PROVIDER_HPP +#define __THALLIUM_PROVIDER_HPP + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace thallium { + +typedef std::integral_constant ignore_return_value; + +/** + * @brief The provider class represents an object that + * exposes its member functions as RPCs. It is a template + * class meant to be used as base class for other objects. + * For example: + * class MyProvider : public thallium::provider { ... }; + * + * @tparam T + */ +template +class provider { + +private: + + engine& m_engine; + uint16_t m_provider_id; + +public: + + provider(engine& e, uint16_t provider_id) + : m_engine(e), m_provider_id(provider_id) {} + + virtual ~provider() {} + + /** + * @brief Copy-constructor is deleted. + */ + provider(const provider& other) = delete; + + /** + * @brief Move-constructor is deleted. + */ + provider(provider&& other) = delete; + + /** + * @brief Move-assignment operator is deleted. + */ + provider& operator=(provider&& other) = delete; + + /** + * @brief Copy-assignment operator is deleted. + */ + provider& operator=(const provider& other) = delete; + + /** + * @brief Waits for the engine to finalize. + */ + inline void wait_for_finalize() { + m_engine.wait_for_finalize(); + } + + /** + * @brief Finalize the engine. + */ + inline void finalize() { + m_engine.finalize(); + } + + /** + * @brief Defines an RPC using a member function of the child class. + * The member function should have a const request& as first parameter + * and use this request parameter to respond to the client. + * + * @tparam S type of the name (e.g. C-like string or std::string) + * @tparam R return value of the member function + * @tparam Args Arguments of the member function + * @param name name of the RPC + * @param T::*func member function + */ + template + void define(S&& name, R(T::*func)(const request&, Args...)) { + T* self = dynamic_cast(this); + std::function fun = [self, func](const request& r, Args... args) { + (self->*func)(r, args...); + }; + m_engine.define(std::forward(name), fun, m_provider_id); + } + +private: + + template + void define_member(S&& name, R(T::*func)(Args...), const std::integral_constant&) { + T* self = dynamic_cast(this); + std::function fun = [self, func](const request& req, Args... args) { + R r = (self->*func)(args...); + req.respond(r); + }; + m_engine.define(std::forward(name), fun, m_provider_id); + } + + template + void define_member(S&& name, R(T::*func)(Args...), const std::integral_constant&) { + T* self = dynamic_cast(this); + std::function fun = [self, func](const request& req, Args... args) { + (self->*func)(args...); + }; + m_engine.define(std::forward(name), fun, m_provider_id).ignore_response(); + } + +public: + + /** + * @brief Defines an RPC from a member function of the child class. + * This member function doesn't have a const request& paramater, so + * the RPC will be formed by assuming the return value of the function + * is what is sent back to the client (nothing is sent back if the + * return type is void). If the member function returns something but + * this return value should not be sent back to the client, the caller + * can pass ignore_return_value() as last argument of define(). + * + * @tparam S Type of the RPC name (e.g. std::string) + * @tparam R Return value of the member function. + * @tparam Args Argument types of the member function. + * @tparam X Dispatcher type. + * @param name Name of the RPC. + * @param T::*func Member function. + */ + template::type> + inline void define(S&& name, R(T::*func)(Args...), X x = X()) { + define_member(std::forward(name), func, x); + } + + /** + * @brief Get the engine associated with this provider. + * + * @return The engine associated with this provider. + */ + const engine& get_engine() const { + return m_engine; + } + + /** + * @brief Get the engine associated with this provider. + * + * @return The engine associated with this provider. + */ + engine& get_engine() { + return m_engine; + } + + /** + * @brief Get the provider id. + * + * @return The provider id. + */ + uint16_t get_provider_id() const { + return m_provider_id; + } +}; + +} // namespace thallium + +#endif diff --git a/include/thallium/provider_handle.hpp b/include/thallium/provider_handle.hpp new file mode 100644 index 0000000000000000000000000000000000000000..e10ef8b33166ccbf20d181be73351def9c7fac61 --- /dev/null +++ b/include/thallium/provider_handle.hpp @@ -0,0 +1,83 @@ +/* + * (C) 2017 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ +#ifndef __THALLIUM_PROVIDER_HANDLE_HPP +#define __THALLIUM_PROVIDER_HANDLE_HPP + +#include +#include +#include +#include +#include + +namespace thallium { + +/** + * The provider_handle class encapsulates an enpoint + * with a provider id to reference a particular provider object + * at a given address. provider_handle inherites from endpoint + * so it can be used wherever endpoint is needed. + */ +class provider_handle : public endpoint { + +private: + + uint16_t m_provider_id; + +public: + + /** + * @brief Constructor. + * + * @param e enpoint to encapsulate. + * @param provider_id provider id. + */ + provider_handle(endpoint e, uint16_t provider_id=0) + : endpoint(std::move(e)), m_provider_id(provider_id) {} + + /** + * @brief Default constructor defined so that provider_handlers can + * be member of other objects and assigned later. + */ + provider_handle() = default; + + /** + * @brief Copy constructor. + */ + provider_handle(const provider_handle& other) = default; + + /** + * @brief Move constructor. + */ + provider_handle(provider_handle&& other) = default; + + /** + * @brief Copy-assignment operator. + */ + provider_handle& operator=(const provider_handle& other) = default; + + /** + * @brief Move-assignment operator. + */ + provider_handle& operator=(provider_handle&& other) = default; + + /** + * @brief Destructor. + */ + ~provider_handle() = default; + + /** + * @brief Get the provider id. + * + * @return the provider id. + */ + uint16_t provider_id() const { + return m_provider_id; + } +}; + +} + +#endif diff --git a/include/thallium/remote_procedure.hpp b/include/thallium/remote_procedure.hpp index 6336a012cd4ff3a4e28e721790710892048d1ec8..66718e494d885d9668b30da172ef25591306497b 100644 --- a/include/thallium/remote_procedure.hpp +++ b/include/thallium/remote_procedure.hpp @@ -12,6 +12,7 @@ namespace thallium { class engine; class endpoint; +class provider_handle; class callable_remote_procedure; /** @@ -26,9 +27,9 @@ class remote_procedure { friend class engine; private: - engine* m_engine; - hg_id_t m_id; - bool m_ignore_response; + engine* m_engine; + hg_id_t m_id; + bool m_ignore_response; /** * @brief Constructor. Made private because remote_procedure @@ -76,6 +77,17 @@ public: */ callable_remote_procedure on(const endpoint& ep) const; + + /** + * @brief Creates a callable remote_procedure by associating the + * remote_procedure with a particular provider_handle. + * + * @param ph provider_handle with which to associate the procedure. + * + * @return a callable_remote_procedure. + */ + callable_remote_procedure on(const provider_handle& ph) const; + /** * @brief Tell the remote_procedure that it should not expect responses. * diff --git a/src/callable_remote_procedure.cpp b/src/callable_remote_procedure.cpp index 16c4c941c5fd2f24dea9d0f4d3e06f6a6a3cf715..61cec60dcd444d14f6540442ae1f548376984389 100644 --- a/src/callable_remote_procedure.cpp +++ b/src/callable_remote_procedure.cpp @@ -11,8 +11,9 @@ namespace thallium { -callable_remote_procedure::callable_remote_procedure(engine& e, hg_id_t id, const endpoint& ep, bool ignore_resp) -: m_engine(&e) { +callable_remote_procedure::callable_remote_procedure( + engine& e, hg_id_t id, const endpoint& ep, bool ignore_resp, uint16_t provider_id) +: m_engine(&e), m_ignore_response(ignore_resp), m_provider_id(provider_id) { m_ignore_response = ignore_resp; hg_return_t ret = margo_create(ep.m_engine->m_mid, ep.m_addr, id, &m_handle); MARGO_ASSERT(ret, margo_create); diff --git a/src/remote_procedure.cpp b/src/remote_procedure.cpp index 8c9063abd337c57987f7f64a6acd0a4630da10be..cd12a99d57e71d07b86d975b56f30cb7f5b4e7da 100644 --- a/src/remote_procedure.cpp +++ b/src/remote_procedure.cpp @@ -6,6 +6,7 @@ #include #include #include +#include namespace thallium { @@ -16,6 +17,10 @@ callable_remote_procedure remote_procedure::on(const endpoint& ep) const { return callable_remote_procedure(*m_engine, m_id, ep, m_ignore_response); } +callable_remote_procedure remote_procedure::on(const provider_handle& ph) const { + return callable_remote_procedure(*m_engine, m_id, ph, m_ignore_response, ph.provider_id()); +} + remote_procedure& remote_procedure::ignore_response() { m_ignore_response = true; margo_registered_disable_response(m_engine->m_mid, m_id, HG_TRUE);