swim-fd.c 19.1 KB
Newer Older
1
/*
Shane Snyder's avatar
Shane Snyder committed
2 3
 * Copyright (c) 2016 UChicago Argonne, LLC
 *
4 5 6
 * See COPYRIGHT in top-level directory.
 */

7 8
#include <ssg-config.h>

9 10 11 12 13
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>

14 15 16 17
#include <abt.h>
#include <margo.h>

#include "ssg.h"
18
#include "ssg-internal.h"
19
#include "swim-fd.h"
20
#include "swim-fd-internal.h"
21

22
typedef struct swim_suspect_member_link
23
{
24
    ssg_member_id_t member_id;
25 26
    double susp_start;
    struct swim_suspect_member_link *next;
27 28
} swim_suspect_member_link_t;

29
typedef struct swim_member_update_link
30
{
Shane Snyder's avatar
Shane Snyder committed
31
    swim_member_update_t update;
32
    int tx_count;
33
    struct swim_member_update_link *next;
34
} swim_member_update_link_t;
35 36 37 38

/* SWIM ABT ULT prototypes */
static void swim_prot_ult(
    void *t_arg);
39 40
static void swim_tick_ult(
    void *t_arg);
41

42
/* SWIM group membership utility function prototypes */
43
static void swim_suspect_member(
44
    ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr);
45
static void swim_unsuspect_member(
46
    ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr);
47
static void swim_kill_member(
48
    ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr);
49
static void swim_update_suspected_members(
Shane Snyder's avatar
Shane Snyder committed
50
    ssg_group_t *g, double susp_timeout);
Shane Snyder's avatar
Shane Snyder committed
51
static void swim_add_recent_member_update(
Shane Snyder's avatar
Shane Snyder committed
52
    ssg_group_t *g, swim_member_update_t update);
53
static int swim_get_rand_group_member(
54
    ssg_group_t *g, ssg_member_id_t *member_id);
55
static int swim_get_rand_group_member_set(
56 57
    ssg_group_t *g, ssg_member_id_t *member_ids, int num_members,
    ssg_member_id_t excluded_id);
58 59 60 61 62

/******************************************************
 * SWIM protocol init/finalize functions and ABT ULTs *
 ******************************************************/

63 64
swim_context_t * swim_init(
    ssg_group_t * g,
65 66 67
    int active)
{
    swim_context_t *swim_ctx;
68 69
    int i, ret;

70
    if (g == NULL) return NULL;
71 72 73

    /* allocate structure for storing swim context */
    swim_ctx = malloc(sizeof(*swim_ctx));
74
    if (!swim_ctx) return NULL;
75 76
    memset(swim_ctx, 0, sizeof(*swim_ctx));

Shane Snyder's avatar
Shane Snyder committed
77 78
    /* initialize SWIM context */
    margo_get_handler_pool(ssg_inst->mid, &swim_ctx->prot_pool);
79
    swim_ctx->ping_target = SSG_MEMBER_ID_INVALID;
80
    for(i = 0; i < SWIM_MAX_SUBGROUP_SIZE; i++)
81
        swim_ctx->subgroup_members[i] = SSG_MEMBER_ID_INVALID;
82

83
    swim_ctx->member_inc_nrs = malloc(g->view.size *
84
        sizeof(*(swim_ctx->member_inc_nrs)));
85 86 87 88 89 90
    if (!swim_ctx->member_inc_nrs)
    {
        free(swim_ctx);
        return NULL;
    }
    memset(swim_ctx->member_inc_nrs, 0, g->view.size *
91 92
        sizeof(*(swim_ctx->member_inc_nrs)));

93 94 95 96 97
    /* set protocol parameters */
    swim_ctx->prot_period_len = SWIM_DEF_PROTOCOL_PERIOD_LEN;
    swim_ctx->prot_susp_timeout = SWIM_DEF_SUSPECT_TIMEOUT;
    swim_ctx->prot_subgroup_sz = SWIM_DEF_SUBGROUP_SIZE;

98
    swim_register_ping_rpcs(g);
99 100 101

    if(active)
    {
102
        ret = ABT_thread_create(swim_ctx->prot_pool, swim_prot_ult, g,
103 104 105 106
            ABT_THREAD_ATTR_NULL, &(swim_ctx->prot_thread));
        if(ret != ABT_SUCCESS)
        {
            fprintf(stderr, "Error: unable to create SWIM protocol ULT.\n");
107 108
            free(swim_ctx->member_inc_nrs);
            free(swim_ctx);
109 110 111 112 113 114 115
            return(NULL);
        }
    }

    return(swim_ctx);
}

116
static void swim_prot_ult(
117
    void * t_arg)
118
{
119
    int ret;
120
    ssg_group_t *g = (ssg_group_t *)t_arg;
121
    swim_context_t *swim_ctx;
122

123
    assert(g != NULL);
Shane Snyder's avatar
Shane Snyder committed
124
    swim_ctx = g->swim_ctx;
125
    assert(swim_ctx != NULL);
126

127 128 129 130
    SSG_DEBUG(g, "SWIM: protocol start (period_len=%.4f, susp_timeout=%d, subgroup_size=%d)\n",
        swim_ctx->prot_period_len, swim_ctx->prot_susp_timeout,
        swim_ctx->prot_subgroup_sz);
    while(!(swim_ctx->shutdown_flag))
131 132
    {
        /* spawn a ULT to run this tick */
133
        ret = ABT_thread_create(swim_ctx->prot_pool, swim_tick_ult, g,
134 135 136 137 138 139 140
            ABT_THREAD_ATTR_NULL, NULL);
        if(ret != ABT_SUCCESS)
        {
            fprintf(stderr, "Error: unable to create ULT for SWIM protocol tick\n");
        }

        /* sleep for a protocol period length */
141
        margo_thread_sleep(ssg_inst->mid, swim_ctx->prot_period_len);
142
    }
143
    SSG_DEBUG(g, "SWIM: protocol shutdown\n");
144 145 146 147

    return;
}

148
static void swim_tick_ult(
149
    void * t_arg)
150
{
151 152
    ssg_group_t *g = (ssg_group_t *)t_arg;
    swim_context_t *swim_ctx;
153 154 155
    int i;
    int ret;

156
    assert(g != NULL);
Shane Snyder's avatar
Shane Snyder committed
157
    swim_ctx = g->swim_ctx;
158 159 160
    assert(swim_ctx != NULL);

    /* update status of any suspected members */
Shane Snyder's avatar
Shane Snyder committed
161
    swim_update_suspected_members(g, swim_ctx->prot_susp_timeout *
162 163 164 165 166
        swim_ctx->prot_period_len);

    /* check whether the ping target from the previous protocol tick
     * ever successfully acked a (direct/indirect) ping request
     */
167
    if((swim_ctx->ping_target != SSG_MEMBER_ID_INVALID) &&
168 169 170
        !(swim_ctx->ping_target_acked))
    {
        /* no response from direct/indirect pings, suspect this member */
Shane Snyder's avatar
Shane Snyder committed
171
        swim_suspect_member(g, swim_ctx->ping_target, swim_ctx->ping_target_inc_nr);
172 173 174
    }

    /* pick a random member from view and ping */
175
    if(swim_get_rand_group_member(g, &(swim_ctx->ping_target)) == 0)
176 177
    {
        /* no available members, back out */
178
        SSG_DEBUG(g, "SWIM: no group members available to dping\n");
179 180
        return;
    }
181 182 183 184 185

    /* TODO: calculate estimated RTT using sliding window of past RTTs */
    swim_ctx->dping_timeout = 250.0;

    /* kick off dping request ULT */
186
    swim_ctx->ping_target_inc_nr = swim_ctx->member_inc_nrs[swim_ctx->ping_target];
187
    swim_ctx->ping_target_acked = 0;
188
    ret = ABT_thread_create(swim_ctx->prot_pool, swim_dping_send_ult, g,
189 190 191 192 193 194 195 196
        ABT_THREAD_ATTR_NULL, NULL);
    if(ret != ABT_SUCCESS)
    {
        fprintf(stderr, "Error: unable to create ULT for SWIM dping send\n");
        return;
    }

    /* sleep for an RTT and wait for an ack for this dping req */
197
    margo_thread_sleep(ssg_inst->mid, swim_ctx->dping_timeout);
198 199 200 201

    /* if we don't hear back from the target after an RTT, kick off
     * a set of indirect pings to a subgroup of group members
     */
Shane Snyder's avatar
Shane Snyder committed
202
    if(!(swim_ctx->ping_target_acked) && (swim_ctx->prot_subgroup_sz > 0))
203 204
    {
        /* get a random subgroup of members to send indirect pings to */
205
        int this_subgroup_sz = swim_get_rand_group_member_set(g,
206 207 208 209 210
            swim_ctx->subgroup_members, swim_ctx->prot_subgroup_sz,
            swim_ctx->ping_target);
        if(this_subgroup_sz == 0)
        {
            /* no available subgroup members, back out */
211
            SSG_DEBUG(g, "SWIM: no subgroup members available to iping\n");
212 213 214 215 216
            return;
        }

        for(i = 0; i < this_subgroup_sz; i++)
        {
217
            ret = ABT_thread_create(swim_ctx->prot_pool, swim_iping_send_ult, g,
218 219 220 221 222 223 224 225 226
                ABT_THREAD_ATTR_NULL, NULL);
            if(ret != ABT_SUCCESS)
            {
                fprintf(stderr, "Error: unable to create ULT for SWIM iping send\n");
                return;
            }
        }
    }

227 228 229
    return;
}

230 231
void swim_finalize(swim_context_t *swim_ctx)
{
232 233 234 235 236 237 238 239 240 241
    /* set shutdown flag so ULTs know to start wrapping up */
    swim_ctx->shutdown_flag = 1;

    if(swim_ctx->prot_thread)
    {
        /* wait for the protocol ULT to terminate */
        ABT_thread_join(swim_ctx->prot_thread);
        ABT_thread_free(&(swim_ctx->prot_thread));
    }

242
    free(swim_ctx->member_inc_nrs);
243 244 245 246 247
    free(swim_ctx);

    return;
}

Shane Snyder's avatar
Shane Snyder committed
248 249 250 251 252
/************************************
 * SWIM membership update functions *
 ************************************/

void swim_retrieve_membership_updates(
253 254
    ssg_group_t * g,
    swim_member_update_t * updates,
Shane Snyder's avatar
Shane Snyder committed
255 256
    int update_count)
{
Shane Snyder's avatar
Shane Snyder committed
257
    swim_context_t *swim_ctx = g->swim_ctx;
Shane Snyder's avatar
Shane Snyder committed
258
    swim_member_update_link_t *iter, *tmp;
Shane Snyder's avatar
Shane Snyder committed
259 260
    swim_member_update_link_t *recent_update_list_p =
        (swim_member_update_link_t *)swim_ctx->recent_update_list;
Shane Snyder's avatar
Shane Snyder committed
261 262
    int i = 0;

Shane Snyder's avatar
Shane Snyder committed
263
    LL_FOREACH_SAFE(recent_update_list_p, iter, tmp)
Shane Snyder's avatar
Shane Snyder committed
264 265 266 267 268 269 270 271 272 273
    {
        if(i == update_count)
            break;

        memcpy(&updates[i], &iter->update, sizeof(iter->update));

        /* remove this update if it has been piggybacked enough */
        iter->tx_count++;
        if(iter->tx_count == SWIM_MAX_PIGGYBACK_TX_COUNT)
        {
Shane Snyder's avatar
Shane Snyder committed
274
            LL_DELETE(recent_update_list_p, iter);
Shane Snyder's avatar
Shane Snyder committed
275 276 277 278 279 280 281 282
            free(iter);
        }
        i++;
    }

    /* invalidate remaining updates */
    for(; i < update_count; i++)
    {
283
        updates[i].id = SSG_MEMBER_ID_INVALID;
Shane Snyder's avatar
Shane Snyder committed
284 285 286 287 288 289
    }

    return;
}

void swim_apply_membership_updates(
Shane Snyder's avatar
Shane Snyder committed
290
    ssg_group_t *g,
Shane Snyder's avatar
Shane Snyder committed
291 292 293
    swim_member_update_t *updates,
    int update_count)
{
Shane Snyder's avatar
Shane Snyder committed
294
    swim_context_t *swim_ctx = g->swim_ctx;
295
    ssg_member_id_t self_id = g->self_id;
Shane Snyder's avatar
Shane Snyder committed
296 297 298 299
    int i;

    for(i = 0; i < update_count; i++)
    {
300
        if(updates[i].id == SSG_MEMBER_ID_INVALID)
Shane Snyder's avatar
Shane Snyder committed
301 302 303 304 305 306 307 308 309 310 311
            break;

        switch(updates[i].status)
        {
            case SWIM_MEMBER_ALIVE:
                if(updates[i].id == self_id)
                {
                    assert(updates[i].inc_nr <= swim_ctx->member_inc_nrs[self_id]);
                }
                else
                {
Shane Snyder's avatar
Shane Snyder committed
312
                    swim_unsuspect_member(g, updates[i].id, updates[i].inc_nr);
Shane Snyder's avatar
Shane Snyder committed
313 314 315 316 317 318 319 320 321 322
                }
                break;
            case SWIM_MEMBER_SUSPECT:
                if(updates[i].id == self_id)
                {
                    assert(updates[i].inc_nr <= swim_ctx->member_inc_nrs[self_id]);
                    /* increment our incarnation number if we are suspected
                     * in the current incarnation
                     */
                    if(updates[i].inc_nr == swim_ctx->member_inc_nrs[self_id])
323
                    {
Shane Snyder's avatar
Shane Snyder committed
324
                        swim_ctx->member_inc_nrs[self_id]++;
Shane Snyder's avatar
Shane Snyder committed
325
                        SSG_DEBUG(g, "SWIM: self SUSPECT received (new inc_nr=%d)\n",
326 327
                            swim_ctx->member_inc_nrs[self_id]);
                    }
Shane Snyder's avatar
Shane Snyder committed
328 329 330
                }
                else
                {
Shane Snyder's avatar
Shane Snyder committed
331
                    swim_suspect_member(g, updates[i].id, updates[i].inc_nr);
Shane Snyder's avatar
Shane Snyder committed
332 333 334 335 336
                }
                break;
            case SWIM_MEMBER_DEAD:
                if(updates[i].id == self_id)
                {
337
                    /* if we get an update that we are dead, just shut down */
Shane Snyder's avatar
Shane Snyder committed
338 339
                    assert(updates[i].inc_nr <= swim_ctx->member_inc_nrs[self_id]);
                    swim_ctx->member_inc_nrs[self_id] = updates[i].inc_nr;
340

Shane Snyder's avatar
Shane Snyder committed
341
                    SSG_DEBUG(g, "SWIM: self confirmed DEAD (inc_nr=%d)\n",
342 343
                        swim_ctx->member_inc_nrs[self_id]);

Shane Snyder's avatar
Shane Snyder committed
344 345 346 347
                    swim_finalize(swim_ctx);
                }
                else
                {
Shane Snyder's avatar
Shane Snyder committed
348
                    swim_kill_member(g, updates[i].id, updates[i].inc_nr);
Shane Snyder's avatar
Shane Snyder committed
349 350 351
                }
                break;
            default:
Shane Snyder's avatar
Shane Snyder committed
352
                SSG_DEBUG(g, "SWIM: invalid membership status update\n");
Shane Snyder's avatar
Shane Snyder committed
353 354 355 356 357 358
        }
    }

    return;
}

359 360 361 362
/*******************************************
 * SWIM group membership utility functions *
 *******************************************/

Shane Snyder's avatar
Shane Snyder committed
363
static void swim_suspect_member(
364
    ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr)
365
{
Shane Snyder's avatar
Shane Snyder committed
366
    swim_context_t *swim_ctx = g->swim_ctx;
367 368
    swim_suspect_member_link_t *iter, *tmp;
    swim_suspect_member_link_t *suspect_link = NULL;
Shane Snyder's avatar
Shane Snyder committed
369 370
    swim_suspect_member_link_t *suspect_list_p =
        (swim_suspect_member_link_t *)swim_ctx->suspect_list;
Shane Snyder's avatar
Shane Snyder committed
371
    swim_member_update_t update;
372

Shane Snyder's avatar
Shane Snyder committed
373
    /* ignore updates for dead members */
Shane Snyder's avatar
Shane Snyder committed
374
#if 0
Shane Snyder's avatar
Shane Snyder committed
375
    if(!(g->view.member_states[member_id].is_member))
376
        return;
Shane Snyder's avatar
Shane Snyder committed
377
#endif
378

379
    /* determine if this member is already suspected */
Shane Snyder's avatar
Shane Snyder committed
380
    LL_FOREACH_SAFE(suspect_list_p, iter, tmp)
381
    {
Shane Snyder's avatar
Shane Snyder committed
382
        if(iter->member_id == member_id)
383
        {
Shane Snyder's avatar
Shane Snyder committed
384
            if(inc_nr <= swim_ctx->member_inc_nrs[member_id])
385 386 387 388 389 390 391 392 393
            {
                /* ignore a suspicion in an incarnation number less than
                 * or equal to the current suspicion's incarnation
                 */
                return;
            }

            /* otherwise, we have a suspicion in a more recent incarnation --
             * remove the current suspicion so we can update it
394
             */
Shane Snyder's avatar
Shane Snyder committed
395
            LL_DELETE(suspect_list_p, iter);
396 397 398 399
            suspect_link = iter;
        }
    }

Shane Snyder's avatar
Shane Snyder committed
400
    /* ignore suspicions for a member that is alive in a newer incarnation */
Shane Snyder's avatar
Shane Snyder committed
401
    if((suspect_link == NULL) && (inc_nr < swim_ctx->member_inc_nrs[member_id]))
Shane Snyder's avatar
Shane Snyder committed
402 403 404 405 406
        return;

    /* if there is no suspicion timeout, just kill the member */
    if(swim_ctx->prot_susp_timeout == 0)
    {
Shane Snyder's avatar
Shane Snyder committed
407
        swim_kill_member(g, member_id, inc_nr);
Shane Snyder's avatar
Shane Snyder committed
408 409 410
        return;
    }

Shane Snyder's avatar
Shane Snyder committed
411
    SSG_DEBUG(g, "SWIM: member %d SUSPECT (inc_nr=%d)\n",
Shane Snyder's avatar
Shane Snyder committed
412
        (int)member_id, (int)inc_nr);
413 414 415 416 417 418 419

    if(suspect_link == NULL)
    {
        /* if this member is not already on the suspect list,
         * allocate a link for it
         */
        suspect_link = malloc(sizeof(*suspect_link));
420 421
        if (!suspect_link)
            return;
422
        memset(suspect_link, 0, sizeof(*suspect_link));
Shane Snyder's avatar
Shane Snyder committed
423
        suspect_link->member_id = member_id;
424 425 426 427
    }
    suspect_link->susp_start = ABT_get_wtime();

    /* add to end of suspect list */
Shane Snyder's avatar
Shane Snyder committed
428
    LL_APPEND(suspect_list_p, suspect_link);
429 430

    /* update swim membership state */
Shane Snyder's avatar
Shane Snyder committed
431
    swim_ctx->member_inc_nrs[member_id] = inc_nr;
432 433 434 435

    /* add this update to recent update list so it will be piggybacked
     * on future protocol messages
     */
Shane Snyder's avatar
Shane Snyder committed
436 437 438
    update.id = member_id;
    update.status = SWIM_MEMBER_SUSPECT;
    update.inc_nr = inc_nr;
Shane Snyder's avatar
Shane Snyder committed
439
    swim_add_recent_member_update(g, update);
440 441 442 443

    return;
}

Shane Snyder's avatar
Shane Snyder committed
444
static void swim_unsuspect_member(
445
    ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr)
446
{
Shane Snyder's avatar
Shane Snyder committed
447
    swim_context_t *swim_ctx = g->swim_ctx;
448
    swim_suspect_member_link_t *iter, *tmp;
Shane Snyder's avatar
Shane Snyder committed
449 450
    swim_suspect_member_link_t *suspect_list_p =
        (swim_suspect_member_link_t *)swim_ctx->suspect_list;
Shane Snyder's avatar
Shane Snyder committed
451
    swim_member_update_t update;
452

Shane Snyder's avatar
Shane Snyder committed
453
    /* ignore updates for dead members */
Shane Snyder's avatar
Shane Snyder committed
454
#if 0
Shane Snyder's avatar
Shane Snyder committed
455
    if(!(g->view.member_states[member_id].is_member))
456
        return;
Shane Snyder's avatar
Shane Snyder committed
457
#endif
458

Shane Snyder's avatar
Shane Snyder committed
459
    /* ignore alive updates for incarnation numbers that aren't new */
Shane Snyder's avatar
Shane Snyder committed
460
    if(inc_nr <= swim_ctx->member_inc_nrs[member_id])
Shane Snyder's avatar
Shane Snyder committed
461 462
        return;

Shane Snyder's avatar
Shane Snyder committed
463
    SSG_DEBUG(g, "SWIM: member %d ALIVE (inc_nr=%d)\n",
Shane Snyder's avatar
Shane Snyder committed
464
        (int)member_id, (int)inc_nr);
465

Shane Snyder's avatar
Shane Snyder committed
466
    /* if member is suspected, remove from suspect list */
Shane Snyder's avatar
Shane Snyder committed
467
    LL_FOREACH_SAFE(suspect_list_p, iter, tmp)
468
    {
Shane Snyder's avatar
Shane Snyder committed
469
        if(iter->member_id == member_id)
470
        {
Shane Snyder's avatar
Shane Snyder committed
471
            LL_DELETE(suspect_list_p, iter);
472 473 474
            free(iter);
            break;
        }
475 476
    }

477
    /* update swim membership state */
Shane Snyder's avatar
Shane Snyder committed
478
    swim_ctx->member_inc_nrs[member_id] = inc_nr;
479 480 481 482

    /* add this update to recent update list so it will be piggybacked
     * on future protocol messages
     */
Shane Snyder's avatar
Shane Snyder committed
483 484 485
    update.id = member_id;
    update.status = SWIM_MEMBER_ALIVE;
    update.inc_nr = inc_nr;
Shane Snyder's avatar
Shane Snyder committed
486
    swim_add_recent_member_update(g, update);
487 488 489 490

    return;
}

Shane Snyder's avatar
Shane Snyder committed
491
static void swim_kill_member(
492
    ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr)
493
{
Shane Snyder's avatar
Shane Snyder committed
494 495
    swim_context_t *swim_ctx = g->swim_ctx;
    ssg_member_state_t *member_state;
496
    swim_suspect_member_link_t *iter, *tmp;
Shane Snyder's avatar
Shane Snyder committed
497 498
    swim_suspect_member_link_t *suspect_list_p =
        (swim_suspect_member_link_t *)swim_ctx->suspect_list;
499 500
    swim_member_update_t swim_update;
    ssg_membership_update_t ssg_update;
501

Shane Snyder's avatar
Shane Snyder committed
502 503 504 505 506 507 508 509
    HASH_FIND(hh, g->view.member_map, &member_id, sizeof(ssg_member_id_t),
        member_state);
    if(!member_state)
    {
        fprintf(stderr, "Error: unable to kill member %lu, not in view\n", member_id);
        return;
    }

Shane Snyder's avatar
Shane Snyder committed
510
    /* ignore updates for dead members */
Shane Snyder's avatar
Shane Snyder committed
511
#if 0
Shane Snyder's avatar
Shane Snyder committed
512
    if(!(g->view.member_states[member_id].is_member))
513
        return;
Shane Snyder's avatar
Shane Snyder committed
514
#endif
515

Shane Snyder's avatar
Shane Snyder committed
516
    SSG_DEBUG(g, "SWIM: member %lu DEAD (inc_nr=%u)\n", member_id, inc_nr);
517

Shane Snyder's avatar
Shane Snyder committed
518
    LL_FOREACH_SAFE(suspect_list_p, iter, tmp)
519
    {
Shane Snyder's avatar
Shane Snyder committed
520
        if(iter->member_id == member_id)
521 522
        {
            /* remove member from suspect list */
Shane Snyder's avatar
Shane Snyder committed
523
            LL_DELETE(suspect_list_p, iter);
524 525 526 527 528
            free(iter);
            break;
        }
    }

529
    /* update swim membership state */
Shane Snyder's avatar
Shane Snyder committed
530
    swim_ctx->member_inc_nrs[member_id] = inc_nr;
531

532 533 534
    /* add this update to recent update list so it will be piggybacked
     * on future protocol messages
     */
535 536 537 538 539 540 541 542
    swim_update.id = member_id;
    swim_update.status = SWIM_MEMBER_DEAD;
    swim_update.inc_nr = inc_nr;
    swim_add_recent_member_update(g, swim_update);
    /* have SSG apply the membership update */
    ssg_update.member = member_id;
    ssg_update.type = SSG_MEMBER_REMOVE;
    ssg_apply_membership_update(g, ssg_update);
543 544 545 546

    return;
}

Shane Snyder's avatar
Shane Snyder committed
547
static void swim_update_suspected_members(
Shane Snyder's avatar
Shane Snyder committed
548
    ssg_group_t *g, double susp_timeout)
549
{
Shane Snyder's avatar
Shane Snyder committed
550
    swim_context_t *swim_ctx = g->swim_ctx;
551 552 553
    double now = ABT_get_wtime();
    double susp_dur;
    swim_suspect_member_link_t *iter, *tmp;
Shane Snyder's avatar
Shane Snyder committed
554 555
    swim_suspect_member_link_t *suspect_list_p =
        (swim_suspect_member_link_t *)swim_ctx->suspect_list;
556

Shane Snyder's avatar
Shane Snyder committed
557
    LL_FOREACH_SAFE(suspect_list_p, iter, tmp)
558 559
    {
        susp_dur = now - iter->susp_start;
Shane Snyder's avatar
Shane Snyder committed
560
        if(susp_dur >= (susp_timeout / 1000.0))
561 562 563 564
        {
            /* if this member has exceeded its allowable suspicion timeout,
             * we mark it as dead
             */
Shane Snyder's avatar
Shane Snyder committed
565
            swim_kill_member(g, iter->member_id,
Shane Snyder's avatar
Shane Snyder committed
566 567 568 569 570 571 572 573
                swim_ctx->member_inc_nrs[iter->member_id]);
        }
    }

    return;
}

static void swim_add_recent_member_update(
Shane Snyder's avatar
Shane Snyder committed
574
    ssg_group_t *g, swim_member_update_t update)
Shane Snyder's avatar
Shane Snyder committed
575
{
Shane Snyder's avatar
Shane Snyder committed
576
    swim_context_t *swim_ctx = g->swim_ctx;
Shane Snyder's avatar
Shane Snyder committed
577 578 579 580 581 582 583 584 585 586 587 588
    swim_member_update_link_t *iter, *tmp;
    swim_member_update_link_t *update_link = NULL;
    swim_member_update_link_t **recent_update_list_p =
        (swim_member_update_link_t **)&(swim_ctx->recent_update_list);

    /* search and remove any recent updates corresponding to this member */
    LL_FOREACH_SAFE(*recent_update_list_p, iter, tmp)
    {
        if(iter->update.id == update.id)
        {
            LL_DELETE(*recent_update_list_p, iter);
            update_link = iter;
589 590
        }
    }
591

Shane Snyder's avatar
Shane Snyder committed
592 593 594 595 596 597 598 599 600 601 602 603 604
    if(update_link == NULL)
    {
        update_link = malloc(sizeof(*update_link));
        assert(update_link);
        memset(update_link, 0, sizeof(*update_link));
    }

    memcpy(&update_link->update, &update, sizeof(update));

    /* add to recent update list */
    update_link->tx_count = 0;
    LL_APPEND(*recent_update_list_p, update_link);

605 606
    return;
}
607

Shane Snyder's avatar
Shane Snyder committed
608
static int swim_get_rand_group_member(
609
    ssg_group_t *g, ssg_member_id_t *member_id)
610
{
611
    int ret = swim_get_rand_group_member_set(g, member_id, 1,
612
        SSG_MEMBER_ID_INVALID);
613 614 615 616

    return(ret);
}

Shane Snyder's avatar
Shane Snyder committed
617
static int swim_get_rand_group_member_set(
618 619
    ssg_group_t *g, ssg_member_id_t *member_ids, int num_members,
    ssg_member_id_t excluded_id)
620
{
621 622
    unsigned int i, j, rand_ndx = 0;
    ssg_member_id_t r, rand_member;
623

624 625 626
    if(num_members == 0)
        return(0);

627 628 629 630 631
    /* TODO: what data structure could we use to avoid looping to look
     * for a set of random ranks
     */
    do
    {
632 633
        r = rand() % g->view.size;
        for(i = 0; i < g->view.size; i++)
634
        {
635 636 637 638 639 640 641 642 643 644
            rand_member = (r + i) % g->view.size;
            if(rand_member == g->self_id || rand_member == excluded_id ||
                !(g->view.member_states[rand_member].is_member))
                continue;
            for(j = 0; j < rand_ndx; j++)
            {
                if(rand_member == member_ids[j])
                    break;
            }
            if(j == rand_ndx)
645 646
                break;
        }
647 648
        if(i == g->view.size)
            break;
649

Shane Snyder's avatar
Shane Snyder committed
650
        member_ids[rand_ndx++] = rand_member;
651
    } while((int)rand_ndx < num_members);
652

653 654
    return(rand_ndx);
}