swim-fd-ping.c 13.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12
/*
 * (C) 2016 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>

#include <mercury.h>
#include <margo.h>
13

14 15 16
#include "swim-fd.h"
#include "swim-fd-internal.h"

17 18 19 20
/* NOTE: keep these defines in sync with defs in swim.h */
#define hg_proc_swim_member_id_t        hg_proc_uint64_t
#define hg_proc_swim_member_inc_nr_t    hg_proc_uint32_t
#define hg_proc_swim_member_status_t    hg_proc_uint8_t
21

22
#if 0
Shane Snyder's avatar
Shane Snyder committed
23
MERCURY_GEN_STRUCT_PROC(swim_member_update_t, \
24
    ((ssg_member_id_t) (id)) \
Shane Snyder's avatar
Shane Snyder committed
25 26
    ((swim_member_status_t) (status)) \
    ((swim_member_inc_nr_t) (inc_nr)));
27
#endif
28 29 30 31 32 33

/* a swim message is the membership information piggybacked (gossiped)
 * on the ping and ack messages generated by the protocol
 */
typedef struct swim_message_s
{
34
    swim_member_id_t source_id;
Shane Snyder's avatar
Shane Snyder committed
35
    swim_member_inc_nr_t source_inc_nr;
36
#if 0
Shane Snyder's avatar
Shane Snyder committed
37
    swim_member_update_t pb_buf[SWIM_MAX_PIGGYBACK_ENTRIES]; //TODO: can we do dynamic array instead?
38
#endif
39 40
} swim_message_t;

41 42 43
/* HG encode/decode routines for SWIM RPCs */
static hg_return_t hg_proc_swim_message_t(
    hg_proc_t proc, void *data);
44 45 46 47 48 49

MERCURY_GEN_PROC(swim_dping_req_t, \
    ((swim_message_t) (msg)));
MERCURY_GEN_PROC(swim_dping_resp_t, \
    ((swim_message_t) (msg)));

50
#if 0
51
MERCURY_GEN_PROC(swim_iping_req_t, \
52
    ((ssg_member_id_t) (target_id)) \
53 54 55
    ((swim_message_t) (msg)));
MERCURY_GEN_PROC(swim_iping_resp_t, \
    ((swim_message_t) (msg)));
56
#endif
57

58 59
/* SWIM message pack/unpack prototypes */
static void swim_pack_message(
60
    swim_context_t *swim_ctx, swim_message_t *msg);
61
static void swim_unpack_message(
62
    swim_context_t *swim_ctx, swim_message_t *msg);
63

64
DECLARE_MARGO_RPC_HANDLER(swim_dping_recv_ult)
65
#if 0
66
DECLARE_MARGO_RPC_HANDLER(swim_iping_recv_ult)
67
#endif
68

Shane Snyder's avatar
Shane Snyder committed
69
static hg_id_t swim_dping_rpc_id;
70
#if 0
Shane Snyder's avatar
Shane Snyder committed
71
static hg_id_t swim_iping_rpc_id;
72
#endif
73 74

void swim_register_ping_rpcs(
75
    swim_context_t *swim_ctx)
76
{
77 78
    assert(swim_ctx != NULL);

79
    /* register RPC handlers for SWIM pings */
80
    swim_dping_rpc_id = MARGO_REGISTER(swim_ctx->mid, "swim_dping", swim_dping_req_t,
Shane Snyder's avatar
Shane Snyder committed
81
        swim_dping_resp_t, swim_dping_recv_ult);
82
#if 0
Shane Snyder's avatar
Shane Snyder committed
83 84
    swim_iping_rpc_id = MARGO_REGISTER(ssg_inst->mid, "swim_iping", swim_iping_req_t,
        swim_iping_resp_t, swim_iping_recv_ult);
85
#endif
86

87
    /* register swim context data structure with each RPC type */
88
    /* XXX: this won't work for multiple groups ... */
89 90
    margo_register_data(swim_ctx->mid, swim_dping_rpc_id, swim_ctx, NULL);
#if 0
Shane Snyder's avatar
Shane Snyder committed
91
    margo_register_data(ssg_inst->mid, swim_iping_rpc_id, g, NULL);
92
#endif
93 94 95 96 97 98 99 100

    return;
}

/********************************
 *       SWIM direct pings      *
 ********************************/

101
/* XXX: just accept a target info? */
Shane Snyder's avatar
Shane Snyder committed
102
static int swim_send_dping(
103 104
    swim_context_t *swim_ctx, swim_member_id_t target_id,
    hg_addr_t target_addr);
105 106 107 108

void swim_dping_send_ult(
    void *t_arg)
{
109 110
    swim_context_t *swim_ctx = (swim_context_t *)t_arg;
    swim_member_id_t dping_target_id;
111 112 113 114
    int ret;

    assert(swim_ctx != NULL);

115 116
    dping_target_id = swim_ctx->dping_target_info.id;
    ret = swim_send_dping(swim_ctx, dping_target_id, swim_ctx->dping_target_info.addr);
117 118 119 120 121 122
    if (ret == 0)
    {
        /* mark this dping req as acked, double checking to make
         * sure we aren't inadvertently ack'ing a ping request
         * for a more recent tick of the protocol
         */
123 124 125
        /* XXX: maybe just use a sequence number? this isn't technically right */
        if(swim_ctx->dping_target_info.id == dping_target_id)
            swim_ctx->dping_target_acked = 1;
126 127 128 129 130
    }

    return;
}

Shane Snyder's avatar
Shane Snyder committed
131
static int swim_send_dping(
132 133
    swim_context_t *swim_ctx, swim_member_id_t target_id,
    hg_addr_t target_addr)
134 135 136 137 138 139 140
{
    hg_handle_t handle;
    swim_dping_req_t dping_req;
    swim_dping_resp_t dping_resp;
    hg_return_t hret;
    int ret = -1;

141
    assert(swim_ctx != NULL);
142

143
    hret = margo_create(swim_ctx->mid, target_addr, swim_dping_rpc_id, &handle);
144 145 146
    if(hret != HG_SUCCESS)
        return(ret);

147
    SWIM_DEBUG(swim_ctx, "send dping req to %lu\n", target_id);
148 149

    /* fill the direct ping request with current membership state */
150
    swim_pack_message(swim_ctx, &(dping_req.msg));
151 152

    /* send a direct ping that expires at the end of the protocol period */
153
    hret = margo_forward_timed(handle, &dping_req, swim_ctx->prot_period_len);
154 155
    if (hret == HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
156
        hret = margo_get_output(handle, &dping_resp);
157
        if(hret != HG_SUCCESS) goto fini;
158

159 160
        SWIM_DEBUG(swim_ctx, "recv dping ack from %lu\n", dping_resp.msg.source_id);
        assert(dping_resp.msg.source_id == target_id);
161 162

        /* extract target's membership state from response */
163
        swim_unpack_message(swim_ctx, &(dping_resp.msg));
164

Shane Snyder's avatar
Shane Snyder committed
165
        margo_free_output(handle, &dping_resp);
166 167 168 169
        ret = 0;
    }
    else if(hret != HG_TIMEOUT)
    {
170
        fprintf(stderr, "SWIM dping req error (err=%d)\n", hret);
171 172
    }

173
fini:
174
    margo_destroy(handle);
175 176 177 178 179 180 181 182
    return(ret);
}

static void swim_dping_recv_ult(hg_handle_t handle)
{
    swim_context_t *swim_ctx;
    swim_dping_req_t dping_req;
    swim_dping_resp_t dping_resp;
183 184
    const struct hg_info *hgi;
    margo_instance_id mid;
185 186
    hg_return_t hret;

187 188 189 190 191 192
    /* get handle info and margo instance */
    hgi = margo_get_info(handle);
    assert(hgi);
    mid = margo_hg_info_get_instance(hgi);
    assert(mid != MARGO_INSTANCE_NULL);

193
    /* get ssg & swim state */
194
    swim_ctx = (swim_context_t *)margo_registered_data(mid, swim_dping_rpc_id);
195 196
    assert(swim_ctx != NULL);

Shane Snyder's avatar
Shane Snyder committed
197
    hret = margo_get_input(handle, &dping_req);
198
    if(hret != HG_SUCCESS) goto fini;
199

200
    SWIM_DEBUG(swim_ctx, "recv dping req from %lu\n", dping_req.msg.source_id);
201 202

    /* extract sender's membership state from request */
203
    swim_unpack_message(swim_ctx, &(dping_req.msg));
204 205

    /* fill the direct ping response with current membership state */
206
    swim_pack_message(swim_ctx, &(dping_resp.msg));
207

208
    SWIM_DEBUG(swim_ctx, "send dping ack to %lu\n", dping_req.msg.source_id);
209 210

    /* respond to sender of the dping req */
211
    margo_respond(handle, &dping_resp);
212

Shane Snyder's avatar
Shane Snyder committed
213
    margo_free_input(handle, &dping_req);
214
fini:
215
    margo_destroy(handle);
216 217 218 219 220 221 222 223
    return;
}
DEFINE_MARGO_RPC_HANDLER(swim_dping_recv_ult)

/********************************
 *     SWIM indirect pings      *
 ********************************/

224
#if 0
225 226 227
void swim_iping_send_ult(
    void *t_arg)
{
228
    ssg_group_t *g = (ssg_group_t *)t_arg;
229 230
    swim_context_t *swim_ctx;
    int i;
231
    ssg_member_id_t my_subgroup_member = SSG_MEMBER_ID_INVALID;
232 233 234 235 236 237
    hg_addr_t target_addr = HG_ADDR_NULL;
    hg_handle_t handle;
    swim_iping_req_t iping_req;
    swim_iping_resp_t iping_resp;
    hg_return_t hret;

238 239
    assert(g != NULL);
    swim_ctx = (swim_context_t *)g->fd_ctx;
240 241 242 243
    assert(swim_ctx != NULL);

    for(i = 0; i < swim_ctx->prot_subgroup_sz; i++)
    {
244
        if(swim_ctx->subgroup_members[i] != SSG_MEMBER_ID_INVALID)
245 246
        {
            my_subgroup_member = swim_ctx->subgroup_members[i];
247
            swim_ctx->subgroup_members[i] = SSG_MEMBER_ID_INVALID;
248 249 250
            break;
        }
    }
251
    assert(my_subgroup_member != SSG_MEMBER_ID_INVALID);
252

253
    target_addr = g->view.member_states[my_subgroup_member].addr;
254 255 256
    if(target_addr == HG_ADDR_NULL)
        return;

Shane Snyder's avatar
Shane Snyder committed
257
    hret = margo_create(ssg_inst->mid, target_addr, swim_iping_rpc_id, &handle);
258 259 260
    if(hret != HG_SUCCESS)
        return;

261
    SSG_DEBUG(g, "SWIM: send iping req to %d (target=%d)\n",
Shane Snyder's avatar
Shane Snyder committed
262
        (int)my_subgroup_member, (int)swim_ctx->ping_target);
263 264 265 266

    /* fill the indirect ping request with target member and current
     * membership state
     */
Shane Snyder's avatar
Shane Snyder committed
267
    iping_req.target_id = swim_ctx->ping_target;
268
    swim_pack_message(g, &(iping_req.msg));
269 270 271 272 273 274

    /* send this indirect ping */
    /* NOTE: the timeout is just the protocol period length minus
     * the dping timeout, which should cause this iping to timeout
     * right at the end of the current protocol period.
     */
275
    hret = margo_forward_timed(handle, &iping_req,
276 277 278
        (swim_ctx->prot_period_len - swim_ctx->dping_timeout));
    if (hret == HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
279
        hret = margo_get_output(handle, &iping_resp);
280
        if(hret != HG_SUCCESS) goto fini;
281

282
        SSG_DEBUG(g, "SWIM: recv iping ack from %d (target=%d)\n",
Shane Snyder's avatar
Shane Snyder committed
283
            (int)iping_resp.msg.source_id, (int)swim_ctx->ping_target);
284 285

        /* extract target's membership state from response */
286
        swim_unpack_message(g, &(iping_resp.msg));
287 288 289 290 291

        /* mark this iping req as acked, double checking to make
         * sure we aren't inadvertently ack'ing a ping request
         * for a more recent tick of the protocol
         */
Shane Snyder's avatar
Shane Snyder committed
292
        if(swim_ctx->ping_target == iping_req.target_id)
293
            swim_ctx->ping_target_acked = 1;
294

Shane Snyder's avatar
Shane Snyder committed
295
        margo_free_output(handle, &iping_resp);
296 297 298
    }
    else if(hret != HG_TIMEOUT)
    {
299
        SSG_DEBUG(g, "SWIM: iping req error from %d (target=%d, err=%d)\n",
Shane Snyder's avatar
Shane Snyder committed
300
            (int)my_subgroup_member, hret, (int)swim_ctx->ping_target);
301 302
    }

303
fini:
304
    margo_destroy(handle);
305 306 307 308 309
    return;
}

static void swim_iping_recv_ult(hg_handle_t handle)
{
310
    ssg_group_t *g;
311 312 313 314 315 316 317
    swim_context_t *swim_ctx;
    swim_iping_req_t iping_req;
    swim_iping_resp_t iping_resp;
    hg_return_t hret;
    int ret;

    /* get the swim state */
Shane Snyder's avatar
Shane Snyder committed
318
    g = (ssg_group_t *)margo_registered_data(ssg_inst->mid, swim_dping_rpc_id);
319 320
    assert(g != NULL);
    swim_ctx = (swim_context_t *)g->fd_ctx;
321 322
    assert(swim_ctx != NULL);

Shane Snyder's avatar
Shane Snyder committed
323
    hret = margo_get_input(handle, &iping_req);
324
    if(hret != HG_SUCCESS) goto fini;
325

326
    SSG_DEBUG(g, "SWIM: recv iping req from %d (target=%d)\n",
Shane Snyder's avatar
Shane Snyder committed
327
        (int)iping_req.msg.source_id, (int)iping_req.target_id);
328 329

    /* extract sender's membership state from request */
330
    swim_unpack_message(g, &(iping_req.msg));
331 332

    /* send direct ping to target on behalf of who sent iping req */
333
    ret = swim_send_dping(g, iping_req.target_id);
334 335 336 337 338
    if(ret == 0)
    {
        /* if the dping req succeeds, fill the indirect ping
         * response with current membership state
         */
339
        swim_pack_message(g, &(iping_resp.msg));
340

341
        SSG_DEBUG(g, "SWIM: send iping ack to %d (target=%d)\n",
Shane Snyder's avatar
Shane Snyder committed
342
            (int)iping_req.msg.source_id, (int)iping_req.target_id);
343 344

        /* respond to sender of the iping req */
345
        margo_respond(handle, &iping_resp);
346 347
    }

Shane Snyder's avatar
Shane Snyder committed
348
    margo_free_input(handle, &iping_req);
349
fini:
350
    margo_destroy(handle);
351 352 353
    return;
}
DEFINE_MARGO_RPC_HANDLER(swim_iping_recv_ult)
354
#endif
355 356 357 358 359

/********************************
 *      SWIM ping helpers       *
 ********************************/

360
static void swim_pack_message(swim_context_t *swim_ctx, swim_message_t *msg)
361 362 363 364
{
    memset(msg, 0, sizeof(*msg));

    /* fill in self information */
365 366
    msg->source_id = swim_ctx->self_id;
    msg->source_inc_nr = swim_ctx->self_inc_nr;
367

368
#if 0
369
    /* piggyback a set of membership states on this message */
370
    swim_retrieve_membership_updates(g, msg->pb_buf, SWIM_MAX_PIGGYBACK_ENTRIES);
371
#endif
372 373 374 375

    return;
}

376
static void swim_unpack_message(swim_context_t *swim_ctx, swim_message_t *msg)
377
{
378
#if 0
379 380
    swim_member_update_t sender_update;

Shane Snyder's avatar
Shane Snyder committed
381 382 383
    /* apply (implicit) sender update */
    sender_update.id = msg->source_id;
    sender_update.status = SWIM_MEMBER_ALIVE;
384
    sender_update.inc_nr = msg->source_inc_nr;
385
    swim_apply_membership_updates(g, &sender_update, 1);
386

Shane Snyder's avatar
Shane Snyder committed
387
    /* update membership status using piggybacked membership updates */
388
    swim_apply_membership_updates(g, msg->pb_buf, SWIM_MAX_PIGGYBACK_ENTRIES);
389
#endif
390 391 392 393 394 395 396 397 398 399 400 401 402 403

    return;
}

/* manual serialization/deserialization routine for swim messages */
static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data)
{
    swim_message_t *msg = (swim_message_t *)data;
    hg_return_t hret = HG_PROTOCOL_ERROR;
    int i;

    switch(hg_proc_get_op(proc))
    {
        case HG_ENCODE:
404
            hret = hg_proc_swim_member_id_t(proc, &(msg->source_id));
405 406 407 408 409
            if(hret != HG_SUCCESS)
            {
                hret = HG_PROTOCOL_ERROR;
                return hret;
            }
Shane Snyder's avatar
Shane Snyder committed
410
            hret = hg_proc_swim_member_inc_nr_t(proc, &(msg->source_inc_nr));
411 412 413 414 415
            if(hret != HG_SUCCESS)
            {
                hret = HG_PROTOCOL_ERROR;
                return hret;
            }
416
#if 0
417 418 419 420 421 422 423 424 425
            for(i = 0; i < SWIM_MAX_PIGGYBACK_ENTRIES; i++)
            {
                hret = hg_proc_swim_member_update_t(proc, &(msg->pb_buf[i]));
                if(hret != HG_SUCCESS)
                {
                    hret = HG_PROTOCOL_ERROR;
                    return hret;
                }
            }
426
#endif
427 428
            break;
        case HG_DECODE:
429
            hret = hg_proc_swim_member_id_t(proc, &(msg->source_id));
430 431 432 433 434
            if(hret != HG_SUCCESS)
            {
                hret = HG_PROTOCOL_ERROR;
                return hret;
            }
Shane Snyder's avatar
Shane Snyder committed
435
            hret = hg_proc_swim_member_inc_nr_t(proc, &(msg->source_inc_nr));
436 437 438 439 440
            if(hret != HG_SUCCESS)
            {
                hret = HG_PROTOCOL_ERROR;
                return hret;
            }
441
#if 0
442 443 444 445 446 447 448 449 450
            for(i = 0; i < SWIM_MAX_PIGGYBACK_ENTRIES; i++)
            {
                hret = hg_proc_swim_member_update_t(proc, &(msg->pb_buf[i]));
                if(hret != HG_SUCCESS)
                {
                    hret = HG_PROTOCOL_ERROR;
                    return hret;
                }
            }
451
#endif
452 453 454 455 456 457 458 459 460 461 462
            break;
        case HG_FREE:
            /* do nothing */
            hret = HG_SUCCESS;
            break;
        default:
            break;
    }

    return(hret);
}