swim-fd-ping.c 13.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12
/*
 * (C) 2016 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>

#include <mercury.h>
#include <margo.h>
13

14 15 16
#include "swim-fd.h"
#include "swim-fd-internal.h"

17 18 19
/* NOTE: keep these defines in sync with defs in swim.h */
#define hg_proc_swim_member_id_t        hg_proc_uint64_t
#define hg_proc_swim_member_status_t    hg_proc_uint8_t
Shane Snyder's avatar
Shane Snyder committed
20
#define hg_proc_swim_member_inc_nr_t    hg_proc_uint32_t
21 22 23
MERCURY_GEN_STRUCT_PROC(swim_member_state_t, \
    ((swim_member_inc_nr_t) (inc_nr)) \
    ((swim_member_status_t) (status)));
Shane Snyder's avatar
Shane Snyder committed
24
MERCURY_GEN_STRUCT_PROC(swim_member_update_t, \
Shane Snyder's avatar
Shane Snyder committed
25
    ((swim_member_id_t) (id)) \
26
    ((swim_member_state_t) (state)));
27 28 29 30 31 32

/* a swim message is the membership information piggybacked (gossiped)
 * on the ping and ack messages generated by the protocol
 */
typedef struct swim_message_s
{
33
    swim_member_id_t source_id;
Shane Snyder's avatar
Shane Snyder committed
34
    swim_member_inc_nr_t source_inc_nr;
35
    hg_size_t pb_buf_count;
Shane Snyder's avatar
Shane Snyder committed
36
    swim_member_update_t pb_buf[SWIM_MAX_PIGGYBACK_ENTRIES]; //TODO: dynamic array?
37 38
} swim_message_t;

39 40 41
/* HG encode/decode routines for SWIM RPCs */
static hg_return_t hg_proc_swim_message_t(
    hg_proc_t proc, void *data);
42 43 44 45 46 47 48

MERCURY_GEN_PROC(swim_dping_req_t, \
    ((swim_message_t) (msg)));
MERCURY_GEN_PROC(swim_dping_resp_t, \
    ((swim_message_t) (msg)));

MERCURY_GEN_PROC(swim_iping_req_t, \
Shane Snyder's avatar
Shane Snyder committed
49
    ((swim_member_id_t) (target_id)) \
50 51 52 53
    ((swim_message_t) (msg)));
MERCURY_GEN_PROC(swim_iping_resp_t, \
    ((swim_message_t) (msg)));

54 55
/* SWIM message pack/unpack prototypes */
static void swim_pack_message(
56
    swim_context_t *swim_ctx, swim_message_t *msg);
57
static void swim_unpack_message(
58
    swim_context_t *swim_ctx, swim_message_t *msg);
59

60 61 62
DECLARE_MARGO_RPC_HANDLER(swim_dping_recv_ult)
DECLARE_MARGO_RPC_HANDLER(swim_iping_recv_ult)

Shane Snyder's avatar
Shane Snyder committed
63 64
static hg_id_t swim_dping_rpc_id;
static hg_id_t swim_iping_rpc_id;
65 66

void swim_register_ping_rpcs(
67
    swim_context_t *swim_ctx)
68
{
69 70
    assert(swim_ctx != NULL);

71
    /* register RPC handlers for SWIM pings */
72
    swim_dping_rpc_id = MARGO_REGISTER(swim_ctx->mid, "swim_dping", swim_dping_req_t,
Shane Snyder's avatar
Shane Snyder committed
73
        swim_dping_resp_t, swim_dping_recv_ult);
Shane Snyder's avatar
Shane Snyder committed
74
    swim_iping_rpc_id = MARGO_REGISTER(swim_ctx->mid, "swim_iping", swim_iping_req_t,
Shane Snyder's avatar
Shane Snyder committed
75
        swim_iping_resp_t, swim_iping_recv_ult);
76

77
    /* register swim context data structure with each RPC type */
78
    /* XXX: this won't work for multiple groups ... */
79
    margo_register_data(swim_ctx->mid, swim_dping_rpc_id, swim_ctx, NULL);
Shane Snyder's avatar
Shane Snyder committed
80
    margo_register_data(swim_ctx->mid, swim_iping_rpc_id, swim_ctx, NULL);
81 82 83 84 85 86 87 88

    return;
}

/********************************
 *       SWIM direct pings      *
 ********************************/

Shane Snyder's avatar
Shane Snyder committed
89
static int swim_send_dping(
Shane Snyder's avatar
Shane Snyder committed
90
    swim_context_t *swim_ctx, swim_member_id_t dping_target_id, hg_addr_t dping_target_addr);
91 92 93 94

void swim_dping_send_ult(
    void *t_arg)
{
95 96
    swim_context_t *swim_ctx = (swim_context_t *)t_arg;
    swim_member_id_t dping_target_id;
97 98 99 100
    int ret;

    assert(swim_ctx != NULL);

Shane Snyder's avatar
Shane Snyder committed
101 102
    dping_target_id = swim_ctx->dping_target_id;
    ret = swim_send_dping(swim_ctx, dping_target_id, swim_ctx->dping_target_addr);
103 104 105 106 107 108
    if (ret == 0)
    {
        /* mark this dping req as acked, double checking to make
         * sure we aren't inadvertently ack'ing a ping request
         * for a more recent tick of the protocol
         */
109
        /* XXX: maybe just use a sequence number? this isn't technically right */
Shane Snyder's avatar
Shane Snyder committed
110 111
        if(swim_ctx->dping_target_id == dping_target_id)
            swim_ctx->ping_target_acked = 1;
112 113 114 115 116
    }

    return;
}

Shane Snyder's avatar
Shane Snyder committed
117
static int swim_send_dping(
Shane Snyder's avatar
Shane Snyder committed
118
    swim_context_t *swim_ctx, swim_member_id_t dping_target_id, hg_addr_t dping_target_addr)
119 120 121 122 123 124 125
{
    hg_handle_t handle;
    swim_dping_req_t dping_req;
    swim_dping_resp_t dping_resp;
    hg_return_t hret;
    int ret = -1;

126
    assert(swim_ctx != NULL);
127

Shane Snyder's avatar
Shane Snyder committed
128
    hret = margo_create(swim_ctx->mid, dping_target_addr, swim_dping_rpc_id, &handle);
129 130 131
    if(hret != HG_SUCCESS)
        return(ret);

Shane Snyder's avatar
Shane Snyder committed
132
    SWIM_DEBUG(swim_ctx, "send dping req to %lu\n", dping_target_id);
133 134

    /* fill the direct ping request with current membership state */
135
    swim_pack_message(swim_ctx, &(dping_req.msg));
136 137

    /* send a direct ping that expires at the end of the protocol period */
138
    hret = margo_forward_timed(handle, &dping_req, swim_ctx->prot_period_len);
139 140
    if (hret == HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
141
        hret = margo_get_output(handle, &dping_resp);
142
        if(hret != HG_SUCCESS) goto fini;
143

144
        SWIM_DEBUG(swim_ctx, "recv dping ack from %lu\n", dping_resp.msg.source_id);
Shane Snyder's avatar
Shane Snyder committed
145
        assert(dping_resp.msg.source_id == dping_target_id);
146 147

        /* extract target's membership state from response */
148
        swim_unpack_message(swim_ctx, &(dping_resp.msg));
149

Shane Snyder's avatar
Shane Snyder committed
150
        margo_free_output(handle, &dping_resp);
151 152 153 154
        ret = 0;
    }
    else if(hret != HG_TIMEOUT)
    {
155
        fprintf(stderr, "SWIM dping req error (err=%d)\n", hret);
156 157
    }

158
fini:
159
    margo_destroy(handle);
160 161 162
    return(ret);
}

Shane Snyder's avatar
Shane Snyder committed
163 164
static void swim_dping_recv_ult(
    hg_handle_t handle)
165
{
Shane Snyder's avatar
Shane Snyder committed
166 167
    const struct hg_info *hgi;
    margo_instance_id mid;
168 169 170 171 172
    swim_context_t *swim_ctx;
    swim_dping_req_t dping_req;
    swim_dping_resp_t dping_resp;
    hg_return_t hret;

173 174 175 176 177 178
    /* get handle info and margo instance */
    hgi = margo_get_info(handle);
    assert(hgi);
    mid = margo_hg_info_get_instance(hgi);
    assert(mid != MARGO_INSTANCE_NULL);

Shane Snyder's avatar
Shane Snyder committed
179
    /* get swim state */
180
    swim_ctx = (swim_context_t *)margo_registered_data(mid, swim_dping_rpc_id);
181 182
    assert(swim_ctx != NULL);

Shane Snyder's avatar
Shane Snyder committed
183
    hret = margo_get_input(handle, &dping_req);
184
    if(hret != HG_SUCCESS) goto fini;
185

186
    SWIM_DEBUG(swim_ctx, "recv dping req from %lu\n", dping_req.msg.source_id);
187 188

    /* extract sender's membership state from request */
189
    swim_unpack_message(swim_ctx, &(dping_req.msg));
190 191

    /* fill the direct ping response with current membership state */
192
    swim_pack_message(swim_ctx, &(dping_resp.msg));
193

194
    SWIM_DEBUG(swim_ctx, "send dping ack to %lu\n", dping_req.msg.source_id);
195 196

    /* respond to sender of the dping req */
197
    margo_respond(handle, &dping_resp);
198

Shane Snyder's avatar
Shane Snyder committed
199
    margo_free_input(handle, &dping_req);
200
fini:
201
    margo_destroy(handle);
202 203 204 205 206 207 208 209 210 211 212
    return;
}
DEFINE_MARGO_RPC_HANDLER(swim_dping_recv_ult)

/********************************
 *     SWIM indirect pings      *
 ********************************/

void swim_iping_send_ult(
    void *t_arg)
{
Shane Snyder's avatar
Shane Snyder committed
213 214 215
    swim_context_t *swim_ctx = (swim_context_t *)t_arg;
    swim_member_id_t iping_target_id;
    hg_addr_t iping_target_addr;
216 217 218 219 220 221 222
    hg_handle_t handle;
    swim_iping_req_t iping_req;
    swim_iping_resp_t iping_resp;
    hg_return_t hret;

    assert(swim_ctx != NULL);

Shane Snyder's avatar
Shane Snyder committed
223 224 225 226
    /* XXX MUTEX */
    iping_target_id = swim_ctx->iping_target_ids[swim_ctx->iping_target_ndx];
    iping_target_addr = swim_ctx->iping_target_addrs[swim_ctx->iping_target_ndx];
    swim_ctx->iping_target_ndx++;
227

Shane Snyder's avatar
Shane Snyder committed
228
    hret = margo_create(swim_ctx->mid, iping_target_addr, swim_iping_rpc_id, &handle);
229 230 231
    if(hret != HG_SUCCESS)
        return;

Shane Snyder's avatar
Shane Snyder committed
232 233
    SWIM_DEBUG(swim_ctx, "send iping req to %lu (target=%lu)\n",
        iping_target_id, swim_ctx->dping_target_id);
234 235 236 237

    /* fill the indirect ping request with target member and current
     * membership state
     */
Shane Snyder's avatar
Shane Snyder committed
238 239
    iping_req.target_id = swim_ctx->dping_target_id;
    swim_pack_message(swim_ctx, &(iping_req.msg));
240 241 242 243 244 245

    /* send this indirect ping */
    /* NOTE: the timeout is just the protocol period length minus
     * the dping timeout, which should cause this iping to timeout
     * right at the end of the current protocol period.
     */
246
    hret = margo_forward_timed(handle, &iping_req,
247 248 249
        (swim_ctx->prot_period_len - swim_ctx->dping_timeout));
    if (hret == HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
250
        hret = margo_get_output(handle, &iping_resp);
251
        if(hret != HG_SUCCESS) goto fini;
252

Shane Snyder's avatar
Shane Snyder committed
253 254
        SWIM_DEBUG(swim_ctx, "recv iping ack from %lu (target=%lu)\n",
            iping_resp.msg.source_id, swim_ctx->dping_target_id);
255 256

        /* extract target's membership state from response */
Shane Snyder's avatar
Shane Snyder committed
257
        swim_unpack_message(swim_ctx, &(iping_resp.msg));
258 259 260 261 262

        /* mark this iping req as acked, double checking to make
         * sure we aren't inadvertently ack'ing a ping request
         * for a more recent tick of the protocol
         */
Shane Snyder's avatar
Shane Snyder committed
263
        if(swim_ctx->dping_target_id == iping_req.target_id)
264
            swim_ctx->ping_target_acked = 1;
265

Shane Snyder's avatar
Shane Snyder committed
266
        margo_free_output(handle, &iping_resp);
267 268 269
    }
    else if(hret != HG_TIMEOUT)
    {
Shane Snyder's avatar
Shane Snyder committed
270
        fprintf(stderr, "SWIM iping req error (err=%d)\n", hret);
271 272
    }

273
fini:
274
    margo_destroy(handle);
275 276 277 278 279
    return;
}

static void swim_iping_recv_ult(hg_handle_t handle)
{
Shane Snyder's avatar
Shane Snyder committed
280 281
    const struct hg_info *hgi;
    margo_instance_id mid;
282 283 284
    swim_context_t *swim_ctx;
    swim_iping_req_t iping_req;
    swim_iping_resp_t iping_resp;
Shane Snyder's avatar
Shane Snyder committed
285
    hg_addr_t target_addr;
286 287 288
    hg_return_t hret;
    int ret;

Shane Snyder's avatar
Shane Snyder committed
289 290 291 292 293 294 295 296
    /* get handle info and margo instance */
    hgi = margo_get_info(handle);
    assert(hgi);
    mid = margo_hg_info_get_instance(hgi);
    assert(mid != MARGO_INSTANCE_NULL);

    /* get swim state */
    swim_ctx = (swim_context_t *)margo_registered_data(mid, swim_iping_rpc_id);
297 298
    assert(swim_ctx != NULL);

Shane Snyder's avatar
Shane Snyder committed
299
    hret = margo_get_input(handle, &iping_req);
300
    if(hret != HG_SUCCESS) goto fini;
301

Shane Snyder's avatar
Shane Snyder committed
302 303
    SWIM_DEBUG(swim_ctx, "recv iping req from %lu (target=%lu)\n",
        iping_req.msg.source_id, iping_req.target_id);
304 305

    /* extract sender's membership state from request */
Shane Snyder's avatar
Shane Snyder committed
306 307 308 309 310
    swim_unpack_message(swim_ctx, &(iping_req.msg));

    /* get address for the iping target */
    swim_ctx->swim_callbacks.get_member_addr(
        swim_ctx, iping_req.target_id, &target_addr);
311 312

    /* send direct ping to target on behalf of who sent iping req */
Shane Snyder's avatar
Shane Snyder committed
313
    ret = swim_send_dping(swim_ctx, iping_req.target_id, target_addr);
314 315 316 317 318
    if(ret == 0)
    {
        /* if the dping req succeeds, fill the indirect ping
         * response with current membership state
         */
Shane Snyder's avatar
Shane Snyder committed
319
        swim_pack_message(swim_ctx, &(iping_resp.msg));
320

Shane Snyder's avatar
Shane Snyder committed
321 322
        SWIM_DEBUG(swim_ctx, "send iping ack to %lu (target=%lu)\n",
            iping_req.msg.source_id, iping_req.target_id);
323 324

        /* respond to sender of the iping req */
325
        margo_respond(handle, &iping_resp);
326 327
    }

Shane Snyder's avatar
Shane Snyder committed
328
    margo_free_input(handle, &iping_req);
329
fini:
330
    margo_destroy(handle);
331 332 333 334 335 336 337 338
    return;
}
DEFINE_MARGO_RPC_HANDLER(swim_iping_recv_ult)

/********************************
 *      SWIM ping helpers       *
 ********************************/

339
static void swim_pack_message(swim_context_t *swim_ctx, swim_message_t *msg)
340 341 342 343
{
    memset(msg, 0, sizeof(*msg));

    /* fill in self information */
344 345
    msg->source_id = swim_ctx->self_id;
    msg->source_inc_nr = swim_ctx->self_inc_nr;
346 347

    /* piggyback a set of membership states on this message */
348 349
    msg->pb_buf_count = SWIM_MAX_PIGGYBACK_ENTRIES;
    swim_retrieve_membership_updates(swim_ctx, msg->pb_buf, &msg->pb_buf_count);
350 351 352 353

    return;
}

354
static void swim_unpack_message(swim_context_t *swim_ctx, swim_message_t *msg)
355 356 357
{
    swim_member_update_t sender_update;

Shane Snyder's avatar
Shane Snyder committed
358 359
    /* apply (implicit) sender update */
    sender_update.id = msg->source_id;
360 361
    sender_update.state.status = SWIM_MEMBER_ALIVE;
    sender_update.state.inc_nr = msg->source_inc_nr;
362
    swim_apply_membership_updates(swim_ctx, &sender_update, 1);
363

Shane Snyder's avatar
Shane Snyder committed
364
    /* update membership status using piggybacked membership updates */
365
    swim_apply_membership_updates(swim_ctx, msg->pb_buf, msg->pb_buf_count);
366 367 368 369 370 371 372 373 374

    return;
}

/* manual serialization/deserialization routine for swim messages */
static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data)
{
    swim_message_t *msg = (swim_message_t *)data;
    hg_return_t hret = HG_PROTOCOL_ERROR;
375
    hg_size_t i;
376 377 378 379

    switch(hg_proc_get_op(proc))
    {
        case HG_ENCODE:
380
            hret = hg_proc_swim_member_id_t(proc, &(msg->source_id));
381 382 383 384 385
            if(hret != HG_SUCCESS)
            {
                hret = HG_PROTOCOL_ERROR;
                return hret;
            }
Shane Snyder's avatar
Shane Snyder committed
386
            hret = hg_proc_swim_member_inc_nr_t(proc, &(msg->source_inc_nr));
387 388 389 390 391
            if(hret != HG_SUCCESS)
            {
                hret = HG_PROTOCOL_ERROR;
                return hret;
            }
392 393 394 395 396 397 398
            hret = hg_proc_hg_size_t(proc, &(msg->pb_buf_count));
            if(hret != HG_SUCCESS)
            {
                hret = HG_PROTOCOL_ERROR;
                return hret;
            }
            for(i = 0; i < msg->pb_buf_count; i++)
399 400 401 402 403 404 405 406 407 408
            {
                hret = hg_proc_swim_member_update_t(proc, &(msg->pb_buf[i]));
                if(hret != HG_SUCCESS)
                {
                    hret = HG_PROTOCOL_ERROR;
                    return hret;
                }
            }
            break;
        case HG_DECODE:
409
            hret = hg_proc_swim_member_id_t(proc, &(msg->source_id));
410 411 412 413 414
            if(hret != HG_SUCCESS)
            {
                hret = HG_PROTOCOL_ERROR;
                return hret;
            }
Shane Snyder's avatar
Shane Snyder committed
415
            hret = hg_proc_swim_member_inc_nr_t(proc, &(msg->source_inc_nr));
416 417 418 419 420
            if(hret != HG_SUCCESS)
            {
                hret = HG_PROTOCOL_ERROR;
                return hret;
            }
421 422 423 424 425 426 427
            hret = hg_proc_hg_size_t(proc, &(msg->pb_buf_count));
            if(hret != HG_SUCCESS)
            {
                hret = HG_PROTOCOL_ERROR;
                return hret;
            }
            for(i = 0; i < msg->pb_buf_count; i++)
428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446
            {
                hret = hg_proc_swim_member_update_t(proc, &(msg->pb_buf[i]));
                if(hret != HG_SUCCESS)
                {
                    hret = HG_PROTOCOL_ERROR;
                    return hret;
                }
            }
            break;
        case HG_FREE:
            /* do nothing */
            hret = HG_SUCCESS;
            break;
        default:
            break;
    }

    return(hret);
}