swim-fd-ping.c 13.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * (C) 2016 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
#include <ssg-config.h>

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>

#include <mercury.h>
14
#include <abt.h>
15
#include <margo.h>
16 17

#include "ssg.h"
18 19 20 21
#include "ssg-internal.h"
#include "swim-fd.h"
#include "swim-fd-internal.h"

Shane Snyder's avatar
Shane Snyder committed
22 23 24 25 26 27
/* NOTE these defines must be kept in sync with typedefs in
 * swim-internal.h
 */
#define hg_proc_swim_member_id_t hg_proc_int64_t
#define hg_proc_swim_member_status_t hg_proc_uint8_t
#define hg_proc_swim_member_inc_nr_t hg_proc_uint32_t
28

Shane Snyder's avatar
Shane Snyder committed
29 30 31 32
MERCURY_GEN_STRUCT_PROC(swim_member_update_t, \
    ((swim_member_id_t) (id)) \
    ((swim_member_status_t) (status)) \
    ((swim_member_inc_nr_t) (inc_nr)));
33 34 35 36 37 38

/* a swim message is the membership information piggybacked (gossiped)
 * on the ping and ack messages generated by the protocol
 */
typedef struct swim_message_s
{
Shane Snyder's avatar
Shane Snyder committed
39 40 41
    swim_member_id_t source_id;
    swim_member_inc_nr_t source_inc_nr;
    swim_member_update_t pb_buf[SWIM_MAX_PIGGYBACK_ENTRIES]; //TODO: can we do dynamic array instead?
42 43
} swim_message_t;

44 45 46
/* HG encode/decode routines for SWIM RPCs */
static hg_return_t hg_proc_swim_message_t(
    hg_proc_t proc, void *data);
47 48 49 50 51 52 53 54

MERCURY_GEN_PROC(swim_dping_req_t, \
    ((swim_message_t) (msg)));

MERCURY_GEN_PROC(swim_dping_resp_t, \
    ((swim_message_t) (msg)));

MERCURY_GEN_PROC(swim_iping_req_t, \
Shane Snyder's avatar
Shane Snyder committed
55
    ((swim_member_id_t) (target_id)) \
56 57 58 59 60
    ((swim_message_t) (msg)));

MERCURY_GEN_PROC(swim_iping_resp_t, \
    ((swim_message_t) (msg)));

61 62 63 64 65 66
/* SWIM message pack/unpack prototypes */
static void swim_pack_message(
    ssg_group_t *g, swim_message_t *msg);
static void swim_unpack_message(
    ssg_group_t *g, swim_message_t *msg);

67 68 69 70 71 72 73
DECLARE_MARGO_RPC_HANDLER(swim_dping_recv_ult)
DECLARE_MARGO_RPC_HANDLER(swim_iping_recv_ult)

static hg_id_t dping_rpc_id;
static hg_id_t iping_rpc_id;

void swim_register_ping_rpcs(
74
    ssg_group_t *g)
75
{
76
    hg_class_t *hg_cls = margo_get_class(ssg_mid);
77

78
    /* register RPC handlers for SWIM pings */
79
    dping_rpc_id = MERCURY_REGISTER(hg_cls, "swim_dping", swim_dping_req_t,
80
        swim_dping_resp_t, swim_dping_recv_ult_handler);
81
    iping_rpc_id = MERCURY_REGISTER(hg_cls, "swim_iping", swim_iping_req_t,
82 83
        swim_iping_resp_t, swim_iping_recv_ult_handler);

84
    /* register swim context data structure with each RPC type */
85
    /* TODO: this won't work */
86 87
    HG_Register_data(hg_cls, dping_rpc_id, g, NULL);
    HG_Register_data(hg_cls, iping_rpc_id, g, NULL);
88 89 90 91 92 93 94 95

    return;
}

/********************************
 *       SWIM direct pings      *
 ********************************/

Shane Snyder's avatar
Shane Snyder committed
96
static int swim_send_dping(
97
    ssg_group_t *g, swim_member_id_t target);
98 99 100 101

void swim_dping_send_ult(
    void *t_arg)
{
102
    ssg_group_t *g = (ssg_group_t *)t_arg;
103
    swim_context_t *swim_ctx;
Shane Snyder's avatar
Shane Snyder committed
104
    swim_member_id_t target;
105 106
    int ret;

107 108
    assert(g != NULL);
    swim_ctx = (swim_context_t *)g->fd_ctx;
109 110 111
    assert(swim_ctx != NULL);

    target = swim_ctx->ping_target;
112
    ret = swim_send_dping(g, target);
113 114 115 116 117 118 119 120 121 122 123 124 125
    if (ret == 0)
    {
        /* mark this dping req as acked, double checking to make
         * sure we aren't inadvertently ack'ing a ping request
         * for a more recent tick of the protocol
         */
        if(swim_ctx->ping_target == target)
            swim_ctx->ping_target_acked = 1;
    }

    return;
}

Shane Snyder's avatar
Shane Snyder committed
126
static int swim_send_dping(
127
    ssg_group_t *g, swim_member_id_t target)
128
{
129
    swim_context_t *swim_ctx = (swim_context_t *)g->fd_ctx;
130 131 132 133 134 135 136
    hg_addr_t target_addr = HG_ADDR_NULL;
    hg_handle_t handle;
    swim_dping_req_t dping_req;
    swim_dping_resp_t dping_resp;
    hg_return_t hret;
    int ret = -1;

137
    target_addr = g->view.member_states[target].addr;
138 139 140
    if(target_addr == HG_ADDR_NULL)
        return(ret);

141
    hret = HG_Create(margo_get_context(ssg_mid), target_addr, dping_rpc_id,
142 143 144 145
        &handle);
    if(hret != HG_SUCCESS)
        return(ret);

146
    SSG_DEBUG(g, "SWIM: send dping req to %d\n", (int)target);
147 148

    /* fill the direct ping request with current membership state */
149
    swim_pack_message(g, &(dping_req.msg));
150 151

    /* send a direct ping that expires at the end of the protocol period */
152
    hret = margo_forward_timed(ssg_mid, handle, &dping_req,
153 154 155 156
        swim_ctx->prot_period_len);
    if (hret == HG_SUCCESS)
    {
        hret = HG_Get_output(handle, &dping_resp);
157
        if(hret != HG_SUCCESS) goto fini;
158

159
        SSG_DEBUG(g, "SWIM: recv dping ack from %d\n", (int)dping_resp.msg.source_id);
Shane Snyder's avatar
Shane Snyder committed
160
        assert(dping_resp.msg.source_id == target);
161 162

        /* extract target's membership state from response */
163
        swim_unpack_message(g, &(dping_resp.msg));
164

165
        HG_Free_output(handle, &dping_resp);
166 167 168 169
        ret = 0;
    }
    else if(hret != HG_TIMEOUT)
    {
170
        SSG_DEBUG(g, "SWIM: dping req error from %d (err=%d)\n", (int)target, hret);
171 172
    }

173
fini:
174 175 176 177 178 179
    HG_Destroy(handle);
    return(ret);
}

static void swim_dping_recv_ult(hg_handle_t handle)
{
180
    ssg_group_t *g;
181
    swim_context_t *swim_ctx;
182
    const struct hg_info *info;
183 184 185 186 187 188
    swim_dping_req_t dping_req;
    swim_dping_resp_t dping_resp;
    hg_return_t hret;

    /* get ssg & swim state */
    info = HG_Get_info(handle);
189
    if(info == NULL) goto fini;
190 191 192
    g = (ssg_group_t *)HG_Registered_data(info->hg_class, dping_rpc_id);
    assert(g != NULL);
    swim_ctx = (swim_context_t *)g->fd_ctx;
193 194
    assert(swim_ctx != NULL);

195 196 197 198 199
#ifdef SWIM_FORCE_FAIL
    int drop = 1;
    if (g->self_rank == 1 && drop) goto fini;
#endif

200
    hret = HG_Get_input(handle, &dping_req);
201
    if(hret != HG_SUCCESS) goto fini;
202

203
    SSG_DEBUG(g, "SWIM: recv dping req from %d\n", (int)dping_req.msg.source_id);
204 205

    /* extract sender's membership state from request */
206
    swim_unpack_message(g, &(dping_req.msg));
207 208

    /* fill the direct ping response with current membership state */
209
    swim_pack_message(g, &(dping_resp.msg));
210

211
    SSG_DEBUG(g, "SWIM: send dping ack to %d\n", (int)dping_req.msg.source_id);
212 213

    /* respond to sender of the dping req */
214
    margo_respond(ssg_mid, handle, &dping_resp);
215

216 217
    HG_Free_input(handle, &dping_req);
fini:
218 219 220 221 222 223 224 225 226 227 228 229
    HG_Destroy(handle);
    return;
}
DEFINE_MARGO_RPC_HANDLER(swim_dping_recv_ult)

/********************************
 *     SWIM indirect pings      *
 ********************************/

void swim_iping_send_ult(
    void *t_arg)
{
230
    ssg_group_t *g = (ssg_group_t *)t_arg;
231 232
    swim_context_t *swim_ctx;
    int i;
233
    swim_member_id_t my_subgroup_member = SWIM_MEMBER_RANK_UNKNOWN;
234 235 236 237 238 239
    hg_addr_t target_addr = HG_ADDR_NULL;
    hg_handle_t handle;
    swim_iping_req_t iping_req;
    swim_iping_resp_t iping_resp;
    hg_return_t hret;

240 241
    assert(g != NULL);
    swim_ctx = (swim_context_t *)g->fd_ctx;
242 243 244 245
    assert(swim_ctx != NULL);

    for(i = 0; i < swim_ctx->prot_subgroup_sz; i++)
    {
246
        if(swim_ctx->subgroup_members[i] != SWIM_MEMBER_RANK_UNKNOWN)
247 248
        {
            my_subgroup_member = swim_ctx->subgroup_members[i];
249
            swim_ctx->subgroup_members[i] = SWIM_MEMBER_RANK_UNKNOWN;
250 251 252
            break;
        }
    }
253
    assert(my_subgroup_member != SWIM_MEMBER_RANK_UNKNOWN);
254

255
    target_addr = g->view.member_states[my_subgroup_member].addr;
256 257 258
    if(target_addr == HG_ADDR_NULL)
        return;

259
    hret = HG_Create(margo_get_context(ssg_mid), target_addr, iping_rpc_id,
260 261 262 263
        &handle);
    if(hret != HG_SUCCESS)
        return;

264
    SSG_DEBUG(g, "SWIM: send iping req to %d (target=%d)\n",
Shane Snyder's avatar
Shane Snyder committed
265
        (int)my_subgroup_member, (int)swim_ctx->ping_target);
266 267 268 269

    /* fill the indirect ping request with target member and current
     * membership state
     */
Shane Snyder's avatar
Shane Snyder committed
270
    iping_req.target_id = swim_ctx->ping_target;
271
    swim_pack_message(g, &(iping_req.msg));
272 273 274 275 276 277

    /* send this indirect ping */
    /* NOTE: the timeout is just the protocol period length minus
     * the dping timeout, which should cause this iping to timeout
     * right at the end of the current protocol period.
     */
278
    hret = margo_forward_timed(ssg_mid, handle, &iping_req,
279 280 281 282
        (swim_ctx->prot_period_len - swim_ctx->dping_timeout));
    if (hret == HG_SUCCESS)
    {
        hret = HG_Get_output(handle, &iping_resp);
283
        if(hret != HG_SUCCESS) goto fini;
284

285
        SSG_DEBUG(g, "SWIM: recv iping ack from %d (target=%d)\n",
Shane Snyder's avatar
Shane Snyder committed
286
            (int)iping_resp.msg.source_id, (int)swim_ctx->ping_target);
287 288

        /* extract target's membership state from response */
289
        swim_unpack_message(g, &(iping_resp.msg));
290 291 292 293 294

        /* mark this iping req as acked, double checking to make
         * sure we aren't inadvertently ack'ing a ping request
         * for a more recent tick of the protocol
         */
Shane Snyder's avatar
Shane Snyder committed
295
        if(swim_ctx->ping_target == iping_req.target_id)
296
            swim_ctx->ping_target_acked = 1;
297 298

        HG_Free_output(handle, &iping_resp);
299 300 301
    }
    else if(hret != HG_TIMEOUT)
    {
302
        SSG_DEBUG(g, "SWIM: iping req error from %d (target=%d, err=%d)\n",
Shane Snyder's avatar
Shane Snyder committed
303
            (int)my_subgroup_member, hret, (int)swim_ctx->ping_target);
304 305
    }

306
fini:
307 308 309 310 311 312
    HG_Destroy(handle);
    return;
}

static void swim_iping_recv_ult(hg_handle_t handle)
{
313
    ssg_group_t *g;
314
    swim_context_t *swim_ctx;
315
    const struct hg_info *info;
316 317 318 319 320 321 322
    swim_iping_req_t iping_req;
    swim_iping_resp_t iping_resp;
    hg_return_t hret;
    int ret;

    /* get the swim state */
    info = HG_Get_info(handle);
323
    if(info == NULL) goto fini;
324 325 326
    g = (ssg_group_t *)HG_Registered_data(info->hg_class, dping_rpc_id);
    assert(g != NULL);
    swim_ctx = (swim_context_t *)g->fd_ctx;
327 328
    assert(swim_ctx != NULL);

329 330 331 332 333
#ifdef SWIM_FORCE_FAIL
    int drop = 1;
    if (g->self_rank == 1 && drop) goto fini;
#endif

334
    hret = HG_Get_input(handle, &iping_req);
335
    if(hret != HG_SUCCESS) goto fini;
336

337
    SSG_DEBUG(g, "SWIM: recv iping req from %d (target=%d)\n",
Shane Snyder's avatar
Shane Snyder committed
338
        (int)iping_req.msg.source_id, (int)iping_req.target_id);
339 340

    /* extract sender's membership state from request */
341
    swim_unpack_message(g, &(iping_req.msg));
342 343

    /* send direct ping to target on behalf of who sent iping req */
344
    ret = swim_send_dping(g, iping_req.target_id);
345 346 347 348 349
    if(ret == 0)
    {
        /* if the dping req succeeds, fill the indirect ping
         * response with current membership state
         */
350
        swim_pack_message(g, &(iping_resp.msg));
351

352
        SSG_DEBUG(g, "SWIM: send iping ack to %d (target=%d)\n",
Shane Snyder's avatar
Shane Snyder committed
353
            (int)iping_req.msg.source_id, (int)iping_req.target_id);
354 355

        /* respond to sender of the iping req */
356
        margo_respond(ssg_mid, handle, &iping_resp);
357 358
    }

359 360
    HG_Free_input(handle, &iping_req);
fini:
361 362 363 364 365 366 367 368 369
    HG_Destroy(handle);
    return;
}
DEFINE_MARGO_RPC_HANDLER(swim_iping_recv_ult)

/********************************
 *      SWIM ping helpers       *
 ********************************/

370 371
/* TODO: refactor retrieve/apply api to make this less awkward */
static void swim_pack_message(ssg_group_t *g, swim_message_t *msg)
372
{
373
    swim_context_t *swim_ctx = (swim_context_t *)g->fd_ctx;
374

375 376 377
    memset(msg, 0, sizeof(*msg));

    /* fill in self information */
378 379
    msg->source_id = g->self_rank;
    msg->source_inc_nr = swim_ctx->member_inc_nrs[g->self_rank];
380 381

    /* piggyback a set of membership states on this message */
382
    swim_retrieve_membership_updates(g, msg->pb_buf, SWIM_MAX_PIGGYBACK_ENTRIES);
383 384 385 386

    return;
}

387
static void swim_unpack_message(ssg_group_t *g, swim_message_t *msg)
388 389 390
{
    swim_member_update_t sender_update;

Shane Snyder's avatar
Shane Snyder committed
391 392 393
    /* apply (implicit) sender update */
    sender_update.id = msg->source_id;
    sender_update.status = SWIM_MEMBER_ALIVE;
394
    sender_update.inc_nr = msg->source_inc_nr;
395
    swim_apply_membership_updates(g, &sender_update, 1);
396

Shane Snyder's avatar
Shane Snyder committed
397
    /* update membership status using piggybacked membership updates */
398
    swim_apply_membership_updates(g, msg->pb_buf, SWIM_MAX_PIGGYBACK_ENTRIES);
399 400 401 402 403 404 405 406 407 408 409 410 411 412

    return;
}

/* manual serialization/deserialization routine for swim messages */
static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data)
{
    swim_message_t *msg = (swim_message_t *)data;
    hg_return_t hret = HG_PROTOCOL_ERROR;
    int i;

    switch(hg_proc_get_op(proc))
    {
        case HG_ENCODE:
Shane Snyder's avatar
Shane Snyder committed
413
            hret = hg_proc_swim_member_id_t(proc, &(msg->source_id));
414 415 416 417 418
            if(hret != HG_SUCCESS)
            {
                hret = HG_PROTOCOL_ERROR;
                return hret;
            }
Shane Snyder's avatar
Shane Snyder committed
419
            hret = hg_proc_swim_member_inc_nr_t(proc, &(msg->source_inc_nr));
420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435
            if(hret != HG_SUCCESS)
            {
                hret = HG_PROTOCOL_ERROR;
                return hret;
            }
            for(i = 0; i < SWIM_MAX_PIGGYBACK_ENTRIES; i++)
            {
                hret = hg_proc_swim_member_update_t(proc, &(msg->pb_buf[i]));
                if(hret != HG_SUCCESS)
                {
                    hret = HG_PROTOCOL_ERROR;
                    return hret;
                }
            }
            break;
        case HG_DECODE:
Shane Snyder's avatar
Shane Snyder committed
436
            hret = hg_proc_swim_member_id_t(proc, &(msg->source_id));
437 438 439 440 441
            if(hret != HG_SUCCESS)
            {
                hret = HG_PROTOCOL_ERROR;
                return hret;
            }
Shane Snyder's avatar
Shane Snyder committed
442
            hret = hg_proc_swim_member_inc_nr_t(proc, &(msg->source_inc_nr));
443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467
            if(hret != HG_SUCCESS)
            {
                hret = HG_PROTOCOL_ERROR;
                return hret;
            }
            for(i = 0; i < SWIM_MAX_PIGGYBACK_ENTRIES; i++)
            {
                hret = hg_proc_swim_member_update_t(proc, &(msg->pb_buf[i]));
                if(hret != HG_SUCCESS)
                {
                    hret = HG_PROTOCOL_ERROR;
                    return hret;
                }
            }
            break;
        case HG_FREE:
            /* do nothing */
            hret = HG_SUCCESS;
            break;
        default:
            break;
    }

    return(hret);
}