async_response.hpp 5.07 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
/*
 * (C) 2017 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
#ifndef __THALLIUM_ASYNC_RESPONSE_HPP
#define __THALLIUM_ASYNC_RESPONSE_HPP

#include <thallium/margo_exception.hpp>
#include <thallium/buffer.hpp>
#include <thallium/packed_response.hpp>
12
13
#include <vector>
#include <utility>
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

namespace thallium {

class callable_remote_procedure;

/**
 * @brief async_response objects are created by sending an
 * RPC in a non-blocking way. They can be used to wait for
 * the actual response.
 */
class async_response {

    friend class callable_remote_procedure;

private:

    margo_request              m_request;
    engine*                    m_engine;
32
    hg_handle_t                m_handle;
33
34
35
36
37
38
39
40
41
42
43
    bool                       m_ignore_response;

    /**
     * @brief Constructor. Made private since async_response
     * objects are created by callable_remote_procedure only.
     *
     * @param req Margo request to wait on.
     * @param e Engine associated with the RPC.
     * @param c callable_remote_procedure that created the async_response.
     * @param ignore_resp whether response should be ignored.
     */
44
45
46
47
    async_response(margo_request req, engine& e, hg_handle_t handle, bool ignore_resp)
    : m_request(req), m_engine(&e), m_handle(handle), m_ignore_response(ignore_resp) {
        margo_ref_incr(handle);
    }
48
49
50

public:

51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
    /**
     * @brief Copy constructor is deleted.
     */
    async_response(const async_response& other) = delete;

    /**
     * @brief Move-constructor.
     *
     * @param other async_response to move from.
     */
    async_response(async_response&& other)
    : m_request(other.m_request)
    , m_engine(other.m_engine)
    , m_handle(other.m_handle)
    , m_ignore_response(other.m_ignore_response) {
        other.m_request = MARGO_REQUEST_NULL;
        other.m_engine = nullptr;
        other.m_handle = HG_HANDLE_NULL;
    }

    /**
     * @brief Copy-assignment operator is deleted.
     */
    async_response& operator=(const async_response& other) = delete;

    /**
77
78
     * @brief Move-assignment operator. Will invalidate
     * the moved-from object.
79
     */
80
81
82
83
84
85
86
87
88
89
90
91
92
    async_response& operator=(async_response&& other) {
        if(this == &other) return *this;
        if(m_handle != HG_HANDLE_NULL)
            margo_destroy(m_handle);
        m_request       = other.m_request;
        m_engine        = other.m_engine;
        m_handle        = other.m_handle;
        m_ignore_response = other.m_ignore_response;
        other.m_request = MARGO_REQUEST_NULL;
        other.m_engine  = nullptr;
        other.m_handle  = HG_HANDLE_NULL;
        return *this;
    }
93

94
95
96
97
    /**
     * @brief Destructor.
     */
    ~async_response() {
98
99
        if(m_handle != HG_HANDLE_NULL)
            margo_destroy(m_handle);
100
101
    }

102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
    /**
     * @brief Waits for the async_response to be ready and returns
     * a packed_response when the response has been received.
     *
     * @return a packed_response containing the response.
     */
    packed_response wait();

    /**
     * @brief Tests without blocking if the response has been received.
     *
     * @return true if the response has been received, false otherwise.
     */
    bool received() const {
        int ret;
        int flag;
        ret = margo_test(m_request, &flag);
        MARGO_ASSERT((hg_return_t)ret, margo_test);
        return flag;
    }
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162

    /**
     * @brief Waits for any of the provided async_response to complete,
     * and return a packed_response. The completed iterator will be set to point to the
     * async_response that completed. This method may throw a timeout
     * if any of the requests timed out, or other exceptions if an error
     * happens. Even if an exception is thrown, the completed iterator will be
     * correctly set to point to the async_response in cause.
     *
     * @tparam Iterator Iterator type (e.g. std::vector<async_response>::iterator)
     * @param begin Begin iterator
     * @param end End iterator
     *
     * @return a packed_response.
     */
    template<typename Iterator>
    static packed_response wait_any(const Iterator& begin, const Iterator& end, Iterator& completed) {
        std::vector<margo_request> reqs;
        size_t count = std::distance(begin,end);
        reqs.reserve(count);
        for(auto it = begin; it != end; it++) {
            reqs.push_back(it->m_request);
        }
        completed = begin;
        size_t index = 0;
        hg_return_t ret = margo_wait_any(count, reqs.data(), &index);
        std::advance(completed, index);
        if(ret == HG_TIMEOUT) {
            throw timeout();
        }
        MARGO_ASSERT(ret, margo_wait_any);
        buffer output;
        if(completed->m_ignore_response) {
            return packed_response(std::move(output), *(completed->m_engine));
        }
        ret = margo_get_output(completed->m_handle, &output);
        MARGO_ASSERT(ret, margo_get_output);
        ret = margo_free_output(completed->m_handle, &output); // won't do anything on a buffer type
        MARGO_ASSERT(ret, margo_free_output);
        return packed_response(std::move(output), *(completed->m_engine));
    }
163
164
165
166
167
};

}

#endif