ssg.c 51.6 KB
Newer Older
1 2 3 4 5 6
/*
 * Copyright (c) 2016 UChicago Argonne, LLC
 *
 * See COPYRIGHT in top-level directory.
 */

7
#include "ssg-config.h"
Shane Snyder's avatar
Shane Snyder committed
8

Jonathan Jenkins's avatar
Jonathan Jenkins committed
9 10 11 12
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
13
#include <errno.h>
Jonathan Jenkins's avatar
Jonathan Jenkins committed
14 15
#include <stdlib.h>
#include <string.h>
16
#include <time.h>
17
#include <linux/limits.h>
Jonathan Jenkins's avatar
Jonathan Jenkins committed
18
#include <assert.h>
Shane Snyder's avatar
Shane Snyder committed
19
#ifdef SSG_HAVE_MPI
20 21
#include <mpi.h>
#endif
Shane Snyder's avatar
Shane Snyder committed
22 23 24
#ifdef SSG_HAVE_PMIX
#include <pmix.h>
#endif
Jonathan Jenkins's avatar
Jonathan Jenkins committed
25

Shane Snyder's avatar
Shane Snyder committed
26
#include <mercury.h>
27
#include <abt.h>
Shane Snyder's avatar
Shane Snyder committed
28
#include <margo.h>
29

30
#include "ssg.h"
Shane Snyder's avatar
Shane Snyder committed
31 32 33
#ifdef SSG_HAVE_MPI
#include "ssg-mpi.h"
#endif
Shane Snyder's avatar
Shane Snyder committed
34 35 36
#ifdef SSG_HAVE_PMIX
#include "ssg-pmix.h"
#endif
37
#include "ssg-internal.h"
Shane Snyder's avatar
Shane Snyder committed
38
#include "swim-fd/swim-fd.h"
39

40 41 42
/* arguments for group lookup ULTs */
struct ssg_group_lookup_ult_args
{
43
    const char *addr_str;
44
    ssg_group_view_t *view;
Shane Snyder's avatar
Shane Snyder committed
45
    ABT_rwlock lock;
46
    int out;
47 48 49
};
static void ssg_group_lookup_ult(void * arg);

50
/* SSG helper routine prototypes */
Shane Snyder's avatar
Shane Snyder committed
51 52 53 54 55 56
static ssg_group_t * ssg_group_create_internal(
    const char * group_name, const char * const group_addr_strs[],
    int group_size, ssg_membership_update_cb update_cb, void *update_cb_dat);
static int ssg_group_view_create(
    const char * const group_addr_strs[], int group_size,
    const char * self_addr_str, ABT_rwlock view_lock,
57
    ssg_group_view_t * view);
58
static ssg_member_state_t * ssg_group_view_add_member(
59 60
    const char * addr_str, hg_addr_t addr, ssg_member_id_t member_id,
    ssg_group_view_t * view);
61
static ssg_group_descriptor_t * ssg_group_descriptor_create(
62
    uint64_t name_hash, const char * leader_addr_str, int owner_status);
63 64 65 66 67 68
static ssg_group_descriptor_t * ssg_group_descriptor_dup(
    ssg_group_descriptor_t * descriptor);
static void ssg_group_destroy_internal(
    ssg_group_t * g);
static void ssg_attached_group_destroy(
    ssg_attached_group_t * ag);
Shane Snyder's avatar
Shane Snyder committed
69 70 71 72 73 74
static void ssg_group_view_destroy(
    ssg_group_view_t * view);
static void ssg_group_descriptor_free(
    ssg_group_descriptor_t * descriptor);
static ssg_member_id_t ssg_gen_member_id(
    const char * addr_str);
75 76
static const char ** ssg_addr_str_buf_to_list(
    const char * buf, int num_addrs);
77 78 79 80 81 82 83 84
#ifdef SSG_HAVE_PMIX
void ssg_pmix_proc_failure_notify_fn(
    size_t evhdlr_registration_id, pmix_status_t status, const pmix_proc_t *source,
    pmix_info_t info[], size_t ninfo, pmix_info_t results[], size_t nresults,
    pmix_event_notification_cbfunc_fn_t cbfunc, void *cbdata);
void ssg_pmix_proc_failure_reg_cb(
    pmix_status_t status, size_t evhdlr_ref, void *cbdata);
#endif 
85

86
/* XXX: we ultimately need per-mid ssg instances rather than 1 global */
87
ssg_instance_t *ssg_inst = NULL;
88

89 90 91
/***************************************************
 *** SSG runtime intialization/shutdown routines ***
 ***************************************************/
92

93 94
int ssg_init(
    margo_instance_id mid)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
95
{
96
    struct timespec ts;
97 98
    hg_addr_t self_addr;
    hg_size_t self_addr_str_size;
99

100 101 102 103 104 105 106 107 108
    if (ssg_inst)
        return SSG_FAILURE;

    /* initialize an SSG instance for this margo instance */
    ssg_inst = malloc(sizeof(*ssg_inst));
    if (!ssg_inst)
        return SSG_FAILURE;
    memset(ssg_inst, 0, sizeof(*ssg_inst));
    ssg_inst->mid = mid;
109
    ABT_rwlock_create(&ssg_inst->lock);
110

111
    ssg_register_rpcs();
112

113 114 115 116
    /* seed RNG */
    clock_gettime(CLOCK_MONOTONIC, &ts);
    srand(ts.tv_nsec + getpid());

117 118
    /* get my self address string and ID (which are constant per-mid) */
    if (margo_addr_self(mid, &self_addr) != HG_SUCCESS)
119
    {
120 121
        free(ssg_inst);
        return SSG_FAILURE;
122
    }
123 124 125 126 127 128 129
    if (margo_addr_to_string(mid, NULL, &self_addr_str_size, self_addr) != HG_SUCCESS)
    {
        margo_addr_free(mid, self_addr); 
        free(ssg_inst);
        return SSG_FAILURE;
    }
    if ((ssg_inst->self_addr_str = malloc(self_addr_str_size)) == NULL)
130
    {
131 132 133 134 135 136 137 138 139 140
        margo_addr_free(mid, self_addr);
        free(ssg_inst);
        return SSG_FAILURE;
    }
    if (margo_addr_to_string(mid, ssg_inst->self_addr_str, &self_addr_str_size, self_addr) != HG_SUCCESS)
    {
        free(ssg_inst->self_addr_str);
        margo_addr_free(mid, self_addr);
        free(ssg_inst);
        return SSG_FAILURE;
141 142
    }

143
    ssg_inst->self_id = ssg_gen_member_id(ssg_inst->self_addr_str);
144

145
    margo_addr_free(mid, self_addr);
146 147
    return SSG_SUCCESS;
}
Jonathan Jenkins's avatar
Jonathan Jenkins committed
148

149
int ssg_finalize()
150
{
151 152
    ssg_group_t *g, *g_tmp;
    ssg_attached_group_t *ag, *ag_tmp;
153 154 155 156

    if (!ssg_inst)
        return SSG_FAILURE;

157 158
    ABT_rwlock_wrlock(ssg_inst->lock);

159 160 161 162 163
#ifdef SSG_HAVE_PMIX
    if (ssg_inst->pmix_failure_evhdlr_ref)
        PMIx_Deregister_event_handler(ssg_inst->pmix_failure_evhdlr_ref, NULL, NULL);
#endif

164
    /* destroy all active groups */
165
    HASH_ITER(hh, ssg_inst->group_table, g, g_tmp)
166 167
    {
        HASH_DELETE(hh, ssg_inst->group_table, g);
168
        ABT_rwlock_unlock(ssg_inst->lock);
169
        ssg_group_destroy_internal(g);
170
        ABT_rwlock_wrlock(ssg_inst->lock);
171 172
    }

173 174 175 176 177 178
    /* detach from all attached groups */
    HASH_ITER(hh, ssg_inst->attached_group_table, ag, ag_tmp)
    {
        ssg_attached_group_destroy(ag);
    }

179 180 181
    ABT_rwlock_unlock(ssg_inst->lock);
    ABT_rwlock_free(&ssg_inst->lock);

182
    free(ssg_inst->self_addr_str);
183 184 185
    free(ssg_inst);
    ssg_inst = NULL;

186
    return SSG_SUCCESS;
187
}
Jonathan Jenkins's avatar
Jonathan Jenkins committed
188

189 190 191
/*************************************
 *** SSG group management routines ***
 *************************************/
Jonathan Jenkins's avatar
Jonathan Jenkins committed
192

Shane Snyder's avatar
Shane Snyder committed
193 194 195 196 197 198
ssg_group_id_t ssg_group_create(
    const char * group_name,
    const char * const group_addr_strs[],
    int group_size,
    ssg_membership_update_cb update_cb,
    void * update_cb_dat)
199
{
Shane Snyder's avatar
Shane Snyder committed
200 201
    ssg_group_t *g;
    ssg_group_id_t g_id = SSG_GROUP_ID_NULL;
202

Shane Snyder's avatar
Shane Snyder committed
203 204 205
    g = ssg_group_create_internal(group_name, group_addr_strs,
            group_size, update_cb, update_cb_dat);
    if (g)
Shane Snyder's avatar
Shane Snyder committed
206
        g_id = (ssg_group_id_t)g->descriptor;
Shane Snyder's avatar
Shane Snyder committed
207 208

    return g_id;
209
}
Shane Snyder's avatar
Shane Snyder committed
210

Shane Snyder's avatar
Shane Snyder committed
211 212 213 214 215
ssg_group_id_t ssg_group_create_config(
    const char * group_name,
    const char * file_name,
    ssg_membership_update_cb update_cb,
    void * update_cb_dat)
216
{
Shane Snyder's avatar
Shane Snyder committed
217 218 219 220 221 222 223 224
    int fd;
    struct stat st;
    char *rd_buf = NULL;
    ssize_t rd_buf_size;
    char *tok;
    void *addr_str_buf = NULL;
    int addr_str_buf_len = 0, num_addrs = 0;
    const char **addr_strs = NULL;
Shane Snyder's avatar
Shane Snyder committed
225
    int ret;
Shane Snyder's avatar
Shane Snyder committed
226
    ssg_group_id_t group_id = SSG_GROUP_ID_NULL;
227

Shane Snyder's avatar
Shane Snyder committed
228 229 230 231 232 233 234 235
    /* open config file for reading */
    fd = open(file_name, O_RDONLY);
    if (fd == -1)
    {
        fprintf(stderr, "Error: SSG unable to open config file %s for group %s\n",
            file_name, group_name);
        goto fini;
    }
236

Shane Snyder's avatar
Shane Snyder committed
237 238 239
    /* get file size and allocate a buffer to store it */
    ret = fstat(fd, &st);
    if (ret == -1)
Shane Snyder's avatar
Shane Snyder committed
240
    {
Shane Snyder's avatar
Shane Snyder committed
241 242 243
        fprintf(stderr, "Error: SSG unable to stat config file %s for group %s\n",
            file_name, group_name);
        goto fini;
Shane Snyder's avatar
Shane Snyder committed
244
    }
Shane Snyder's avatar
Shane Snyder committed
245 246
    rd_buf = malloc(st.st_size+1);
    if (rd_buf == NULL) goto fini;
Shane Snyder's avatar
Shane Snyder committed
247

Shane Snyder's avatar
Shane Snyder committed
248 249 250
    /* load it all in one fell swoop */
    rd_buf_size = read(fd, rd_buf, st.st_size);
    if (rd_buf_size != st.st_size)
Shane Snyder's avatar
Shane Snyder committed
251
    {
Shane Snyder's avatar
Shane Snyder committed
252 253 254
        fprintf(stderr, "Error: SSG unable to read config file %s for group %s\n",
            file_name, group_name);
        goto fini;
Shane Snyder's avatar
Shane Snyder committed
255
    }
Shane Snyder's avatar
Shane Snyder committed
256 257 258 259 260 261 262
    rd_buf[rd_buf_size]='\0';

    /* strtok the result - each space-delimited address is assumed to be
     * a unique mercury address
     */
    tok = strtok(rd_buf, "\r\n\t ");
    if (tok == NULL) goto fini;
Shane Snyder's avatar
Shane Snyder committed
263

Shane Snyder's avatar
Shane Snyder committed
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
    /* build up the address buffer */
    addr_str_buf = malloc(rd_buf_size);
    if (addr_str_buf == NULL) goto fini;
    do
    {
        int tok_size = strlen(tok);
        memcpy((char*)addr_str_buf + addr_str_buf_len, tok, tok_size+1);
        addr_str_buf_len += tok_size+1;
        num_addrs++;
        tok = strtok(NULL, "\r\n\t ");
    } while (tok != NULL);
    if (addr_str_buf_len != rd_buf_size)
    {
        /* adjust buffer size if our initial guess was wrong */
        void *tmp = realloc(addr_str_buf, addr_str_buf_len);
        if (tmp == NULL) goto fini;
        addr_str_buf = tmp;
    }
Shane Snyder's avatar
Shane Snyder committed
282

Shane Snyder's avatar
Shane Snyder committed
283 284 285
    /* set up address string array for group members */
    addr_strs = ssg_addr_str_buf_to_list(addr_str_buf, num_addrs);
    if (!addr_strs) goto fini;
Shane Snyder's avatar
Shane Snyder committed
286

Shane Snyder's avatar
Shane Snyder committed
287 288 289
    /* invoke the generic group create routine using our list of addrs */
    group_id = ssg_group_create(group_name, addr_strs, num_addrs,
        update_cb, update_cb_dat);
Shane Snyder's avatar
Shane Snyder committed
290

Shane Snyder's avatar
Shane Snyder committed
291 292 293 294 295 296
fini:
    /* cleanup before returning */
    if (fd != -1) close(fd);
    free(rd_buf);
    free(addr_str_buf);
    free(addr_strs);
Shane Snyder's avatar
Shane Snyder committed
297

Shane Snyder's avatar
Shane Snyder committed
298
    return group_id;
Shane Snyder's avatar
Shane Snyder committed
299 300
}

Shane Snyder's avatar
Shane Snyder committed
301 302 303 304 305 306
#ifdef SSG_HAVE_MPI
ssg_group_id_t ssg_group_create_mpi(
    const char * group_name,
    MPI_Comm comm,
    ssg_membership_update_cb update_cb,
    void * update_cb_dat)
Shane Snyder's avatar
Shane Snyder committed
307
{
Shane Snyder's avatar
Shane Snyder committed
308
    int i;
309
    int self_addr_str_size = 0;
Shane Snyder's avatar
Shane Snyder committed
310 311 312 313 314 315
    char *addr_str_buf = NULL;
    int *sizes = NULL;
    int *sizes_psum = NULL;
    int comm_size = 0, comm_rank = 0;
    const char **addr_strs = NULL;
    ssg_group_id_t group_id = SSG_GROUP_ID_NULL;
Shane Snyder's avatar
Shane Snyder committed
316

Shane Snyder's avatar
Shane Snyder committed
317
    if (!ssg_inst) goto fini;
Shane Snyder's avatar
Shane Snyder committed
318

Shane Snyder's avatar
Shane Snyder committed
319 320 321 322 323
    /* gather the buffer sizes */
    MPI_Comm_size(comm, &comm_size);
    MPI_Comm_rank(comm, &comm_rank);
    sizes = malloc(comm_size * sizeof(*sizes));
    if (sizes == NULL) goto fini;
324
    self_addr_str_size = (int)strlen(ssg_inst->self_addr_str) + 1;
325
    sizes[comm_rank] = self_addr_str_size;
Shane Snyder's avatar
Shane Snyder committed
326
    MPI_Allgather(MPI_IN_PLACE, 0, MPI_BYTE, sizes, 1, MPI_INT, comm);
327

Shane Snyder's avatar
Shane Snyder committed
328 329
    /* compute a exclusive prefix sum of the data sizes, including the
     * total at the end
330
     */
Shane Snyder's avatar
Shane Snyder committed
331 332 333 334 335
    sizes_psum = malloc((comm_size+1) * sizeof(*sizes_psum));
    if (sizes_psum == NULL) goto fini;
    sizes_psum[0] = 0;
    for (i = 1; i < comm_size+1; i++)
        sizes_psum[i] = sizes_psum[i-1] + sizes[i-1];
336

Shane Snyder's avatar
Shane Snyder committed
337 338 339
    /* allgather the addresses */
    addr_str_buf = malloc(sizes_psum[comm_size]);
    if (addr_str_buf == NULL) goto fini;
340
    MPI_Allgatherv(ssg_inst->self_addr_str, self_addr_str_size, MPI_BYTE,
Shane Snyder's avatar
Shane Snyder committed
341
            addr_str_buf, sizes, sizes_psum, MPI_BYTE, comm);
Shane Snyder's avatar
Shane Snyder committed
342

Shane Snyder's avatar
Shane Snyder committed
343 344 345
    /* set up address string array for group members */
    addr_strs = ssg_addr_str_buf_to_list(addr_str_buf, comm_size);
    if (!addr_strs) goto fini;
346

Shane Snyder's avatar
Shane Snyder committed
347 348 349
    /* invoke the generic group create routine using our list of addrs */
    group_id = ssg_group_create(group_name, addr_strs, comm_size,
        update_cb, update_cb_dat);
Shane Snyder's avatar
Shane Snyder committed
350

Shane Snyder's avatar
Shane Snyder committed
351 352 353 354 355 356
fini:
    /* cleanup before returning */
    free(sizes);
    free(sizes_psum);
    free(addr_str_buf);
    free(addr_strs);
Shane Snyder's avatar
Shane Snyder committed
357

Shane Snyder's avatar
Shane Snyder committed
358
    return group_id;
Shane Snyder's avatar
Shane Snyder committed
359
}
Shane Snyder's avatar
Shane Snyder committed
360
#endif
Shane Snyder's avatar
Shane Snyder committed
361

Shane Snyder's avatar
Shane Snyder committed
362 363 364
#ifdef SSG_HAVE_PMIX
ssg_group_id_t ssg_group_create_pmix(
    const char * group_name,
Shane Snyder's avatar
Shane Snyder committed
365
    const pmix_proc_t proc,
Shane Snyder's avatar
Shane Snyder committed
366 367 368
    ssg_membership_update_cb update_cb,
    void * update_cb_dat)
{
Shane Snyder's avatar
Shane Snyder committed
369 370
    pmix_proc_t tmp_proc;
    pmix_value_t value;
371
    pmix_value_t *val_p;
Shane Snyder's avatar
Shane Snyder committed
372 373
    pmix_value_t *addr_vals = NULL;
    unsigned int nprocs;
374
    char key[512];
Shane Snyder's avatar
Shane Snyder committed
375 376 377 378 379
    pmix_info_t *info;
    bool flag;
    const char **addr_strs = NULL;
    unsigned int n;
    pmix_status_t ret;
Shane Snyder's avatar
Shane Snyder committed
380 381
    ssg_group_id_t group_id = SSG_GROUP_ID_NULL;

Shane Snyder's avatar
Shane Snyder committed
382 383
    if (!ssg_inst || !PMIx_Initialized()) goto fini;

384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404
    /* XXX config switch for this functionality */
    /* if not already done, register for PMIx process failure notifications */
    if (!ssg_inst->pmix_failure_evhdlr_ref)
    {
        /* use PMIx event registrations to inform us of terminated/aborted procs */
        pmix_status_t err_codes[2] = {PMIX_PROC_TERMINATED, PMIX_ERR_PROC_ABORTED};
        PMIx_Register_event_handler(err_codes, 2, NULL, 0,
            ssg_pmix_proc_failure_notify_fn, ssg_pmix_proc_failure_reg_cb,
            &ssg_inst->pmix_failure_evhdlr_ref);

        /* exchange information needed to map PMIx ranks to SSG member IDs */
        snprintf(key, 512, "ssg-%s-%d-id", proc.nspace, proc.rank);
        PMIX_VALUE_LOAD(&value, &ssg_inst->self_id, PMIX_UINT64);
        ret = PMIx_Put(PMIX_GLOBAL, key, &value);
        if (ret != PMIX_SUCCESS)
        {
            fprintf(stderr, "Warning: skipping PMIx event notification registration -- "\
                "Unable to put PMIx rank mapping\n");
            PMIx_Deregister_event_handler(ssg_inst->pmix_failure_evhdlr_ref, NULL, NULL);
        }
    }
Shane Snyder's avatar
Shane Snyder committed
405 406 407 408 409 410 411 412 413 414

    /* XXX note we are assuming every process in the job wants to join this group... */
    /* get the total nprocs in the job */
    PMIX_PROC_LOAD(&tmp_proc, proc.nspace, PMIX_RANK_WILDCARD);
    ret = PMIx_Get(&tmp_proc, PMIX_JOB_SIZE, NULL, 0, &val_p);
    if (ret != PMIX_SUCCESS) goto fini;
    nprocs = (int)val_p->data.uint32;
    PMIX_VALUE_RELEASE(val_p);

    /* put my address string using a well-known key */
415 416
    snprintf(key, 512, "ssg-%s-%s-%d-hg-addr", group_name, proc.nspace, proc.rank);
    PMIX_VALUE_LOAD(&value, ssg_inst->self_addr_str, PMIX_STRING);
417
    ret = PMIx_Put(PMIX_GLOBAL, key, &value);
Shane Snyder's avatar
Shane Snyder committed
418 419 420 421 422 423 424 425 426 427 428
    if (ret != PMIX_SUCCESS) goto fini;

    /* commit the put data to the local pmix server */
    ret = PMIx_Commit();
    if (ret != PMIX_SUCCESS) goto fini;

    /* barrier, additionally requesting to collect relevant process data */
    PMIX_INFO_CREATE(info, 1);
    flag = true;
    PMIX_INFO_LOAD(info, PMIX_COLLECT_DATA, &flag, PMIX_BOOL);
    ret = PMIx_Fence(&proc, 1, info, 1);
429
    if (ret != PMIX_SUCCESS) goto fini;
Shane Snyder's avatar
Shane Snyder committed
430 431 432 433 434 435 436 437 438 439
    PMIX_INFO_FREE(info, 1);

    addr_strs = malloc(nprocs * sizeof(*addr_strs));
    if (addr_strs == NULL) goto fini;

    /* finalize exchange by getting each member's address */
    PMIX_VALUE_CREATE(addr_vals, nprocs);
    for (n = 0; n < nprocs; n++)
    {
        /* skip ourselves */
440 441
        if(n == proc.rank)
        {
442
            addr_strs[n] = ssg_inst->self_addr_str;
443 444
            continue;
        }
Shane Snyder's avatar
Shane Snyder committed
445

446 447
        if (snprintf(key, 128, "ssg-%s-%s-%d-hg-addr", group_name,
            proc.nspace, n) >= 128) goto fini;
Shane Snyder's avatar
Shane Snyder committed
448 449 450 451 452 453 454 455

        tmp_proc.rank = n;
        val_p = &addr_vals[n];
        ret = PMIx_Get(&tmp_proc, key, NULL, 0, &val_p);
        if (ret != PMIX_SUCCESS) goto fini;

        addr_strs[n] = val_p->data.string;
    }
Shane Snyder's avatar
Shane Snyder committed
456

Shane Snyder's avatar
Shane Snyder committed
457 458 459
    /* invoke the generic group create routine using our list of addrs */
    group_id = ssg_group_create(group_name, addr_strs, nprocs,
        update_cb, update_cb_dat);
Shane Snyder's avatar
Shane Snyder committed
460 461

fini:
Shane Snyder's avatar
Shane Snyder committed
462 463 464
    /* cleanup before returning */
    free(addr_strs);
    PMIX_VALUE_FREE(addr_vals, nprocs);
Shane Snyder's avatar
Shane Snyder committed
465 466 467 468 469

    return group_id;
}
#endif 

Shane Snyder's avatar
Shane Snyder committed
470 471
int ssg_group_destroy(
    ssg_group_id_t group_id)
472
{
Shane Snyder's avatar
Shane Snyder committed
473 474
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
    ssg_group_t *g;
475

Shane Snyder's avatar
Shane Snyder committed
476
    if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) return SSG_FAILURE;
477

Shane Snyder's avatar
Shane Snyder committed
478
    if (group_descriptor->owner_status != SSG_OWNER_IS_MEMBER)
479
    {
Shane Snyder's avatar
Shane Snyder committed
480 481
        fprintf(stderr, "Error: SSG unable to destroy a group it is not a member of\n");
        return SSG_FAILURE;
482
    }
483

484 485
    ABT_rwlock_wrlock(ssg_inst->lock);

Shane Snyder's avatar
Shane Snyder committed
486 487 488 489
    /* find the group structure and destroy it */
    HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash,
        sizeof(uint64_t), g);
    if (!g)
490
    {
491
        ABT_rwlock_unlock(ssg_inst->lock);
Shane Snyder's avatar
Shane Snyder committed
492 493
        fprintf(stderr, "Error: SSG unable to find expected group reference\n");
        return SSG_FAILURE;
494
    }
495 496
    HASH_DELETE(hh, ssg_inst->group_table, g);
    ABT_rwlock_unlock(ssg_inst->lock);
Shane Snyder's avatar
Shane Snyder committed
497
    ssg_group_destroy_internal(g);
498

Shane Snyder's avatar
Shane Snyder committed
499
    return SSG_SUCCESS;
500 501
}

Shane Snyder's avatar
Shane Snyder committed
502 503
ssg_group_id_t ssg_group_join(
    ssg_group_id_t in_group_id,
504 505
    ssg_membership_update_cb update_cb,
    void * update_cb_dat)
506
{
Shane Snyder's avatar
Shane Snyder committed
507
    ssg_group_descriptor_t *in_group_descriptor = (ssg_group_descriptor_t *)in_group_id;
508
    hg_addr_t group_target_addr = HG_ADDR_NULL;
Shane Snyder's avatar
Shane Snyder committed
509 510 511 512
    char *group_name = NULL;
    int group_size;
    void *view_buf = NULL;
    const char **addr_strs = NULL;
513
    hg_return_t hret;
514
    int sret;
Shane Snyder's avatar
Shane Snyder committed
515 516
    ssg_group_t *g = NULL;
    ssg_group_id_t g_id = SSG_GROUP_ID_NULL;
517

Shane Snyder's avatar
Shane Snyder committed
518
    if (!ssg_inst || in_group_id == SSG_GROUP_ID_NULL) goto fini;
519

Shane Snyder's avatar
Shane Snyder committed
520
    if (in_group_descriptor->owner_status == SSG_OWNER_IS_MEMBER)
521
    {
Shane Snyder's avatar
Shane Snyder committed
522
        fprintf(stderr, "Error: SSG unable to join a group it is already a member of\n");
523 524
        goto fini;
    }
Shane Snyder's avatar
Shane Snyder committed
525
    else if (in_group_descriptor->owner_status == SSG_OWNER_IS_ATTACHER)
526
    {
Shane Snyder's avatar
Shane Snyder committed
527
        fprintf(stderr, "Error: SSG unable to join a group it is attached to\n");
528 529
        goto fini;
    }
530

531 532 533 534 535 536 537 538 539
    /* lookup the address of the target group member in the GID */
    hret = margo_addr_lookup(ssg_inst->mid, in_group_descriptor->addr_str,
        &group_target_addr);
    if (hret != HG_SUCCESS) goto fini;

    sret = ssg_group_join_send(in_group_descriptor, group_target_addr,
        &group_name, &group_size, &view_buf);
    if (sret != SSG_SUCCESS || !group_name || !view_buf) goto fini;

Shane Snyder's avatar
Shane Snyder committed
540 541
    /* set up address string array for all group members */
    addr_strs = ssg_addr_str_buf_to_list(view_buf, group_size);
542 543
    if (!addr_strs) goto fini;

Shane Snyder's avatar
Shane Snyder committed
544 545 546
    /* append self address string to list of group member address strings */
    addr_strs = realloc(addr_strs, (group_size+1)*sizeof(char *));
    if(!addr_strs) goto fini;
547
    addr_strs[group_size++] = ssg_inst->self_addr_str;
Shane Snyder's avatar
Shane Snyder committed
548 549 550 551 552

    g = ssg_group_create_internal(group_name, addr_strs, group_size,
            update_cb, update_cb_dat);
    if (g)
    {
Shane Snyder's avatar
Shane Snyder committed
553
        g_id = (ssg_group_id_t)g->descriptor;
554 555 556

        /* don't free on success */
        group_name = NULL;
Shane Snyder's avatar
Shane Snyder committed
557
    }
Shane Snyder's avatar
Shane Snyder committed
558 559

fini:
560 561
    if (group_target_addr != HG_ADDR_NULL)
        margo_addr_free(ssg_inst->mid, group_target_addr);
Shane Snyder's avatar
Shane Snyder committed
562 563 564
    free(addr_strs);
    free(view_buf);
    free(group_name);
565

Shane Snyder's avatar
Shane Snyder committed
566
    return g_id;
Shane Snyder's avatar
Shane Snyder committed
567 568
}

Shane Snyder's avatar
Shane Snyder committed
569
int ssg_group_leave(
570
    ssg_group_id_t group_id)
Shane Snyder's avatar
Shane Snyder committed
571
{
Shane Snyder's avatar
Shane Snyder committed
572
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
573 574 575 576
    ssg_group_t *g = NULL;
    hg_addr_t group_target_addr = HG_ADDR_NULL;
    hg_return_t hret;
    int sret = SSG_FAILURE;
577

578
    if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) goto fini;
579

580 581
    if (group_descriptor->owner_status != SSG_OWNER_IS_MEMBER)
    {
Shane Snyder's avatar
Shane Snyder committed
582
        fprintf(stderr, "Error: SSG unable to leave group it is not a member of\n");
583
        goto fini;
584 585
    }

586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603
    ABT_rwlock_rdlock(ssg_inst->lock);
    HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash,
        sizeof(uint64_t), g);
    if (!g)
    {
        ABT_rwlock_unlock(ssg_inst->lock);
        goto fini;
    }

    /* send the leave req to the first member in the view */
    hret = margo_addr_dup(ssg_inst->mid, g->view.member_map->addr, &group_target_addr);
    if (hret != HG_SUCCESS)
    {
        ABT_rwlock_unlock(ssg_inst->lock);
        goto fini;
    }
    ABT_rwlock_unlock(ssg_inst->lock);

604
    sret = ssg_group_leave_send(group_descriptor, ssg_inst->self_id, group_target_addr);
605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620
    if (sret != SSG_SUCCESS) goto fini;

    /* at least one group member knows of the leave request -- safe to
     * shutdown the group locally
     */

    /* re-lookup the group as we don't hold the lock while sending the leave req */
    ABT_rwlock_wrlock(ssg_inst->lock);
    HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash,
        sizeof(uint64_t), g);
    if (g)
    {
        HASH_DELETE(hh, ssg_inst->group_table, g);
        ABT_rwlock_unlock(ssg_inst->lock);
        ssg_group_destroy_internal(g);
    }
Shane Snyder's avatar
Shane Snyder committed
621 622
    else
        ABT_rwlock_unlock(ssg_inst->lock);
623 624 625 626 627 628 629 630

    sret = SSG_SUCCESS;

fini:
    if (group_target_addr != HG_ADDR_NULL)
        margo_addr_free(ssg_inst->mid, group_target_addr);

    return sret;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
631 632
}

633
#if 0
634 635 636
int ssg_group_attach(
    ssg_group_id_t group_id)
{
Shane Snyder's avatar
Shane Snyder committed
637
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
638 639 640 641 642 643
    ssg_attached_group_t *ag = NULL;
    char *group_name = NULL;
    int group_size;
    void *view_buf = NULL;
    const char **addr_strs = NULL;
    int sret = SSG_FAILURE;
644

645
    if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) goto fini;
646

647 648 649 650 651 652 653 654 655 656 657
    if (group_descriptor->owner_status == SSG_OWNER_IS_MEMBER)
    {
        fprintf(stderr, "Error: SSG unable to attach a group it is a member of\n");
        goto fini;
    }
    else if (group_descriptor->owner_status == SSG_OWNER_IS_ATTACHER)
    {
        fprintf(stderr, "Error: SSG unable to attach a group it is" \
            " already attached to\n");
        goto fini;
    }
658

659 660 661 662 663 664 665
    /* send the attach request to a group member to initiate a bulk transfer
     * of the group's membership view
     */
    sret = ssg_group_attach_send(group_descriptor, &group_name,
        &group_size, &view_buf);
    if (sret != SSG_SUCCESS || !group_name || !view_buf) goto fini;

Shane Snyder's avatar
Shane Snyder committed
666
    /* set up address string array for all group members */
667 668 669 670 671 672 673
    addr_strs = ssg_addr_str_buf_to_list(view_buf, group_size);
    if (!addr_strs) goto fini;

    /* allocate an SSG attached group data structure and initialize some of it */
    ag = malloc(sizeof(*ag));
    if (!ag) goto fini;
    memset(ag, 0, sizeof(*ag));
Shane Snyder's avatar
Shane Snyder committed
674
    ag->name = strdup(group_name);
675 676 677 678 679
    ag->descriptor = ssg_group_descriptor_dup(group_descriptor);
    if (!ag->descriptor) goto fini;
    ag->descriptor->owner_status = SSG_OWNER_IS_ATTACHER;

    /* create the view for the group */
680
    sret = ssg_group_view_create(addr_strs, group_size, NULL, ag->lock, &ag->view);
681 682 683 684 685 686 687 688 689
    if (sret != SSG_SUCCESS) goto fini;

    /* add this group reference to our group table */
    HASH_ADD(hh, ssg_inst->attached_group_table, descriptor->name_hash,
        sizeof(uint64_t), ag);

    sret = SSG_SUCCESS;

    /* don't free on success */
Shane Snyder's avatar
Shane Snyder committed
690
    group_name = NULL;
691 692
    ag = NULL;
fini:
Shane Snyder's avatar
Shane Snyder committed
693
    if (ag) ssg_attached_group_destroy(ag);
694
    free(addr_strs);
Shane Snyder's avatar
Shane Snyder committed
695 696
    free(view_buf);
    free(group_name);
697 698

    return sret;
699 700 701 702 703
}

int ssg_group_detach(
    ssg_group_id_t group_id)
{
704 705 706
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
    ssg_attached_group_t *ag;

Shane Snyder's avatar
Shane Snyder committed
707 708
    if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) return SSG_FAILURE;

709 710 711 712 713 714 715 716 717 718 719 720
    if (group_descriptor->owner_status != SSG_OWNER_IS_ATTACHER)
    {
        fprintf(stderr, "Error: SSG unable to detach from group that" \
            " was never attached\n");
        return SSG_FAILURE;
    }

    /* find the attached group structure and destroy it */
    HASH_FIND(hh, ssg_inst->attached_group_table, &group_descriptor->name_hash,
        sizeof(uint64_t), ag);
    if (!ag)
    {
Shane Snyder's avatar
Shane Snyder committed
721
        fprintf(stderr, "Error: SSG unable to find expected group attached\n");
722 723 724 725 726
        return SSG_FAILURE;
    }
    HASH_DELETE(hh, ssg_inst->attached_group_table, ag);
    ssg_attached_group_destroy(ag);

727 728
    return SSG_SUCCESS;
}
729
#endif
730

731 732 733
/*********************************************************
 *** SSG routines for obtaining self/group information ***
 *********************************************************/
Shane Snyder's avatar
Shane Snyder committed
734

735 736
ssg_member_id_t ssg_get_self_id(
    margo_instance_id mid)
Shane Snyder's avatar
Shane Snyder committed
737
{
738
    /* XXX eventually mid needed to distinguish multiple ssg contexts */
739

740
    if (!ssg_inst) return SSG_MEMBER_ID_INVALID;
741

742
    return ssg_inst->self_id;
Shane Snyder's avatar
Shane Snyder committed
743 744
}

745 746
int ssg_get_group_size(
    ssg_group_id_t group_id)
Shane Snyder's avatar
Shane Snyder committed
747
{
Shane Snyder's avatar
Shane Snyder committed
748
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
Shane Snyder's avatar
Shane Snyder committed
749
    int group_size = 0;
750

Shane Snyder's avatar
Shane Snyder committed
751
    if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) return 0;
752

753 754 755 756
    if (group_descriptor->owner_status == SSG_OWNER_IS_MEMBER)
    {
        ssg_group_t *g;

757
        ABT_rwlock_rdlock(ssg_inst->lock);
758 759 760
        HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash,
            sizeof(uint64_t), g);
        if (g)
Shane Snyder's avatar
Shane Snyder committed
761 762 763 764 765
        {
            ABT_rwlock_rdlock(g->lock);
            group_size = g->view.size + 1; /* add ourself to view size */
            ABT_rwlock_unlock(g->lock);
        }
766
        ABT_rwlock_unlock(ssg_inst->lock);
767
    }
768
#if 0
769 770 771 772 773 774 775
    else if (group_descriptor->owner_status == SSG_OWNER_IS_ATTACHER)
    {
        ssg_attached_group_t *ag;

        HASH_FIND(hh, ssg_inst->attached_group_table, &group_descriptor->name_hash,
            sizeof(uint64_t), ag);
        if (ag)
Shane Snyder's avatar
Shane Snyder committed
776 777 778 779 780
        {
            ABT_rwlock_rdlock(ag->lock);
            group_size = ag->view.size;
            ABT_rwlock_unlock(ag->lock);
        }
781
    }
782
#endif
783 784 785 786
    else
    {
        fprintf(stderr, "Error: SSG can only obtain size of groups that the caller" \
            " is a member of or an attacher of\n");
787
        return 0;
788
    }
789

Shane Snyder's avatar
Shane Snyder committed
790
    return group_size;
Shane Snyder's avatar
Shane Snyder committed
791 792
}

793
hg_addr_t ssg_get_group_addr(
794 795
    ssg_group_id_t group_id,
    ssg_member_id_t member_id)
Shane Snyder's avatar
Shane Snyder committed
796
{
Shane Snyder's avatar
Shane Snyder committed
797
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
798
    ssg_member_state_t *member_state;
Shane Snyder's avatar
Shane Snyder committed
799
    hg_addr_t member_addr = HG_ADDR_NULL;
800

801 802
    if (!ssg_inst || group_id == SSG_GROUP_ID_NULL ||
            member_id == SSG_MEMBER_ID_INVALID)
803 804
        return HG_ADDR_NULL;

805 806 807 808
    if (group_descriptor->owner_status == SSG_OWNER_IS_MEMBER)
    {
        ssg_group_t *g;

809
        ABT_rwlock_rdlock(ssg_inst->lock);
810 811 812
        HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash,
            sizeof(uint64_t), g);
        if (g)
Shane Snyder's avatar
Shane Snyder committed
813 814 815 816 817 818 819 820
        {
            ABT_rwlock_rdlock(g->lock);
            HASH_FIND(hh, g->view.member_map, &member_id, sizeof(ssg_member_id_t),
                member_state);
            if (member_state) 
                member_addr = member_state->addr;
            ABT_rwlock_unlock(g->lock);
        }
821
        ABT_rwlock_unlock(ssg_inst->lock);
822
    }
823
#if 0
824 825 826 827 828 829 830
    else if (group_descriptor->owner_status == SSG_OWNER_IS_ATTACHER)
    {
        ssg_attached_group_t *ag;

        HASH_FIND(hh, ssg_inst->attached_group_table, &group_descriptor->name_hash,
            sizeof(uint64_t), ag);
        if (ag)
Shane Snyder's avatar
Shane Snyder committed
831 832 833 834 835 836 837 838
        {
            ABT_rwlock_rdlock(ag->lock);
            HASH_FIND(hh, ag->view.member_map, &member_id, sizeof(ssg_member_id_t),
                member_state);
            if (member_state) 
                member_addr = member_state->addr;
            ABT_rwlock_unlock(ag->lock);
        }
839
    }
840
#endif
841 842 843 844 845 846 847
    else
    {
        fprintf(stderr, "Error: SSG can only obtain member addresses of groups" \
            " that the caller is a member of or an attacher of\n");
        return HG_ADDR_NULL;
    }

Shane Snyder's avatar
Shane Snyder committed
848
    return member_addr;
849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865
}

ssg_group_id_t ssg_group_id_dup(
    ssg_group_id_t group_id)
{
    ssg_group_descriptor_t *dup;

    dup = ssg_group_descriptor_dup((ssg_group_descriptor_t *)group_id);
    return (ssg_group_id_t)dup;
}

void ssg_group_id_free(
    ssg_group_id_t group_id)
{
    ssg_group_descriptor_t *descriptor = (ssg_group_descriptor_t *)group_id;

    ssg_group_descriptor_free(descriptor);
Shane Snyder's avatar
Shane Snyder committed
866
    descriptor = SSG_GROUP_ID_NULL;
867 868 869
    return;
}

870 871 872 873 874 875 876 877
char *ssg_group_id_get_addr_str(
    ssg_group_id_t group_id)
{
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;

    return strdup(group_descriptor->addr_str);
}

878 879 880 881 882 883 884
void ssg_group_id_serialize(
    ssg_group_id_t group_id,
    char ** buf_p,
    size_t * buf_size_p)
{
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
    size_t alloc_size;
885
    char *gid_buf, *p; 
886 887 888 889 890 891 892 893 894 895 896 897 898

    *buf_p = NULL;
    *buf_size_p = 0;

    /* determine needed buffer size */
    alloc_size = (sizeof(group_descriptor->magic_nr) + sizeof(group_descriptor->name_hash) +
        strlen(group_descriptor->addr_str) + 1);

    gid_buf = malloc(alloc_size);
    if (!gid_buf)
        return;

    /* serialize */
899 900 901 902 903 904
    p = gid_buf;
    *(uint64_t *)p = group_descriptor->magic_nr;
    p += sizeof(uint64_t);
    *(uint64_t *)p = group_descriptor->name_hash;
    p += sizeof(uint64_t);
    strcpy(p, group_descriptor->addr_str);

    /* the rest of the descriptor is stateful and not appropriate for serializing... */

    *buf_p = gid_buf;
    *buf_size_p = alloc_size;

    return;
}

void ssg_group_id_deserialize(
    const char * buf,
    size_t buf_size,
    ssg_group_id_t * group_id_p)
{
    size_t min_buf_size;
    uint64_t magic_nr;
    uint64_t name_hash;
    const char *addr_str;
    ssg_group_descriptor_t *group_descriptor;

    *group_id_p = SSG_GROUP_ID_NULL;

    /* check to ensure the buffer contains enough data to make a group ID */
    min_buf_size = (sizeof(group_descriptor->magic_nr) +
        sizeof(group_descriptor->name_hash) + 1);
    if (buf_size < min_buf_size)
    {
        fprintf(stderr, "Error: Serialized buffer does not contain a valid SSG group ID\n");
        return;
    }

    /* deserialize */
    magic_nr = *(uint64_t *)buf;
    if (magic_nr != SSG_MAGIC_NR)
    {
        fprintf(stderr, "Error: Magic number mismatch when deserializing SSG group ID\n");
        return;
    }
    buf += sizeof(uint64_t);
    name_hash = *(uint64_t *)buf;
    buf += sizeof(uint64_t);
    addr_str = buf;

    group_descriptor = ssg_group_descriptor_create(name_hash, addr_str,
        SSG_OWNER_IS_UNASSOCIATED);
    if (!group_descriptor)
        return;

    *group_id_p = (ssg_group_id_t)group_descriptor;

    return;
}

int ssg_group_id_store(
    const char * file_name,
    ssg_group_id_t group_id)
{
    int fd;
    char *buf;
    size_t buf_size;
    ssize_t bytes_written;

    fd = open(file_name, O_WRONLY | O_CREAT | O_TRUNC, 0644);
    if (fd < 0)
    {
        fprintf(stderr, "Error: Unable to open file %s for storing SSG group ID\n",
            file_name);
        return SSG_FAILURE;
    }

    ssg_group_id_serialize(group_id, &buf, &buf_size);
    if (buf == NULL)
    {
        fprintf(stderr, "Error: Unable to serialize SSG group ID.\n");
        close(fd);
        return SSG_FAILURE;
    }

    bytes_written = write(fd, buf, buf_size);
    if (bytes_written != (ssize_t)buf_size)
    {
        fprintf(stderr, "Error: Unable to write SSG group ID to file %s\n", file_name);
        close(fd);
        free(buf);
        return SSG_FAILURE;
    }

    close(fd);
    free(buf);
    return SSG_SUCCESS;
}

int ssg_group_id_load(
    const char * file_name,
    ssg_group_id_t * group_id_p)
{
    int fd;
    struct stat fstats;
    char *buf;
    ssize_t bytes_read;
    int ret;

    *group_id_p = SSG_GROUP_ID_NULL;

    fd = open(file_name, O_RDONLY);
    if (fd < 0)
    {
        fprintf(stderr, "Error: Unable to open file %s for loading SSG group ID\n",
            file_name);
        return SSG_FAILURE;
    }

    ret = fstat(fd, &fstats);
    if (ret != 0)
    {
        fprintf(stderr, "Error: Unable to stat file %s\n", file_name);
        close(fd);
        return SSG_FAILURE;
    }
    if (fstats.st_size == 0)
    {
        fprintf(stderr, "Error: SSG group ID file %s is empty\n", file_name);
        close(fd);
        return SSG_FAILURE;
    }

    buf = malloc(fstats.st_size);
    if (buf == NULL)
    {
        close(fd);
        return SSG_FAILURE;
    }

    bytes_read = read(fd, buf, fstats.st_size);
    if (bytes_read != (ssize_t)fstats.st_size)
    {
        fprintf(stderr, "Error: Unable to read SSG group ID from file %s\n", file_name);
        close(fd);
        free(buf);
        return SSG_FAILURE;
    }

    ssg_group_id_deserialize(buf, (size_t)bytes_read, group_id_p);
    if (*group_id_p == SSG_GROUP_ID_NULL)
    {
        fprintf(stderr, "Error: Unable to deserialize SSG group ID\n");
        close(fd);
        free(buf);
        return SSG_FAILURE;
    }

    close(fd);
    free(buf);
    return SSG_SUCCESS;
}

1060 1061 1062 1063 1064
void ssg_group_dump(
    ssg_group_id_t group_id)
{
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
    ssg_group_view_t *group_view = NULL;
1065 1066
    ABT_rwlock group_view_lock;
    int group_size;
1067 1068 1069 1070 1071 1072 1073 1074
    char *group_name = NULL;
    char group_role[32];
    char group_self_id[32];

    if (group_descriptor->owner_status == SSG_OWNER_IS_MEMBER)
    {
        ssg_group_t *g;

1075
        ABT_rwlock_rdlock(ssg_inst->lock);
1076 1077 1078 1079 1080
        HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash,
            sizeof(uint64_t), g);
        if (g)
        {
            group_view = &g->view;
1081 1082
            group_view_lock = g->lock;
            group_size = g->view.size + 1;
1083 1084
            group_name = g->name;
            strcpy(group_role, "member");
1085
            sprintf(group_self_id, "%lu", ssg_inst->self_id);
1086
        }
1087
        ABT_rwlock_unlock(ssg_inst->lock);
1088
    }
1089
#if 0
1090 1091 1092 1093 1094 1095 1096 1097 1098
    else if (group_descriptor->owner_status == SSG_OWNER_IS_ATTACHER)
    {
        ssg_attached_group_t *ag;

        HASH_FIND(hh, ssg_inst->attached_group_table, &group_descriptor->name_hash,
            sizeof(uint64_t), ag);
        if (ag)
        {
            group_view = &ag->view;
1099
            group_size = ag->view.size;
1100 1101 1102 1103
            group_name = ag->name;
            strcpy(group_role, "attacher");
        }
    }
1104
#endif
1105 1106 1107 1108 1109 1110 1111 1112 1113
    else
    {
        fprintf(stderr, "Error: SSG can only dump membership information for" \
            " groups that the caller is a member of or an attacher of\n");
        return;
    }

    if (group_view)
    {
1114
        ssg_member_state_t *member_state, *tmp_ms;
Shane Snyder's avatar
Shane Snyder committed
1115 1116
        char hostname[1024];
        gethostname(hostname, 1024);
1117 1118

        printf("SSG membership information for group '%s':\n", group_name);
Shane Snyder's avatar
Shane Snyder committed
1119 1120
        printf("\trole: %s\n", group_role);
        printf("\thost: %s\n", hostname);
1121 1122
        if (strcmp(group_role, "member") == 0)
            printf("\tself_id: %s\n", group_self_id);
1123
        printf("\tsize: %d\n", group_size);
1124
        printf("\tview:\n");
1125
        ABT_rwlock_rdlock(group_view_lock);
1126
        HASH_ITER(hh, group_view->member_map, member_state, tmp_ms)
1127
        {
Shane Snyder's avatar
Shane Snyder committed
1128
            printf("\t\tid: %20lu\taddr: %s\n", member_state->id,
1129
                member_state->addr_str);
1130
        }
1131
        ABT_rwlock_unlock(group_view_lock);
1132 1133 1134 1135
    }
    else
        fprintf(stderr, "Error: SSG unable to find group view associated" \
            " with the given group ID\n");
1136

1137
    return;
Shane Snyder's avatar
Shane Snyder committed
1138 1139
}

1140 1141 1142
/************************************
 *** SSG internal helper routines ***
 ************************************/
1143

Shane Snyder's avatar
Shane Snyder committed
1144 1145 1146
static ssg_group_t * ssg_group_create_internal(
    const char * group_name, const char * const group_addr_strs[],
    int group_size, ssg_membership_update_cb update_cb, void *update_cb_dat)
1147
{
Shane Snyder's avatar
Shane Snyder committed
1148 1149 1150
    uint64_t name_hash;
    int sret;
    int success = 0;
1151
    ssg_group_t *g = NULL, *check_g;
1152

Shane Snyder's avatar
Shane Snyder committed
1153
    if (!ssg_inst) return NULL;
Shane Snyder's avatar
Shane Snyder committed
1154

Shane Snyder's avatar
Shane Snyder committed
1155 1156 1157 1158 1159 1160 1161 1162
    name_hash = ssg_hash64_str(group_name);

    /* allocate an SSG group data structure and initialize some of it */
    g = malloc(sizeof(*g));
    if (!g) goto fini;
    memset(g, 0, sizeof(*g));
    g->name = strdup(group_name);
    if (!g->name) goto fini;
1163
    g->ssg_inst = ssg_inst;
Shane Snyder's avatar
Shane Snyder committed
1164 1165 1166 1167 1168
    g->update_cb = update_cb;
    g->update_cb_dat = update_cb_dat;
    ABT_rwlock_create(&g->lock);

    /* generate unique descriptor for this group  */
1169
    g->descriptor = ssg_group_descriptor_create(name_hash, ssg_inst->self_addr_str,
Shane Snyder's avatar
Shane Snyder committed
1170 1171 1172 1173
        SSG_OWNER_IS_MEMBER);
    if (g->descriptor == NULL) goto fini;

    /* initialize the group view */
1174 1175
    sret = ssg_group_view_create(group_addr_strs, group_size, ssg_inst->self_addr_str,
        g->lock, &g->view);
Shane Snyder's avatar
Shane Snyder committed
1176
    if (sret != SSG_SUCCESS) goto fini;
1177

1178 1179 1180 1181 1182 1183 1184
#ifdef DEBUG
    /* set debug output pointer */
    char *dbg_log_dir = getenv("SSG_DEBUG_LOGDIR");
    if (dbg_log_dir)
    {
        char dbg_log_path[PATH_MAX];
        snprintf(dbg_log_path, PATH_MAX, "%s/ssg-%s-%lu.log",
1185
            dbg_log_dir, g->name, g->ssg_inst->self_id);
1186 1187 1188 1189 1190 1191 1192 1193 1194
        g->dbg_log = fopen(dbg_log_path, "a");
        if (!g->dbg_log) goto fini;
    }
    else
    {
        g->dbg_log = stdout;
    }
#endif

1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205
    /* make sure we aren't re-creating an existing group */
    ABT_rwlock_wrlock(ssg_inst->lock);
    HASH_FIND(hh, ssg_inst->group_table, &name_hash, sizeof(uint64_t), check_g);
    if (check_g) goto fini;

    /* add this group reference to the group table */
    HASH_ADD(hh, ssg_inst->group_table, descriptor->name_hash,
        sizeof(uint64_t), g);
    ABT_rwlock_unlock(ssg_inst->lock);

    /* initialize swim failure detector */
Shane Snyder's avatar
Shane Snyder committed
1206 1207
    sret = swim_init(g, ssg_inst->mid, 1);
    if (sret != SSG_SUCCESS)
1208 1209 1210 1211 1212 1213 1214
    {
        ABT_rwlock_wrlock(ssg_inst->lock);
        HASH_DELETE(hh, ssg_inst->group_table, g);
        ABT_rwlock_unlock(ssg_inst->lock);
        goto fini;
    }

1215 1216
    SSG_DEBUG(g, "group create successful (size=%d, self=%s)\n",
        group_size, ssg_inst->self_addr_str);
Shane Snyder's avatar
Shane Snyder committed
1217
    success = 1;
1218

Shane Snyder's avatar
Shane Snyder committed
1219 1220
fini:
    if (!success && g)
1221
    {
1222 1223 1224 1225 1226
#ifdef DEBUG
        /* if using logfile debug output, close the stream */
        if (getenv("SSG_DEBUG_LOGDIR"))
            fclose(g->dbg_log);
#endif
Shane Snyder's avatar
Shane Snyder committed
1227 1228
        if (g->descriptor) ssg_group_descriptor_free(g->descriptor);
        ssg_group_view_destroy(&g->view);
1229
        ABT_rwlock_free(&g->lock);
Shane Snyder's avatar
Shane Snyder committed
1230 1231 1232
        free(g->name);
        free(g);
        g = NULL;
1233
    }
Shane Snyder's avatar
Shane Snyder committed
1234 1235

    return g;
1236
}
1237

1238
static int ssg_group_view_create(
1239
    const char * const group_addr_strs[], int group_size,
Shane Snyder's avatar
Shane Snyder committed
1240
    const char * self_addr_str, ABT_rwlock view_lock,
1241
    ssg_group_view_t * view)
1242 1243
{
    int i, j, r;
1244 1245
    ABT_thread *lookup_ults = NULL;
    struct ssg_group_lookup_ult_args *lookup_ult_args = NULL;
1246 1247
    const char *self_addr_substr = NULL;
    const char *addr_substr = NULL;
1248
    int self_found = 0;
1249
    int aret;
1250
    int sret = SSG_FAILURE;
1251 1252 1253

    /* allocate lookup ULTs */
    lookup_ults = malloc(group_size * sizeof(*lookup_ults));
1254 1255
    if (lookup_ults == NULL) goto fini;
    for (i = 0; i < group_size; i++) lookup_ults[i] = ABT_THREAD_NULL;
1256
    lookup_ult_args = malloc(group_size * sizeof(*lookup_ult_args));
1257
    if (lookup_ult_args == NULL) goto fini;
Shane Snyder's avatar
Shane Snyder committed
1258

Shane Snyder's avatar
Shane Snyder committed
1259
    if(self_addr_str)
1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271
    {
        /* strstr is used here b/c there may be inconsistencies in whether the class
         * is included in the address or not (it should not be in HG_Addr_to_string,
         * but it's possible that it is in the list of group address strings)
         */
        self_addr_substr = strstr(self_addr_str, "://");
        if (self_addr_substr == NULL)
            self_addr_substr = self_addr_str;
        else
            self_addr_substr += 3;
    }

1272
    /* construct view using ULTs to lookup the address of each group member */
Shane Snyder's avatar
Shane Snyder committed
1273
    r = rand() % group_size;
1274 1275 1276 1277 1278
    for (i = 0; i < group_size; i++)
    {
        /* randomize our starting index so all group members aren't looking
         * up other group members in the same order
         */
Shane Snyder's avatar
Shane Snyder committed
1279
        j = (r + i) % group_size;
1280