swim-fd.c 18 KB
Newer Older
1
/*
Shane Snyder's avatar
Shane Snyder committed
2 3
 * Copyright (c) 2016 UChicago Argonne, LLC
 *
4 5 6 7 8 9 10
 * See COPYRIGHT in top-level directory.
 */
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>

11 12 13
#include <abt.h>
#include <margo.h>

14
#include "swim-fd.h"
15
#include "swim-fd-internal.h"
16

17
typedef struct swim_suspect_member_link
18
{
Shane Snyder's avatar
Shane Snyder committed
19
    swim_member_id_t member_id;
20
    swim_member_state_t *member_state;
21 22
    double susp_start;
    struct swim_suspect_member_link *next;
23 24
} swim_suspect_member_link_t;

25
typedef struct swim_member_update_link
26
{
Shane Snyder's avatar
Shane Snyder committed
27
    swim_member_update_t update;
28
    int tx_count;
29
    struct swim_member_update_link *next;
30
} swim_member_update_link_t;
31 32 33 34

/* SWIM ABT ULT prototypes */
static void swim_prot_ult(
    void *t_arg);
35 36
static void swim_tick_ult(
    void *t_arg);
37

38
/* SWIM group membership utility function prototypes */
39
static void swim_suspect_member(
Shane Snyder's avatar
Shane Snyder committed
40 41
    swim_context_t *swim_ctx, swim_member_id_t member_id,
    swim_member_inc_nr_t inc_nr);
42
static void swim_unsuspect_member(
43 44
    swim_context_t *swim_ctx, swim_member_id_t member_id,
    swim_member_inc_nr_t inc_nr);
45
static void swim_kill_member(
46 47
    swim_context_t *swim_ctx, swim_member_id_t member_id,
    swim_member_inc_nr_t inc_nr);
48
static void swim_update_suspected_members(
49
    swim_context_t *swim_ctx, double susp_timeout);
Shane Snyder's avatar
Shane Snyder committed
50
static void swim_add_recent_member_update(
51
    swim_context_t *swim_ctx, swim_member_update_t update);
52 53 54 55 56

/******************************************************
 * SWIM protocol init/finalize functions and ABT ULTs *
 ******************************************************/

57
swim_context_t * swim_init(
58
    margo_instance_id mid,
Shane Snyder's avatar
Shane Snyder committed
59
    void * group_data,
60
    swim_member_id_t self_id,
61
    swim_group_mgmt_callbacks_t swim_callbacks,
62 63 64
    int active)
{
    swim_context_t *swim_ctx;
Shane Snyder's avatar
Shane Snyder committed
65
    int ret;
66

67 68
    /* allocate structure for storing swim context */
    swim_ctx = malloc(sizeof(*swim_ctx));
69
    if (!swim_ctx) return NULL;
70
    memset(swim_ctx, 0, sizeof(*swim_ctx));
71 72
    swim_ctx->mid = mid;
    swim_ctx->group_data = group_data;
73 74
    swim_ctx->self_id = self_id;
    swim_ctx->self_inc_nr = 0;
75
    swim_ctx->swim_callbacks = swim_callbacks;
76

77 78 79 80 81
    /* set protocol parameters */
    swim_ctx->prot_period_len = SWIM_DEF_PROTOCOL_PERIOD_LEN;
    swim_ctx->prot_susp_timeout = SWIM_DEF_SUSPECT_TIMEOUT;
    swim_ctx->prot_subgroup_sz = SWIM_DEF_SUBGROUP_SIZE;

82 83 84
    margo_get_handler_pool(swim_ctx->mid, &swim_ctx->swim_pool);
    ABT_mutex_create(&swim_ctx->swim_mutex);

Shane Snyder's avatar
Shane Snyder committed
85 86 87 88 89
    /* NOTE: set this flag so we don't inadvertently suspect a member
     * on the first iteration of the protocol
     */
    swim_ctx->ping_target_acked = 1;

90
    swim_register_ping_rpcs(swim_ctx);
91 92 93

    if(active)
    {
94
        ret = ABT_thread_create(swim_ctx->swim_pool, swim_prot_ult, swim_ctx,
95 96 97 98
            ABT_THREAD_ATTR_NULL, &(swim_ctx->prot_thread));
        if(ret != ABT_SUCCESS)
        {
            fprintf(stderr, "Error: unable to create SWIM protocol ULT.\n");
99
            free(swim_ctx);
100 101 102 103 104 105 106
            return(NULL);
        }
    }

    return(swim_ctx);
}

107
static void swim_prot_ult(
108
    void * t_arg)
109
{
110
    int ret;
111
    swim_context_t *swim_ctx = (swim_context_t *)t_arg;
112

113
    assert(swim_ctx != NULL);
114

115 116
    SWIM_DEBUG(swim_ctx,
        "protocol start (period_len=%.4f, susp_timeout=%d, subgroup_size=%d)\n",
117 118
        swim_ctx->prot_period_len, swim_ctx->prot_susp_timeout,
        swim_ctx->prot_subgroup_sz);
119

120
    while(!(swim_ctx->shutdown_flag))
121 122
    {
        /* spawn a ULT to run this tick */
123
        ret = ABT_thread_create(swim_ctx->swim_pool, swim_tick_ult, swim_ctx,
124 125 126 127 128 129 130
            ABT_THREAD_ATTR_NULL, NULL);
        if(ret != ABT_SUCCESS)
        {
            fprintf(stderr, "Error: unable to create ULT for SWIM protocol tick\n");
        }

        /* sleep for a protocol period length */
131
        margo_thread_sleep(swim_ctx->mid, swim_ctx->prot_period_len);
132
    }
133 134 135

    SWIM_DEBUG(swim_ctx, "protocol shutdown\n");

136 137 138
    return;
}

139
static void swim_tick_ult(
140
    void * t_arg)
141
{
142
    swim_context_t *swim_ctx = (swim_context_t *)t_arg;
143 144 145 146 147 148
    int i;
    int ret;

    assert(swim_ctx != NULL);

    /* update status of any suspected members */
149
    swim_update_suspected_members(swim_ctx, swim_ctx->prot_susp_timeout *
150 151 152 153 154
        swim_ctx->prot_period_len);

    /* check whether the ping target from the previous protocol tick
     * ever successfully acked a (direct/indirect) ping request
     */
Shane Snyder's avatar
Shane Snyder committed
155
    if(!(swim_ctx->ping_target_acked))
156 157
    {
        /* no response from direct/indirect pings, suspect this member */
Shane Snyder's avatar
Shane Snyder committed
158 159
        swim_suspect_member(swim_ctx, swim_ctx->dping_target_id,
            swim_ctx->dping_target_inc_nr);
160 161
    }

Shane Snyder's avatar
Shane Snyder committed
162
    /* pick a random member from view to ping */
163
    ret = swim_ctx->swim_callbacks.get_dping_target(
Shane Snyder's avatar
Shane Snyder committed
164 165
        swim_ctx->group_data, &swim_ctx->dping_target_id,
        &swim_ctx->dping_target_inc_nr, &swim_ctx->dping_target_addr);
166
    if(ret != 0)
167 168
    {
        /* no available members, back out */
169
        SWIM_DEBUG(swim_ctx, "no group members available to dping\n");
170 171
        return;
    }
172 173 174 175 176

    /* TODO: calculate estimated RTT using sliding window of past RTTs */
    swim_ctx->dping_timeout = 250.0;

    /* kick off dping request ULT */
Shane Snyder's avatar
Shane Snyder committed
177
    swim_ctx->ping_target_acked = 0;
178
    ret = ABT_thread_create(swim_ctx->swim_pool, swim_dping_send_ult, swim_ctx,
179 180 181 182 183 184 185 186
        ABT_THREAD_ATTR_NULL, NULL);
    if(ret != ABT_SUCCESS)
    {
        fprintf(stderr, "Error: unable to create ULT for SWIM dping send\n");
        return;
    }

    /* sleep for an RTT and wait for an ack for this dping req */
187
    margo_thread_sleep(swim_ctx->mid, swim_ctx->dping_timeout);
188 189 190 191

    /* if we don't hear back from the target after an RTT, kick off
     * a set of indirect pings to a subgroup of group members
     */
Shane Snyder's avatar
Shane Snyder committed
192
    if(!(swim_ctx->ping_target_acked) && (swim_ctx->prot_subgroup_sz > 0))
193 194
    {
        /* get a random subgroup of members to send indirect pings to */
Shane Snyder's avatar
Shane Snyder committed
195 196 197
        int iping_target_count = swim_ctx->swim_callbacks.get_iping_targets(
            swim_ctx->group_data, swim_ctx->iping_target_ids, swim_ctx->iping_target_addrs);
        if(iping_target_count == 0)
198 199
        {
            /* no available subgroup members, back out */
Shane Snyder's avatar
Shane Snyder committed
200
            SWIM_DEBUG(swim_ctx, "no subgroup members available to iping\n");
201 202 203
            return;
        }

Shane Snyder's avatar
Shane Snyder committed
204 205
        swim_ctx->iping_target_ndx = 0;
        for(i = 0; i < iping_target_count; i++)
206
        {
Shane Snyder's avatar
Shane Snyder committed
207 208
            ret = ABT_thread_create(swim_ctx->swim_pool, swim_iping_send_ult,
                swim_ctx, ABT_THREAD_ATTR_NULL, NULL);
209 210 211 212 213 214 215 216
            if(ret != ABT_SUCCESS)
            {
                fprintf(stderr, "Error: unable to create ULT for SWIM iping send\n");
                return;
            }
        }
    }

217 218 219
    return;
}

220 221
void swim_finalize(swim_context_t *swim_ctx)
{
222 223 224 225 226 227 228 229 230 231
    /* set shutdown flag so ULTs know to start wrapping up */
    swim_ctx->shutdown_flag = 1;

    if(swim_ctx->prot_thread)
    {
        /* wait for the protocol ULT to terminate */
        ABT_thread_join(swim_ctx->prot_thread);
        ABT_thread_free(&(swim_ctx->prot_thread));
    }

232 233 234 235 236
    free(swim_ctx);

    return;
}

Shane Snyder's avatar
Shane Snyder committed
237 238 239 240 241
/************************************
 * SWIM membership update functions *
 ************************************/

void swim_retrieve_membership_updates(
242 243 244
    swim_context_t *swim_ctx,
    swim_member_update_t *updates,
    hg_size_t *update_count)
Shane Snyder's avatar
Shane Snyder committed
245 246
{
    swim_member_update_link_t *iter, *tmp;
247
    swim_member_update_link_t *recent_update_list =
Shane Snyder's avatar
Shane Snyder committed
248
        (swim_member_update_link_t *)swim_ctx->recent_update_list;
249 250
    hg_size_t i = 0;
    hg_size_t max_updates = *update_count;
Shane Snyder's avatar
Shane Snyder committed
251

252
    LL_FOREACH_SAFE(recent_update_list, iter, tmp)
Shane Snyder's avatar
Shane Snyder committed
253
    {
254
        if(i == max_updates)
Shane Snyder's avatar
Shane Snyder committed
255 256 257 258 259 260 261 262
            break;

        memcpy(&updates[i], &iter->update, sizeof(iter->update));

        /* remove this update if it has been piggybacked enough */
        iter->tx_count++;
        if(iter->tx_count == SWIM_MAX_PIGGYBACK_TX_COUNT)
        {
263
            LL_DELETE(recent_update_list, iter);
Shane Snyder's avatar
Shane Snyder committed
264 265 266 267
            free(iter);
        }
        i++;
    }
268
    *update_count = i;
Shane Snyder's avatar
Shane Snyder committed
269 270 271 272 273

    return;
}

void swim_apply_membership_updates(
274
    swim_context_t *swim_ctx,
Shane Snyder's avatar
Shane Snyder committed
275
    swim_member_update_t *updates,
276
    hg_size_t update_count)
Shane Snyder's avatar
Shane Snyder committed
277
{
278
    hg_size_t i;
Shane Snyder's avatar
Shane Snyder committed
279 280 281

    for(i = 0; i < update_count; i++)
    {
282
        switch(updates[i].state.status)
Shane Snyder's avatar
Shane Snyder committed
283 284
        {
            case SWIM_MEMBER_ALIVE:
285 286
                /* ignore alive updates for self */
                if(updates[i].id != swim_ctx->self_id)
287
                    swim_unsuspect_member(swim_ctx, updates[i].id, updates[i].state.inc_nr);
Shane Snyder's avatar
Shane Snyder committed
288 289
                break;
            case SWIM_MEMBER_SUSPECT:
290
                if(updates[i].id == swim_ctx->self_id)
Shane Snyder's avatar
Shane Snyder committed
291 292 293 294
                {
                    /* increment our incarnation number if we are suspected
                     * in the current incarnation
                     */
295
                    if(updates[i].state.inc_nr == swim_ctx->self_inc_nr)
296
                    {
297 298 299
                        swim_ctx->self_inc_nr++;
                        SWIM_DEBUG(swim_ctx, "self SUSPECT received (new inc_nr=%u)\n",
                            swim_ctx->self_inc_nr);
300
                    }
Shane Snyder's avatar
Shane Snyder committed
301 302 303
                }
                else
                {
304
                    swim_suspect_member(swim_ctx, updates[i].id, updates[i].state.inc_nr);
Shane Snyder's avatar
Shane Snyder committed
305 306 307
                }
                break;
            case SWIM_MEMBER_DEAD:
308 309
                /* if we get an update that we are dead, just shut down */
                if(updates[i].id == swim_ctx->self_id)
Shane Snyder's avatar
Shane Snyder committed
310
                {
311
                    SWIM_DEBUG(swim_ctx, "self confirmed DEAD (inc_nr=%u)\n",
312
                        updates[i].state.inc_nr);
Shane Snyder's avatar
Shane Snyder committed
313
                    swim_finalize(swim_ctx);
314
                    return;
Shane Snyder's avatar
Shane Snyder committed
315 316 317
                }
                else
                {
318
                    swim_kill_member(swim_ctx, updates[i].id, updates[i].state.inc_nr);
Shane Snyder's avatar
Shane Snyder committed
319 320 321
                }
                break;
            default:
322
                fprintf(stderr, "Error: invalid SWIM member update\n");
Shane Snyder's avatar
Shane Snyder committed
323 324 325 326 327 328
        }
    }

    return;
}

329 330 331 332
/*******************************************
 * SWIM group membership utility functions *
 *******************************************/

Shane Snyder's avatar
Shane Snyder committed
333
static void swim_suspect_member(
Shane Snyder's avatar
Shane Snyder committed
334
    swim_context_t *swim_ctx, swim_member_id_t member_id, swim_member_inc_nr_t inc_nr)
335
{
Shane Snyder's avatar
Shane Snyder committed
336
    swim_member_state_t *cur_swim_state;
337 338
    swim_suspect_member_link_t *iter, *tmp;
    swim_suspect_member_link_t *suspect_link = NULL;
Shane Snyder's avatar
Shane Snyder committed
339 340
    swim_suspect_member_link_t *suspect_list_p =
        (swim_suspect_member_link_t *)swim_ctx->suspect_list;
Shane Snyder's avatar
Shane Snyder committed
341
    swim_member_update_t update;
342

Shane Snyder's avatar
Shane Snyder committed
343 344 345
    /* if there is no suspicion timeout, just kill the member */
    if(swim_ctx->prot_susp_timeout == 0)
    {
346
        swim_kill_member(swim_ctx, member_id, inc_nr);
Shane Snyder's avatar
Shane Snyder committed
347 348 349
        return;
    }

350 351
    /* lock access to group's swim state */
    ABT_mutex_lock(swim_ctx->swim_mutex);
Shane Snyder's avatar
Shane Snyder committed
352 353 354

    /* get current swim state for member */
    swim_ctx->swim_callbacks.get_member_state(
355
        swim_ctx->group_data, member_id, &cur_swim_state);
Shane Snyder's avatar
Shane Snyder committed
356 357

    /* ignore updates for dead members */
358 359 360
    if(cur_swim_state->status == SWIM_MEMBER_DEAD)
    {
        ABT_mutex_unlock(swim_ctx->swim_mutex);
Shane Snyder's avatar
Shane Snyder committed
361
        return;
362
    }
363

364
    /* determine if this member is already suspected */
Shane Snyder's avatar
Shane Snyder committed
365
    LL_FOREACH_SAFE(suspect_list_p, iter, tmp)
366
    {
Shane Snyder's avatar
Shane Snyder committed
367
        if(iter->member_id == member_id)
368
        {
369
            if(inc_nr <= cur_swim_state->inc_nr)
370 371 372 373
            {
                /* ignore a suspicion in an incarnation number less than
                 * or equal to the current suspicion's incarnation
                 */
374
                ABT_mutex_unlock(swim_ctx->swim_mutex);
375 376 377 378 379
                return;
            }

            /* otherwise, we have a suspicion in a more recent incarnation --
             * remove the current suspicion so we can update it
380
             */
Shane Snyder's avatar
Shane Snyder committed
381
            LL_DELETE(suspect_list_p, iter);
382 383 384 385
            suspect_link = iter;
        }
    }

Shane Snyder's avatar
Shane Snyder committed
386
    /* ignore suspicions for a member that is alive in a newer incarnation */
387 388 389
    if((suspect_link == NULL) && (inc_nr < cur_swim_state->inc_nr))
    {
        ABT_mutex_unlock(swim_ctx->swim_mutex);
Shane Snyder's avatar
Shane Snyder committed
390
        return;
391
    }
Shane Snyder's avatar
Shane Snyder committed
392

393
    SWIM_DEBUG(swim_ctx, "member %lu SUSPECT (inc_nr=%u)\n", member_id, inc_nr);
394 395 396 397 398 399 400

    if(suspect_link == NULL)
    {
        /* if this member is not already on the suspect list,
         * allocate a link for it
         */
        suspect_link = malloc(sizeof(*suspect_link));
401
        if (!suspect_link)
402 403
        {
            ABT_mutex_unlock(swim_ctx->swim_mutex);
404
            return;
405
        }
406
        memset(suspect_link, 0, sizeof(*suspect_link));
Shane Snyder's avatar
Shane Snyder committed
407
        suspect_link->member_id = member_id;
408
        suspect_link->member_state = cur_swim_state;
409 410 411 412
    }
    suspect_link->susp_start = ABT_get_wtime();

    /* add to end of suspect list */
Shane Snyder's avatar
Shane Snyder committed
413
    LL_APPEND(suspect_list_p, suspect_link);
414

415 416 417
    /* update swim membership state */
    cur_swim_state->inc_nr = inc_nr;
    cur_swim_state->status = SWIM_MEMBER_SUSPECT;
418 419 420 421

    /* add this update to recent update list so it will be piggybacked
     * on future protocol messages
     */
Shane Snyder's avatar
Shane Snyder committed
422
    update.id = member_id;
423 424
    update.state.status = SWIM_MEMBER_SUSPECT;
    update.state.inc_nr = inc_nr;
425 426 427
    swim_add_recent_member_update(swim_ctx, update);

    ABT_mutex_unlock(swim_ctx->swim_mutex);
428 429 430 431

    return;
}

Shane Snyder's avatar
Shane Snyder committed
432
static void swim_unsuspect_member(
433
    swim_context_t *swim_ctx, swim_member_id_t member_id, swim_member_inc_nr_t inc_nr)
434
{
435
    swim_member_state_t *cur_swim_state;
436
    swim_suspect_member_link_t *iter, *tmp;
437
    swim_suspect_member_link_t *suspect_list =
Shane Snyder's avatar
Shane Snyder committed
438
        (swim_suspect_member_link_t *)swim_ctx->suspect_list;
Shane Snyder's avatar
Shane Snyder committed
439
    swim_member_update_t update;
440

441 442 443 444 445
    /* lock access to group's swim state */
    ABT_mutex_lock(swim_ctx->swim_mutex);

    /* get current swim state for member */
    swim_ctx->swim_callbacks.get_member_state(
446
        swim_ctx->group_data, member_id, &cur_swim_state);
447

Shane Snyder's avatar
Shane Snyder committed
448
    /* ignore updates for dead members */
449 450 451
    if(cur_swim_state->status == SWIM_MEMBER_DEAD)
    {
        ABT_mutex_unlock(swim_ctx->swim_mutex);
452
        return;
453
    }
454

Shane Snyder's avatar
Shane Snyder committed
455
    /* ignore alive updates for incarnation numbers that aren't new */
456 457 458
    if(inc_nr <= cur_swim_state->inc_nr)
    {
        ABT_mutex_unlock(swim_ctx->swim_mutex);
Shane Snyder's avatar
Shane Snyder committed
459
        return;
460
    }
Shane Snyder's avatar
Shane Snyder committed
461

462
    SWIM_DEBUG(swim_ctx, "member %lu ALIVE (inc_nr=%u)\n", member_id, inc_nr);
463

Shane Snyder's avatar
Shane Snyder committed
464
    /* if member is suspected, remove from suspect list */
465
    LL_FOREACH_SAFE(suspect_list, iter, tmp)
466
    {
Shane Snyder's avatar
Shane Snyder committed
467
        if(iter->member_id == member_id)
468
        {
469
            LL_DELETE(suspect_list, iter);
470 471 472
            free(iter);
            break;
        }
473 474
    }

475
    /* update swim membership state */
476 477
    cur_swim_state->inc_nr = inc_nr;
    cur_swim_state->status = SWIM_MEMBER_ALIVE;
478 479 480 481

    /* add this update to recent update list so it will be piggybacked
     * on future protocol messages
     */
Shane Snyder's avatar
Shane Snyder committed
482
    update.id = member_id;
483 484
    update.state.status = SWIM_MEMBER_ALIVE;
    update.state.inc_nr = inc_nr;
485 486 487
    swim_add_recent_member_update(swim_ctx, update);

    ABT_mutex_unlock(swim_ctx->swim_mutex);
488 489 490 491

    return;
}

Shane Snyder's avatar
Shane Snyder committed
492
static void swim_kill_member(
493
    swim_context_t *swim_ctx, swim_member_id_t member_id, swim_member_inc_nr_t inc_nr)
494
{
495
    swim_member_state_t *cur_swim_state;
496
    swim_suspect_member_link_t *iter, *tmp;
Shane Snyder's avatar
Shane Snyder committed
497 498
    swim_suspect_member_link_t *suspect_list_p =
        (swim_suspect_member_link_t *)swim_ctx->suspect_list;
499
    swim_member_update_t update;
500

501 502 503 504 505
    /* lock access to group's swim state */
    ABT_mutex_lock(swim_ctx->swim_mutex);

    /* get current swim state for member */
    swim_ctx->swim_callbacks.get_member_state(
506
        swim_ctx->group_data, member_id, &cur_swim_state);
Shane Snyder's avatar
Shane Snyder committed
507

Shane Snyder's avatar
Shane Snyder committed
508
    /* ignore updates for dead members */
509 510 511
    if(cur_swim_state->status == SWIM_MEMBER_DEAD)
    {
        ABT_mutex_unlock(swim_ctx->swim_mutex);
512
        return;
513
    }
514

515
    SWIM_DEBUG(swim_ctx, "member %lu DEAD (inc_nr=%u)\n", member_id, inc_nr);
516

Shane Snyder's avatar
Shane Snyder committed
517
    LL_FOREACH_SAFE(suspect_list_p, iter, tmp)
518
    {
Shane Snyder's avatar
Shane Snyder committed
519
        if(iter->member_id == member_id)
520 521
        {
            /* remove member from suspect list */
Shane Snyder's avatar
Shane Snyder committed
522
            LL_DELETE(suspect_list_p, iter);
523 524 525 526 527
            free(iter);
            break;
        }
    }

528
    /* update swim membership state */
529 530
    cur_swim_state->inc_nr = inc_nr;
    cur_swim_state->status = SWIM_MEMBER_DEAD;
531

532 533 534
    /* add this update to recent update list so it will be piggybacked
     * on future protocol messages
     */
535
    update.id = member_id;
536 537
    update.state.status = SWIM_MEMBER_DEAD;
    update.state.inc_nr = inc_nr;
538 539
    swim_add_recent_member_update(swim_ctx, update);

540 541 542
    /* have group management layer apply this update */
    swim_ctx->swim_callbacks.apply_member_update(
        swim_ctx->group_data, update);
543 544

    ABT_mutex_unlock(swim_ctx->swim_mutex);
545 546 547 548

    return;
}

Shane Snyder's avatar
Shane Snyder committed
549
static void swim_update_suspected_members(
550
    swim_context_t *swim_ctx, double susp_timeout)
551 552 553 554
{
    double now = ABT_get_wtime();
    double susp_dur;
    swim_suspect_member_link_t *iter, *tmp;
Shane Snyder's avatar
Shane Snyder committed
555 556
    swim_suspect_member_link_t *suspect_list_p =
        (swim_suspect_member_link_t *)swim_ctx->suspect_list;
557

558 559
    ABT_mutex_lock(swim_ctx->swim_mutex);

Shane Snyder's avatar
Shane Snyder committed
560
    LL_FOREACH_SAFE(suspect_list_p, iter, tmp)
561 562
    {
        susp_dur = now - iter->susp_start;
Shane Snyder's avatar
Shane Snyder committed
563
        if(susp_dur >= (susp_timeout / 1000.0))
564 565 566 567
        {
            /* if this member has exceeded its allowable suspicion timeout,
             * we mark it as dead
             */
568 569
            swim_kill_member(swim_ctx, iter->member_id,
                iter->member_state->inc_nr);
Shane Snyder's avatar
Shane Snyder committed
570 571 572
        }
    }

573 574
    ABT_mutex_unlock(swim_ctx->swim_mutex);

Shane Snyder's avatar
Shane Snyder committed
575 576 577 578
    return;
}

static void swim_add_recent_member_update(
579
    swim_context_t *swim_ctx, swim_member_update_t update)
Shane Snyder's avatar
Shane Snyder committed
580 581 582
{
    swim_member_update_link_t *iter, *tmp;
    swim_member_update_link_t *update_link = NULL;
583 584
    swim_member_update_link_t *recent_update_list =
        (swim_member_update_link_t *)swim_ctx->recent_update_list;
Shane Snyder's avatar
Shane Snyder committed
585 586

    /* search and remove any recent updates corresponding to this member */
587
    LL_FOREACH_SAFE(recent_update_list, iter, tmp)
Shane Snyder's avatar
Shane Snyder committed
588 589 590
    {
        if(iter->update.id == update.id)
        {
591
            LL_DELETE(recent_update_list, iter);
Shane Snyder's avatar
Shane Snyder committed
592
            update_link = iter;
593 594
        }
    }
595

Shane Snyder's avatar
Shane Snyder committed
596 597 598 599 600 601 602 603 604 605 606
    if(update_link == NULL)
    {
        update_link = malloc(sizeof(*update_link));
        assert(update_link);
        memset(update_link, 0, sizeof(*update_link));
    }

    memcpy(&update_link->update, &update, sizeof(update));

    /* add to recent update list */
    update_link->tx_count = 0;
607
    LL_APPEND(recent_update_list, update_link);
Shane Snyder's avatar
Shane Snyder committed
608

609 610
    return;
}