Commit 4a922e0f authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

added implementation of wait_any

parent dcd61c83
......@@ -11,10 +11,24 @@ int main(int argc, char** argv) {
tl::engine myEngine("tcp", THALLIUM_CLIENT_MODE);
tl::remote_procedure sum = myEngine.define("sum");
tl::endpoint server = myEngine.lookup(argv[1]);
auto response = sum.on(server).async(42,63);
int ret = response.wait();
std::cout << "Server answered " << ret << std::endl;
std::vector<tl::async_response> reqs;
for(unsigned i=0; i < 10; i++) {
auto response = sum.on(server).async(42,63);
reqs.push_back(std::move(response));
}
for(auto i=0; i < 10; i++) {
int ret;
decltype(reqs.begin()) completed;
ret = tl::async_response::wait_any(reqs.begin(), reqs.end(), completed);
reqs.erase(completed);
std::cout << "Server answered " << ret << std::endl;
}
return 0;
}
#include <cstdlib>
#include <iostream>
#include <thallium.hpp>
namespace tl = thallium;
tl::engine* theEngine;
void sum(const tl::request& req, int x, int y) {
std::cout << "Computing " << x << "+" << y << std::endl;
req.respond(x+y);
......@@ -11,6 +14,7 @@ void sum(const tl::request& req, int x, int y) {
int main(int argc, char** argv) {
tl::engine myEngine("tcp", THALLIUM_SERVER_MODE);
theEngine = &myEngine;
std::cout << "Server running at address " << myEngine.self() << std::endl;
myEngine.define("sum", sum);
......
......@@ -9,6 +9,8 @@
#include <thallium/margo_exception.hpp>
#include <thallium/buffer.hpp>
#include <thallium/packed_response.hpp>
#include <vector>
#include <utility>
namespace thallium {
......@@ -72,9 +74,22 @@ public:
async_response& operator=(const async_response& other) = delete;
/**
* @brief Move-assignment operator is deleted.
* @brief Move-assignment operator. Will invalidate
* the moved-from object.
*/
async_response& operator=(async_response&& other) = delete;
async_response& operator=(async_response&& other) {
if(this == &other) return *this;
if(m_handle != HG_HANDLE_NULL)
margo_destroy(m_handle);
m_request = other.m_request;
m_engine = other.m_engine;
m_handle = other.m_handle;
m_ignore_response = other.m_ignore_response;
other.m_request = MARGO_REQUEST_NULL;
other.m_engine = nullptr;
other.m_handle = HG_HANDLE_NULL;
return *this;
}
/**
* @brief Destructor.
......@@ -104,6 +119,47 @@ public:
MARGO_ASSERT((hg_return_t)ret, margo_test);
return flag;
}
/**
* @brief Waits for any of the provided async_response to complete,
* and return a packed_response. The completed iterator will be set to point to the
* async_response that completed. This method may throw a timeout
* if any of the requests timed out, or other exceptions if an error
* happens. Even if an exception is thrown, the completed iterator will be
* correctly set to point to the async_response in cause.
*
* @tparam Iterator Iterator type (e.g. std::vector<async_response>::iterator)
* @param begin Begin iterator
* @param end End iterator
*
* @return a packed_response.
*/
template<typename Iterator>
static packed_response wait_any(const Iterator& begin, const Iterator& end, Iterator& completed) {
std::vector<margo_request> reqs;
size_t count = std::distance(begin,end);
reqs.reserve(count);
for(auto it = begin; it != end; it++) {
reqs.push_back(it->m_request);
}
completed = begin;
size_t index = 0;
hg_return_t ret = margo_wait_any(count, reqs.data(), &index);
std::advance(completed, index);
if(ret == HG_TIMEOUT) {
throw timeout();
}
MARGO_ASSERT(ret, margo_wait_any);
buffer output;
if(completed->m_ignore_response) {
return packed_response(std::move(output), *(completed->m_engine));
}
ret = margo_get_output(completed->m_handle, &output);
MARGO_ASSERT(ret, margo_get_output);
ret = margo_free_output(completed->m_handle, &output); // won't do anything on a buffer type
MARGO_ASSERT(ret, margo_free_output);
return packed_response(std::move(output), *(completed->m_engine));
}
};
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment