ssg.c 52.4 KB
Newer Older
1 2 3 4 5 6
/*
 * Copyright (c) 2016 UChicago Argonne, LLC
 *
 * See COPYRIGHT in top-level directory.
 */

7
#include "ssg-config.h"
Shane Snyder's avatar
Shane Snyder committed
8

Jonathan Jenkins's avatar
Jonathan Jenkins committed
9 10 11 12
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
13
#include <errno.h>
Jonathan Jenkins's avatar
Jonathan Jenkins committed
14 15
#include <stdlib.h>
#include <string.h>
16
#include <time.h>
17
#include <linux/limits.h>
Jonathan Jenkins's avatar
Jonathan Jenkins committed
18
#include <assert.h>
Shane Snyder's avatar
Shane Snyder committed
19
#ifdef SSG_HAVE_MPI
20 21
#include <mpi.h>
#endif
Jonathan Jenkins's avatar
Jonathan Jenkins committed
22

Shane Snyder's avatar
Shane Snyder committed
23
#include <mercury.h>
24
#include <abt.h>
Shane Snyder's avatar
Shane Snyder committed
25
#include <margo.h>
26

27
#include "ssg.h"
Shane Snyder's avatar
Shane Snyder committed
28 29 30
#ifdef SSG_HAVE_MPI
#include "ssg-mpi.h"
#endif
31
#include "ssg-internal.h"
32

33 34 35
/* arguments for group lookup ULTs */
struct ssg_group_lookup_ult_args
{
36
    const char *addr_str;
37
    ssg_group_view_t *view;
Shane Snyder's avatar
Shane Snyder committed
38
    ABT_rwlock lock;
39
    int out;
40 41 42
};
static void ssg_group_lookup_ult(void * arg);

43
/* SSG helper routine prototypes */
Shane Snyder's avatar
Shane Snyder committed
44 45 46
static ssg_group_t * ssg_group_create_internal(
    const char * group_name, const char * const group_addr_strs[],
    int group_size, ssg_membership_update_cb update_cb, void *update_cb_dat);
47 48 49 50 51
static int ssg_group_add_member(
    ssg_group_t *g, const char * addr_str, hg_addr_t addr,
    ssg_member_id_t member_id);
static int ssg_group_remove_member(
    ssg_group_t *g, ssg_member_state_t *member_state);
Shane Snyder's avatar
Shane Snyder committed
52 53 54 55
static int ssg_group_view_create(
    const char * const group_addr_strs[], int group_size,
    const char * self_addr_str, ABT_rwlock view_lock,
    ssg_group_view_t * view, ssg_member_id_t * self_id);
56
static ssg_member_state_t * ssg_group_view_add_member(
57 58
    const char * addr_str, hg_addr_t addr, ssg_member_id_t member_id,
    ssg_group_view_t * view);
59
static ssg_group_descriptor_t * ssg_group_descriptor_create(
60
    uint64_t name_hash, const char * leader_addr_str, int owner_status);
61 62 63 64 65 66
static ssg_group_descriptor_t * ssg_group_descriptor_dup(
    ssg_group_descriptor_t * descriptor);
static void ssg_group_destroy_internal(
    ssg_group_t * g);
static void ssg_attached_group_destroy(
    ssg_attached_group_t * ag);
Shane Snyder's avatar
Shane Snyder committed
67 68 69 70 71 72
static void ssg_group_view_destroy(
    ssg_group_view_t * view);
static void ssg_group_descriptor_free(
    ssg_group_descriptor_t * descriptor);
static ssg_member_id_t ssg_gen_member_id(
    const char * addr_str);
73 74
static const char ** ssg_addr_str_buf_to_list(
    const char * buf, int num_addrs);
Shane Snyder's avatar
Shane Snyder committed
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
static void ssg_shuffle_member_list(
    ssg_group_target_list_t *list);

/* SWIM group management routine prototypes */
static int ssg_get_swim_dping_target(
    void *group_data,
    swim_member_id_t *target_id,
    swim_member_inc_nr_t *target_inc_nr,
    hg_addr_t *target_addr);
static int ssg_get_swim_iping_targets(
    void *group_data,
    swim_member_id_t dping_target_id,
    int *num_targets,
    swim_member_id_t *target_ids,
    hg_addr_t *target_addrs);
static void ssg_get_swim_member_addr(
    void *group_data,
    swim_member_id_t id,
    hg_addr_t *target_addr);
static void ssg_get_swim_member_state(
    void *group_data,
    swim_member_id_t id,
    swim_member_state_t **state);
static void ssg_apply_swim_member_update(
    void *group_data,
    swim_member_update_t update);
101

102 103
/* XXX: i think we ultimately need per-mid ssg instances rather than 1 global? */
ssg_instance_t *ssg_inst = NULL;
104

105 106 107
/***************************************************
 *** SSG runtime intialization/shutdown routines ***
 ***************************************************/
108

109 110
int ssg_init(
    margo_instance_id mid)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
111
{
112 113
    struct timespec ts;

114 115 116 117 118 119 120 121 122
    if (ssg_inst)
        return SSG_FAILURE;

    /* initialize an SSG instance for this margo instance */
    ssg_inst = malloc(sizeof(*ssg_inst));
    if (!ssg_inst)
        return SSG_FAILURE;
    memset(ssg_inst, 0, sizeof(*ssg_inst));
    ssg_inst->mid = mid;
123
    ABT_rwlock_create(&ssg_inst->lock);
124

125
    ssg_register_rpcs();
126

127 128 129 130
    /* seed RNG */
    clock_gettime(CLOCK_MONOTONIC, &ts);
    srand(ts.tv_nsec + getpid());

131 132
    return SSG_SUCCESS;
}
Jonathan Jenkins's avatar
Jonathan Jenkins committed
133

134
int ssg_finalize()
135
{
136 137
    ssg_group_t *g, *g_tmp;
    ssg_attached_group_t *ag, *ag_tmp;
138 139 140 141

    if (!ssg_inst)
        return SSG_FAILURE;

142 143
    ABT_rwlock_wrlock(ssg_inst->lock);

144
    /* destroy all active groups */
145
    HASH_ITER(hh, ssg_inst->group_table, g, g_tmp)
146 147
    {
        HASH_DELETE(hh, ssg_inst->group_table, g);
148
        ABT_rwlock_unlock(ssg_inst->lock);
149
        ssg_group_destroy_internal(g);
150
        ABT_rwlock_wrlock(ssg_inst->lock);
151 152
    }

153 154 155 156 157 158
    /* detach from all attached groups */
    HASH_ITER(hh, ssg_inst->attached_group_table, ag, ag_tmp)
    {
        ssg_attached_group_destroy(ag);
    }

159 160 161
    ABT_rwlock_unlock(ssg_inst->lock);
    ABT_rwlock_free(&ssg_inst->lock);

162 163 164
    free(ssg_inst);
    ssg_inst = NULL;

165
    return SSG_SUCCESS;
166
}
Jonathan Jenkins's avatar
Jonathan Jenkins committed
167

168 169 170
/*************************************
 *** SSG group management routines ***
 *************************************/
Jonathan Jenkins's avatar
Jonathan Jenkins committed
171

Shane Snyder's avatar
Shane Snyder committed
172 173 174 175 176 177
ssg_group_id_t ssg_group_create(
    const char * group_name,
    const char * const group_addr_strs[],
    int group_size,
    ssg_membership_update_cb update_cb,
    void * update_cb_dat)
178
{
Shane Snyder's avatar
Shane Snyder committed
179 180
    ssg_group_t *g;
    ssg_group_id_t g_id = SSG_GROUP_ID_NULL;
181

Shane Snyder's avatar
Shane Snyder committed
182 183 184
    g = ssg_group_create_internal(group_name, group_addr_strs,
            group_size, update_cb, update_cb_dat);
    if (g)
185
    {
Shane Snyder's avatar
Shane Snyder committed
186 187 188 189 190
        /* on successful creation, dup the group descriptor and return
         * it for the caller to hold on to
         */
        g_id = (ssg_group_id_t)ssg_group_descriptor_dup(g->descriptor);
        if (g_id == SSG_GROUP_ID_NULL)
191 192 193 194
        {
            ABT_rwlock_wrlock(ssg_inst->lock);
            HASH_DELETE(hh, ssg_inst->group_table, g);
            ABT_rwlock_unlock(ssg_inst->lock);
Shane Snyder's avatar
Shane Snyder committed
195
            ssg_group_destroy_internal(g);
196
        }
197
    }
Shane Snyder's avatar
Shane Snyder committed
198 199

    return g_id;
200
}
Shane Snyder's avatar
Shane Snyder committed
201

Shane Snyder's avatar
Shane Snyder committed
202 203 204 205 206
ssg_group_id_t ssg_group_create_config(
    const char * group_name,
    const char * file_name,
    ssg_membership_update_cb update_cb,
    void * update_cb_dat)
207
{
Shane Snyder's avatar
Shane Snyder committed
208 209 210 211 212 213 214 215 216 217
    int fd;
    struct stat st;
    char *rd_buf = NULL;
    ssize_t rd_buf_size;
    char *tok;
    void *addr_str_buf = NULL;
    int addr_str_buf_len = 0, num_addrs = 0;
    int ret;
    const char **addr_strs = NULL;
    ssg_group_id_t group_id = SSG_GROUP_ID_NULL;
218

Shane Snyder's avatar
Shane Snyder committed
219 220 221 222 223 224 225 226
    /* open config file for reading */
    fd = open(file_name, O_RDONLY);
    if (fd == -1)
    {
        fprintf(stderr, "Error: SSG unable to open config file %s for group %s\n",
            file_name, group_name);
        goto fini;
    }
227

Shane Snyder's avatar
Shane Snyder committed
228 229 230
    /* get file size and allocate a buffer to store it */
    ret = fstat(fd, &st);
    if (ret == -1)
Shane Snyder's avatar
Shane Snyder committed
231
    {
Shane Snyder's avatar
Shane Snyder committed
232 233 234
        fprintf(stderr, "Error: SSG unable to stat config file %s for group %s\n",
            file_name, group_name);
        goto fini;
Shane Snyder's avatar
Shane Snyder committed
235
    }
Shane Snyder's avatar
Shane Snyder committed
236 237
    rd_buf = malloc(st.st_size+1);
    if (rd_buf == NULL) goto fini;
Shane Snyder's avatar
Shane Snyder committed
238

Shane Snyder's avatar
Shane Snyder committed
239 240 241
    /* load it all in one fell swoop */
    rd_buf_size = read(fd, rd_buf, st.st_size);
    if (rd_buf_size != st.st_size)
Shane Snyder's avatar
Shane Snyder committed
242
    {
Shane Snyder's avatar
Shane Snyder committed
243 244 245
        fprintf(stderr, "Error: SSG unable to read config file %s for group %s\n",
            file_name, group_name);
        goto fini;
Shane Snyder's avatar
Shane Snyder committed
246
    }
Shane Snyder's avatar
Shane Snyder committed
247 248 249 250 251 252 253
    rd_buf[rd_buf_size]='\0';

    /* strtok the result - each space-delimited address is assumed to be
     * a unique mercury address
     */
    tok = strtok(rd_buf, "\r\n\t ");
    if (tok == NULL) goto fini;
Shane Snyder's avatar
Shane Snyder committed
254

Shane Snyder's avatar
Shane Snyder committed
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
    /* build up the address buffer */
    addr_str_buf = malloc(rd_buf_size);
    if (addr_str_buf == NULL) goto fini;
    do
    {
        int tok_size = strlen(tok);
        memcpy((char*)addr_str_buf + addr_str_buf_len, tok, tok_size+1);
        addr_str_buf_len += tok_size+1;
        num_addrs++;
        tok = strtok(NULL, "\r\n\t ");
    } while (tok != NULL);
    if (addr_str_buf_len != rd_buf_size)
    {
        /* adjust buffer size if our initial guess was wrong */
        void *tmp = realloc(addr_str_buf, addr_str_buf_len);
        if (tmp == NULL) goto fini;
        addr_str_buf = tmp;
    }
Shane Snyder's avatar
Shane Snyder committed
273

Shane Snyder's avatar
Shane Snyder committed
274 275 276
    /* set up address string array for group members */
    addr_strs = ssg_addr_str_buf_to_list(addr_str_buf, num_addrs);
    if (!addr_strs) goto fini;
Shane Snyder's avatar
Shane Snyder committed
277

Shane Snyder's avatar
Shane Snyder committed
278 279 280
    /* invoke the generic group create routine using our list of addrs */
    group_id = ssg_group_create(group_name, addr_strs, num_addrs,
        update_cb, update_cb_dat);
Shane Snyder's avatar
Shane Snyder committed
281

Shane Snyder's avatar
Shane Snyder committed
282 283 284 285 286 287
fini:
    /* cleanup before returning */
    if (fd != -1) close(fd);
    free(rd_buf);
    free(addr_str_buf);
    free(addr_strs);
Shane Snyder's avatar
Shane Snyder committed
288

Shane Snyder's avatar
Shane Snyder committed
289
    return group_id;
Shane Snyder's avatar
Shane Snyder committed
290 291
}

Shane Snyder's avatar
Shane Snyder committed
292 293 294 295 296 297
#ifdef SSG_HAVE_MPI
ssg_group_id_t ssg_group_create_mpi(
    const char * group_name,
    MPI_Comm comm,
    ssg_membership_update_cb update_cb,
    void * update_cb_dat)
Shane Snyder's avatar
Shane Snyder committed
298
{
Shane Snyder's avatar
Shane Snyder committed
299 300
    int i;
    char *self_addr_str = NULL;
301
    int self_addr_str_size = 0;
Shane Snyder's avatar
Shane Snyder committed
302 303 304 305 306 307
    char *addr_str_buf = NULL;
    int *sizes = NULL;
    int *sizes_psum = NULL;
    int comm_size = 0, comm_rank = 0;
    const char **addr_strs = NULL;
    ssg_group_id_t group_id = SSG_GROUP_ID_NULL;
Shane Snyder's avatar
Shane Snyder committed
308

Shane Snyder's avatar
Shane Snyder committed
309
    if (!ssg_inst) goto fini;
Shane Snyder's avatar
Shane Snyder committed
310

Shane Snyder's avatar
Shane Snyder committed
311
    /* get my address */
312
    SSG_GET_SELF_ADDR_STR(ssg_inst->mid, self_addr_str);
Shane Snyder's avatar
Shane Snyder committed
313
    if (self_addr_str == NULL) goto fini;
314
    self_addr_str_size = (int)strlen(self_addr_str) + 1;
315

Shane Snyder's avatar
Shane Snyder committed
316 317 318 319 320
    /* gather the buffer sizes */
    MPI_Comm_size(comm, &comm_size);
    MPI_Comm_rank(comm, &comm_rank);
    sizes = malloc(comm_size * sizeof(*sizes));
    if (sizes == NULL) goto fini;
321
    sizes[comm_rank] = self_addr_str_size;
Shane Snyder's avatar
Shane Snyder committed
322
    MPI_Allgather(MPI_IN_PLACE, 0, MPI_BYTE, sizes, 1, MPI_INT, comm);
323

Shane Snyder's avatar
Shane Snyder committed
324 325
    /* compute a exclusive prefix sum of the data sizes, including the
     * total at the end
326
     */
Shane Snyder's avatar
Shane Snyder committed
327 328 329 330 331
    sizes_psum = malloc((comm_size+1) * sizeof(*sizes_psum));
    if (sizes_psum == NULL) goto fini;
    sizes_psum[0] = 0;
    for (i = 1; i < comm_size+1; i++)
        sizes_psum[i] = sizes_psum[i-1] + sizes[i-1];
332

Shane Snyder's avatar
Shane Snyder committed
333 334 335
    /* allgather the addresses */
    addr_str_buf = malloc(sizes_psum[comm_size]);
    if (addr_str_buf == NULL) goto fini;
336
    MPI_Allgatherv(self_addr_str, self_addr_str_size, MPI_BYTE,
Shane Snyder's avatar
Shane Snyder committed
337
            addr_str_buf, sizes, sizes_psum, MPI_BYTE, comm);
Shane Snyder's avatar
Shane Snyder committed
338

Shane Snyder's avatar
Shane Snyder committed
339 340 341
    /* set up address string array for group members */
    addr_strs = ssg_addr_str_buf_to_list(addr_str_buf, comm_size);
    if (!addr_strs) goto fini;
342

Shane Snyder's avatar
Shane Snyder committed
343 344 345
    /* invoke the generic group create routine using our list of addrs */
    group_id = ssg_group_create(group_name, addr_strs, comm_size,
        update_cb, update_cb_dat);
Shane Snyder's avatar
Shane Snyder committed
346

Shane Snyder's avatar
Shane Snyder committed
347 348 349 350 351 352 353
fini:
    /* cleanup before returning */
    free(self_addr_str);
    free(sizes);
    free(sizes_psum);
    free(addr_str_buf);
    free(addr_strs);
Shane Snyder's avatar
Shane Snyder committed
354

Shane Snyder's avatar
Shane Snyder committed
355
    return group_id;
Shane Snyder's avatar
Shane Snyder committed
356
}
Shane Snyder's avatar
Shane Snyder committed
357
#endif
Shane Snyder's avatar
Shane Snyder committed
358

Shane Snyder's avatar
Shane Snyder committed
359 360
int ssg_group_destroy(
    ssg_group_id_t group_id)
361
{
Shane Snyder's avatar
Shane Snyder committed
362 363
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
    ssg_group_t *g;
364

Shane Snyder's avatar
Shane Snyder committed
365
    if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) return SSG_FAILURE;
366

Shane Snyder's avatar
Shane Snyder committed
367
    if (group_descriptor->owner_status != SSG_OWNER_IS_MEMBER)
368
    {
Shane Snyder's avatar
Shane Snyder committed
369 370
        fprintf(stderr, "Error: SSG unable to destroy a group it is not a member of\n");
        return SSG_FAILURE;
371
    }
372

373 374
    ABT_rwlock_wrlock(ssg_inst->lock);

Shane Snyder's avatar
Shane Snyder committed
375 376 377 378
    /* find the group structure and destroy it */
    HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash,
        sizeof(uint64_t), g);
    if (!g)
379
    {
380
        ABT_rwlock_unlock(ssg_inst->lock);
Shane Snyder's avatar
Shane Snyder committed
381 382
        fprintf(stderr, "Error: SSG unable to find expected group reference\n");
        return SSG_FAILURE;
383
    }
384 385
    HASH_DELETE(hh, ssg_inst->group_table, g);
    ABT_rwlock_unlock(ssg_inst->lock);
Shane Snyder's avatar
Shane Snyder committed
386
    ssg_group_destroy_internal(g);
387

Shane Snyder's avatar
Shane Snyder committed
388
    return SSG_SUCCESS;
389 390
}

Shane Snyder's avatar
Shane Snyder committed
391 392
ssg_group_id_t ssg_group_join(
    ssg_group_id_t in_group_id,
393 394
    ssg_membership_update_cb update_cb,
    void * update_cb_dat)
395
{
Shane Snyder's avatar
Shane Snyder committed
396 397
    ssg_group_descriptor_t *in_group_descriptor = (ssg_group_descriptor_t *)in_group_id;
    char *self_addr_str = NULL;
398
    hg_addr_t group_target_addr = HG_ADDR_NULL;
Shane Snyder's avatar
Shane Snyder committed
399 400 401 402
    char *group_name = NULL;
    int group_size;
    void *view_buf = NULL;
    const char **addr_strs = NULL;
403
    hg_return_t hret;
404
    int sret;
Shane Snyder's avatar
Shane Snyder committed
405 406
    ssg_group_t *g = NULL;
    ssg_group_id_t g_id = SSG_GROUP_ID_NULL;
407

Shane Snyder's avatar
Shane Snyder committed
408
    if (!ssg_inst || in_group_id == SSG_GROUP_ID_NULL) goto fini;
409

Shane Snyder's avatar
Shane Snyder committed
410
    if (in_group_descriptor->owner_status == SSG_OWNER_IS_MEMBER)
411
    {
Shane Snyder's avatar
Shane Snyder committed
412
        fprintf(stderr, "Error: SSG unable to join a group it is already a member of\n");
413 414
        goto fini;
    }
Shane Snyder's avatar
Shane Snyder committed
415
    else if (in_group_descriptor->owner_status == SSG_OWNER_IS_ATTACHER)
416
    {
Shane Snyder's avatar
Shane Snyder committed
417
        fprintf(stderr, "Error: SSG unable to join a group it is attached to\n");
418 419
        goto fini;
    }
420

421 422 423 424 425 426 427 428 429
    /* lookup the address of the target group member in the GID */
    hret = margo_addr_lookup(ssg_inst->mid, in_group_descriptor->addr_str,
        &group_target_addr);
    if (hret != HG_SUCCESS) goto fini;

    sret = ssg_group_join_send(in_group_descriptor, group_target_addr,
        &group_name, &group_size, &view_buf);
    if (sret != SSG_SUCCESS || !group_name || !view_buf) goto fini;

Shane Snyder's avatar
Shane Snyder committed
430
    /* get my address string */
431
    SSG_GET_SELF_ADDR_STR(ssg_inst->mid, self_addr_str);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
432 433
    if (self_addr_str == NULL) goto fini;

Shane Snyder's avatar
Shane Snyder committed
434 435
    /* set up address string array for all group members */
    addr_strs = ssg_addr_str_buf_to_list(view_buf, group_size);
436 437
    if (!addr_strs) goto fini;

Shane Snyder's avatar
Shane Snyder committed
438 439 440 441 442 443 444 445 446 447 448 449 450
    /* append self address string to list of group member address strings */
    addr_strs = realloc(addr_strs, (group_size+1)*sizeof(char *));
    if(!addr_strs) goto fini;
    addr_strs[group_size++] = self_addr_str;

    g = ssg_group_create_internal(group_name, addr_strs, group_size,
            update_cb, update_cb_dat);
    if (g)
    {
        /* on successful creation, dup the group descriptor and return
         * it for the caller to hold on to
         */
        g_id = (ssg_group_id_t)ssg_group_descriptor_dup(g->descriptor);
451 452 453 454 455 456 457 458 459 460 461
        if (g_id == SSG_GROUP_ID_NULL)
        {
            ABT_rwlock_wrlock(ssg_inst->lock);
            HASH_DELETE(hh, ssg_inst->group_table, g);
            ABT_rwlock_unlock(ssg_inst->lock);
            ssg_group_destroy_internal(g);
            goto fini;
        }

        /* don't free on success */
        group_name = NULL;
Shane Snyder's avatar
Shane Snyder committed
462
    }
Shane Snyder's avatar
Shane Snyder committed
463 464

fini:
465 466
    if (group_target_addr != HG_ADDR_NULL)
        margo_addr_free(ssg_inst->mid, group_target_addr);
Shane Snyder's avatar
Shane Snyder committed
467 468 469
    free(addr_strs);
    free(view_buf);
    free(group_name);
470
    free(self_addr_str);
471

Shane Snyder's avatar
Shane Snyder committed
472
    return g_id;
Shane Snyder's avatar
Shane Snyder committed
473 474
}

Shane Snyder's avatar
Shane Snyder committed
475
int ssg_group_leave(
476
    ssg_group_id_t group_id)
Shane Snyder's avatar
Shane Snyder committed
477
{
Shane Snyder's avatar
Shane Snyder committed
478
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
479 480 481 482
    ssg_group_t *g = NULL;
    hg_addr_t group_target_addr = HG_ADDR_NULL;
    hg_return_t hret;
    int sret = SSG_FAILURE;
483

484
    if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) goto fini;
485

486 487
    if (group_descriptor->owner_status != SSG_OWNER_IS_MEMBER)
    {
Shane Snyder's avatar
Shane Snyder committed
488
        fprintf(stderr, "Error: SSG unable to leave group it is not a member of\n");
489
        goto fini;
490 491
    }

492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535
    ABT_rwlock_rdlock(ssg_inst->lock);
    HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash,
        sizeof(uint64_t), g);
    if (!g)
    {
        ABT_rwlock_unlock(ssg_inst->lock);
        goto fini;
    }

    /* send the leave req to the first member in the view */
    hret = margo_addr_dup(ssg_inst->mid, g->view.member_map->addr, &group_target_addr);
    if (hret != HG_SUCCESS)
    {
        ABT_rwlock_unlock(ssg_inst->lock);
        goto fini;
    }
    ABT_rwlock_unlock(ssg_inst->lock);

    sret = ssg_group_leave_send(group_descriptor, g->self_id, group_target_addr);
    if (sret != SSG_SUCCESS) goto fini;

    /* at least one group member knows of the leave request -- safe to
     * shutdown the group locally
     */

    /* re-lookup the group as we don't hold the lock while sending the leave req */
    ABT_rwlock_wrlock(ssg_inst->lock);
    HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash,
        sizeof(uint64_t), g);
    if (g)
    {
        HASH_DELETE(hh, ssg_inst->group_table, g);
        ABT_rwlock_unlock(ssg_inst->lock);
        ssg_group_destroy_internal(g);
    }
    ABT_rwlock_unlock(ssg_inst->lock);

    sret = SSG_SUCCESS;

fini:
    if (group_target_addr != HG_ADDR_NULL)
        margo_addr_free(ssg_inst->mid, group_target_addr);

    return sret;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
536 537
}

538
#if 0
539 540 541
int ssg_group_attach(
    ssg_group_id_t group_id)
{
Shane Snyder's avatar
Shane Snyder committed
542
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
543 544 545 546 547 548
    ssg_attached_group_t *ag = NULL;
    char *group_name = NULL;
    int group_size;
    void *view_buf = NULL;
    const char **addr_strs = NULL;
    int sret = SSG_FAILURE;
549

550
    if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) goto fini;
551

552 553 554 555 556 557 558 559 560 561 562
    if (group_descriptor->owner_status == SSG_OWNER_IS_MEMBER)
    {
        fprintf(stderr, "Error: SSG unable to attach a group it is a member of\n");
        goto fini;
    }
    else if (group_descriptor->owner_status == SSG_OWNER_IS_ATTACHER)
    {
        fprintf(stderr, "Error: SSG unable to attach a group it is" \
            " already attached to\n");
        goto fini;
    }
563

564 565 566 567 568 569 570
    /* send the attach request to a group member to initiate a bulk transfer
     * of the group's membership view
     */
    sret = ssg_group_attach_send(group_descriptor, &group_name,
        &group_size, &view_buf);
    if (sret != SSG_SUCCESS || !group_name || !view_buf) goto fini;

Shane Snyder's avatar
Shane Snyder committed
571
    /* set up address string array for all group members */
572 573 574 575 576 577 578
    addr_strs = ssg_addr_str_buf_to_list(view_buf, group_size);
    if (!addr_strs) goto fini;

    /* allocate an SSG attached group data structure and initialize some of it */
    ag = malloc(sizeof(*ag));
    if (!ag) goto fini;
    memset(ag, 0, sizeof(*ag));
Shane Snyder's avatar
Shane Snyder committed
579
    ag->name = strdup(group_name);
580 581 582 583 584
    ag->descriptor = ssg_group_descriptor_dup(group_descriptor);
    if (!ag->descriptor) goto fini;
    ag->descriptor->owner_status = SSG_OWNER_IS_ATTACHER;

    /* create the view for the group */
Shane Snyder's avatar
Shane Snyder committed
585
    sret = ssg_group_view_create(addr_strs, group_size, NULL, ag->lock, &ag->view, NULL);
586 587 588 589 590 591 592 593 594
    if (sret != SSG_SUCCESS) goto fini;

    /* add this group reference to our group table */
    HASH_ADD(hh, ssg_inst->attached_group_table, descriptor->name_hash,
        sizeof(uint64_t), ag);

    sret = SSG_SUCCESS;

    /* don't free on success */
Shane Snyder's avatar
Shane Snyder committed
595
    group_name = NULL;
596 597
    ag = NULL;
fini:
Shane Snyder's avatar
Shane Snyder committed
598
    if (ag) ssg_attached_group_destroy(ag);
599
    free(addr_strs);
Shane Snyder's avatar
Shane Snyder committed
600 601
    free(view_buf);
    free(group_name);
602 603

    return sret;
604 605 606 607 608
}

int ssg_group_detach(
    ssg_group_id_t group_id)
{
609 610 611
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
    ssg_attached_group_t *ag;

Shane Snyder's avatar
Shane Snyder committed
612 613
    if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) return SSG_FAILURE;

614 615 616 617 618 619 620 621 622 623 624 625
    if (group_descriptor->owner_status != SSG_OWNER_IS_ATTACHER)
    {
        fprintf(stderr, "Error: SSG unable to detach from group that" \
            " was never attached\n");
        return SSG_FAILURE;
    }

    /* find the attached group structure and destroy it */
    HASH_FIND(hh, ssg_inst->attached_group_table, &group_descriptor->name_hash,
        sizeof(uint64_t), ag);
    if (!ag)
    {
Shane Snyder's avatar
Shane Snyder committed
626
        fprintf(stderr, "Error: SSG unable to find expected group attached\n");
627 628 629 630 631
        return SSG_FAILURE;
    }
    HASH_DELETE(hh, ssg_inst->attached_group_table, ag);
    ssg_attached_group_destroy(ag);

632 633
    return SSG_SUCCESS;
}
634
#endif
635

636 637 638
/*********************************
 *** SSG group access routines ***
 *********************************/
Shane Snyder's avatar
Shane Snyder committed
639

640
ssg_member_id_t ssg_get_group_self_id(
641
    ssg_group_id_t group_id)
Shane Snyder's avatar
Shane Snyder committed
642
{
Shane Snyder's avatar
Shane Snyder committed
643
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
644
    ssg_group_t *g;
645
    ssg_member_id_t self_id;
646

Shane Snyder's avatar
Shane Snyder committed
647
    if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) return SSG_MEMBER_ID_INVALID;
648

649 650 651 652
    if (group_descriptor->owner_status != SSG_OWNER_IS_MEMBER)
    {
        fprintf(stderr, "Error: SSG can only obtain a self ID from a group the" \
            " caller is a member of\n");
653
        return SSG_MEMBER_ID_INVALID;
654 655
    }

656
    ABT_rwlock_rdlock(ssg_inst->lock);
Shane Snyder's avatar
Shane Snyder committed
657 658
    HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash,
        sizeof(uint64_t), g);
659 660 661 662 663
    if (g)
        self_id = g->self_id;
    else
        self_id = SSG_MEMBER_ID_INVALID;
    ABT_rwlock_unlock(ssg_inst->lock);
664

665
    return self_id;
Shane Snyder's avatar
Shane Snyder committed
666 667
}

668 669
int ssg_get_group_size(
    ssg_group_id_t group_id)
Shane Snyder's avatar
Shane Snyder committed
670
{
Shane Snyder's avatar
Shane Snyder committed
671
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
Shane Snyder's avatar
Shane Snyder committed
672
    int group_size = 0;
673

Shane Snyder's avatar
Shane Snyder committed
674
    if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) return 0;
675

676 677 678 679
    if (group_descriptor->owner_status == SSG_OWNER_IS_MEMBER)
    {
        ssg_group_t *g;

680
        ABT_rwlock_rdlock(ssg_inst->lock);
681 682 683
        HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash,
            sizeof(uint64_t), g);
        if (g)
Shane Snyder's avatar
Shane Snyder committed
684 685 686 687 688
        {
            ABT_rwlock_rdlock(g->lock);
            group_size = g->view.size + 1; /* add ourself to view size */
            ABT_rwlock_unlock(g->lock);
        }
689
        ABT_rwlock_unlock(ssg_inst->lock);
690
    }
691
#if 0
692 693 694 695 696 697 698
    else if (group_descriptor->owner_status == SSG_OWNER_IS_ATTACHER)
    {
        ssg_attached_group_t *ag;

        HASH_FIND(hh, ssg_inst->attached_group_table, &group_descriptor->name_hash,
            sizeof(uint64_t), ag);
        if (ag)
Shane Snyder's avatar
Shane Snyder committed
699 700 701 702 703
        {
            ABT_rwlock_rdlock(ag->lock);
            group_size = ag->view.size;
            ABT_rwlock_unlock(ag->lock);
        }
704
    }
705
#endif
706 707 708 709
    else
    {
        fprintf(stderr, "Error: SSG can only obtain size of groups that the caller" \
            " is a member of or an attacher of\n");
710
        return 0;
711
    }
712

Shane Snyder's avatar
Shane Snyder committed
713
    return group_size;
Shane Snyder's avatar
Shane Snyder committed
714 715
}

716 717 718
hg_addr_t ssg_get_addr(
    ssg_group_id_t group_id,
    ssg_member_id_t member_id)
Shane Snyder's avatar
Shane Snyder committed
719
{
Shane Snyder's avatar
Shane Snyder committed
720
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
721
    ssg_member_state_t *member_state;
Shane Snyder's avatar
Shane Snyder committed
722
    hg_addr_t member_addr = HG_ADDR_NULL;
723

724 725
    if (!ssg_inst || group_id == SSG_GROUP_ID_NULL ||
            member_id == SSG_MEMBER_ID_INVALID)
726 727
        return HG_ADDR_NULL;

728 729 730 731
    if (group_descriptor->owner_status == SSG_OWNER_IS_MEMBER)
    {
        ssg_group_t *g;

732
        ABT_rwlock_rdlock(ssg_inst->lock);
733 734 735
        HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash,
            sizeof(uint64_t), g);
        if (g)
Shane Snyder's avatar
Shane Snyder committed
736 737 738 739 740 741 742 743
        {
            ABT_rwlock_rdlock(g->lock);
            HASH_FIND(hh, g->view.member_map, &member_id, sizeof(ssg_member_id_t),
                member_state);
            if (member_state) 
                member_addr = member_state->addr;
            ABT_rwlock_unlock(g->lock);
        }
744
        ABT_rwlock_unlock(ssg_inst->lock);
745
    }
746
#if 0
747 748 749 750 751 752 753
    else if (group_descriptor->owner_status == SSG_OWNER_IS_ATTACHER)
    {
        ssg_attached_group_t *ag;

        HASH_FIND(hh, ssg_inst->attached_group_table, &group_descriptor->name_hash,
            sizeof(uint64_t), ag);
        if (ag)
Shane Snyder's avatar
Shane Snyder committed
754 755 756 757 758 759 760 761
        {
            ABT_rwlock_rdlock(ag->lock);
            HASH_FIND(hh, ag->view.member_map, &member_id, sizeof(ssg_member_id_t),
                member_state);
            if (member_state) 
                member_addr = member_state->addr;
            ABT_rwlock_unlock(ag->lock);
        }
762
    }
763
#endif
764 765 766 767 768 769 770
    else
    {
        fprintf(stderr, "Error: SSG can only obtain member addresses of groups" \
            " that the caller is a member of or an attacher of\n");
        return HG_ADDR_NULL;
    }

Shane Snyder's avatar
Shane Snyder committed
771
    return member_addr;
772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788
}

ssg_group_id_t ssg_group_id_dup(
    ssg_group_id_t group_id)
{
    ssg_group_descriptor_t *dup;

    dup = ssg_group_descriptor_dup((ssg_group_descriptor_t *)group_id);
    return (ssg_group_id_t)dup;
}

void ssg_group_id_free(
    ssg_group_id_t group_id)
{
    ssg_group_descriptor_t *descriptor = (ssg_group_descriptor_t *)group_id;

    ssg_group_descriptor_free(descriptor);
Shane Snyder's avatar
Shane Snyder committed
789
    descriptor = SSG_GROUP_ID_NULL;
790 791 792
    return;
}

793 794 795 796 797 798 799 800
char *ssg_group_id_get_addr_str(
    ssg_group_id_t group_id)
{
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;

    return strdup(group_descriptor->addr_str);
}

801 802 803 804 805 806 807
void ssg_group_id_serialize(
    ssg_group_id_t group_id,
    char ** buf_p,
    size_t * buf_size_p)
{
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
    size_t alloc_size;
808
    char *gid_buf, *p; 
809 810 811 812 813 814 815 816 817 818 819 820 821

    *buf_p = NULL;
    *buf_size_p = 0;

    /* determine needed buffer size */
    alloc_size = (sizeof(group_descriptor->magic_nr) + sizeof(group_descriptor->name_hash) +
        strlen(group_descriptor->addr_str) + 1);

    gid_buf = malloc(alloc_size);
    if (!gid_buf)
        return;

    /* serialize */
822 823 824 825 826 827
    p = gid_buf;
    *(uint64_t *)p = group_descriptor->magic_nr;
    p += sizeof(uint64_t);
    *(uint64_t *)p = group_descriptor->name_hash;
    p += sizeof(uint64_t);
    strcpy(p, group_descriptor->addr_str);
828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982
    /* the rest of the descriptor is stateful and not appropriate for serializing... */

    *buf_p = gid_buf;
    *buf_size_p = alloc_size;

    return;
}

void ssg_group_id_deserialize(
    const char * buf,
    size_t buf_size,
    ssg_group_id_t * group_id_p)
{
    size_t min_buf_size;
    uint64_t magic_nr;
    uint64_t name_hash;
    const char *addr_str;
    ssg_group_descriptor_t *group_descriptor;

    *group_id_p = SSG_GROUP_ID_NULL;

    /* check to ensure the buffer contains enough data to make a group ID */
    min_buf_size = (sizeof(group_descriptor->magic_nr) +
        sizeof(group_descriptor->name_hash) + 1);
    if (buf_size < min_buf_size)
    {
        fprintf(stderr, "Error: Serialized buffer does not contain a valid SSG group ID\n");
        return;
    }

    /* deserialize */
    magic_nr = *(uint64_t *)buf;
    if (magic_nr != SSG_MAGIC_NR)
    {
        fprintf(stderr, "Error: Magic number mismatch when deserializing SSG group ID\n");
        return;
    }
    buf += sizeof(uint64_t);
    name_hash = *(uint64_t *)buf;
    buf += sizeof(uint64_t);
    addr_str = buf;

    group_descriptor = ssg_group_descriptor_create(name_hash, addr_str,
        SSG_OWNER_IS_UNASSOCIATED);
    if (!group_descriptor)
        return;

    *group_id_p = (ssg_group_id_t)group_descriptor;

    return;
}

int ssg_group_id_store(
    const char * file_name,
    ssg_group_id_t group_id)
{
    int fd;
    char *buf;
    size_t buf_size;
    ssize_t bytes_written;

    fd = open(file_name, O_WRONLY | O_CREAT | O_TRUNC, 0644);
    if (fd < 0)
    {
        fprintf(stderr, "Error: Unable to open file %s for storing SSG group ID\n",
            file_name);
        return SSG_FAILURE;
    }

    ssg_group_id_serialize(group_id, &buf, &buf_size);
    if (buf == NULL)
    {
        fprintf(stderr, "Error: Unable to serialize SSG group ID.\n");
        close(fd);
        return SSG_FAILURE;
    }

    bytes_written = write(fd, buf, buf_size);
    if (bytes_written != (ssize_t)buf_size)
    {
        fprintf(stderr, "Error: Unable to write SSG group ID to file %s\n", file_name);
        close(fd);
        free(buf);
        return SSG_FAILURE;
    }

    close(fd);
    free(buf);
    return SSG_SUCCESS;
}

int ssg_group_id_load(
    const char * file_name,
    ssg_group_id_t * group_id_p)
{
    int fd;
    struct stat fstats;
    char *buf;
    ssize_t bytes_read;
    int ret;

    *group_id_p = SSG_GROUP_ID_NULL;

    fd = open(file_name, O_RDONLY);
    if (fd < 0)
    {
        fprintf(stderr, "Error: Unable to open file %s for loading SSG group ID\n",
            file_name);
        return SSG_FAILURE;
    }

    ret = fstat(fd, &fstats);
    if (ret != 0)
    {
        fprintf(stderr, "Error: Unable to stat file %s\n", file_name);
        close(fd);
        return SSG_FAILURE;
    }
    if (fstats.st_size == 0)
    {
        fprintf(stderr, "Error: SSG group ID file %s is empty\n", file_name);
        close(fd);
        return SSG_FAILURE;
    }

    buf = malloc(fstats.st_size);
    if (buf == NULL)
    {
        close(fd);
        return SSG_FAILURE;
    }

    bytes_read = read(fd, buf, fstats.st_size);
    if (bytes_read != (ssize_t)fstats.st_size)
    {
        fprintf(stderr, "Error: Unable to read SSG group ID from file %s\n", file_name);
        close(fd);
        free(buf);
        return SSG_FAILURE;
    }

    ssg_group_id_deserialize(buf, (size_t)bytes_read, group_id_p);
    if (*group_id_p == SSG_GROUP_ID_NULL)
    {
        fprintf(stderr, "Error: Unable to deserialize SSG group ID\n");
        close(fd);
        free(buf);
        return SSG_FAILURE;
    }

    close(fd);
    free(buf);
    return SSG_SUCCESS;
}

983 984 985 986 987
void ssg_group_dump(
    ssg_group_id_t group_id)
{
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
    ssg_group_view_t *group_view = NULL;
988 989
    ABT_rwlock group_view_lock;
    int group_size;
990 991 992 993 994 995 996 997
    char *group_name = NULL;
    char group_role[32];
    char group_self_id[32];

    if (group_descriptor->owner_status == SSG_OWNER_IS_MEMBER)
    {
        ssg_group_t *g;

998
        ABT_rwlock_rdlock(ssg_inst->lock);
999 1000 1001 1002 1003
        HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash,
            sizeof(uint64_t), g);
        if (g)
        {
            group_view = &g->view;
1004 1005
            group_view_lock = g->lock;
            group_size = g->view.size + 1;
1006 1007
            group_name = g->name;
            strcpy(group_role, "member");
1008
            sprintf(group_self_id, "%lu", g->self_id);
1009
        }
1010
        ABT_rwlock_unlock(ssg_inst->lock);
1011
    }
1012
#if 0
1013 1014 1015 1016 1017 1018 1019 1020 1021
    else if (group_descriptor->owner_status == SSG_OWNER_IS_ATTACHER)
    {
        ssg_attached_group_t *ag;

        HASH_FIND(hh, ssg_inst->attached_group_table, &group_descriptor->name_hash,
            sizeof(uint64_t), ag);
        if (ag)
        {
            group_view = &ag->view;
1022
            group_size = ag->view.size;
1023 1024 1025 1026
            group_name = ag->name;
            strcpy(group_role, "attacher");
        }
    }
1027
#endif
1028 1029 1030 1031 1032 1033 1034 1035 1036
    else
    {
        fprintf(stderr, "Error: SSG can only dump membership information for" \
            " groups that the caller is a member of or an attacher of\n");
        return;
    }

    if (group_view)
    {
1037
        ssg_member_state_t *member_state, *tmp_ms;
1038 1039 1040 1041 1042

        printf("SSG membership information for group '%s':\n", group_name);
        printf("\trole: '%s'\n", group_role);
        if (strcmp(group_role, "member") == 0)
            printf("\tself_id: %s\n", group_self_id);
1043
        printf("\tsize: %d\n", group_size);
1044
        printf("\tview:\n");
1045
        ABT_rwlock_rdlock(group_view_lock);
1046
        HASH_ITER(hh, group_view->member_map, member_state, tmp_ms)
1047
        {
Shane Snyder's avatar
Shane Snyder committed
1048
            printf("\t\tid: %20lu\taddr: %s\n", member_state->id,
1049
                member_state->addr_str);
1050
        }
1051
        ABT_rwlock_unlock(group_view_lock);
1052 1053 1054 1055
    }
    else
        fprintf(stderr, "Error: SSG unable to find group view associated" \
            " with the given group ID\n");
1056

1057
    return;
Shane Snyder's avatar
Shane Snyder committed
1058 1059
}

1060 1061 1062
/************************************
 *** SSG internal helper routines ***
 ************************************/
1063

Shane Snyder's avatar
Shane Snyder committed
1064 1065 1066
static ssg_group_t * ssg_group_create_internal(
    const char * group_name, const char * const group_addr_strs[],
    int group_size, ssg_membership_update_cb update_cb, void *update_cb_dat)
1067
{
Shane Snyder's avatar
Shane Snyder committed
1068 1069 1070 1071 1072 1073
    uint64_t name_hash;
    char *self_addr_str = NULL;
    ssg_member_state_t *ms, *tmp_ms;
    unsigned int i = 0;
    int sret;
    int success = 0;
1074
    ssg_group_t *g = NULL, *check_g;
1075

Shane Snyder's avatar
Shane Snyder committed
1076
    if (!ssg_inst) return NULL;
Shane Snyder's avatar
Shane Snyder committed
1077

Shane Snyder's avatar
Shane Snyder committed
1078 1079 1080
    name_hash = ssg_hash64_str(group_name);

    /* get my address string */
1081
    SSG_GET_SELF_ADDR_STR(ssg_inst->mid, self_addr_str);
Shane Snyder's avatar
Shane Snyder committed
1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103
    if (self_addr_str == NULL) goto fini;

    /* allocate an SSG group data structure and initialize some of it */
    g = malloc(sizeof(*g));
    if (!g) goto fini;
    memset(g, 0, sizeof(*g));
    g->name = strdup(group_name);
    if (!g->name) goto fini;
    g->update_cb = update_cb;
    g->update_cb_dat = update_cb_dat;
    ABT_rwlock_create(&g->lock);

    /* generate unique descriptor for this group  */
    g->descriptor = ssg_group_descriptor_create(name_hash, self_addr_str,
        SSG_OWNER_IS_MEMBER);
    if (g->descriptor == NULL) goto fini;

    /* initialize the group view */
    sret = ssg_group_view_create(group_addr_strs, group_size, self_addr_str,
        g->lock, &g->view, &g->self_id);
    if (sret != SSG_SUCCESS) goto fini;
    if (g->self_id == SSG_MEMBER_ID_INVALID)
Shane Snyder's avatar
Shane Snyder committed
1104
    {
Shane Snyder's avatar
Shane Snyder committed
1105 1106 1107 1108
        /* if unable to resolve my rank within the group, error out */
        fprintf(stderr, "Error: SSG unable to resolve rank in group %s\n",
            group_name);
        goto fini;
Shane Snyder's avatar
Shane Snyder committed
1109
    }
1110

Shane Snyder's avatar
Shane Snyder committed
1111 1112 1113 1114 1115 1116 1117 1118 1119 1120
    /* create a list of all target member states and shuffle it */
    g->target_list.targets = malloc(g->view.size * sizeof(*g->target_list.targets));
    if (g->target_list.targets == NULL) goto fini;
    g->target_list.nslots = g->target_list.len = g->view.size;
    g->target_list.dping_ndx = 0;
    HASH_ITER(hh, g->view.member_map, ms, tmp_ms)
    {
        g->target_list.targets[i] = ms;
        i++;
    }
1121

1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138
#ifdef DEBUG
    /* set debug output pointer */
    char *dbg_log_dir = getenv("SSG_DEBUG_LOGDIR");
    if (dbg_log_dir)
    {
        char dbg_log_path[PATH_MAX];
        snprintf(dbg_log_path, PATH_MAX, "%s/ssg-%s-%lu.log",
            dbg_log_dir, g->name, g->self_id);
        g->dbg_log = fopen(dbg_log_path, "a");
        if (!g->dbg_log) goto fini;
    }
    else
    {
        g->dbg_log = stdout;
    }
#endif

1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168
    /* make sure we aren't re-creating an existing group */
    ABT_rwlock_wrlock(ssg_inst->lock);
    HASH_FIND(hh, ssg_inst->group_table, &name_hash, sizeof(uint64_t), check_g);
    if (check_g) goto fini;

    /* add this group reference to the group table */
    HASH_ADD(hh, ssg_inst->group_table, descriptor->name_hash,
        sizeof(uint64_t), g);
    ABT_rwlock_unlock(ssg_inst->lock);

    /* initialize swim failure detector */
    swim_group_mgmt_callbacks_t swim_callbacks = {
        .get_dping_target = &ssg_get_swim_dping_target,
        .get_iping_targets = &ssg_get_swim_iping_targets,
        .get_member_addr = ssg_get_swim_member_addr,
        .get_member_state = ssg_get_swim_member_state,
        .apply_member_update = ssg_apply_swim_member_update,
        .apply_user_updates = ssg_apply_swim_user_updates,
    };
    g->swim_ctx = swim_init(ssg_inst->mid, g, (swim_member_id_t)g->self_id,
        swim_callbacks, 1);
    if (g->swim_ctx == NULL)
    {
        ABT_rwlock_wrlock(ssg_inst->lock);
        HASH_DELETE(hh, ssg_inst->group_table, g);
        ABT_rwlock_unlock(ssg_inst->lock);
        goto fini;
    }

    SSG_DEBUG(g, "group create successful (size=%d, self=%s)\n", group_size, self_addr_str);
Shane Snyder's avatar
Shane Snyder committed
1169
    success = 1;
1170

Shane Snyder's avatar
Shane Snyder committed
1171 1172
fini:
    if (!success && g)
1173
    {
1174 1175 1176 1177 1178
#ifdef DEBUG
        /* if using logfile debug output, close the stream */
        if (getenv("SSG_DEBUG_LOGDIR"))
            fclose(g->dbg_log);
#endif
Shane Snyder's avatar
Shane Snyder committed
1179 1180
        if (g->descriptor) ssg_group_descriptor_free(g->descriptor);
        ssg_group_view_destroy(&g->view);
1181
        ABT_rwlock_free(&g->lock);
Shane Snyder's avatar
Shane Snyder committed
1182 1183 1184 1185
        free(g->target_list.targets);
        free(g->name);
        free(g);
        g = NULL;
1186
    }
Shane Snyder's avatar
Shane Snyder committed
1187 1188 1189
    free(self_addr_str);

    return g;
1190
}
1191

1192 1193 1194
static int ssg_group_add_member(
    ssg_group_t *g, const char * addr_str, hg_addr_t addr,
    ssg_member_id_t member_id)
1195
{
1196
    ssg_member_state_t *ms;
1197

1198 1199
    HASH_FIND(hh, g->dead_members, &member_id, sizeof(member_id), ms);
    if (ms) return SSG_FAILURE;
1200

1201 1202 1203
    /* group view add member */
    ms = ssg_group_view_add_member(addr_str, addr, member_id, &g->view);
    if (ms == NULL) return SSG_FAILURE;
1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214

    /* add to target list */
    if (g->target_list.len == g->target_list.nslots)
    {
        /* realloc target list, use fixed incr for now */
        /* XXX constants bad... */
        g->target_list.targets = realloc(g->target_list.targets, 
            (g->target_list.len + 10) * sizeof(*g->target_list.targets));
        if (!g->target_list.targets) return SSG_FAILURE;
        g->target_list.nslots += 10;
    }
1215
    g->target_list.targets[g->target_list.len++] = ms;
1216

1217
    SSG_DEBUG(g, "successfully added member %lu\n", member_id);
1218

1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236
    return SSG_SUCCESS;
}

static int ssg_group_remove_member(
    ssg_group_t *g, ssg_member_state_t *member_state)
{
    /* remove from view and add to dead list */
    HASH_DELETE(hh, g->view.member_map, member_state);
    g->view.size--;
    HASH_ADD(hh, g->dead_members, id, sizeof(member_state->id), member_state);
    margo_addr_free(ssg_inst->mid, member_state->addr);
    member_state->addr= HG_ADDR_NULL;

    /* NOTE: we don't remove member from target list here -- we clean the target
     * list when we shuffle it after a complete traversal of ping targets
     */

    SSG_DEBUG(g, "successfully removed member %lu\n", member_state->id);
1237 1238 1239 1240

    return SSG_SUCCESS;
}

1241
static int ssg_group_view_create(
1242
    const char * const group_addr_strs[], int group_size,
Shane Snyder's avatar
Shane Snyder committed
1243 1244
    const char * self_addr_str, ABT_rwlock view_lock,
    ssg_group_view_t * view, ssg_member_id_t * self_id)
1245 1246
{
    int i, j, r;
1247 1248
    ABT_thread *lookup_ults = NULL;
    struct ssg_group_lookup_ult_args *lookup_ult_args = NULL;
1249 1250 1251
    const char *self_addr_substr = NULL;
    const char *addr_substr = NULL;
    int aret;
1252
    int sret = SSG_FAILURE;
1253

1254 1255 1256
    if (self_id)
        *self_id = SSG_MEMBER_ID_INVALID;
        
Shane Snyder's avatar
Shane Snyder committed
1257
    if ((self_id != NULL && self_addr_str == NULL) || !view) goto fini;
Shane Snyder's avatar
Shane Snyder committed
1258

1259 1260
    /* allocate lookup ULTs */
    lookup_ults = malloc(group_size * sizeof(*lookup_ults));
1261 1262
    if (lookup_ults == NULL) goto fini;
    for (i = 0; i < group_size; i++) lookup_ults[i] = ABT_THREAD_NULL;
1263
    lookup_ult_args = malloc(group_size * sizeof(*lookup_ult_args));
1264
    if (lookup_ult_args == NULL) goto fini;
Shane Snyder's avatar
Shane Snyder committed
1265

Shane Snyder's avatar
Shane Snyder committed
1266
    if(self_addr_str)
1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278
    {
        /* strstr is used here b/c there may be inconsistencies in whether the class
         * is included in the address or not (it should not be in HG_Addr_to_string,
         * but it's possible that it is in the list of group address strings)
         */
        self_addr_substr = strstr(self_addr_str, "://");
        if (self_addr_substr == NULL)
            self_addr_substr = self_addr_str;
        else
            self_addr_substr += 3;
    }

1279
    /* construct view using ULTs to lookup the address of each group member */
Shane Snyder's avatar
Shane Snyder committed
1280
    r = rand() % group_size;
1281 1282 1283 1284 1285
    for (i = 0; i < group_size; i++)
    {
        /* randomize our starting index so all group members aren't looking
         * up other group members in the same order
         */
Shane Snyder's avatar
Shane Snyder committed
1286
        j = (r + i) % group_size;
1287

1288 1289
        if (group_addr_strs[j] == NULL || strlen(group_addr_strs[j]) == 0) continue;

1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300
        /* resolve self id in group if caller asked for it */
        if (self_addr_substr)
        {
            addr_substr = strstr(group_addr_strs[j], "://");
            if (addr_substr == NULL)
                addr_substr = group_addr_strs[j];
            else
                addr_substr += 3;

            if (strcmp(self_addr_substr, addr_substr) == 0)
            {
1301
                if (self_id)               
1302
                    *self_id = ssg_gen_member_id(group_addr_strs[j]);
1303 1304 1305

                /* don't look up our own address, we already know it */
                continue;
1306 1307 1308 1309
            }
        }

        /* XXX limit outstanding lookups to some max */
1310
        lookup_ult_args[j].addr_str = group_addr_strs[j];
1311
        lookup_ult_args[j].view = view;
Shane Snyder's avatar
Shane Snyder committed
1312
        lookup_ult_args[j].lock = view_lock;
1313 1314
        ABT_pool pool;
        margo_get_handler_pool(ssg_inst->mid, &pool);
1315 1316
        aret = ABT_thread_create(pool, &ssg_group_lookup_ult,
            &lookup_ult_args[j], ABT_THREAD_ATTR_NULL,
1317
            &lookup_ults[j]);
1318
        if (aret != ABT_SUCCESS) goto fini;
1319 1320 1321 1322 1323 1324 1325 1326 1327 1328
    }

    /* wait on all lookup ULTs to terminate */