ssg.c 44.8 KB
Newer Older
1 2 3 4 5 6
/*
 * Copyright (c) 2016 UChicago Argonne, LLC
 *
 * See COPYRIGHT in top-level directory.
 */

7
#include "ssg-config.h"
Shane Snyder's avatar
Shane Snyder committed
8

Jonathan Jenkins's avatar
Jonathan Jenkins committed
9 10 11 12
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
13
#include <errno.h>
Jonathan Jenkins's avatar
Jonathan Jenkins committed
14 15
#include <stdlib.h>
#include <string.h>
16
#include <time.h>
17
#include <linux/limits.h>
Jonathan Jenkins's avatar
Jonathan Jenkins committed
18
#include <assert.h>
Shane Snyder's avatar
Shane Snyder committed
19
#ifdef SSG_HAVE_MPI
20 21
#include <mpi.h>
#endif
Jonathan Jenkins's avatar
Jonathan Jenkins committed
22

Shane Snyder's avatar
Shane Snyder committed
23
#include <mercury.h>
24
#include <abt.h>
Shane Snyder's avatar
Shane Snyder committed
25
#include <margo.h>
26

27
#include "ssg.h"
Shane Snyder's avatar
Shane Snyder committed
28 29 30
#ifdef SSG_HAVE_MPI
#include "ssg-mpi.h"
#endif
31
#include "ssg-internal.h"
Shane Snyder's avatar
Shane Snyder committed
32
#include "swim-fd/swim-fd.h"
33

34 35 36
/* arguments for group lookup ULTs */
struct ssg_group_lookup_ult_args
{
37
    const char *addr_str;
38
    ssg_group_view_t *view;
Shane Snyder's avatar
Shane Snyder committed
39
    ABT_rwlock lock;
40
    int out;
41 42 43
};
static void ssg_group_lookup_ult(void * arg);

44
/* SSG helper routine prototypes */
Shane Snyder's avatar
Shane Snyder committed
45 46 47 48 49 50 51
static ssg_group_t * ssg_group_create_internal(
    const char * group_name, const char * const group_addr_strs[],
    int group_size, ssg_membership_update_cb update_cb, void *update_cb_dat);
static int ssg_group_view_create(
    const char * const group_addr_strs[], int group_size,
    const char * self_addr_str, ABT_rwlock view_lock,
    ssg_group_view_t * view, ssg_member_id_t * self_id);
52
static ssg_member_state_t * ssg_group_view_add_member(
53 54
    const char * addr_str, hg_addr_t addr, ssg_member_id_t member_id,
    ssg_group_view_t * view);
55
static ssg_group_descriptor_t * ssg_group_descriptor_create(
56
    uint64_t name_hash, const char * leader_addr_str, int owner_status);
57 58 59 60 61 62
static ssg_group_descriptor_t * ssg_group_descriptor_dup(
    ssg_group_descriptor_t * descriptor);
static void ssg_group_destroy_internal(
    ssg_group_t * g);
static void ssg_attached_group_destroy(
    ssg_attached_group_t * ag);
Shane Snyder's avatar
Shane Snyder committed
63 64 65 66 67 68
static void ssg_group_view_destroy(
    ssg_group_view_t * view);
static void ssg_group_descriptor_free(
    ssg_group_descriptor_t * descriptor);
static ssg_member_id_t ssg_gen_member_id(
    const char * addr_str);
69 70
static const char ** ssg_addr_str_buf_to_list(
    const char * buf, int num_addrs);
71

72 73
/* XXX: i think we ultimately need per-mid ssg instances rather than 1 global? */
ssg_instance_t *ssg_inst = NULL;
74

75 76 77
/***************************************************
 *** SSG runtime intialization/shutdown routines ***
 ***************************************************/
78

79 80
int ssg_init(
    margo_instance_id mid)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
81
{
82 83
    struct timespec ts;

84 85 86 87 88 89 90 91 92
    if (ssg_inst)
        return SSG_FAILURE;

    /* initialize an SSG instance for this margo instance */
    ssg_inst = malloc(sizeof(*ssg_inst));
    if (!ssg_inst)
        return SSG_FAILURE;
    memset(ssg_inst, 0, sizeof(*ssg_inst));
    ssg_inst->mid = mid;
93
    ABT_rwlock_create(&ssg_inst->lock);
94

95
    ssg_register_rpcs();
96

97 98 99 100
    /* seed RNG */
    clock_gettime(CLOCK_MONOTONIC, &ts);
    srand(ts.tv_nsec + getpid());

101 102
    return SSG_SUCCESS;
}
Jonathan Jenkins's avatar
Jonathan Jenkins committed
103

104
int ssg_finalize()
105
{
106 107
    ssg_group_t *g, *g_tmp;
    ssg_attached_group_t *ag, *ag_tmp;
108 109 110 111

    if (!ssg_inst)
        return SSG_FAILURE;

112 113
    ABT_rwlock_wrlock(ssg_inst->lock);

114
    /* destroy all active groups */
115
    HASH_ITER(hh, ssg_inst->group_table, g, g_tmp)
116 117
    {
        HASH_DELETE(hh, ssg_inst->group_table, g);
118
        ABT_rwlock_unlock(ssg_inst->lock);
119
        ssg_group_destroy_internal(g);
120
        ABT_rwlock_wrlock(ssg_inst->lock);
121 122
    }

123 124 125 126 127 128
    /* detach from all attached groups */
    HASH_ITER(hh, ssg_inst->attached_group_table, ag, ag_tmp)
    {
        ssg_attached_group_destroy(ag);
    }

129 130 131
    ABT_rwlock_unlock(ssg_inst->lock);
    ABT_rwlock_free(&ssg_inst->lock);

132 133 134
    free(ssg_inst);
    ssg_inst = NULL;

135
    return SSG_SUCCESS;
136
}
Jonathan Jenkins's avatar
Jonathan Jenkins committed
137

138 139 140
/*************************************
 *** SSG group management routines ***
 *************************************/
Jonathan Jenkins's avatar
Jonathan Jenkins committed
141

Shane Snyder's avatar
Shane Snyder committed
142 143 144 145 146 147
ssg_group_id_t ssg_group_create(
    const char * group_name,
    const char * const group_addr_strs[],
    int group_size,
    ssg_membership_update_cb update_cb,
    void * update_cb_dat)
148
{
Shane Snyder's avatar
Shane Snyder committed
149 150
    ssg_group_t *g;
    ssg_group_id_t g_id = SSG_GROUP_ID_NULL;
151

Shane Snyder's avatar
Shane Snyder committed
152 153 154
    g = ssg_group_create_internal(group_name, group_addr_strs,
            group_size, update_cb, update_cb_dat);
    if (g)
Shane Snyder's avatar
Shane Snyder committed
155
        g_id = (ssg_group_id_t)g->descriptor;
Shane Snyder's avatar
Shane Snyder committed
156 157

    return g_id;
158
}
Shane Snyder's avatar
Shane Snyder committed
159

Shane Snyder's avatar
Shane Snyder committed
160 161 162 163 164
ssg_group_id_t ssg_group_create_config(
    const char * group_name,
    const char * file_name,
    ssg_membership_update_cb update_cb,
    void * update_cb_dat)
165
{
Shane Snyder's avatar
Shane Snyder committed
166 167 168 169 170 171 172 173 174 175
    int fd;
    struct stat st;
    char *rd_buf = NULL;
    ssize_t rd_buf_size;
    char *tok;
    void *addr_str_buf = NULL;
    int addr_str_buf_len = 0, num_addrs = 0;
    int ret;
    const char **addr_strs = NULL;
    ssg_group_id_t group_id = SSG_GROUP_ID_NULL;
176

Shane Snyder's avatar
Shane Snyder committed
177 178 179 180 181 182 183 184
    /* open config file for reading */
    fd = open(file_name, O_RDONLY);
    if (fd == -1)
    {
        fprintf(stderr, "Error: SSG unable to open config file %s for group %s\n",
            file_name, group_name);
        goto fini;
    }
185

Shane Snyder's avatar
Shane Snyder committed
186 187 188
    /* get file size and allocate a buffer to store it */
    ret = fstat(fd, &st);
    if (ret == -1)
Shane Snyder's avatar
Shane Snyder committed
189
    {
Shane Snyder's avatar
Shane Snyder committed
190 191 192
        fprintf(stderr, "Error: SSG unable to stat config file %s for group %s\n",
            file_name, group_name);
        goto fini;
Shane Snyder's avatar
Shane Snyder committed
193
    }
Shane Snyder's avatar
Shane Snyder committed
194 195
    rd_buf = malloc(st.st_size+1);
    if (rd_buf == NULL) goto fini;
Shane Snyder's avatar
Shane Snyder committed
196

Shane Snyder's avatar
Shane Snyder committed
197 198 199
    /* load it all in one fell swoop */
    rd_buf_size = read(fd, rd_buf, st.st_size);
    if (rd_buf_size != st.st_size)
Shane Snyder's avatar
Shane Snyder committed
200
    {
Shane Snyder's avatar
Shane Snyder committed
201 202 203
        fprintf(stderr, "Error: SSG unable to read config file %s for group %s\n",
            file_name, group_name);
        goto fini;
Shane Snyder's avatar
Shane Snyder committed
204
    }
Shane Snyder's avatar
Shane Snyder committed
205 206 207 208 209 210 211
    rd_buf[rd_buf_size]='\0';

    /* strtok the result - each space-delimited address is assumed to be
     * a unique mercury address
     */
    tok = strtok(rd_buf, "\r\n\t ");
    if (tok == NULL) goto fini;
Shane Snyder's avatar
Shane Snyder committed
212

Shane Snyder's avatar
Shane Snyder committed
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
    /* build up the address buffer */
    addr_str_buf = malloc(rd_buf_size);
    if (addr_str_buf == NULL) goto fini;
    do
    {
        int tok_size = strlen(tok);
        memcpy((char*)addr_str_buf + addr_str_buf_len, tok, tok_size+1);
        addr_str_buf_len += tok_size+1;
        num_addrs++;
        tok = strtok(NULL, "\r\n\t ");
    } while (tok != NULL);
    if (addr_str_buf_len != rd_buf_size)
    {
        /* adjust buffer size if our initial guess was wrong */
        void *tmp = realloc(addr_str_buf, addr_str_buf_len);
        if (tmp == NULL) goto fini;
        addr_str_buf = tmp;
    }
Shane Snyder's avatar
Shane Snyder committed
231

Shane Snyder's avatar
Shane Snyder committed
232 233 234
    /* set up address string array for group members */
    addr_strs = ssg_addr_str_buf_to_list(addr_str_buf, num_addrs);
    if (!addr_strs) goto fini;
Shane Snyder's avatar
Shane Snyder committed
235

Shane Snyder's avatar
Shane Snyder committed
236 237 238
    /* invoke the generic group create routine using our list of addrs */
    group_id = ssg_group_create(group_name, addr_strs, num_addrs,
        update_cb, update_cb_dat);
Shane Snyder's avatar
Shane Snyder committed
239

Shane Snyder's avatar
Shane Snyder committed
240 241 242 243 244 245
fini:
    /* cleanup before returning */
    if (fd != -1) close(fd);
    free(rd_buf);
    free(addr_str_buf);
    free(addr_strs);
Shane Snyder's avatar
Shane Snyder committed
246

Shane Snyder's avatar
Shane Snyder committed
247
    return group_id;
Shane Snyder's avatar
Shane Snyder committed
248 249
}

Shane Snyder's avatar
Shane Snyder committed
250 251 252 253 254 255
#ifdef SSG_HAVE_MPI
ssg_group_id_t ssg_group_create_mpi(
    const char * group_name,
    MPI_Comm comm,
    ssg_membership_update_cb update_cb,
    void * update_cb_dat)
Shane Snyder's avatar
Shane Snyder committed
256
{
Shane Snyder's avatar
Shane Snyder committed
257 258
    int i;
    char *self_addr_str = NULL;
259
    int self_addr_str_size = 0;
Shane Snyder's avatar
Shane Snyder committed
260 261 262 263 264 265
    char *addr_str_buf = NULL;
    int *sizes = NULL;
    int *sizes_psum = NULL;
    int comm_size = 0, comm_rank = 0;
    const char **addr_strs = NULL;
    ssg_group_id_t group_id = SSG_GROUP_ID_NULL;
Shane Snyder's avatar
Shane Snyder committed
266

Shane Snyder's avatar
Shane Snyder committed
267
    if (!ssg_inst) goto fini;
Shane Snyder's avatar
Shane Snyder committed
268

Shane Snyder's avatar
Shane Snyder committed
269
    /* get my address */
270
    SSG_GET_SELF_ADDR_STR(ssg_inst->mid, self_addr_str);
Shane Snyder's avatar
Shane Snyder committed
271
    if (self_addr_str == NULL) goto fini;
272
    self_addr_str_size = (int)strlen(self_addr_str) + 1;
273

Shane Snyder's avatar
Shane Snyder committed
274 275 276 277 278
    /* gather the buffer sizes */
    MPI_Comm_size(comm, &comm_size);
    MPI_Comm_rank(comm, &comm_rank);
    sizes = malloc(comm_size * sizeof(*sizes));
    if (sizes == NULL) goto fini;
279
    sizes[comm_rank] = self_addr_str_size;
Shane Snyder's avatar
Shane Snyder committed
280
    MPI_Allgather(MPI_IN_PLACE, 0, MPI_BYTE, sizes, 1, MPI_INT, comm);
281

Shane Snyder's avatar
Shane Snyder committed
282 283
    /* compute a exclusive prefix sum of the data sizes, including the
     * total at the end
284
     */
Shane Snyder's avatar
Shane Snyder committed
285 286 287 288 289
    sizes_psum = malloc((comm_size+1) * sizeof(*sizes_psum));
    if (sizes_psum == NULL) goto fini;
    sizes_psum[0] = 0;
    for (i = 1; i < comm_size+1; i++)
        sizes_psum[i] = sizes_psum[i-1] + sizes[i-1];
290

Shane Snyder's avatar
Shane Snyder committed
291 292 293
    /* allgather the addresses */
    addr_str_buf = malloc(sizes_psum[comm_size]);
    if (addr_str_buf == NULL) goto fini;
294
    MPI_Allgatherv(self_addr_str, self_addr_str_size, MPI_BYTE,
Shane Snyder's avatar
Shane Snyder committed
295
            addr_str_buf, sizes, sizes_psum, MPI_BYTE, comm);
Shane Snyder's avatar
Shane Snyder committed
296

Shane Snyder's avatar
Shane Snyder committed
297 298 299
    /* set up address string array for group members */
    addr_strs = ssg_addr_str_buf_to_list(addr_str_buf, comm_size);
    if (!addr_strs) goto fini;
300

Shane Snyder's avatar
Shane Snyder committed
301 302 303
    /* invoke the generic group create routine using our list of addrs */
    group_id = ssg_group_create(group_name, addr_strs, comm_size,
        update_cb, update_cb_dat);
Shane Snyder's avatar
Shane Snyder committed
304

Shane Snyder's avatar
Shane Snyder committed
305 306 307 308 309 310 311
fini:
    /* cleanup before returning */
    free(self_addr_str);
    free(sizes);
    free(sizes_psum);
    free(addr_str_buf);
    free(addr_strs);
Shane Snyder's avatar
Shane Snyder committed
312

Shane Snyder's avatar
Shane Snyder committed
313
    return group_id;
Shane Snyder's avatar
Shane Snyder committed
314
}
Shane Snyder's avatar
Shane Snyder committed
315
#endif
Shane Snyder's avatar
Shane Snyder committed
316

Shane Snyder's avatar
Shane Snyder committed
317 318
int ssg_group_destroy(
    ssg_group_id_t group_id)
319
{
Shane Snyder's avatar
Shane Snyder committed
320 321
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
    ssg_group_t *g;
322

Shane Snyder's avatar
Shane Snyder committed
323
    if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) return SSG_FAILURE;
324

Shane Snyder's avatar
Shane Snyder committed
325
    if (group_descriptor->owner_status != SSG_OWNER_IS_MEMBER)
326
    {
Shane Snyder's avatar
Shane Snyder committed
327 328
        fprintf(stderr, "Error: SSG unable to destroy a group it is not a member of\n");
        return SSG_FAILURE;
329
    }
330

331 332
    ABT_rwlock_wrlock(ssg_inst->lock);

Shane Snyder's avatar
Shane Snyder committed
333 334 335 336
    /* find the group structure and destroy it */
    HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash,
        sizeof(uint64_t), g);
    if (!g)
337
    {
338
        ABT_rwlock_unlock(ssg_inst->lock);
Shane Snyder's avatar
Shane Snyder committed
339 340
        fprintf(stderr, "Error: SSG unable to find expected group reference\n");
        return SSG_FAILURE;
341
    }
342 343
    HASH_DELETE(hh, ssg_inst->group_table, g);
    ABT_rwlock_unlock(ssg_inst->lock);
Shane Snyder's avatar
Shane Snyder committed
344
    ssg_group_destroy_internal(g);
345

Shane Snyder's avatar
Shane Snyder committed
346
    return SSG_SUCCESS;
347 348
}

Shane Snyder's avatar
Shane Snyder committed
349 350
ssg_group_id_t ssg_group_join(
    ssg_group_id_t in_group_id,
351 352
    ssg_membership_update_cb update_cb,
    void * update_cb_dat)
353
{
Shane Snyder's avatar
Shane Snyder committed
354 355
    ssg_group_descriptor_t *in_group_descriptor = (ssg_group_descriptor_t *)in_group_id;
    char *self_addr_str = NULL;
356
    hg_addr_t group_target_addr = HG_ADDR_NULL;
Shane Snyder's avatar
Shane Snyder committed
357 358 359 360
    char *group_name = NULL;
    int group_size;
    void *view_buf = NULL;
    const char **addr_strs = NULL;
361
    hg_return_t hret;
362
    int sret;
Shane Snyder's avatar
Shane Snyder committed
363 364
    ssg_group_t *g = NULL;
    ssg_group_id_t g_id = SSG_GROUP_ID_NULL;
365

Shane Snyder's avatar
Shane Snyder committed
366
    if (!ssg_inst || in_group_id == SSG_GROUP_ID_NULL) goto fini;
367

Shane Snyder's avatar
Shane Snyder committed
368
    if (in_group_descriptor->owner_status == SSG_OWNER_IS_MEMBER)
369
    {
Shane Snyder's avatar
Shane Snyder committed
370
        fprintf(stderr, "Error: SSG unable to join a group it is already a member of\n");
371 372
        goto fini;
    }
Shane Snyder's avatar
Shane Snyder committed
373
    else if (in_group_descriptor->owner_status == SSG_OWNER_IS_ATTACHER)
374
    {
Shane Snyder's avatar
Shane Snyder committed
375
        fprintf(stderr, "Error: SSG unable to join a group it is attached to\n");
376 377
        goto fini;
    }
378

379 380 381 382 383 384 385 386 387
    /* lookup the address of the target group member in the GID */
    hret = margo_addr_lookup(ssg_inst->mid, in_group_descriptor->addr_str,
        &group_target_addr);
    if (hret != HG_SUCCESS) goto fini;

    sret = ssg_group_join_send(in_group_descriptor, group_target_addr,
        &group_name, &group_size, &view_buf);
    if (sret != SSG_SUCCESS || !group_name || !view_buf) goto fini;

Shane Snyder's avatar
Shane Snyder committed
388
    /* get my address string */
389
    SSG_GET_SELF_ADDR_STR(ssg_inst->mid, self_addr_str);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
390 391
    if (self_addr_str == NULL) goto fini;

Shane Snyder's avatar
Shane Snyder committed
392 393
    /* set up address string array for all group members */
    addr_strs = ssg_addr_str_buf_to_list(view_buf, group_size);
394 395
    if (!addr_strs) goto fini;

Shane Snyder's avatar
Shane Snyder committed
396 397 398 399 400 401 402 403 404
    /* append self address string to list of group member address strings */
    addr_strs = realloc(addr_strs, (group_size+1)*sizeof(char *));
    if(!addr_strs) goto fini;
    addr_strs[group_size++] = self_addr_str;

    g = ssg_group_create_internal(group_name, addr_strs, group_size,
            update_cb, update_cb_dat);
    if (g)
    {
Shane Snyder's avatar
Shane Snyder committed
405
        g_id = (ssg_group_id_t)g->descriptor;
406 407 408

        /* don't free on success */
        group_name = NULL;
Shane Snyder's avatar
Shane Snyder committed
409
    }
Shane Snyder's avatar
Shane Snyder committed
410 411

fini:
412 413
    if (group_target_addr != HG_ADDR_NULL)
        margo_addr_free(ssg_inst->mid, group_target_addr);
Shane Snyder's avatar
Shane Snyder committed
414 415 416
    free(addr_strs);
    free(view_buf);
    free(group_name);
417
    free(self_addr_str);
418

Shane Snyder's avatar
Shane Snyder committed
419
    return g_id;
Shane Snyder's avatar
Shane Snyder committed
420 421
}

Shane Snyder's avatar
Shane Snyder committed
422
int ssg_group_leave(
423
    ssg_group_id_t group_id)
Shane Snyder's avatar
Shane Snyder committed
424
{
Shane Snyder's avatar
Shane Snyder committed
425
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
426 427 428 429
    ssg_group_t *g = NULL;
    hg_addr_t group_target_addr = HG_ADDR_NULL;
    hg_return_t hret;
    int sret = SSG_FAILURE;
430

431
    if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) goto fini;
432

433 434
    if (group_descriptor->owner_status != SSG_OWNER_IS_MEMBER)
    {
Shane Snyder's avatar
Shane Snyder committed
435
        fprintf(stderr, "Error: SSG unable to leave group it is not a member of\n");
436
        goto fini;
437 438
    }

439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473
    ABT_rwlock_rdlock(ssg_inst->lock);
    HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash,
        sizeof(uint64_t), g);
    if (!g)
    {
        ABT_rwlock_unlock(ssg_inst->lock);
        goto fini;
    }

    /* send the leave req to the first member in the view */
    hret = margo_addr_dup(ssg_inst->mid, g->view.member_map->addr, &group_target_addr);
    if (hret != HG_SUCCESS)
    {
        ABT_rwlock_unlock(ssg_inst->lock);
        goto fini;
    }
    ABT_rwlock_unlock(ssg_inst->lock);

    sret = ssg_group_leave_send(group_descriptor, g->self_id, group_target_addr);
    if (sret != SSG_SUCCESS) goto fini;

    /* at least one group member knows of the leave request -- safe to
     * shutdown the group locally
     */

    /* re-lookup the group as we don't hold the lock while sending the leave req */
    ABT_rwlock_wrlock(ssg_inst->lock);
    HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash,
        sizeof(uint64_t), g);
    if (g)
    {
        HASH_DELETE(hh, ssg_inst->group_table, g);
        ABT_rwlock_unlock(ssg_inst->lock);
        ssg_group_destroy_internal(g);
    }
Shane Snyder's avatar
Shane Snyder committed
474 475
    else
        ABT_rwlock_unlock(ssg_inst->lock);
476 477 478 479 480 481 482 483

    sret = SSG_SUCCESS;

fini:
    if (group_target_addr != HG_ADDR_NULL)
        margo_addr_free(ssg_inst->mid, group_target_addr);

    return sret;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
484 485
}

486
#if 0
487 488 489
int ssg_group_attach(
    ssg_group_id_t group_id)
{
Shane Snyder's avatar
Shane Snyder committed
490
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
491 492 493 494 495 496
    ssg_attached_group_t *ag = NULL;
    char *group_name = NULL;
    int group_size;
    void *view_buf = NULL;
    const char **addr_strs = NULL;
    int sret = SSG_FAILURE;
497

498
    if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) goto fini;
499

500 501 502 503 504 505 506 507 508 509 510
    if (group_descriptor->owner_status == SSG_OWNER_IS_MEMBER)
    {
        fprintf(stderr, "Error: SSG unable to attach a group it is a member of\n");
        goto fini;
    }
    else if (group_descriptor->owner_status == SSG_OWNER_IS_ATTACHER)
    {
        fprintf(stderr, "Error: SSG unable to attach a group it is" \
            " already attached to\n");
        goto fini;
    }
511

512 513 514 515 516 517 518
    /* send the attach request to a group member to initiate a bulk transfer
     * of the group's membership view
     */
    sret = ssg_group_attach_send(group_descriptor, &group_name,
        &group_size, &view_buf);
    if (sret != SSG_SUCCESS || !group_name || !view_buf) goto fini;

Shane Snyder's avatar
Shane Snyder committed
519
    /* set up address string array for all group members */
520 521 522 523 524 525 526
    addr_strs = ssg_addr_str_buf_to_list(view_buf, group_size);
    if (!addr_strs) goto fini;

    /* allocate an SSG attached group data structure and initialize some of it */
    ag = malloc(sizeof(*ag));
    if (!ag) goto fini;
    memset(ag, 0, sizeof(*ag));
Shane Snyder's avatar
Shane Snyder committed
527
    ag->name = strdup(group_name);
528 529 530 531 532
    ag->descriptor = ssg_group_descriptor_dup(group_descriptor);
    if (!ag->descriptor) goto fini;
    ag->descriptor->owner_status = SSG_OWNER_IS_ATTACHER;

    /* create the view for the group */
Shane Snyder's avatar
Shane Snyder committed
533
    sret = ssg_group_view_create(addr_strs, group_size, NULL, ag->lock, &ag->view, NULL);
534 535 536 537 538 539 540 541 542
    if (sret != SSG_SUCCESS) goto fini;

    /* add this group reference to our group table */
    HASH_ADD(hh, ssg_inst->attached_group_table, descriptor->name_hash,
        sizeof(uint64_t), ag);

    sret = SSG_SUCCESS;

    /* don't free on success */
Shane Snyder's avatar
Shane Snyder committed
543
    group_name = NULL;
544 545
    ag = NULL;
fini:
Shane Snyder's avatar
Shane Snyder committed
546
    if (ag) ssg_attached_group_destroy(ag);
547
    free(addr_strs);
Shane Snyder's avatar
Shane Snyder committed
548 549
    free(view_buf);
    free(group_name);
550 551

    return sret;
552 553 554 555 556
}

int ssg_group_detach(
    ssg_group_id_t group_id)
{
557 558 559
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
    ssg_attached_group_t *ag;

Shane Snyder's avatar
Shane Snyder committed
560 561
    if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) return SSG_FAILURE;

562 563 564 565 566 567 568 569 570 571 572 573
    if (group_descriptor->owner_status != SSG_OWNER_IS_ATTACHER)
    {
        fprintf(stderr, "Error: SSG unable to detach from group that" \
            " was never attached\n");
        return SSG_FAILURE;
    }

    /* find the attached group structure and destroy it */
    HASH_FIND(hh, ssg_inst->attached_group_table, &group_descriptor->name_hash,
        sizeof(uint64_t), ag);
    if (!ag)
    {
Shane Snyder's avatar
Shane Snyder committed
574
        fprintf(stderr, "Error: SSG unable to find expected group attached\n");
575 576 577 578 579
        return SSG_FAILURE;
    }
    HASH_DELETE(hh, ssg_inst->attached_group_table, ag);
    ssg_attached_group_destroy(ag);

580 581
    return SSG_SUCCESS;
}
582
#endif
583

584 585 586
/*********************************
 *** SSG group access routines ***
 *********************************/
Shane Snyder's avatar
Shane Snyder committed
587

588
ssg_member_id_t ssg_get_group_self_id(
589
    ssg_group_id_t group_id)
Shane Snyder's avatar
Shane Snyder committed
590
{
Shane Snyder's avatar
Shane Snyder committed
591
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
592
    ssg_group_t *g;
593
    ssg_member_id_t self_id;
594

Shane Snyder's avatar
Shane Snyder committed
595
    if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) return SSG_MEMBER_ID_INVALID;
596

597 598 599 600
    if (group_descriptor->owner_status != SSG_OWNER_IS_MEMBER)
    {
        fprintf(stderr, "Error: SSG can only obtain a self ID from a group the" \
            " caller is a member of\n");
601
        return SSG_MEMBER_ID_INVALID;
602 603
    }

604
    ABT_rwlock_rdlock(ssg_inst->lock);
Shane Snyder's avatar
Shane Snyder committed
605 606
    HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash,
        sizeof(uint64_t), g);
607 608 609 610 611
    if (g)
        self_id = g->self_id;
    else
        self_id = SSG_MEMBER_ID_INVALID;
    ABT_rwlock_unlock(ssg_inst->lock);
612

613
    return self_id;
Shane Snyder's avatar
Shane Snyder committed
614 615
}

616 617
int ssg_get_group_size(
    ssg_group_id_t group_id)
Shane Snyder's avatar
Shane Snyder committed
618
{
Shane Snyder's avatar
Shane Snyder committed
619
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
Shane Snyder's avatar
Shane Snyder committed
620
    int group_size = 0;
621

Shane Snyder's avatar
Shane Snyder committed
622
    if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) return 0;
623

624 625 626 627
    if (group_descriptor->owner_status == SSG_OWNER_IS_MEMBER)
    {
        ssg_group_t *g;

628
        ABT_rwlock_rdlock(ssg_inst->lock);
629 630 631
        HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash,
            sizeof(uint64_t), g);
        if (g)
Shane Snyder's avatar
Shane Snyder committed
632 633 634 635 636
        {
            ABT_rwlock_rdlock(g->lock);
            group_size = g->view.size + 1; /* add ourself to view size */
            ABT_rwlock_unlock(g->lock);
        }
637
        ABT_rwlock_unlock(ssg_inst->lock);
638
    }
639
#if 0
640 641 642 643 644 645 646
    else if (group_descriptor->owner_status == SSG_OWNER_IS_ATTACHER)
    {
        ssg_attached_group_t *ag;

        HASH_FIND(hh, ssg_inst->attached_group_table, &group_descriptor->name_hash,
            sizeof(uint64_t), ag);
        if (ag)
Shane Snyder's avatar
Shane Snyder committed
647 648 649 650 651
        {
            ABT_rwlock_rdlock(ag->lock);
            group_size = ag->view.size;
            ABT_rwlock_unlock(ag->lock);
        }
652
    }
653
#endif
654 655 656 657
    else
    {
        fprintf(stderr, "Error: SSG can only obtain size of groups that the caller" \
            " is a member of or an attacher of\n");
658
        return 0;
659
    }
660

Shane Snyder's avatar
Shane Snyder committed
661
    return group_size;
Shane Snyder's avatar
Shane Snyder committed
662 663
}

664 665 666
hg_addr_t ssg_get_addr(
    ssg_group_id_t group_id,
    ssg_member_id_t member_id)
Shane Snyder's avatar
Shane Snyder committed
667
{
Shane Snyder's avatar
Shane Snyder committed
668
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
669
    ssg_member_state_t *member_state;
Shane Snyder's avatar
Shane Snyder committed
670
    hg_addr_t member_addr = HG_ADDR_NULL;
671

672 673
    if (!ssg_inst || group_id == SSG_GROUP_ID_NULL ||
            member_id == SSG_MEMBER_ID_INVALID)
674 675
        return HG_ADDR_NULL;

676 677 678 679
    if (group_descriptor->owner_status == SSG_OWNER_IS_MEMBER)
    {
        ssg_group_t *g;

680
        ABT_rwlock_rdlock(ssg_inst->lock);
681 682 683
        HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash,
            sizeof(uint64_t), g);
        if (g)
Shane Snyder's avatar
Shane Snyder committed
684 685 686 687 688 689 690 691
        {
            ABT_rwlock_rdlock(g->lock);
            HASH_FIND(hh, g->view.member_map, &member_id, sizeof(ssg_member_id_t),
                member_state);
            if (member_state) 
                member_addr = member_state->addr;
            ABT_rwlock_unlock(g->lock);
        }
692
        ABT_rwlock_unlock(ssg_inst->lock);
693
    }
694
#if 0
695 696 697 698 699 700 701
    else if (group_descriptor->owner_status == SSG_OWNER_IS_ATTACHER)
    {
        ssg_attached_group_t *ag;

        HASH_FIND(hh, ssg_inst->attached_group_table, &group_descriptor->name_hash,
            sizeof(uint64_t), ag);
        if (ag)
Shane Snyder's avatar
Shane Snyder committed
702 703 704 705 706 707 708 709
        {
            ABT_rwlock_rdlock(ag->lock);
            HASH_FIND(hh, ag->view.member_map, &member_id, sizeof(ssg_member_id_t),
                member_state);
            if (member_state) 
                member_addr = member_state->addr;
            ABT_rwlock_unlock(ag->lock);
        }
710
    }
711
#endif
712 713 714 715 716 717 718
    else
    {
        fprintf(stderr, "Error: SSG can only obtain member addresses of groups" \
            " that the caller is a member of or an attacher of\n");
        return HG_ADDR_NULL;
    }

Shane Snyder's avatar
Shane Snyder committed
719
    return member_addr;
720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736
}

ssg_group_id_t ssg_group_id_dup(
    ssg_group_id_t group_id)
{
    ssg_group_descriptor_t *dup;

    dup = ssg_group_descriptor_dup((ssg_group_descriptor_t *)group_id);
    return (ssg_group_id_t)dup;
}

void ssg_group_id_free(
    ssg_group_id_t group_id)
{
    ssg_group_descriptor_t *descriptor = (ssg_group_descriptor_t *)group_id;

    ssg_group_descriptor_free(descriptor);
Shane Snyder's avatar
Shane Snyder committed
737
    descriptor = SSG_GROUP_ID_NULL;
738 739 740
    return;
}

741 742 743 744 745 746 747 748
char *ssg_group_id_get_addr_str(
    ssg_group_id_t group_id)
{
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;

    return strdup(group_descriptor->addr_str);
}

749 750 751 752 753 754 755
void ssg_group_id_serialize(
    ssg_group_id_t group_id,
    char ** buf_p,
    size_t * buf_size_p)
{
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
    size_t alloc_size;
756
    char *gid_buf, *p; 
757 758 759 760 761 762 763 764 765 766 767 768 769

    *buf_p = NULL;
    *buf_size_p = 0;

    /* determine needed buffer size */
    alloc_size = (sizeof(group_descriptor->magic_nr) + sizeof(group_descriptor->name_hash) +
        strlen(group_descriptor->addr_str) + 1);

    gid_buf = malloc(alloc_size);
    if (!gid_buf)
        return;

    /* serialize */
770 771 772 773 774 775
    p = gid_buf;
    *(uint64_t *)p = group_descriptor->magic_nr;
    p += sizeof(uint64_t);
    *(uint64_t *)p = group_descriptor->name_hash;
    p += sizeof(uint64_t);
    strcpy(p, group_descriptor->addr_str);

    /* the rest of the descriptor is stateful and not appropriate for serializing... */

    *buf_p = gid_buf;
    *buf_size_p = alloc_size;

    return;
}

void ssg_group_id_deserialize(
    const char * buf,
    size_t buf_size,
    ssg_group_id_t * group_id_p)
{
    size_t min_buf_size;
    uint64_t magic_nr;
    uint64_t name_hash;
    const char *addr_str;
    ssg_group_descriptor_t *group_descriptor;

    *group_id_p = SSG_GROUP_ID_NULL;

    /* check to ensure the buffer contains enough data to make a group ID */
    min_buf_size = (sizeof(group_descriptor->magic_nr) +
        sizeof(group_descriptor->name_hash) + 1);
    if (buf_size < min_buf_size)
    {
        fprintf(stderr, "Error: Serialized buffer does not contain a valid SSG group ID\n");
        return;
    }

    /* deserialize */
    magic_nr = *(uint64_t *)buf;
    if (magic_nr != SSG_MAGIC_NR)
    {
        fprintf(stderr, "Error: Magic number mismatch when deserializing SSG group ID\n");
        return;
    }
    buf += sizeof(uint64_t);
    name_hash = *(uint64_t *)buf;
    buf += sizeof(uint64_t);
    addr_str = buf;

    group_descriptor = ssg_group_descriptor_create(name_hash, addr_str,
        SSG_OWNER_IS_UNASSOCIATED);
    if (!group_descriptor)
        return;

    *group_id_p = (ssg_group_id_t)group_descriptor;

    return;
}

int ssg_group_id_store(
    const char * file_name,
    ssg_group_id_t group_id)
{
    int fd;
    char *buf;
    size_t buf_size;
    ssize_t bytes_written;

    fd = open(file_name, O_WRONLY | O_CREAT | O_TRUNC, 0644);
    if (fd < 0)
    {
        fprintf(stderr, "Error: Unable to open file %s for storing SSG group ID\n",
            file_name);
        return SSG_FAILURE;
    }

    ssg_group_id_serialize(group_id, &buf, &buf_size);
    if (buf == NULL)
    {
        fprintf(stderr, "Error: Unable to serialize SSG group ID.\n");
        close(fd);
        return SSG_FAILURE;
    }

    bytes_written = write(fd, buf, buf_size);
    if (bytes_written != (ssize_t)buf_size)
    {
        fprintf(stderr, "Error: Unable to write SSG group ID to file %s\n", file_name);
        close(fd);
        free(buf);
        return SSG_FAILURE;
    }

    close(fd);
    free(buf);
    return SSG_SUCCESS;
}

int ssg_group_id_load(
    const char * file_name,
    ssg_group_id_t * group_id_p)
{
    int fd;
    struct stat fstats;
    char *buf;
    ssize_t bytes_read;
    int ret;

    *group_id_p = SSG_GROUP_ID_NULL;

    fd = open(file_name, O_RDONLY);
    if (fd < 0)
    {
        fprintf(stderr, "Error: Unable to open file %s for loading SSG group ID\n",
            file_name);
        return SSG_FAILURE;
    }

    ret = fstat(fd, &fstats);
    if (ret != 0)
    {
        fprintf(stderr, "Error: Unable to stat file %s\n", file_name);
        close(fd);
        return SSG_FAILURE;
    }
    if (fstats.st_size == 0)
    {
        fprintf(stderr, "Error: SSG group ID file %s is empty\n", file_name);
        close(fd);
        return SSG_FAILURE;
    }

    buf = malloc(fstats.st_size);
    if (buf == NULL)
    {
        close(fd);
        return SSG_FAILURE;
    }

    bytes_read = read(fd, buf, fstats.st_size);
    if (bytes_read != (ssize_t)fstats.st_size)
    {
        fprintf(stderr, "Error: Unable to read SSG group ID from file %s\n", file_name);
        close(fd);
        free(buf);
        return SSG_FAILURE;
    }

    ssg_group_id_deserialize(buf, (size_t)bytes_read, group_id_p);
    if (*group_id_p == SSG_GROUP_ID_NULL)
    {
        fprintf(stderr, "Error: Unable to deserialize SSG group ID\n");
        close(fd);
        free(buf);
        return SSG_FAILURE;
    }

    close(fd);
    free(buf);
    return SSG_SUCCESS;
}

931 932 933 934 935
void ssg_group_dump(
    ssg_group_id_t group_id)
{
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
    ssg_group_view_t *group_view = NULL;
936 937
    ABT_rwlock group_view_lock;
    int group_size;
938 939 940 941 942 943 944 945
    char *group_name = NULL;
    char group_role[32];
    char group_self_id[32];

    if (group_descriptor->owner_status == SSG_OWNER_IS_MEMBER)
    {
        ssg_group_t *g;

946
        ABT_rwlock_rdlock(ssg_inst->lock);
947 948 949 950 951
        HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash,
            sizeof(uint64_t), g);
        if (g)
        {
            group_view = &g->view;
952 953
            group_view_lock = g->lock;
            group_size = g->view.size + 1;
954 955
            group_name = g->name;
            strcpy(group_role, "member");
956
            sprintf(group_self_id, "%lu", g->self_id);
957
        }
958
        ABT_rwlock_unlock(ssg_inst->lock);
959
    }
960
#if 0
961 962 963 964 965 966 967 968 969
    else if (group_descriptor->owner_status == SSG_OWNER_IS_ATTACHER)
    {
        ssg_attached_group_t *ag;

        HASH_FIND(hh, ssg_inst->attached_group_table, &group_descriptor->name_hash,
            sizeof(uint64_t), ag);
        if (ag)
        {
            group_view = &ag->view;
970
            group_size = ag->view.size;
971 972 973 974
            group_name = ag->name;
            strcpy(group_role, "attacher");
        }
    }
975
#endif
976 977 978 979 980 981 982 983 984
    else
    {
        fprintf(stderr, "Error: SSG can only dump membership information for" \
            " groups that the caller is a member of or an attacher of\n");
        return;
    }

    if (group_view)
    {
985
        ssg_member_state_t *member_state, *tmp_ms;
986 987 988 989 990

        printf("SSG membership information for group '%s':\n", group_name);
        printf("\trole: '%s'\n", group_role);
        if (strcmp(group_role, "member") == 0)
            printf("\tself_id: %s\n", group_self_id);
991
        printf("\tsize: %d\n", group_size);
992
        printf("\tview:\n");
993
        ABT_rwlock_rdlock(group_view_lock);
994
        HASH_ITER(hh, group_view->member_map, member_state, tmp_ms)
995
        {
Shane Snyder's avatar
Shane Snyder committed
996
            printf("\t\tid: %20lu\taddr: %s\n", member_state->id,
997
                member_state->addr_str);
998
        }
999
        ABT_rwlock_unlock(group_view_lock);
1000 1001 1002 1003
    }
    else
        fprintf(stderr, "Error: SSG unable to find group view associated" \
            " with the given group ID\n");
1004

1005
    return;
Shane Snyder's avatar
Shane Snyder committed
1006 1007
}

1008 1009 1010
/************************************
 *** SSG internal helper routines ***
 ************************************/
1011

Shane Snyder's avatar
Shane Snyder committed
1012 1013 1014
static ssg_group_t * ssg_group_create_internal(
    const char * group_name, const char * const group_addr_strs[],
    int group_size, ssg_membership_update_cb update_cb, void *update_cb_dat)
1015
{
Shane Snyder's avatar
Shane Snyder committed
1016 1017 1018 1019
    uint64_t name_hash;
    char *self_addr_str = NULL;
    int sret;
    int success = 0;
1020
    ssg_group_t *g = NULL, *check_g;
1021

Shane Snyder's avatar
Shane Snyder committed
1022
    if (!ssg_inst) return NULL;
Shane Snyder's avatar
Shane Snyder committed
1023

Shane Snyder's avatar
Shane Snyder committed
1024 1025 1026
    name_hash = ssg_hash64_str(group_name);

    /* get my address string */
1027
    SSG_GET_SELF_ADDR_STR(ssg_inst->mid, self_addr_str);
Shane Snyder's avatar
Shane Snyder committed
1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049
    if (self_addr_str == NULL) goto fini;

    /* allocate an SSG group data structure and initialize some of it */
    g = malloc(sizeof(*g));
    if (!g) goto fini;
    memset(g, 0, sizeof(*g));
    g->name = strdup(group_name);
    if (!g->name) goto fini;
    g->update_cb = update_cb;
    g->update_cb_dat = update_cb_dat;
    ABT_rwlock_create(&g->lock);

    /* generate unique descriptor for this group  */
    g->descriptor = ssg_group_descriptor_create(name_hash, self_addr_str,
        SSG_OWNER_IS_MEMBER);
    if (g->descriptor == NULL) goto fini;

    /* initialize the group view */
    sret = ssg_group_view_create(group_addr_strs, group_size, self_addr_str,
        g->lock, &g->view, &g->self_id);
    if (sret != SSG_SUCCESS) goto fini;
    if (g->self_id == SSG_MEMBER_ID_INVALID)
Shane Snyder's avatar
Shane Snyder committed
1050
    {
Shane Snyder's avatar
Shane Snyder committed
1051 1052 1053 1054
        /* if unable to resolve my rank within the group, error out */
        fprintf(stderr, "Error: SSG unable to resolve rank in group %s\n",
            group_name);
        goto fini;
Shane Snyder's avatar
Shane Snyder committed
1055
    }
1056

1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073
#ifdef DEBUG
    /* set debug output pointer */
    char *dbg_log_dir = getenv("SSG_DEBUG_LOGDIR");
    if (dbg_log_dir)
    {
        char dbg_log_path[PATH_MAX];
        snprintf(dbg_log_path, PATH_MAX, "%s/ssg-%s-%lu.log",
            dbg_log_dir, g->name, g->self_id);
        g->dbg_log = fopen(dbg_log_path, "a");
        if (!g->dbg_log) goto fini;
    }
    else
    {
        g->dbg_log = stdout;
    }
#endif

1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084
    /* make sure we aren't re-creating an existing group */
    ABT_rwlock_wrlock(ssg_inst->lock);
    HASH_FIND(hh, ssg_inst->group_table, &name_hash, sizeof(uint64_t), check_g);
    if (check_g) goto fini;

    /* add this group reference to the group table */
    HASH_ADD(hh, ssg_inst->group_table, descriptor->name_hash,
        sizeof(uint64_t), g);
    ABT_rwlock_unlock(ssg_inst->lock);

    /* initialize swim failure detector */
Shane Snyder's avatar
Shane Snyder committed
1085 1086
    sret = swim_init(g, ssg_inst->mid, 1);
    if (sret != SSG_SUCCESS)
1087 1088 1089 1090 1091 1092 1093 1094
    {
        ABT_rwlock_wrlock(ssg_inst->lock);
        HASH_DELETE(hh, ssg_inst->group_table, g);
        ABT_rwlock_unlock(ssg_inst->lock);
        goto fini;
    }

    SSG_DEBUG(g, "group create successful (size=%d, self=%s)\n", group_size, self_addr_str);
Shane Snyder's avatar
Shane Snyder committed
1095
    success = 1;
1096

Shane Snyder's avatar
Shane Snyder committed
1097 1098
fini:
    if (!success && g)
1099
    {
1100 1101 1102 1103 1104
#ifdef DEBUG
        /* if using logfile debug output, close the stream */
        if (getenv("SSG_DEBUG_LOGDIR"))
            fclose(g->dbg_log);
#endif
Shane Snyder's avatar
Shane Snyder committed
1105 1106
        if (g->descriptor) ssg_group_descriptor_free(g->descriptor);
        ssg_group_view_destroy(&g->view);
1107
        ABT_rwlock_free(&g->lock);
Shane Snyder's avatar
Shane Snyder committed
1108 1109 1110
        free(g->name);
        free(g);
        g = NULL;
1111
    }
Shane Snyder's avatar
Shane Snyder committed
1112 1113 1114
    free(self_addr_str);

    return g;
1115
}
1116

1117
static int ssg_group_view_create(
1118
    const char * const group_addr_strs[], int group_size,
Shane Snyder's avatar
Shane Snyder committed
1119 1120
    const char * self_addr_str, ABT_rwlock view_lock,
    ssg_group_view_t * view, ssg_member_id_t * self_id)
1121 1122
{
    int i, j, r;
1123 1124
    ABT_thread *lookup_ults = NULL;
    struct ssg_group_lookup_ult_args *lookup_ult_args = NULL;
1125 1126 1127
    const char *self_addr_substr = NULL;
    const char *addr_substr = NULL;
    int aret;
1128
    int sret = SSG_FAILURE;
1129

1130 1131 1132
    if (self_id)
        *self_id = SSG_MEMBER_ID_INVALID;
        
Shane Snyder's avatar
Shane Snyder committed
1133
    if ((self_id != NULL && self_addr_str == NULL) || !view) goto fini;
Shane Snyder's avatar
Shane Snyder committed
1134

1135 1136
    /* allocate lookup ULTs */
    lookup_ults = malloc(group_size * sizeof(*lookup_ults));
1137 1138
    if (lookup_ults == NULL) goto fini;
    for (i = 0; i < group_size; i++) lookup_ults[i] = ABT_THREAD_NULL;
1139
    lookup_ult_args = malloc(group_size * sizeof(*lookup_ult_args));
1140
    if (lookup_ult_args == NULL) goto fini;
Shane Snyder's avatar
Shane Snyder committed
1141

Shane Snyder's avatar
Shane Snyder committed
1142
    if(self_addr_str)
1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154
    {
        /* strstr is used here b/c there may be inconsistencies in whether the class
         * is included in the address or not (it should not be in HG_Addr_to_string,
         * but it's possible that it is in the list of group address strings)
         */
        self_addr_substr = strstr(self_addr_str, "://");
        if (self_addr_substr == NULL)
            self_addr_substr = self_addr_str;
        else
            self_addr_substr += 3;
    }

1155
    /* construct view using ULTs to lookup the address of each group member */
Shane Snyder's avatar
Shane Snyder committed
1156
    r = rand() % group_size;
1157 1158 1159 1160 1161
    for (i = 0; i < group_size; i++)
    {
        /* randomize our starting index so all group members aren't looking
         * up other group members in the same order
         */
Shane Snyder's avatar
Shane Snyder committed
1162
        j = (r + i) % group_size;
1163

1164 1165
        if (group_addr_strs[j] == NULL || strlen(group_addr_strs[j]) == 0) continue;

1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176
        /* resolve self id in group if caller asked for it */
        if (self_addr_substr)
        {
            addr_substr = strstr(group_addr_strs[j], "://");
            if (addr_substr == NULL)
                addr_substr = group_addr_strs[j];
            else
                addr_substr += 3;

            if (strcmp(self_addr_substr, addr_substr) == 0)
            {
1177
                if (self_id)               
1178
                    *self_id = ssg_gen_member_id(group_addr_strs[j]);
1179 1180 1181

                /* don't look up our own address, we already know it */
                continue;
1182 1183 1184 1185
            }
        }

        /* XXX limit outstanding lookups to some max */
1186
        lookup_ult_args[j].addr_str = group_addr_strs[j];
1187
        lookup_ult_args[j].view = view;
Shane Snyder's avatar
Shane Snyder committed
1188
        lookup_ult_args[j].lock = view_lock;
1189 1190
        ABT_pool pool;
        margo_get_handler_pool(ssg_inst->mid, &pool);
1191 1192
        aret = ABT_thread_create(pool, &ssg_group_lookup_ult,
            &lookup_ult_args[j], ABT_THREAD_ATTR_NULL,
1193
            &lookup_ults[j]);
1194
        if (aret != ABT_SUCCESS) goto fini;
1195 1196 1197 1198 1199 1200 1201 1202 1203 1204
    }

    /* wait on all lookup ULTs to terminate */
    for (i = 0; i < group_size; i++)
    {
        if (lookup_ults[i] == ABT_THREAD_NULL) continue;

        aret = ABT_thread_join(lookup_ults[i]);
        ABT_thread_free(&lookup_ults[i]);
        lookup_ults[i] = ABT_THREAD_NULL;
1205
        if (aret != ABT_SUCCESS) goto fini;
1206
        else if (lookup_ult_args[i].out != SSG_SUCCESS)
1207
        {
1208 1209
            fprintf(stderr, "Error: SSG unable to lookup HG address %s\n",
                lookup_ult_args[i].addr_str);
1210
            goto fini;
1211 1212 1213
        }
    }

1214 1215 1216
    /* clean exit */
    sret = SSG_SUCCESS;

1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233
fini:
    if (sret != SSG_SUCCESS)
    {
        for (i = 0; i < group_size; i++)
        {
            if (lookup_ults[i] != ABT_THREAD_NULL)
            {
                ABT_thread_cancel(lookup_ults[i]);
                ABT_thread_free(&lookup_ults[i]);
            }
        }
        ssg_group_view_destroy(view);
    }
    free(lookup_ults);
    free(lookup_ult_args);

    return sret;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
1234
}
1235

1236 1237 1238 1239
static void ssg_group_lookup_ult(
    void * arg)
{
    struct ssg_group_lookup_ult_args *l = arg;
1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250
    ssg_member_id_t member_id = ssg_gen_member_id(l->addr_str);
    hg_addr_t member_addr;
    ssg_member_state_t *ms;
    hg_return_t hret;

    hret = margo_addr_lookup(ssg_inst->mid, l->addr_str, &member_addr);
    if (hret != HG_SUCCESS)
    {
        l->out = SSG_FAILURE;
        return;
    }
1251

1252 1253 1254
    ABT_rwlock_wrlock(l->lock);
    ms = ssg_group_view_add_member(l->addr_str, member_addr, member_id, l->view);
    if (ms)
1255 1256 1257
        l->out = SSG_SUCCESS;
    else
        l->out = SSG_FAILURE;
1258
    ABT_rwlock_unlock(l->lock);
1259 1260 1261 1262 1263

    return;
}

static ssg_member_state_t * ssg_group_view_add_member(
1264 1265
    const char * addr_str, hg_addr_t addr, ssg_member_id_t member_id,
    ssg_group_view_t * view)
1266 1267 1268 1269 1270 1271
{
    ssg_member_state_t *ms;

    ms = calloc(1, sizeof(*ms));
    if (!ms) return NULL;
    ms->addr_str = strdup(addr_str);
1272
    ms->addr = addr;
1273
    if (!ms->addr_str)
1274
    {
1275 1276
        free(ms);
        return NULL;
1277
    }
1278
    ms->id = member_id;
1279 1280 1281 1282 1283

    HASH_ADD(hh, view->member_map, id, sizeof(ssg_member_id_t), ms);
    view->size++;

    return ms;
1284 1285
}

Shane Snyder's avatar
Shane Snyder committed
1286 1287
static ssg_group_descriptor_t * ssg_group_descriptor_create(
    uint64_t name_hash, const char * leader_addr_str, int owner_status)
1288
{