Commit daae9466 authored by Matthieu Dorier's avatar Matthieu Dorier

added timeouts

parent da3cb122
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include <thallium/anonymous.hpp> #include <thallium/anonymous.hpp>
#include <thallium/bulk_mode.hpp> #include <thallium/bulk_mode.hpp>
#include <thallium/bulk.hpp> #include <thallium/bulk.hpp>
#include <thallium/timeout.hpp>
#include <thallium/engine.hpp> #include <thallium/engine.hpp>
#include <thallium/endpoint.hpp> #include <thallium/endpoint.hpp>
#include <thallium/remote_procedure.hpp> #include <thallium/remote_procedure.hpp>
......
...@@ -9,8 +9,10 @@ ...@@ -9,8 +9,10 @@
#include <tuple> #include <tuple>
#include <cstdint> #include <cstdint>
#include <utility> #include <utility>
#include <chrono>
#include <margo.h> #include <margo.h>
#include <thallium/buffer.hpp> #include <thallium/buffer.hpp>
#include <thallium/timeout.hpp>
#include <thallium/packed_response.hpp> #include <thallium/packed_response.hpp>
#include <thallium/async_response.hpp> #include <thallium/async_response.hpp>
#include <thallium/serialization/serialize.hpp> #include <thallium/serialization/serialize.hpp>
...@@ -58,13 +60,26 @@ private: ...@@ -58,13 +60,26 @@ private:
* in which the arguments have been serialized. * in which the arguments have been serialized.
* *
* @param buf Buffer containing a serialized version of the arguments. * @param buf Buffer containing a serialized version of the arguments.
* @param timeout_ms Timeout in milliseconds. After this timeout, a timeout exception is thrown.
* *
* @return a packed_response object from which the returned value can be deserialized. * @return a packed_response object from which the returned value can be deserialized.
*/ */
packed_response forward(const buffer& buf) const { packed_response forward(const buffer& buf, double timeout_ms=-1.0) const {
hg_return_t ret; hg_return_t ret;
ret = margo_provider_forward(m_provider_id, if(timeout_ms > 0.0) {
m_handle, const_cast<void*>(static_cast<const void*>(&buf))); ret = margo_provider_forward_timed(
m_provider_id,
m_handle,
const_cast<void*>(static_cast<const void*>(&buf)),
timeout_ms);
if(ret == HG_TIMEOUT)
throw timeout();
} else {
ret = margo_provider_forward(
m_provider_id,
m_handle,
const_cast<void*>(static_cast<const void*>(&buf)));
}
MARGO_ASSERT(ret, margo_forward); MARGO_ASSERT(ret, margo_forward);
buffer output; buffer output;
if(m_ignore_response) return packed_response(std::move(output), *m_engine); if(m_ignore_response) return packed_response(std::move(output), *m_engine);
...@@ -80,14 +95,30 @@ private: ...@@ -80,14 +95,30 @@ private:
* in which the arguments have been serialized. The RPC is sent in a non-blocking manner. * in which the arguments have been serialized. The RPC is sent in a non-blocking manner.
* *
* @param buf Buffer containing a serialized version of the arguments. * @param buf Buffer containing a serialized version of the arguments.
* @param timeout_ms Optional timeout after which to throw a timeout exception.
* *
* @return an async_response object that can be waited on. * @return an async_response object that can be waited on.
*
* Note: If the request times out, the timeout exception will occure when calling wait()
* on the async_response.
*/ */
async_response iforward(const buffer& buf) { async_response iforward(const buffer& buf, double timeout_ms=-1.0) const {
hg_return_t ret; hg_return_t ret;
margo_request req; margo_request req;
ret = margo_provider_iforward(m_provider_id, if(timeout_ms > 0.0) {
m_handle, const_cast<void*>(static_cast<const void*>(&buf)), &req); ret = margo_provider_iforward_timed(
m_provider_id,
m_handle,
const_cast<void*>(static_cast<const void*>(&buf)),
timeout_ms,
&req);
} else {
ret = margo_provider_iforward(
m_provider_id,
m_handle,
const_cast<void*>(static_cast<const void*>(&buf)),
&req);
}
MARGO_ASSERT(ret, margo_iforward); MARGO_ASSERT(ret, margo_iforward);
return async_response(req, *m_engine, m_handle, m_ignore_response); return async_response(req, *m_engine, m_handle, m_ignore_response);
} }
...@@ -165,13 +196,37 @@ public: ...@@ -165,13 +196,37 @@ public:
* @return a packed_response object containing the returned value. * @return a packed_response object containing the returned value.
*/ */
template<typename ... T> template<typename ... T>
packed_response operator()(T&& ... t) const { packed_response operator()(T&& ... args) const {
buffer b; buffer b;
buffer_output_archive arch(b, *m_engine); buffer_output_archive arch(b, *m_engine);
serialize_many(arch, std::forward<T>(t)...); serialize_many(arch, std::forward<T>(args)...);
return forward(b); return forward(b);
} }
/**
* @brief Same as operator() but takes a first parameter representing
* a timeout (std::duration object). If no response is received from
* the server before this timeout, the request is cancelled and tl::timeout
* is thrown.
*
* @tparam R
* @tparam P
* @tparam T
* @param t Timeout.
* @param args Parameters of the RPC.
*
* @return a packed_response object containing the returned value.
*/
template<typename R, typename P, typename ... T>
packed_response timed(const std::chrono::duration<R,P>& t, T&& ... args) const {
buffer b;
buffer_output_archive arch(b, *m_engine);
serialize_many(arch, std::forward<T>(args)...);
std::chrono::duration<double, std::milli> fp_ms = t;
double timeout_ms = fp_ms.count();
return forward(b, timeout_ms);
}
/** /**
* @brief Operator to call the RPC without any argument. * @brief Operator to call the RPC without any argument.
* *
...@@ -182,6 +237,23 @@ public: ...@@ -182,6 +237,23 @@ public:
return forward(b); return forward(b);
} }
/**
* @brief Same as operator() with only a timeout value.
*
* @tparam R
* @tparam P
* @param t Timeout.
*
* @return a packed_response object containing the returned value.
*/
template<typename R, typename P>
packed_response timed(const std::chrono::duration<R,P>& t) const {
buffer b;
std::chrono::duration<double, std::milli> fp_ms = t;
double timeout_ms = fp_ms.count();
return forward(b, timeout_ms);
}
/** /**
* @brief Issues an RPC in a non-blocking way. Will serialize the arguments * @brief Issues an RPC in a non-blocking way. Will serialize the arguments
* in a buffer and send the RPC to the endpoint. * in a buffer and send the RPC to the endpoint.
...@@ -199,6 +271,29 @@ public: ...@@ -199,6 +271,29 @@ public:
return iforward(b); return iforward(b);
} }
/**
* @brief Asynchronous RPC call with a timeout. If the operation times out,
* the wait() call on the returned async_response object will throw a tl::timeout
* exception.
*
* @tparam R
* @tparam P
* @tparam T
* @param t Timeout.
* @param args Parameters of the RPC.
*
* @return an async_response object that the caller can wait on.
*/
template<typename R, typename P, typename ... T>
async_response timed_async(const std::chrono::duration<R,P>& t, T&& ... args) {
buffer b;
buffer_output_archive arch(b, *m_engine);
serialize_many(arch, std::forward<T>(args)...);
std::chrono::duration<double, std::milli> fp_ms = t;
double timeout_ms = fp_ms.count();
return iforward(b, timeout_ms);
}
/** /**
* @brief Non-blocking call to the RPC without any argument. * @brief Non-blocking call to the RPC without any argument.
* *
...@@ -208,6 +303,23 @@ public: ...@@ -208,6 +303,23 @@ public:
buffer b; buffer b;
return iforward(b); return iforward(b);
} }
/**
* @brief Same as async() but with a specified timeout.
*
* @tparam R
* @tparam P
* @param t Timeout.
*
* @return an async_response object that the caller can wait on.
*/
template<typename R, typename P>
async_response timed_async(const std::chrono::duration<R,P>& t) {
buffer b;
std::chrono::duration<double, std::milli> fp_ms = t;
double timeout_ms = fp_ms.count();
return iforward(b, timeout_ms);
}
}; };
} }
......
...@@ -24,11 +24,11 @@ class callable_remote_procedure; ...@@ -24,11 +24,11 @@ class callable_remote_procedure;
*/ */
class remote_procedure { class remote_procedure {
friend class engine; friend class engine;
private: private:
engine* m_engine; engine* m_engine;
hg_id_t m_id; hg_id_t m_id;
bool m_ignore_response; bool m_ignore_response;
/** /**
...@@ -38,34 +38,34 @@ private: ...@@ -38,34 +38,34 @@ private:
* @param e Engine object that created the remote_procedure. * @param e Engine object that created the remote_procedure.
* @param id Mercury RPC id. * @param id Mercury RPC id.
*/ */
remote_procedure(engine& e, hg_id_t id); remote_procedure(engine& e, hg_id_t id);
public: public:
/** /**
* @brief Copy-constructor is default. * @brief Copy-constructor is default.
*/ */
remote_procedure(const remote_procedure& other) = default; remote_procedure(const remote_procedure& other) = default;
/** /**
* @brief Move-constructor is default. * @brief Move-constructor is default.
*/ */
remote_procedure(remote_procedure&& other) = default; remote_procedure(remote_procedure&& other) = default;
/** /**
* @brief Copy-assignment operator is default. * @brief Copy-assignment operator is default.
*/ */
remote_procedure& operator=(const remote_procedure& other) = default; remote_procedure& operator=(const remote_procedure& other) = default;
/** /**
* @brief Move-assignment operator is default. * @brief Move-assignment operator is default.
*/ */
remote_procedure& operator=(remote_procedure&& other) = default; remote_procedure& operator=(remote_procedure&& other) = default;
/** /**
* @brief Destructor is default. * @brief Destructor is default.
*/ */
~remote_procedure() = default; ~remote_procedure() = default;
/** /**
* @brief Creates a callable_remote_procedure by associating the * @brief Creates a callable_remote_procedure by associating the
...@@ -75,7 +75,7 @@ public: ...@@ -75,7 +75,7 @@ public:
* *
* @return a callable_remote_procedure. * @return a callable_remote_procedure.
*/ */
callable_remote_procedure on(const endpoint& ep) const; callable_remote_procedure on(const endpoint& ep) const;
/** /**
......
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __THALLIUM_TIMEOUT_HPP
#define __THALLIUM_TIMEOUT_HPP
#include <exception>
#include <stdexcept>
namespace thallium {
/**
* @brief This exception is thrown when an RPC invoked with
* timed() or timed_async times out.
*/
class timeout : public std::exception {
public:
virtual const char* what() const throw() {
return "Request timed out";
}
};
}
#endif
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
* *
* See COPYRIGHT in top-level directory. * See COPYRIGHT in top-level directory.
*/ */
#include <thallium/timeout.hpp>
#include <thallium/async_response.hpp> #include <thallium/async_response.hpp>
#include <thallium/callable_remote_procedure.hpp> #include <thallium/callable_remote_procedure.hpp>
...@@ -11,6 +12,9 @@ namespace thallium { ...@@ -11,6 +12,9 @@ namespace thallium {
packed_response async_response::wait() { packed_response async_response::wait() {
hg_return_t ret; hg_return_t ret;
ret = margo_wait(m_request); ret = margo_wait(m_request);
if(ret == HG_TIMEOUT) {
throw timeout();
}
MARGO_ASSERT(ret, margo_wait); MARGO_ASSERT(ret, margo_wait);
buffer output; buffer output;
if(m_ignore_response) return packed_response(std::move(output), *m_engine); if(m_ignore_response) return packed_response(std::move(output), *m_engine);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment