Commit 0deea5b8 authored by Matthieu Dorier's avatar Matthieu Dorier

Merge branch 'dev-async-api' into 'master'

added support for asynchronous forwards

See merge request !3
parents 49261340 ea480ba9
add_executable(08_async_sum_server server.cpp)
target_link_libraries(08_async_sum_server thallium)
add_executable(08_async_sum_client client.cpp)
target_link_libraries(08_async_sum_client thallium)
#include <iostream>
#include <thallium.hpp>
namespace tl = thallium;
int main(int argc, char** argv) {
if(argc != 2) {
std::cerr << "Usage: " << argv[0] << " <address>" << std::endl;
exit(0);
}
tl::engine myEngine("tcp", THALLIUM_CLIENT_MODE);
tl::remote_procedure sum = myEngine.define("sum");
tl::endpoint server = myEngine.lookup(argv[1]);
auto response = sum.on(server).async(42,63);
int ret = response.wait();
std::cout << "Server answered " << ret << std::endl;
return 0;
}
#include <iostream>
#include <thallium.hpp>
namespace tl = thallium;
void sum(const tl::request& req, int x, int y) {
std::cout << "Computing " << x << "+" << y << std::endl;
req.respond(x+y);
}
int main(int argc, char** argv) {
tl::engine myEngine("tcp", THALLIUM_SERVER_MODE);
std::cout << "Server running at address " << (std::string)myEngine.self() << std::endl;
myEngine.define("sum", sum);
return 0;
}
......@@ -6,3 +6,4 @@ add_subdirectory(04_stop)
add_subdirectory(05_stl)
add_subdirectory(06_custom)
add_subdirectory(07_rdma)
add_subdirectory(08_async)
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __THALLIUM_ASYNC_RESPONSE_HPP
#define __THALLIUM_ASYNC_RESPONSE_HPP
#include <thallium/margo_exception.hpp>
#include <thallium/buffer.hpp>
#include <thallium/packed_response.hpp>
#include <thallium/serialization/serialize.hpp>
#include <thallium/serialization/stl/vector.hpp>
#include <thallium/serialization/buffer_output_archive.hpp>
namespace thallium {
class callable_remote_procedure;
/**
* @brief async_response objects are created by sending an
* RPC in a non-blocking way. They can be used to wait for
* the actual response.
*/
class async_response {
friend class callable_remote_procedure;
private:
margo_request m_request;
engine* m_engine;
callable_remote_procedure& m_rpc;
buffer m_buffer;
bool m_ignore_response;
/**
* @brief Constructor. Made private since async_response
* objects are created by callable_remote_procedure only.
*
* @param req Margo request to wait on.
* @param e Engine associated with the RPC.
* @param c callable_remote_procedure that created the async_response.
* @param ignore_resp whether response should be ignored.
*/
async_response(margo_request req, engine& e, callable_remote_procedure& c, bool ignore_resp)
: m_request(req), m_engine(&e), m_rpc(c), m_ignore_response(ignore_resp) {}
public:
/**
* @brief Waits for the async_response to be ready and returns
* a packed_response when the response has been received.
*
* @return a packed_response containing the response.
*/
packed_response wait();
/**
* @brief Tests without blocking if the response has been received.
*
* @return true if the response has been received, false otherwise.
*/
bool received() const {
int ret;
int flag;
ret = margo_test(m_request, &flag);
MARGO_ASSERT((hg_return_t)ret, margo_test);
return flag;
}
};
}
#endif
......@@ -12,6 +12,7 @@
#include <margo.h>
#include <thallium/buffer.hpp>
#include <thallium/packed_response.hpp>
#include <thallium/async_response.hpp>
#include <thallium/serialization/serialize.hpp>
#include <thallium/serialization/stl/vector.hpp>
#include <thallium/serialization/buffer_output_archive.hpp>
......@@ -32,6 +33,7 @@ class endpoint;
class callable_remote_procedure {
friend class remote_procedure;
friend class async_response;
private:
engine* m_engine;
......@@ -70,17 +72,29 @@ private:
return packed_response(std::move(output), *m_engine);
}
/**
* @brief Sends the RPC to the endpoint (calls margo_iforward), passing a buffer
* in which the arguments have been serialized. The RPC is sent in a non-blocking manner.
*
* @param buf Buffer containing a serialized version of the arguments.
*
* @return an async_response object that can be waited on.
*/
async_response iforward(const buffer& buf) {
hg_return_t ret;
margo_request req;
ret = margo_iforward(m_handle, const_cast<void*>(static_cast<const void*>(&buf)), &req);
MARGO_ASSERT(ret, margo_iforward);
return async_response(req, *m_engine, *this, m_ignore_response);
}
public:
/**
* @brief Copy-constructor.
*/
/**
* @brief Copy-constructor.
*/
callable_remote_procedure(const callable_remote_procedure& other) {
hg_return_t ret;
if(m_handle != HG_HANDLE_NULL) {
ret = margo_destroy(m_handle);
MARGO_ASSERT(ret, margo_destroy);
}
m_handle = other.m_handle;
if(m_handle != HG_HANDLE_NULL) {
ret = margo_ref_incr(m_handle);
......@@ -92,10 +106,6 @@ public:
* @brief Move-constructor.
*/
callable_remote_procedure(callable_remote_procedure&& other) {
if(m_handle != HG_HANDLE_NULL) {
hg_return_t ret = margo_destroy(m_handle);
MARGO_ASSERT(ret, margo_destroy);
}
m_handle = other.m_handle;
other.m_handle = HG_HANDLE_NULL;
}
......@@ -167,6 +177,33 @@ public:
buffer b;
return forward(b);
}
/**
* @brief Issues an RPC in a non-blocking way. Will serialize the arguments
* in a buffer and send the RPC to the endpoint.
*
* @tparam T Types of the parameters.
* @param t Parameters of the RPC.
*
* @return an async_response object that the caller can wait on.
*/
template<typename ... T>
async_response async(T&& ... t) {
buffer b;
buffer_output_archive arch(b, *m_engine);
serialize_many(arch, std::forward<T>(t)...);
return iforward(b);
}
/**
* @brief Non-blocking call to the RPC without any argument.
*
* @return an async_response object that the caller can wait on.
*/
async_response async() {
buffer b;
return iforward(b);
}
};
}
......
......@@ -13,6 +13,7 @@
namespace thallium {
class callable_remote_procedure;
class async_response;
/**
* @brief packed_response objects are created as a reponse to
......@@ -22,6 +23,7 @@ class callable_remote_procedure;
class packed_response {
friend class callable_remote_procedure;
friend class async_response;
private:
......
......@@ -4,7 +4,8 @@
#
# list of source files
set(thallium-src bulk.cpp
set(thallium-src async_response.cpp
bulk.cpp
callable_remote_procedure.cpp
endpoint.cpp
engine.cpp
......
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <thallium/async_response.hpp>
#include <thallium/callable_remote_procedure.hpp>
namespace thallium {
packed_response async_response::wait() {
hg_return_t ret;
ret = margo_wait(m_request);
MARGO_ASSERT(ret, margo_wait);
buffer output;
if(m_ignore_response) return packed_response(std::move(output), *m_engine);
ret = margo_get_output(m_rpc.m_handle, &output);
MARGO_ASSERT(ret, margo_get_output);
ret = margo_free_output(m_rpc.m_handle, &output); // won't do anything on a buffer type
MARGO_ASSERT(ret, margo_free_output);
return packed_response(std::move(output), *m_engine);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment