swim-fd-ping.c 13.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * (C) 2016 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
#include <ssg-config.h>

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>

#include <mercury.h>
14
#include <abt.h>
15
#include <margo.h>
16 17

#include "ssg.h"
18 19 20 21
#include "ssg-internal.h"
#include "swim-fd.h"
#include "swim-fd-internal.h"

Shane Snyder's avatar
Shane Snyder committed
22 23 24 25 26 27
/* NOTE these defines must be kept in sync with typedefs in
 * swim-internal.h
 */
#define hg_proc_swim_member_id_t hg_proc_int64_t
#define hg_proc_swim_member_status_t hg_proc_uint8_t
#define hg_proc_swim_member_inc_nr_t hg_proc_uint32_t
28

Shane Snyder's avatar
Shane Snyder committed
29 30 31 32
MERCURY_GEN_STRUCT_PROC(swim_member_update_t, \
    ((swim_member_id_t) (id)) \
    ((swim_member_status_t) (status)) \
    ((swim_member_inc_nr_t) (inc_nr)));
33 34 35 36 37 38

/* a swim message is the membership information piggybacked (gossiped)
 * on the ping and ack messages generated by the protocol
 */
typedef struct swim_message_s
{
Shane Snyder's avatar
Shane Snyder committed
39 40 41
    swim_member_id_t source_id;
    swim_member_inc_nr_t source_inc_nr;
    swim_member_update_t pb_buf[SWIM_MAX_PIGGYBACK_ENTRIES]; //TODO: can we do dynamic array instead?
42 43
} swim_message_t;

44 45 46
/* HG encode/decode routines for SWIM RPCs */
static hg_return_t hg_proc_swim_message_t(
    hg_proc_t proc, void *data);
47 48 49 50 51 52 53 54

MERCURY_GEN_PROC(swim_dping_req_t, \
    ((swim_message_t) (msg)));

MERCURY_GEN_PROC(swim_dping_resp_t, \
    ((swim_message_t) (msg)));

MERCURY_GEN_PROC(swim_iping_req_t, \
Shane Snyder's avatar
Shane Snyder committed
55
    ((swim_member_id_t) (target_id)) \
56 57 58 59 60
    ((swim_message_t) (msg)));

MERCURY_GEN_PROC(swim_iping_resp_t, \
    ((swim_message_t) (msg)));

61 62 63 64 65 66
/* SWIM message pack/unpack prototypes */
static void swim_pack_message(
    ssg_group_t *g, swim_message_t *msg);
static void swim_unpack_message(
    ssg_group_t *g, swim_message_t *msg);

67 68 69 70 71 72 73
DECLARE_MARGO_RPC_HANDLER(swim_dping_recv_ult)
DECLARE_MARGO_RPC_HANDLER(swim_iping_recv_ult)

static hg_id_t dping_rpc_id;
static hg_id_t iping_rpc_id;

void swim_register_ping_rpcs(
74
    ssg_group_t *g)
75
{
76
    hg_class_t *hg_cls = margo_get_class(ssg_mid);
77

78
    /* register RPC handlers for SWIM pings */
79
    dping_rpc_id = MERCURY_REGISTER(hg_cls, "swim_dping", swim_dping_req_t,
80
        swim_dping_resp_t, swim_dping_recv_ult_handler);
81
    iping_rpc_id = MERCURY_REGISTER(hg_cls, "swim_iping", swim_iping_req_t,
82 83
        swim_iping_resp_t, swim_iping_recv_ult_handler);

84
    /* register swim context data structure with each RPC type */
85 86
    HG_Register_data(hg_cls, dping_rpc_id, g, NULL);
    HG_Register_data(hg_cls, iping_rpc_id, g, NULL);
87 88 89 90 91 92 93 94

    return;
}

/********************************
 *       SWIM direct pings      *
 ********************************/

Shane Snyder's avatar
Shane Snyder committed
95
static int swim_send_dping(
96
    ssg_group_t *g, swim_member_id_t target);
97 98 99 100

void swim_dping_send_ult(
    void *t_arg)
{
101
    ssg_group_t *g = (ssg_group_t *)t_arg;
102
    swim_context_t *swim_ctx;
Shane Snyder's avatar
Shane Snyder committed
103
    swim_member_id_t target;
104 105
    int ret;

106 107
    assert(g != NULL);
    swim_ctx = (swim_context_t *)g->fd_ctx;
108 109 110
    assert(swim_ctx != NULL);

    target = swim_ctx->ping_target;
111
    ret = swim_send_dping(g, target);
112 113 114 115 116 117 118 119 120 121 122 123 124
    if (ret == 0)
    {
        /* mark this dping req as acked, double checking to make
         * sure we aren't inadvertently ack'ing a ping request
         * for a more recent tick of the protocol
         */
        if(swim_ctx->ping_target == target)
            swim_ctx->ping_target_acked = 1;
    }

    return;
}

Shane Snyder's avatar
Shane Snyder committed
125
static int swim_send_dping(
126
    ssg_group_t *g, swim_member_id_t target)
127
{
128
    swim_context_t *swim_ctx = (swim_context_t *)g->fd_ctx;
129 130 131 132 133 134 135
    hg_addr_t target_addr = HG_ADDR_NULL;
    hg_handle_t handle;
    swim_dping_req_t dping_req;
    swim_dping_resp_t dping_resp;
    hg_return_t hret;
    int ret = -1;

136
    target_addr = g->view.member_states[target].addr;
137 138 139
    if(target_addr == HG_ADDR_NULL)
        return(ret);

140
    hret = HG_Create(margo_get_context(ssg_mid), target_addr, dping_rpc_id,
141 142 143 144
        &handle);
    if(hret != HG_SUCCESS)
        return(ret);

145
    SSG_DEBUG(g, "SWIM: send dping req to %d\n", (int)target);
146 147

    /* fill the direct ping request with current membership state */
148
    swim_pack_message(g, &(dping_req.msg));
149 150

    /* send a direct ping that expires at the end of the protocol period */
151
    hret = margo_forward_timed(ssg_mid, handle, &dping_req,
152 153 154 155 156 157 158
        swim_ctx->prot_period_len);
    if (hret == HG_SUCCESS)
    {
        hret = HG_Get_output(handle, &dping_resp);
        if(hret != HG_SUCCESS)
            return(ret);

159
        SSG_DEBUG(g, "SWIM: recv dping ack from %d\n", (int)dping_resp.msg.source_id);
Shane Snyder's avatar
Shane Snyder committed
160
        assert(dping_resp.msg.source_id == target);
161 162

        /* extract target's membership state from response */
163
        swim_unpack_message(g, &(dping_resp.msg));
164 165 166 167 168

        ret = 0;
    }
    else if(hret != HG_TIMEOUT)
    {
169
        SSG_DEBUG(g, "SWIM: dping req error from %d (err=%d)\n", (int)target, hret);
170 171 172 173 174 175 176 177
    }

    HG_Destroy(handle);
    return(ret);
}

static void swim_dping_recv_ult(hg_handle_t handle)
{
178
    ssg_group_t *g;
179
    swim_context_t *swim_ctx;
180
    const struct hg_info *info;
181 182 183 184 185 186 187 188
    swim_dping_req_t dping_req;
    swim_dping_resp_t dping_resp;
    hg_return_t hret;

    /* get ssg & swim state */
    info = HG_Get_info(handle);
    if(info == NULL)
        return;
189 190 191
    g = (ssg_group_t *)HG_Registered_data(info->hg_class, dping_rpc_id);
    assert(g != NULL);
    swim_ctx = (swim_context_t *)g->fd_ctx;
192 193 194 195 196 197
    assert(swim_ctx != NULL);

    hret = HG_Get_input(handle, &dping_req);
    if(hret != HG_SUCCESS)
        return;

198
    SSG_DEBUG(g, "SWIM: recv dping req from %d\n", (int)dping_req.msg.source_id);
199 200

    /* extract sender's membership state from request */
201
    swim_unpack_message(g, &(dping_req.msg));
202 203

    /* fill the direct ping response with current membership state */
204
    swim_pack_message(g, &(dping_resp.msg));
205

206
    SSG_DEBUG(g, "SWIM: send dping ack to %d\n", (int)dping_req.msg.source_id);
207 208

    /* respond to sender of the dping req */
209
    margo_respond(ssg_mid, handle, &dping_resp);
210 211 212 213 214 215 216 217 218 219 220 221 222

    HG_Destroy(handle);
    return;
}
DEFINE_MARGO_RPC_HANDLER(swim_dping_recv_ult)

/********************************
 *     SWIM indirect pings      *
 ********************************/

void swim_iping_send_ult(
    void *t_arg)
{
223
    ssg_group_t *g = (ssg_group_t *)t_arg;
224 225
    swim_context_t *swim_ctx;
    int i;
226
    swim_member_id_t my_subgroup_member = SWIM_MEMBER_RANK_UNKNOWN;
227 228 229 230 231 232
    hg_addr_t target_addr = HG_ADDR_NULL;
    hg_handle_t handle;
    swim_iping_req_t iping_req;
    swim_iping_resp_t iping_resp;
    hg_return_t hret;

233 234
    assert(g != NULL);
    swim_ctx = (swim_context_t *)g->fd_ctx;
235 236 237 238
    assert(swim_ctx != NULL);

    for(i = 0; i < swim_ctx->prot_subgroup_sz; i++)
    {
239
        if(swim_ctx->subgroup_members[i] != SWIM_MEMBER_RANK_UNKNOWN)
240 241
        {
            my_subgroup_member = swim_ctx->subgroup_members[i];
242
            swim_ctx->subgroup_members[i] = SWIM_MEMBER_RANK_UNKNOWN;
243 244 245
            break;
        }
    }
246
    assert(my_subgroup_member != SWIM_MEMBER_RANK_UNKNOWN);
247

248
    target_addr = g->view.member_states[my_subgroup_member].addr;
249 250 251
    if(target_addr == HG_ADDR_NULL)
        return;

252
    hret = HG_Create(margo_get_context(ssg_mid), target_addr, iping_rpc_id,
253 254 255 256
        &handle);
    if(hret != HG_SUCCESS)
        return;

257
    SSG_DEBUG(g, "SWIM: send iping req to %d (target=%d)\n",
Shane Snyder's avatar
Shane Snyder committed
258
        (int)my_subgroup_member, (int)swim_ctx->ping_target);
259 260 261 262

    /* fill the indirect ping request with target member and current
     * membership state
     */
Shane Snyder's avatar
Shane Snyder committed
263
    iping_req.target_id = swim_ctx->ping_target;
264
    swim_pack_message(g, &(iping_req.msg));
265 266 267 268 269 270

    /* send this indirect ping */
    /* NOTE: the timeout is just the protocol period length minus
     * the dping timeout, which should cause this iping to timeout
     * right at the end of the current protocol period.
     */
271
    hret = margo_forward_timed(ssg_mid, handle, &iping_req,
272 273 274 275 276 277 278
        (swim_ctx->prot_period_len - swim_ctx->dping_timeout));
    if (hret == HG_SUCCESS)
    {
        hret = HG_Get_output(handle, &iping_resp);
        if(hret != HG_SUCCESS)
            return;

279
        SSG_DEBUG(g, "SWIM: recv iping ack from %d (target=%d)\n",
Shane Snyder's avatar
Shane Snyder committed
280
            (int)iping_resp.msg.source_id, (int)swim_ctx->ping_target);
281 282

        /* extract target's membership state from response */
283
        swim_unpack_message(g, &(iping_resp.msg));
284 285 286 287 288

        /* mark this iping req as acked, double checking to make
         * sure we aren't inadvertently ack'ing a ping request
         * for a more recent tick of the protocol
         */
Shane Snyder's avatar
Shane Snyder committed
289
        if(swim_ctx->ping_target == iping_req.target_id)
290 291 292 293
            swim_ctx->ping_target_acked = 1;
    }
    else if(hret != HG_TIMEOUT)
    {
294
        SSG_DEBUG(g, "SWIM: iping req error from %d (target=%d, err=%d)\n",
Shane Snyder's avatar
Shane Snyder committed
295
            (int)my_subgroup_member, hret, (int)swim_ctx->ping_target);
296 297 298 299 300 301 302 303
    }

    HG_Destroy(handle);
    return;
}

static void swim_iping_recv_ult(hg_handle_t handle)
{
304
    ssg_group_t *g;
305
    swim_context_t *swim_ctx;
306
    const struct hg_info *info;
307 308 309 310 311 312 313 314 315
    swim_iping_req_t iping_req;
    swim_iping_resp_t iping_resp;
    hg_return_t hret;
    int ret;

    /* get the swim state */
    info = HG_Get_info(handle);
    if(info == NULL)
        return;
316 317 318
    g = (ssg_group_t *)HG_Registered_data(info->hg_class, dping_rpc_id);
    assert(g != NULL);
    swim_ctx = (swim_context_t *)g->fd_ctx;
319 320 321 322 323 324
    assert(swim_ctx != NULL);

    hret = HG_Get_input(handle, &iping_req);
    if(hret != HG_SUCCESS)
        return;

325
    SSG_DEBUG(g, "SWIM: recv iping req from %d (target=%d)\n",
Shane Snyder's avatar
Shane Snyder committed
326
        (int)iping_req.msg.source_id, (int)iping_req.target_id);
327 328

    /* extract sender's membership state from request */
329
    swim_unpack_message(g, &(iping_req.msg));
330 331

    /* send direct ping to target on behalf of who sent iping req */
332
    ret = swim_send_dping(g, iping_req.target_id);
333 334 335 336 337
    if(ret == 0)
    {
        /* if the dping req succeeds, fill the indirect ping
         * response with current membership state
         */
338
        swim_pack_message(g, &(iping_resp.msg));
339

340
        SSG_DEBUG(g, "SWIM: send iping ack to %d (target=%d)\n",
Shane Snyder's avatar
Shane Snyder committed
341
            (int)iping_req.msg.source_id, (int)iping_req.target_id);
342 343

        /* respond to sender of the iping req */
344
        margo_respond(ssg_mid, handle, &iping_resp);
345 346 347 348 349 350 351 352 353 354 355
    }

    HG_Destroy(handle);
    return;
}
DEFINE_MARGO_RPC_HANDLER(swim_iping_recv_ult)

/********************************
 *      SWIM ping helpers       *
 ********************************/

356 357
/* TODO: refactor retrieve/apply api to make this less awkward */
static void swim_pack_message(ssg_group_t *g, swim_message_t *msg)
358
{
359
    swim_context_t *swim_ctx = (swim_context_t *)g->fd_ctx;
360

361 362 363
    memset(msg, 0, sizeof(*msg));

    /* fill in self information */
364 365
    msg->source_id = g->self_rank;
    msg->source_inc_nr = swim_ctx->member_inc_nrs[g->self_rank];
366

367
#if 0
368
    /* piggyback a set of membership states on this message */
369 370
    swim_retrieve_membership_updates(g, msg->pb_buf, SWIM_MAX_PIGGYBACK_ENTRIES);
#endif
371 372 373 374

    return;
}

375
static void swim_unpack_message(ssg_group_t *g, swim_message_t *msg)
376 377 378
{
    swim_member_update_t sender_update;

Shane Snyder's avatar
Shane Snyder committed
379 380 381
    /* apply (implicit) sender update */
    sender_update.id = msg->source_id;
    sender_update.status = SWIM_MEMBER_ALIVE;
382
    sender_update.inc_nr = msg->source_inc_nr;
383 384
#if 0
    swim_apply_membership_updates(g, &sender_update, 1);
385

Shane Snyder's avatar
Shane Snyder committed
386
    /* update membership status using piggybacked membership updates */
387 388
    swim_apply_membership_updates(g, msg->pb_buf, SWIM_MAX_PIGGYBACK_ENTRIES);
#endif
389 390 391 392 393 394 395 396 397 398 399 400 401 402

    return;
}

/* manual serialization/deserialization routine for swim messages */
static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data)
{
    swim_message_t *msg = (swim_message_t *)data;
    hg_return_t hret = HG_PROTOCOL_ERROR;
    int i;

    switch(hg_proc_get_op(proc))
    {
        case HG_ENCODE:
Shane Snyder's avatar
Shane Snyder committed
403
            hret = hg_proc_swim_member_id_t(proc, &(msg->source_id));
404 405 406 407 408
            if(hret != HG_SUCCESS)
            {
                hret = HG_PROTOCOL_ERROR;
                return hret;
            }
Shane Snyder's avatar
Shane Snyder committed
409
            hret = hg_proc_swim_member_inc_nr_t(proc, &(msg->source_inc_nr));
410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425
            if(hret != HG_SUCCESS)
            {
                hret = HG_PROTOCOL_ERROR;
                return hret;
            }
            for(i = 0; i < SWIM_MAX_PIGGYBACK_ENTRIES; i++)
            {
                hret = hg_proc_swim_member_update_t(proc, &(msg->pb_buf[i]));
                if(hret != HG_SUCCESS)
                {
                    hret = HG_PROTOCOL_ERROR;
                    return hret;
                }
            }
            break;
        case HG_DECODE:
Shane Snyder's avatar
Shane Snyder committed
426
            hret = hg_proc_swim_member_id_t(proc, &(msg->source_id));
427 428 429 430 431
            if(hret != HG_SUCCESS)
            {
                hret = HG_PROTOCOL_ERROR;
                return hret;
            }
Shane Snyder's avatar
Shane Snyder committed
432
            hret = hg_proc_swim_member_inc_nr_t(proc, &(msg->source_inc_nr));
433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457
            if(hret != HG_SUCCESS)
            {
                hret = HG_PROTOCOL_ERROR;
                return hret;
            }
            for(i = 0; i < SWIM_MAX_PIGGYBACK_ENTRIES; i++)
            {
                hret = hg_proc_swim_member_update_t(proc, &(msg->pb_buf[i]));
                if(hret != HG_SUCCESS)
                {
                    hret = HG_PROTOCOL_ERROR;
                    return hret;
                }
            }
            break;
        case HG_FREE:
            /* do nothing */
            hret = HG_SUCCESS;
            break;
        default:
            break;
    }

    return(hret);
}