swim-fd.c 17.3 KB
Newer Older
1
/*
Shane Snyder's avatar
Shane Snyder committed
2 3
 * Copyright (c) 2016 UChicago Argonne, LLC
 *
4 5 6 7 8 9 10
 * See COPYRIGHT in top-level directory.
 */
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>

11 12 13
#include <abt.h>
#include <margo.h>

14
#include "swim-fd.h"
15
#include "swim-fd-internal.h"
16

17
#if 0
18
typedef struct swim_suspect_member_link
19
{
20 21
    double susp_start;
    struct swim_suspect_member_link *next;
22 23
} swim_suspect_member_link_t;

24
typedef struct swim_member_update_link
25
{
Shane Snyder's avatar
Shane Snyder committed
26
    swim_member_update_t update;
27
    int tx_count;
28
    struct swim_member_update_link *next;
29
} swim_member_update_link_t;
30
#endif
31 32 33 34

/* SWIM ABT ULT prototypes */
static void swim_prot_ult(
    void *t_arg);
35 36
static void swim_tick_ult(
    void *t_arg);
37

38
#if 0
39
/* SWIM group membership utility function prototypes */
40
static void swim_suspect_member(
41
    ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr);
42
static void swim_unsuspect_member(
43
    ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr);
44
static void swim_kill_member(
45
    ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr);
46
static void swim_update_suspected_members(
Shane Snyder's avatar
Shane Snyder committed
47
    ssg_group_t *g, double susp_timeout);
Shane Snyder's avatar
Shane Snyder committed
48
static void swim_add_recent_member_update(
Shane Snyder's avatar
Shane Snyder committed
49
    ssg_group_t *g, swim_member_update_t update);
50
#endif
51 52 53 54 55

/******************************************************
 * SWIM protocol init/finalize functions and ABT ULTs *
 ******************************************************/

56
swim_context_t * swim_init(
57
    margo_instance_id mid,
Shane Snyder's avatar
Shane Snyder committed
58
    void * group_data,
59
    swim_member_id_t self_id,
60
    swim_group_mgmt_callbacks_t swim_callbacks,
61 62 63
    int active)
{
    swim_context_t *swim_ctx;
64 65
    int i, ret;

66 67
    /* allocate structure for storing swim context */
    swim_ctx = malloc(sizeof(*swim_ctx));
68
    if (!swim_ctx) return NULL;
69
    memset(swim_ctx, 0, sizeof(*swim_ctx));
70 71
    swim_ctx->mid = mid;
    swim_ctx->group_data = group_data;
72 73
    swim_ctx->self_id = self_id;
    swim_ctx->self_inc_nr = 0;
74
    swim_ctx->swim_callbacks = swim_callbacks;
75

Shane Snyder's avatar
Shane Snyder committed
76
    /* initialize SWIM context */
77
    margo_get_handler_pool(swim_ctx->mid, &swim_ctx->swim_pool);
78
    for(i = 0; i < SWIM_MAX_SUBGROUP_SIZE; i++)
79
        swim_ctx->iping_subgroup_addrs[i] = HG_ADDR_NULL;
80

81 82 83 84 85
    /* set protocol parameters */
    swim_ctx->prot_period_len = SWIM_DEF_PROTOCOL_PERIOD_LEN;
    swim_ctx->prot_susp_timeout = SWIM_DEF_SUSPECT_TIMEOUT;
    swim_ctx->prot_subgroup_sz = SWIM_DEF_SUBGROUP_SIZE;

86
    swim_register_ping_rpcs(swim_ctx);
87 88 89

    if(active)
    {
90
        ret = ABT_thread_create(swim_ctx->swim_pool, swim_prot_ult, swim_ctx,
91 92 93 94
            ABT_THREAD_ATTR_NULL, &(swim_ctx->prot_thread));
        if(ret != ABT_SUCCESS)
        {
            fprintf(stderr, "Error: unable to create SWIM protocol ULT.\n");
95
            free(swim_ctx);
96 97 98 99 100 101 102
            return(NULL);
        }
    }

    return(swim_ctx);
}

103
static void swim_prot_ult(
104
    void * t_arg)
105
{
106
    int ret;
107
    swim_context_t *swim_ctx = (swim_context_t *)t_arg;
108

109
    assert(swim_ctx != NULL);
110

111 112
    SWIM_DEBUG(swim_ctx,
        "protocol start (period_len=%.4f, susp_timeout=%d, subgroup_size=%d)\n",
113 114
        swim_ctx->prot_period_len, swim_ctx->prot_susp_timeout,
        swim_ctx->prot_subgroup_sz);
115

116
    while(!(swim_ctx->shutdown_flag))
117 118
    {
        /* spawn a ULT to run this tick */
119
        ret = ABT_thread_create(swim_ctx->swim_pool, swim_tick_ult, swim_ctx,
120 121 122 123 124 125 126
            ABT_THREAD_ATTR_NULL, NULL);
        if(ret != ABT_SUCCESS)
        {
            fprintf(stderr, "Error: unable to create ULT for SWIM protocol tick\n");
        }

        /* sleep for a protocol period length */
127
        margo_thread_sleep(swim_ctx->mid, swim_ctx->prot_period_len);
128
    }
129 130 131

    SWIM_DEBUG(swim_ctx, "protocol shutdown\n");

132 133 134
    return;
}

135
static void swim_tick_ult(
136
    void * t_arg)
137
{
138
    swim_context_t *swim_ctx = (swim_context_t *)t_arg;
139 140 141 142 143
    int i;
    int ret;

    assert(swim_ctx != NULL);

144
#if 0
145
    /* update status of any suspected members */
Shane Snyder's avatar
Shane Snyder committed
146
    swim_update_suspected_members(g, swim_ctx->prot_susp_timeout *
147 148 149 150 151
        swim_ctx->prot_period_len);

    /* check whether the ping target from the previous protocol tick
     * ever successfully acked a (direct/indirect) ping request
     */
152
    if((swim_ctx->ping_target != SSG_MEMBER_ID_INVALID) &&
153 154 155
        !(swim_ctx->ping_target_acked))
    {
        /* no response from direct/indirect pings, suspect this member */
Shane Snyder's avatar
Shane Snyder committed
156
        swim_suspect_member(g, swim_ctx->ping_target, swim_ctx->ping_target_inc_nr);
157
    }
158
#endif
159 160

    /* pick a random member from view and ping */
161 162 163
    ret = swim_ctx->swim_callbacks.get_dping_target(
            swim_ctx->group_data,
            &swim_ctx->dping_target_info);
164
    if(ret != 0)
165 166
    {
        /* no available members, back out */
167
        SWIM_DEBUG(swim_ctx, "no group members available to dping\n");
168 169
        return;
    }
170 171 172 173 174

    /* TODO: calculate estimated RTT using sliding window of past RTTs */
    swim_ctx->dping_timeout = 250.0;

    /* kick off dping request ULT */
175
    swim_ctx->dping_target_acked = 0;
176
    ret = ABT_thread_create(swim_ctx->swim_pool, swim_dping_send_ult, swim_ctx,
177 178 179 180 181 182 183 184
        ABT_THREAD_ATTR_NULL, NULL);
    if(ret != ABT_SUCCESS)
    {
        fprintf(stderr, "Error: unable to create ULT for SWIM dping send\n");
        return;
    }

    /* sleep for an RTT and wait for an ack for this dping req */
185
    margo_thread_sleep(swim_ctx->mid, swim_ctx->dping_timeout);
186

187
#if 0
188 189 190
    /* if we don't hear back from the target after an RTT, kick off
     * a set of indirect pings to a subgroup of group members
     */
Shane Snyder's avatar
Shane Snyder committed
191
    if(!(swim_ctx->ping_target_acked) && (swim_ctx->prot_subgroup_sz > 0))
192 193
    {
        /* get a random subgroup of members to send indirect pings to */
194
        int this_subgroup_sz = swim_get_rand_group_member_set(g,
195 196 197 198 199
            swim_ctx->subgroup_members, swim_ctx->prot_subgroup_sz,
            swim_ctx->ping_target);
        if(this_subgroup_sz == 0)
        {
            /* no available subgroup members, back out */
200
            SSG_DEBUG(g, "SWIM: no subgroup members available to iping\n");
201 202 203 204 205
            return;
        }

        for(i = 0; i < this_subgroup_sz; i++)
        {
206
            ret = ABT_thread_create(swim_ctx->prot_pool, swim_iping_send_ult, g,
207 208 209 210 211 212 213 214
                ABT_THREAD_ATTR_NULL, NULL);
            if(ret != ABT_SUCCESS)
            {
                fprintf(stderr, "Error: unable to create ULT for SWIM iping send\n");
                return;
            }
        }
    }
215
#endif
216

217 218 219
    return;
}

220 221
void swim_finalize(swim_context_t *swim_ctx)
{
222 223 224 225 226 227 228 229 230 231
    /* set shutdown flag so ULTs know to start wrapping up */
    swim_ctx->shutdown_flag = 1;

    if(swim_ctx->prot_thread)
    {
        /* wait for the protocol ULT to terminate */
        ABT_thread_join(swim_ctx->prot_thread);
        ABT_thread_free(&(swim_ctx->prot_thread));
    }

232 233 234 235 236
    free(swim_ctx);

    return;
}

Shane Snyder's avatar
Shane Snyder committed
237 238 239 240
/************************************
 * SWIM membership update functions *
 ************************************/

241
#if 0
Shane Snyder's avatar
Shane Snyder committed
242
void swim_retrieve_membership_updates(
243 244
    ssg_group_t * g,
    swim_member_update_t * updates,
Shane Snyder's avatar
Shane Snyder committed
245 246
    int update_count)
{
Shane Snyder's avatar
Shane Snyder committed
247
    swim_context_t *swim_ctx = g->swim_ctx;
Shane Snyder's avatar
Shane Snyder committed
248
    swim_member_update_link_t *iter, *tmp;
Shane Snyder's avatar
Shane Snyder committed
249 250
    swim_member_update_link_t *recent_update_list_p =
        (swim_member_update_link_t *)swim_ctx->recent_update_list;
Shane Snyder's avatar
Shane Snyder committed
251 252
    int i = 0;

Shane Snyder's avatar
Shane Snyder committed
253
    LL_FOREACH_SAFE(recent_update_list_p, iter, tmp)
Shane Snyder's avatar
Shane Snyder committed
254 255 256 257 258 259 260 261 262 263
    {
        if(i == update_count)
            break;

        memcpy(&updates[i], &iter->update, sizeof(iter->update));

        /* remove this update if it has been piggybacked enough */
        iter->tx_count++;
        if(iter->tx_count == SWIM_MAX_PIGGYBACK_TX_COUNT)
        {
Shane Snyder's avatar
Shane Snyder committed
264
            LL_DELETE(recent_update_list_p, iter);
Shane Snyder's avatar
Shane Snyder committed
265 266 267 268 269 270 271 272
            free(iter);
        }
        i++;
    }

    /* invalidate remaining updates */
    for(; i < update_count; i++)
    {
273
        updates[i].id = SSG_MEMBER_ID_INVALID;
Shane Snyder's avatar
Shane Snyder committed
274 275 276 277 278 279
    }

    return;
}

void swim_apply_membership_updates(
Shane Snyder's avatar
Shane Snyder committed
280
    ssg_group_t *g,
Shane Snyder's avatar
Shane Snyder committed
281 282 283
    swim_member_update_t *updates,
    int update_count)
{
Shane Snyder's avatar
Shane Snyder committed
284
    swim_context_t *swim_ctx = g->swim_ctx;
285
    ssg_member_id_t self_id = g->self_id;
Shane Snyder's avatar
Shane Snyder committed
286 287 288 289
    int i;

    for(i = 0; i < update_count; i++)
    {
290
        if(updates[i].id == SSG_MEMBER_ID_INVALID)
Shane Snyder's avatar
Shane Snyder committed
291 292 293 294 295 296 297 298 299 300 301
            break;

        switch(updates[i].status)
        {
            case SWIM_MEMBER_ALIVE:
                if(updates[i].id == self_id)
                {
                    assert(updates[i].inc_nr <= swim_ctx->member_inc_nrs[self_id]);
                }
                else
                {
Shane Snyder's avatar
Shane Snyder committed
302
                    swim_unsuspect_member(g, updates[i].id, updates[i].inc_nr);
Shane Snyder's avatar
Shane Snyder committed
303 304 305 306 307 308 309 310 311 312
                }
                break;
            case SWIM_MEMBER_SUSPECT:
                if(updates[i].id == self_id)
                {
                    assert(updates[i].inc_nr <= swim_ctx->member_inc_nrs[self_id]);
                    /* increment our incarnation number if we are suspected
                     * in the current incarnation
                     */
                    if(updates[i].inc_nr == swim_ctx->member_inc_nrs[self_id])
313
                    {
Shane Snyder's avatar
Shane Snyder committed
314
                        swim_ctx->member_inc_nrs[self_id]++;
Shane Snyder's avatar
Shane Snyder committed
315
                        SSG_DEBUG(g, "SWIM: self SUSPECT received (new inc_nr=%d)\n",
316 317
                            swim_ctx->member_inc_nrs[self_id]);
                    }
Shane Snyder's avatar
Shane Snyder committed
318 319 320
                }
                else
                {
Shane Snyder's avatar
Shane Snyder committed
321
                    swim_suspect_member(g, updates[i].id, updates[i].inc_nr);
Shane Snyder's avatar
Shane Snyder committed
322 323 324 325 326
                }
                break;
            case SWIM_MEMBER_DEAD:
                if(updates[i].id == self_id)
                {
327
                    /* if we get an update that we are dead, just shut down */
Shane Snyder's avatar
Shane Snyder committed
328 329
                    assert(updates[i].inc_nr <= swim_ctx->member_inc_nrs[self_id]);
                    swim_ctx->member_inc_nrs[self_id] = updates[i].inc_nr;
330

Shane Snyder's avatar
Shane Snyder committed
331
                    SSG_DEBUG(g, "SWIM: self confirmed DEAD (inc_nr=%d)\n",
332 333
                        swim_ctx->member_inc_nrs[self_id]);

Shane Snyder's avatar
Shane Snyder committed
334 335 336 337
                    swim_finalize(swim_ctx);
                }
                else
                {
Shane Snyder's avatar
Shane Snyder committed
338
                    swim_kill_member(g, updates[i].id, updates[i].inc_nr);
Shane Snyder's avatar
Shane Snyder committed
339 340 341
                }
                break;
            default:
Shane Snyder's avatar
Shane Snyder committed
342
                SSG_DEBUG(g, "SWIM: invalid membership status update\n");
Shane Snyder's avatar
Shane Snyder committed
343 344 345 346 347 348
        }
    }

    return;
}

349 350 351 352
/*******************************************
 * SWIM group membership utility functions *
 *******************************************/

Shane Snyder's avatar
Shane Snyder committed
353
static void swim_suspect_member(
354
    ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr)
355
{
Shane Snyder's avatar
Shane Snyder committed
356
    swim_context_t *swim_ctx = g->swim_ctx;
357 358
    swim_suspect_member_link_t *iter, *tmp;
    swim_suspect_member_link_t *suspect_link = NULL;
Shane Snyder's avatar
Shane Snyder committed
359 360
    swim_suspect_member_link_t *suspect_list_p =
        (swim_suspect_member_link_t *)swim_ctx->suspect_list;
Shane Snyder's avatar
Shane Snyder committed
361
    swim_member_update_t update;
362

Shane Snyder's avatar
Shane Snyder committed
363
    /* ignore updates for dead members */
Shane Snyder's avatar
Shane Snyder committed
364
#if 0
Shane Snyder's avatar
Shane Snyder committed
365
    if(!(g->view.member_states[member_id].is_member))
366
        return;
Shane Snyder's avatar
Shane Snyder committed
367
#endif
368

369
    /* determine if this member is already suspected */
Shane Snyder's avatar
Shane Snyder committed
370
    LL_FOREACH_SAFE(suspect_list_p, iter, tmp)
371
    {
Shane Snyder's avatar
Shane Snyder committed
372
        if(iter->member_id == member_id)
373
        {
Shane Snyder's avatar
Shane Snyder committed
374
            if(inc_nr <= swim_ctx->member_inc_nrs[member_id])
375 376 377 378 379 380 381 382 383
            {
                /* ignore a suspicion in an incarnation number less than
                 * or equal to the current suspicion's incarnation
                 */
                return;
            }

            /* otherwise, we have a suspicion in a more recent incarnation --
             * remove the current suspicion so we can update it
384
             */
Shane Snyder's avatar
Shane Snyder committed
385
            LL_DELETE(suspect_list_p, iter);
386 387 388 389
            suspect_link = iter;
        }
    }

Shane Snyder's avatar
Shane Snyder committed
390
    /* ignore suspicions for a member that is alive in a newer incarnation */
Shane Snyder's avatar
Shane Snyder committed
391
    if((suspect_link == NULL) && (inc_nr < swim_ctx->member_inc_nrs[member_id]))
Shane Snyder's avatar
Shane Snyder committed
392 393 394 395 396
        return;

    /* if there is no suspicion timeout, just kill the member */
    if(swim_ctx->prot_susp_timeout == 0)
    {
Shane Snyder's avatar
Shane Snyder committed
397
        swim_kill_member(g, member_id, inc_nr);
Shane Snyder's avatar
Shane Snyder committed
398 399 400
        return;
    }

Shane Snyder's avatar
Shane Snyder committed
401
    SSG_DEBUG(g, "SWIM: member %d SUSPECT (inc_nr=%d)\n",
Shane Snyder's avatar
Shane Snyder committed
402
        (int)member_id, (int)inc_nr);
403 404 405 406 407 408 409

    if(suspect_link == NULL)
    {
        /* if this member is not already on the suspect list,
         * allocate a link for it
         */
        suspect_link = malloc(sizeof(*suspect_link));
410 411
        if (!suspect_link)
            return;
412
        memset(suspect_link, 0, sizeof(*suspect_link));
Shane Snyder's avatar
Shane Snyder committed
413
        suspect_link->member_id = member_id;
414 415 416 417
    }
    suspect_link->susp_start = ABT_get_wtime();

    /* add to end of suspect list */
Shane Snyder's avatar
Shane Snyder committed
418
    LL_APPEND(suspect_list_p, suspect_link);
419 420

    /* update swim membership state */
Shane Snyder's avatar
Shane Snyder committed
421
    swim_ctx->member_inc_nrs[member_id] = inc_nr;
422 423 424 425

    /* add this update to recent update list so it will be piggybacked
     * on future protocol messages
     */
Shane Snyder's avatar
Shane Snyder committed
426 427 428
    update.id = member_id;
    update.status = SWIM_MEMBER_SUSPECT;
    update.inc_nr = inc_nr;
Shane Snyder's avatar
Shane Snyder committed
429
    swim_add_recent_member_update(g, update);
430 431 432 433

    return;
}

Shane Snyder's avatar
Shane Snyder committed
434
static void swim_unsuspect_member(
435
    ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr)
436
{
Shane Snyder's avatar
Shane Snyder committed
437
    swim_context_t *swim_ctx = g->swim_ctx;
438
    swim_suspect_member_link_t *iter, *tmp;
Shane Snyder's avatar
Shane Snyder committed
439 440
    swim_suspect_member_link_t *suspect_list_p =
        (swim_suspect_member_link_t *)swim_ctx->suspect_list;
Shane Snyder's avatar
Shane Snyder committed
441
    swim_member_update_t update;
442

Shane Snyder's avatar
Shane Snyder committed
443
    /* ignore updates for dead members */
Shane Snyder's avatar
Shane Snyder committed
444
#if 0
Shane Snyder's avatar
Shane Snyder committed
445
    if(!(g->view.member_states[member_id].is_member))
446
        return;
Shane Snyder's avatar
Shane Snyder committed
447
#endif
448

Shane Snyder's avatar
Shane Snyder committed
449
    /* ignore alive updates for incarnation numbers that aren't new */
Shane Snyder's avatar
Shane Snyder committed
450
    if(inc_nr <= swim_ctx->member_inc_nrs[member_id])
Shane Snyder's avatar
Shane Snyder committed
451 452
        return;

Shane Snyder's avatar
Shane Snyder committed
453
    SSG_DEBUG(g, "SWIM: member %d ALIVE (inc_nr=%d)\n",
Shane Snyder's avatar
Shane Snyder committed
454
        (int)member_id, (int)inc_nr);
455

Shane Snyder's avatar
Shane Snyder committed
456
    /* if member is suspected, remove from suspect list */
Shane Snyder's avatar
Shane Snyder committed
457
    LL_FOREACH_SAFE(suspect_list_p, iter, tmp)
458
    {
Shane Snyder's avatar
Shane Snyder committed
459
        if(iter->member_id == member_id)
460
        {
Shane Snyder's avatar
Shane Snyder committed
461
            LL_DELETE(suspect_list_p, iter);
462 463 464
            free(iter);
            break;
        }
465 466
    }

467
    /* update swim membership state */
Shane Snyder's avatar
Shane Snyder committed
468
    swim_ctx->member_inc_nrs[member_id] = inc_nr;
469 470 471 472

    /* add this update to recent update list so it will be piggybacked
     * on future protocol messages
     */
Shane Snyder's avatar
Shane Snyder committed
473 474 475
    update.id = member_id;
    update.status = SWIM_MEMBER_ALIVE;
    update.inc_nr = inc_nr;
Shane Snyder's avatar
Shane Snyder committed
476
    swim_add_recent_member_update(g, update);
477 478 479 480

    return;
}

Shane Snyder's avatar
Shane Snyder committed
481
static void swim_kill_member(
482
    ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr)
483
{
Shane Snyder's avatar
Shane Snyder committed
484 485
    swim_context_t *swim_ctx = g->swim_ctx;
    ssg_member_state_t *member_state;
486
    swim_suspect_member_link_t *iter, *tmp;
Shane Snyder's avatar
Shane Snyder committed
487 488
    swim_suspect_member_link_t *suspect_list_p =
        (swim_suspect_member_link_t *)swim_ctx->suspect_list;
489 490
    swim_member_update_t swim_update;
    ssg_membership_update_t ssg_update;
491

Shane Snyder's avatar
Shane Snyder committed
492 493 494 495 496 497 498 499
    HASH_FIND(hh, g->view.member_map, &member_id, sizeof(ssg_member_id_t),
        member_state);
    if(!member_state)
    {
        fprintf(stderr, "Error: unable to kill member %lu, not in view\n", member_id);
        return;
    }

Shane Snyder's avatar
Shane Snyder committed
500
    /* ignore updates for dead members */
Shane Snyder's avatar
Shane Snyder committed
501
#if 0
Shane Snyder's avatar
Shane Snyder committed
502
    if(!(g->view.member_states[member_id].is_member))
503
        return;
Shane Snyder's avatar
Shane Snyder committed
504
#endif
505

Shane Snyder's avatar
Shane Snyder committed
506
    SSG_DEBUG(g, "SWIM: member %lu DEAD (inc_nr=%u)\n", member_id, inc_nr);
507

Shane Snyder's avatar
Shane Snyder committed
508
    LL_FOREACH_SAFE(suspect_list_p, iter, tmp)
509
    {
Shane Snyder's avatar
Shane Snyder committed
510
        if(iter->member_id == member_id)
511 512
        {
            /* remove member from suspect list */
Shane Snyder's avatar
Shane Snyder committed
513
            LL_DELETE(suspect_list_p, iter);
514 515 516 517 518
            free(iter);
            break;
        }
    }

519
    /* update swim membership state */
Shane Snyder's avatar
Shane Snyder committed
520
    swim_ctx->member_inc_nrs[member_id] = inc_nr;
521

522 523 524
    /* add this update to recent update list so it will be piggybacked
     * on future protocol messages
     */
525 526 527 528 529 530 531 532
    swim_update.id = member_id;
    swim_update.status = SWIM_MEMBER_DEAD;
    swim_update.inc_nr = inc_nr;
    swim_add_recent_member_update(g, swim_update);
    /* have SSG apply the membership update */
    ssg_update.member = member_id;
    ssg_update.type = SSG_MEMBER_REMOVE;
    ssg_apply_membership_update(g, ssg_update);
533 534 535 536

    return;
}

Shane Snyder's avatar
Shane Snyder committed
537
static void swim_update_suspected_members(
Shane Snyder's avatar
Shane Snyder committed
538
    ssg_group_t *g, double susp_timeout)
539
{
Shane Snyder's avatar
Shane Snyder committed
540
    swim_context_t *swim_ctx = g->swim_ctx;
541 542 543
    double now = ABT_get_wtime();
    double susp_dur;
    swim_suspect_member_link_t *iter, *tmp;
Shane Snyder's avatar
Shane Snyder committed
544 545
    swim_suspect_member_link_t *suspect_list_p =
        (swim_suspect_member_link_t *)swim_ctx->suspect_list;
546

Shane Snyder's avatar
Shane Snyder committed
547
    LL_FOREACH_SAFE(suspect_list_p, iter, tmp)
548 549
    {
        susp_dur = now - iter->susp_start;
Shane Snyder's avatar
Shane Snyder committed
550
        if(susp_dur >= (susp_timeout / 1000.0))
551 552 553 554
        {
            /* if this member has exceeded its allowable suspicion timeout,
             * we mark it as dead
             */
Shane Snyder's avatar
Shane Snyder committed
555
            swim_kill_member(g, iter->member_id,
Shane Snyder's avatar
Shane Snyder committed
556 557 558 559 560 561 562 563
                swim_ctx->member_inc_nrs[iter->member_id]);
        }
    }

    return;
}

static void swim_add_recent_member_update(
Shane Snyder's avatar
Shane Snyder committed
564
    ssg_group_t *g, swim_member_update_t update)
Shane Snyder's avatar
Shane Snyder committed
565
{
Shane Snyder's avatar
Shane Snyder committed
566
    swim_context_t *swim_ctx = g->swim_ctx;
Shane Snyder's avatar
Shane Snyder committed
567 568 569 570 571 572 573 574 575 576 577 578
    swim_member_update_link_t *iter, *tmp;
    swim_member_update_link_t *update_link = NULL;
    swim_member_update_link_t **recent_update_list_p =
        (swim_member_update_link_t **)&(swim_ctx->recent_update_list);

    /* search and remove any recent updates corresponding to this member */
    LL_FOREACH_SAFE(*recent_update_list_p, iter, tmp)
    {
        if(iter->update.id == update.id)
        {
            LL_DELETE(*recent_update_list_p, iter);
            update_link = iter;
579 580
        }
    }
581

Shane Snyder's avatar
Shane Snyder committed
582 583 584 585 586 587 588 589 590 591 592 593 594
    if(update_link == NULL)
    {
        update_link = malloc(sizeof(*update_link));
        assert(update_link);
        memset(update_link, 0, sizeof(*update_link));
    }

    memcpy(&update_link->update, &update, sizeof(update));

    /* add to recent update list */
    update_link->tx_count = 0;
    LL_APPEND(*recent_update_list_p, update_link);

595 596
    return;
}
597
#endif