ssg.c 52.5 KB
Newer Older
1 2 3 4 5 6
/*
 * Copyright (c) 2016 UChicago Argonne, LLC
 *
 * See COPYRIGHT in top-level directory.
 */

7
#include "ssg-config.h"
Shane Snyder's avatar
Shane Snyder committed
8

Jonathan Jenkins's avatar
Jonathan Jenkins committed
9 10 11 12
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
13
#include <errno.h>
Jonathan Jenkins's avatar
Jonathan Jenkins committed
14 15
#include <stdlib.h>
#include <string.h>
16
#include <time.h>
17
#include <linux/limits.h>
Jonathan Jenkins's avatar
Jonathan Jenkins committed
18
#include <assert.h>
Shane Snyder's avatar
Shane Snyder committed
19
#ifdef SSG_HAVE_MPI
20 21
#include <mpi.h>
#endif
Shane Snyder's avatar
Shane Snyder committed
22 23 24
#ifdef SSG_HAVE_PMIX
#include <pmix.h>
#endif
Jonathan Jenkins's avatar
Jonathan Jenkins committed
25

Shane Snyder's avatar
Shane Snyder committed
26
#include <mercury.h>
27
#include <abt.h>
Shane Snyder's avatar
Shane Snyder committed
28
#include <margo.h>
29

30
#include "ssg.h"
Shane Snyder's avatar
Shane Snyder committed
31 32 33
#ifdef SSG_HAVE_MPI
#include "ssg-mpi.h"
#endif
Shane Snyder's avatar
Shane Snyder committed
34 35 36
#ifdef SSG_HAVE_PMIX
#include "ssg-pmix.h"
#endif
37
#include "ssg-internal.h"
Shane Snyder's avatar
Shane Snyder committed
38
#include "swim-fd/swim-fd.h"
39

40 41 42
/* arguments for group lookup ULTs */
struct ssg_group_lookup_ult_args
{
43
    const char *addr_str;
44
    ssg_group_view_t *view;
Shane Snyder's avatar
Shane Snyder committed
45
    ABT_rwlock lock;
46
    int out;
47 48 49
};
static void ssg_group_lookup_ult(void * arg);

50
/* SSG helper routine prototypes */
Shane Snyder's avatar
Shane Snyder committed
51 52 53 54 55 56
static ssg_group_t * ssg_group_create_internal(
    const char * group_name, const char * const group_addr_strs[],
    int group_size, ssg_membership_update_cb update_cb, void *update_cb_dat);
static int ssg_group_view_create(
    const char * const group_addr_strs[], int group_size,
    const char * self_addr_str, ABT_rwlock view_lock,
57
    ssg_group_view_t * view);
58
static ssg_member_state_t * ssg_group_view_add_member(
59 60
    const char * addr_str, hg_addr_t addr, ssg_member_id_t member_id,
    ssg_group_view_t * view);
61
static ssg_group_descriptor_t * ssg_group_descriptor_create(
62
    uint64_t name_hash, const char * leader_addr_str, int owner_status);
63 64 65 66 67 68
static ssg_group_descriptor_t * ssg_group_descriptor_dup(
    ssg_group_descriptor_t * descriptor);
static void ssg_group_destroy_internal(
    ssg_group_t * g);
static void ssg_attached_group_destroy(
    ssg_attached_group_t * ag);
Shane Snyder's avatar
Shane Snyder committed
69 70 71 72 73 74
static void ssg_group_view_destroy(
    ssg_group_view_t * view);
static void ssg_group_descriptor_free(
    ssg_group_descriptor_t * descriptor);
static ssg_member_id_t ssg_gen_member_id(
    const char * addr_str);
75 76
static const char ** ssg_addr_str_buf_to_list(
    const char * buf, int num_addrs);
77 78 79 80 81 82 83 84
#ifdef SSG_HAVE_PMIX
void ssg_pmix_proc_failure_notify_fn(
    size_t evhdlr_registration_id, pmix_status_t status, const pmix_proc_t *source,
    pmix_info_t info[], size_t ninfo, pmix_info_t results[], size_t nresults,
    pmix_event_notification_cbfunc_fn_t cbfunc, void *cbdata);
void ssg_pmix_proc_failure_reg_cb(
    pmix_status_t status, size_t evhdlr_ref, void *cbdata);
#endif 
85

86
/* XXX: we ultimately need per-mid ssg instances rather than 1 global */
87
ssg_instance_t *ssg_inst = NULL;
88

89 90 91
/***************************************************
 *** SSG runtime intialization/shutdown routines ***
 ***************************************************/
92

93 94
int ssg_init(
    margo_instance_id mid)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
95
{
96
    struct timespec ts;
97 98
    hg_addr_t self_addr;
    hg_size_t self_addr_str_size;
99

100 101 102 103 104 105 106 107 108
    if (ssg_inst)
        return SSG_FAILURE;

    /* initialize an SSG instance for this margo instance */
    ssg_inst = malloc(sizeof(*ssg_inst));
    if (!ssg_inst)
        return SSG_FAILURE;
    memset(ssg_inst, 0, sizeof(*ssg_inst));
    ssg_inst->mid = mid;
109
    ABT_rwlock_create(&ssg_inst->lock);
110

111
    ssg_register_rpcs();
112

113 114 115 116
    /* seed RNG */
    clock_gettime(CLOCK_MONOTONIC, &ts);
    srand(ts.tv_nsec + getpid());

117 118
    /* get my self address string and ID (which are constant per-mid) */
    if (margo_addr_self(mid, &self_addr) != HG_SUCCESS)
119
    {
120 121
        free(ssg_inst);
        return SSG_FAILURE;
122
    }
123 124 125 126 127 128 129
    if (margo_addr_to_string(mid, NULL, &self_addr_str_size, self_addr) != HG_SUCCESS)
    {
        margo_addr_free(mid, self_addr); 
        free(ssg_inst);
        return SSG_FAILURE;
    }
    if ((ssg_inst->self_addr_str = malloc(self_addr_str_size)) == NULL)
130
    {
131 132 133 134 135 136 137 138 139 140
        margo_addr_free(mid, self_addr);
        free(ssg_inst);
        return SSG_FAILURE;
    }
    if (margo_addr_to_string(mid, ssg_inst->self_addr_str, &self_addr_str_size, self_addr) != HG_SUCCESS)
    {
        free(ssg_inst->self_addr_str);
        margo_addr_free(mid, self_addr);
        free(ssg_inst);
        return SSG_FAILURE;
141 142
    }

143
    ssg_inst->self_id = ssg_gen_member_id(ssg_inst->self_addr_str);
144

145
    margo_addr_free(mid, self_addr);
146 147
    return SSG_SUCCESS;
}
Jonathan Jenkins's avatar
Jonathan Jenkins committed
148

149
int ssg_finalize()
150
{
151 152
    ssg_group_t *g, *g_tmp;
    ssg_attached_group_t *ag, *ag_tmp;
153 154 155 156

    if (!ssg_inst)
        return SSG_FAILURE;

157 158
    ABT_rwlock_wrlock(ssg_inst->lock);

159 160 161 162 163
#ifdef SSG_HAVE_PMIX
    if (ssg_inst->pmix_failure_evhdlr_ref)
        PMIx_Deregister_event_handler(ssg_inst->pmix_failure_evhdlr_ref, NULL, NULL);
#endif

164
    /* destroy all active groups */
165
    HASH_ITER(hh, ssg_inst->group_table, g, g_tmp)
166 167
    {
        HASH_DELETE(hh, ssg_inst->group_table, g);
168
        ABT_rwlock_unlock(ssg_inst->lock);
169
        ssg_group_destroy_internal(g);
170
        ABT_rwlock_wrlock(ssg_inst->lock);
171 172
    }

173 174 175 176 177 178
    /* detach from all attached groups */
    HASH_ITER(hh, ssg_inst->attached_group_table, ag, ag_tmp)
    {
        ssg_attached_group_destroy(ag);
    }

179 180 181
    ABT_rwlock_unlock(ssg_inst->lock);
    ABT_rwlock_free(&ssg_inst->lock);

182
    free(ssg_inst->self_addr_str);
183 184 185
    free(ssg_inst);
    ssg_inst = NULL;

186
    return SSG_SUCCESS;
187
}
Jonathan Jenkins's avatar
Jonathan Jenkins committed
188

189 190 191
/*************************************
 *** SSG group management routines ***
 *************************************/
Jonathan Jenkins's avatar
Jonathan Jenkins committed
192

Shane Snyder's avatar
Shane Snyder committed
193 194 195 196 197 198
ssg_group_id_t ssg_group_create(
    const char * group_name,
    const char * const group_addr_strs[],
    int group_size,
    ssg_membership_update_cb update_cb,
    void * update_cb_dat)
199
{
Shane Snyder's avatar
Shane Snyder committed
200 201
    ssg_group_t *g;
    ssg_group_id_t g_id = SSG_GROUP_ID_NULL;
202

Shane Snyder's avatar
Shane Snyder committed
203 204 205
    g = ssg_group_create_internal(group_name, group_addr_strs,
            group_size, update_cb, update_cb_dat);
    if (g)
Shane Snyder's avatar
Shane Snyder committed
206
        g_id = (ssg_group_id_t)g->descriptor;
Shane Snyder's avatar
Shane Snyder committed
207 208

    return g_id;
209
}
Shane Snyder's avatar
Shane Snyder committed
210

Shane Snyder's avatar
Shane Snyder committed
211 212 213 214 215
ssg_group_id_t ssg_group_create_config(
    const char * group_name,
    const char * file_name,
    ssg_membership_update_cb update_cb,
    void * update_cb_dat)
216
{
Shane Snyder's avatar
Shane Snyder committed
217 218 219 220 221 222 223 224
    int fd;
    struct stat st;
    char *rd_buf = NULL;
    ssize_t rd_buf_size;
    char *tok;
    void *addr_str_buf = NULL;
    int addr_str_buf_len = 0, num_addrs = 0;
    const char **addr_strs = NULL;
Shane Snyder's avatar
Shane Snyder committed
225
    int ret;
Shane Snyder's avatar
Shane Snyder committed
226
    ssg_group_id_t group_id = SSG_GROUP_ID_NULL;
227

Shane Snyder's avatar
Shane Snyder committed
228 229 230 231 232 233 234 235
    /* open config file for reading */
    fd = open(file_name, O_RDONLY);
    if (fd == -1)
    {
        fprintf(stderr, "Error: SSG unable to open config file %s for group %s\n",
            file_name, group_name);
        goto fini;
    }
236

Shane Snyder's avatar
Shane Snyder committed
237 238 239
    /* get file size and allocate a buffer to store it */
    ret = fstat(fd, &st);
    if (ret == -1)
Shane Snyder's avatar
Shane Snyder committed
240
    {
Shane Snyder's avatar
Shane Snyder committed
241 242 243
        fprintf(stderr, "Error: SSG unable to stat config file %s for group %s\n",
            file_name, group_name);
        goto fini;
Shane Snyder's avatar
Shane Snyder committed
244
    }
Shane Snyder's avatar
Shane Snyder committed
245 246
    rd_buf = malloc(st.st_size+1);
    if (rd_buf == NULL) goto fini;
Shane Snyder's avatar
Shane Snyder committed
247

Shane Snyder's avatar
Shane Snyder committed
248 249 250
    /* load it all in one fell swoop */
    rd_buf_size = read(fd, rd_buf, st.st_size);
    if (rd_buf_size != st.st_size)
Shane Snyder's avatar
Shane Snyder committed
251
    {
Shane Snyder's avatar
Shane Snyder committed
252 253 254
        fprintf(stderr, "Error: SSG unable to read config file %s for group %s\n",
            file_name, group_name);
        goto fini;
Shane Snyder's avatar
Shane Snyder committed
255
    }
Shane Snyder's avatar
Shane Snyder committed
256 257 258 259 260 261 262
    rd_buf[rd_buf_size]='\0';

    /* strtok the result - each space-delimited address is assumed to be
     * a unique mercury address
     */
    tok = strtok(rd_buf, "\r\n\t ");
    if (tok == NULL) goto fini;
Shane Snyder's avatar
Shane Snyder committed
263

Shane Snyder's avatar
Shane Snyder committed
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
    /* build up the address buffer */
    addr_str_buf = malloc(rd_buf_size);
    if (addr_str_buf == NULL) goto fini;
    do
    {
        int tok_size = strlen(tok);
        memcpy((char*)addr_str_buf + addr_str_buf_len, tok, tok_size+1);
        addr_str_buf_len += tok_size+1;
        num_addrs++;
        tok = strtok(NULL, "\r\n\t ");
    } while (tok != NULL);
    if (addr_str_buf_len != rd_buf_size)
    {
        /* adjust buffer size if our initial guess was wrong */
        void *tmp = realloc(addr_str_buf, addr_str_buf_len);
        if (tmp == NULL) goto fini;
        addr_str_buf = tmp;
    }
Shane Snyder's avatar
Shane Snyder committed
282

Shane Snyder's avatar
Shane Snyder committed
283 284 285
    /* set up address string array for group members */
    addr_strs = ssg_addr_str_buf_to_list(addr_str_buf, num_addrs);
    if (!addr_strs) goto fini;
Shane Snyder's avatar
Shane Snyder committed
286

Shane Snyder's avatar
Shane Snyder committed
287 288 289
    /* invoke the generic group create routine using our list of addrs */
    group_id = ssg_group_create(group_name, addr_strs, num_addrs,
        update_cb, update_cb_dat);
Shane Snyder's avatar
Shane Snyder committed
290

Shane Snyder's avatar
Shane Snyder committed
291 292 293 294 295 296
fini:
    /* cleanup before returning */
    if (fd != -1) close(fd);
    free(rd_buf);
    free(addr_str_buf);
    free(addr_strs);
Shane Snyder's avatar
Shane Snyder committed
297

Shane Snyder's avatar
Shane Snyder committed
298
    return group_id;
Shane Snyder's avatar
Shane Snyder committed
299 300
}

Shane Snyder's avatar
Shane Snyder committed
301 302 303 304 305 306
#ifdef SSG_HAVE_MPI
ssg_group_id_t ssg_group_create_mpi(
    const char * group_name,
    MPI_Comm comm,
    ssg_membership_update_cb update_cb,
    void * update_cb_dat)
Shane Snyder's avatar
Shane Snyder committed
307
{
Shane Snyder's avatar
Shane Snyder committed
308
    int i;
309
    int self_addr_str_size = 0;
Shane Snyder's avatar
Shane Snyder committed
310 311 312 313 314 315
    char *addr_str_buf = NULL;
    int *sizes = NULL;
    int *sizes_psum = NULL;
    int comm_size = 0, comm_rank = 0;
    const char **addr_strs = NULL;
    ssg_group_id_t group_id = SSG_GROUP_ID_NULL;
Shane Snyder's avatar
Shane Snyder committed
316

Shane Snyder's avatar
Shane Snyder committed
317
    if (!ssg_inst) goto fini;
Shane Snyder's avatar
Shane Snyder committed
318

Shane Snyder's avatar
Shane Snyder committed
319 320 321 322 323
    /* gather the buffer sizes */
    MPI_Comm_size(comm, &comm_size);
    MPI_Comm_rank(comm, &comm_rank);
    sizes = malloc(comm_size * sizeof(*sizes));
    if (sizes == NULL) goto fini;
324
    self_addr_str_size = (int)strlen(ssg_inst->self_addr_str) + 1;
325
    sizes[comm_rank] = self_addr_str_size;
Shane Snyder's avatar
Shane Snyder committed
326
    MPI_Allgather(MPI_IN_PLACE, 0, MPI_BYTE, sizes, 1, MPI_INT, comm);
327

Shane Snyder's avatar
Shane Snyder committed
328 329
    /* compute a exclusive prefix sum of the data sizes, including the
     * total at the end
330
     */
Shane Snyder's avatar
Shane Snyder committed
331 332 333 334 335
    sizes_psum = malloc((comm_size+1) * sizeof(*sizes_psum));
    if (sizes_psum == NULL) goto fini;
    sizes_psum[0] = 0;
    for (i = 1; i < comm_size+1; i++)
        sizes_psum[i] = sizes_psum[i-1] + sizes[i-1];
336

Shane Snyder's avatar
Shane Snyder committed
337 338 339
    /* allgather the addresses */
    addr_str_buf = malloc(sizes_psum[comm_size]);
    if (addr_str_buf == NULL) goto fini;
340
    MPI_Allgatherv(ssg_inst->self_addr_str, self_addr_str_size, MPI_BYTE,
Shane Snyder's avatar
Shane Snyder committed
341
            addr_str_buf, sizes, sizes_psum, MPI_BYTE, comm);
Shane Snyder's avatar
Shane Snyder committed
342

Shane Snyder's avatar
Shane Snyder committed
343 344 345
    /* set up address string array for group members */
    addr_strs = ssg_addr_str_buf_to_list(addr_str_buf, comm_size);
    if (!addr_strs) goto fini;
346

Shane Snyder's avatar
Shane Snyder committed
347 348 349
    /* invoke the generic group create routine using our list of addrs */
    group_id = ssg_group_create(group_name, addr_strs, comm_size,
        update_cb, update_cb_dat);
Shane Snyder's avatar
Shane Snyder committed
350

Shane Snyder's avatar
Shane Snyder committed
351 352 353 354 355 356
fini:
    /* cleanup before returning */
    free(sizes);
    free(sizes_psum);
    free(addr_str_buf);
    free(addr_strs);
Shane Snyder's avatar
Shane Snyder committed
357

Shane Snyder's avatar
Shane Snyder committed
358
    return group_id;
Shane Snyder's avatar
Shane Snyder committed
359
}
Shane Snyder's avatar
Shane Snyder committed
360
#endif
Shane Snyder's avatar
Shane Snyder committed
361

Shane Snyder's avatar
Shane Snyder committed
362 363 364
#ifdef SSG_HAVE_PMIX
ssg_group_id_t ssg_group_create_pmix(
    const char * group_name,
Shane Snyder's avatar
Shane Snyder committed
365
    const pmix_proc_t proc,
Shane Snyder's avatar
Shane Snyder committed
366 367 368
    ssg_membership_update_cb update_cb,
    void * update_cb_dat)
{
Shane Snyder's avatar
Shane Snyder committed
369 370
    pmix_proc_t tmp_proc;
    pmix_value_t value;
371
    pmix_value_t *val_p;
Shane Snyder's avatar
Shane Snyder committed
372 373
    pmix_value_t *addr_vals = NULL;
    unsigned int nprocs;
374
    char key[512];
Shane Snyder's avatar
Shane Snyder committed
375 376 377 378 379
    pmix_info_t *info;
    bool flag;
    const char **addr_strs = NULL;
    unsigned int n;
    pmix_status_t ret;
Shane Snyder's avatar
Shane Snyder committed
380 381
    ssg_group_id_t group_id = SSG_GROUP_ID_NULL;

Shane Snyder's avatar
Shane Snyder committed
382 383
    if (!ssg_inst || !PMIx_Initialized()) goto fini;

384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426
    /* XXX config switch for this functionality */
    /* if not already done, register for PMIx process failure notifications */
    if (!ssg_inst->pmix_failure_evhdlr_ref)
    {
        /* use PMIx event registrations to inform us of terminated/aborted procs */
        pmix_status_t err_codes[2] = {PMIX_PROC_TERMINATED, PMIX_ERR_PROC_ABORTED};
        PMIx_Register_event_handler(err_codes, 2, NULL, 0,
            ssg_pmix_proc_failure_notify_fn, ssg_pmix_proc_failure_reg_cb,
            &ssg_inst->pmix_failure_evhdlr_ref);

        /* exchange information needed to map PMIx ranks to SSG member IDs */
        snprintf(key, 512, "ssg-%s-%d-id", proc.nspace, proc.rank);
        PMIX_VALUE_LOAD(&value, &ssg_inst->self_id, PMIX_UINT64);
        ret = PMIx_Put(PMIX_GLOBAL, key, &value);
        if (ret != PMIX_SUCCESS)
        {
            fprintf(stderr, "Warning: skipping PMIx event notification registration -- "\
                "Unable to put PMIx rank mapping\n");
            PMIx_Deregister_event_handler(ssg_inst->pmix_failure_evhdlr_ref, NULL, NULL);
        }

        /* commit the put data to the local pmix server */
        ret = PMIx_Commit();
        if (ret != PMIX_SUCCESS)
        {
            fprintf(stderr, "Warning: skipping PMIx event notification registration -- "\
                "Unable to commit PMIx rank mapping\n");
            PMIx_Deregister_event_handler(ssg_inst->pmix_failure_evhdlr_ref, NULL, NULL);
        }

        /* barrier, additionally requesting to collect relevant process data */
        PMIX_INFO_CREATE(info, 1);
        flag = true;
        PMIX_INFO_LOAD(info, PMIX_COLLECT_DATA, &flag, PMIX_BOOL);
        ret = PMIx_Fence(&proc, 1, info, 1);
        if (ret != PMIX_SUCCESS)
        {
            fprintf(stderr, "Warning: skipping PMIx event notification registration -- "\
                "Unable to exchange PMIx rank mapping\n");
            PMIx_Deregister_event_handler(ssg_inst->pmix_failure_evhdlr_ref, NULL, NULL);
        }
        PMIX_INFO_FREE(info, 1);
    }
Shane Snyder's avatar
Shane Snyder committed
427 428 429 430 431 432 433 434 435 436

    /* XXX note we are assuming every process in the job wants to join this group... */
    /* get the total nprocs in the job */
    PMIX_PROC_LOAD(&tmp_proc, proc.nspace, PMIX_RANK_WILDCARD);
    ret = PMIx_Get(&tmp_proc, PMIX_JOB_SIZE, NULL, 0, &val_p);
    if (ret != PMIX_SUCCESS) goto fini;
    nprocs = (int)val_p->data.uint32;
    PMIX_VALUE_RELEASE(val_p);

    /* put my address string using a well-known key */
437 438
    snprintf(key, 512, "ssg-%s-%s-%d-hg-addr", group_name, proc.nspace, proc.rank);
    PMIX_VALUE_LOAD(&value, ssg_inst->self_addr_str, PMIX_STRING);
439
    ret = PMIx_Put(PMIX_GLOBAL, key, &value);
Shane Snyder's avatar
Shane Snyder committed
440 441 442 443 444 445 446 447 448 449 450
    if (ret != PMIX_SUCCESS) goto fini;

    /* commit the put data to the local pmix server */
    ret = PMIx_Commit();
    if (ret != PMIX_SUCCESS) goto fini;

    /* barrier, additionally requesting to collect relevant process data */
    PMIX_INFO_CREATE(info, 1);
    flag = true;
    PMIX_INFO_LOAD(info, PMIX_COLLECT_DATA, &flag, PMIX_BOOL);
    ret = PMIx_Fence(&proc, 1, info, 1);
451
    if (ret != PMIX_SUCCESS) goto fini;
Shane Snyder's avatar
Shane Snyder committed
452 453 454 455 456 457 458 459 460 461
    PMIX_INFO_FREE(info, 1);

    addr_strs = malloc(nprocs * sizeof(*addr_strs));
    if (addr_strs == NULL) goto fini;

    /* finalize exchange by getting each member's address */
    PMIX_VALUE_CREATE(addr_vals, nprocs);
    for (n = 0; n < nprocs; n++)
    {
        /* skip ourselves */
462 463
        if(n == proc.rank)
        {
464
            addr_strs[n] = ssg_inst->self_addr_str;
465 466
            continue;
        }
Shane Snyder's avatar
Shane Snyder committed
467

468 469
        if (snprintf(key, 128, "ssg-%s-%s-%d-hg-addr", group_name,
            proc.nspace, n) >= 128) goto fini;
Shane Snyder's avatar
Shane Snyder committed
470 471 472 473 474 475 476 477

        tmp_proc.rank = n;
        val_p = &addr_vals[n];
        ret = PMIx_Get(&tmp_proc, key, NULL, 0, &val_p);
        if (ret != PMIX_SUCCESS) goto fini;

        addr_strs[n] = val_p->data.string;
    }
Shane Snyder's avatar
Shane Snyder committed
478

Shane Snyder's avatar
Shane Snyder committed
479 480 481
    /* invoke the generic group create routine using our list of addrs */
    group_id = ssg_group_create(group_name, addr_strs, nprocs,
        update_cb, update_cb_dat);
Shane Snyder's avatar
Shane Snyder committed
482 483

fini:
Shane Snyder's avatar
Shane Snyder committed
484 485 486
    /* cleanup before returning */
    free(addr_strs);
    PMIX_VALUE_FREE(addr_vals, nprocs);
Shane Snyder's avatar
Shane Snyder committed
487 488 489 490 491

    return group_id;
}
#endif 

Shane Snyder's avatar
Shane Snyder committed
492 493
int ssg_group_destroy(
    ssg_group_id_t group_id)
494
{
Shane Snyder's avatar
Shane Snyder committed
495 496
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
    ssg_group_t *g;
497

Shane Snyder's avatar
Shane Snyder committed
498
    if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) return SSG_FAILURE;
499

Shane Snyder's avatar
Shane Snyder committed
500
    if (group_descriptor->owner_status != SSG_OWNER_IS_MEMBER)
501
    {
Shane Snyder's avatar
Shane Snyder committed
502 503
        fprintf(stderr, "Error: SSG unable to destroy a group it is not a member of\n");
        return SSG_FAILURE;
504
    }
505

506 507
    ABT_rwlock_wrlock(ssg_inst->lock);

Shane Snyder's avatar
Shane Snyder committed
508 509 510 511
    /* find the group structure and destroy it */
    HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash,
        sizeof(uint64_t), g);
    if (!g)
512
    {
513
        ABT_rwlock_unlock(ssg_inst->lock);
Shane Snyder's avatar
Shane Snyder committed
514 515
        fprintf(stderr, "Error: SSG unable to find expected group reference\n");
        return SSG_FAILURE;
516
    }
517 518
    HASH_DELETE(hh, ssg_inst->group_table, g);
    ABT_rwlock_unlock(ssg_inst->lock);
Shane Snyder's avatar
Shane Snyder committed
519
    ssg_group_destroy_internal(g);
520

Shane Snyder's avatar
Shane Snyder committed
521
    return SSG_SUCCESS;
522 523
}

Shane Snyder's avatar
Shane Snyder committed
524 525
ssg_group_id_t ssg_group_join(
    ssg_group_id_t in_group_id,
526 527
    ssg_membership_update_cb update_cb,
    void * update_cb_dat)
528
{
Shane Snyder's avatar
Shane Snyder committed
529
    ssg_group_descriptor_t *in_group_descriptor = (ssg_group_descriptor_t *)in_group_id;
530
    hg_addr_t group_target_addr = HG_ADDR_NULL;
Shane Snyder's avatar
Shane Snyder committed
531 532 533 534
    char *group_name = NULL;
    int group_size;
    void *view_buf = NULL;
    const char **addr_strs = NULL;
535
    hg_return_t hret;
536
    int sret;
Shane Snyder's avatar
Shane Snyder committed
537 538
    ssg_group_t *g = NULL;
    ssg_group_id_t g_id = SSG_GROUP_ID_NULL;
539

Shane Snyder's avatar
Shane Snyder committed
540
    if (!ssg_inst || in_group_id == SSG_GROUP_ID_NULL) goto fini;
541

Shane Snyder's avatar
Shane Snyder committed
542
    if (in_group_descriptor->owner_status == SSG_OWNER_IS_MEMBER)
543
    {
Shane Snyder's avatar
Shane Snyder committed
544
        fprintf(stderr, "Error: SSG unable to join a group it is already a member of\n");
545 546
        goto fini;
    }
Shane Snyder's avatar
Shane Snyder committed
547
    else if (in_group_descriptor->owner_status == SSG_OWNER_IS_ATTACHER)
548
    {
Shane Snyder's avatar
Shane Snyder committed
549
        fprintf(stderr, "Error: SSG unable to join a group it is attached to\n");
550 551
        goto fini;
    }
552

553 554 555 556 557 558 559 560 561
    /* lookup the address of the target group member in the GID */
    hret = margo_addr_lookup(ssg_inst->mid, in_group_descriptor->addr_str,
        &group_target_addr);
    if (hret != HG_SUCCESS) goto fini;

    sret = ssg_group_join_send(in_group_descriptor, group_target_addr,
        &group_name, &group_size, &view_buf);
    if (sret != SSG_SUCCESS || !group_name || !view_buf) goto fini;

Shane Snyder's avatar
Shane Snyder committed
562 563
    /* set up address string array for all group members */
    addr_strs = ssg_addr_str_buf_to_list(view_buf, group_size);
564 565
    if (!addr_strs) goto fini;

Shane Snyder's avatar
Shane Snyder committed
566 567 568
    /* append self address string to list of group member address strings */
    addr_strs = realloc(addr_strs, (group_size+1)*sizeof(char *));
    if(!addr_strs) goto fini;
569
    addr_strs[group_size++] = ssg_inst->self_addr_str;
Shane Snyder's avatar
Shane Snyder committed
570 571 572 573 574

    g = ssg_group_create_internal(group_name, addr_strs, group_size,
            update_cb, update_cb_dat);
    if (g)
    {
Shane Snyder's avatar
Shane Snyder committed
575
        g_id = (ssg_group_id_t)g->descriptor;
576 577 578

        /* don't free on success */
        group_name = NULL;
Shane Snyder's avatar
Shane Snyder committed
579
    }
Shane Snyder's avatar
Shane Snyder committed
580 581

fini:
582 583
    if (group_target_addr != HG_ADDR_NULL)
        margo_addr_free(ssg_inst->mid, group_target_addr);
Shane Snyder's avatar
Shane Snyder committed
584 585 586
    free(addr_strs);
    free(view_buf);
    free(group_name);
587

Shane Snyder's avatar
Shane Snyder committed
588
    return g_id;
Shane Snyder's avatar
Shane Snyder committed
589 590
}

Shane Snyder's avatar
Shane Snyder committed
591
int ssg_group_leave(
592
    ssg_group_id_t group_id)
Shane Snyder's avatar
Shane Snyder committed
593
{
Shane Snyder's avatar
Shane Snyder committed
594
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
595 596 597 598
    ssg_group_t *g = NULL;
    hg_addr_t group_target_addr = HG_ADDR_NULL;
    hg_return_t hret;
    int sret = SSG_FAILURE;
599

600
    if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) goto fini;
601

602 603
    if (group_descriptor->owner_status != SSG_OWNER_IS_MEMBER)
    {
Shane Snyder's avatar
Shane Snyder committed
604
        fprintf(stderr, "Error: SSG unable to leave group it is not a member of\n");
605
        goto fini;
606 607
    }

608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625
    ABT_rwlock_rdlock(ssg_inst->lock);
    HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash,
        sizeof(uint64_t), g);
    if (!g)
    {
        ABT_rwlock_unlock(ssg_inst->lock);
        goto fini;
    }

    /* send the leave req to the first member in the view */
    hret = margo_addr_dup(ssg_inst->mid, g->view.member_map->addr, &group_target_addr);
    if (hret != HG_SUCCESS)
    {
        ABT_rwlock_unlock(ssg_inst->lock);
        goto fini;
    }
    ABT_rwlock_unlock(ssg_inst->lock);

626
    sret = ssg_group_leave_send(group_descriptor, ssg_inst->self_id, group_target_addr);
627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642
    if (sret != SSG_SUCCESS) goto fini;

    /* at least one group member knows of the leave request -- safe to
     * shutdown the group locally
     */

    /* re-lookup the group as we don't hold the lock while sending the leave req */
    ABT_rwlock_wrlock(ssg_inst->lock);
    HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash,
        sizeof(uint64_t), g);
    if (g)
    {
        HASH_DELETE(hh, ssg_inst->group_table, g);
        ABT_rwlock_unlock(ssg_inst->lock);
        ssg_group_destroy_internal(g);
    }
Shane Snyder's avatar
Shane Snyder committed
643 644
    else
        ABT_rwlock_unlock(ssg_inst->lock);
645 646 647 648 649 650 651 652

    sret = SSG_SUCCESS;

fini:
    if (group_target_addr != HG_ADDR_NULL)
        margo_addr_free(ssg_inst->mid, group_target_addr);

    return sret;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
653 654
}

655
#if 0
656 657 658
int ssg_group_attach(
    ssg_group_id_t group_id)
{
Shane Snyder's avatar
Shane Snyder committed
659
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
660 661 662 663 664 665
    ssg_attached_group_t *ag = NULL;
    char *group_name = NULL;
    int group_size;
    void *view_buf = NULL;
    const char **addr_strs = NULL;
    int sret = SSG_FAILURE;
666

667
    if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) goto fini;
668

669 670 671 672 673 674 675 676 677 678 679
    if (group_descriptor->owner_status == SSG_OWNER_IS_MEMBER)
    {
        fprintf(stderr, "Error: SSG unable to attach a group it is a member of\n");
        goto fini;
    }
    else if (group_descriptor->owner_status == SSG_OWNER_IS_ATTACHER)
    {
        fprintf(stderr, "Error: SSG unable to attach a group it is" \
            " already attached to\n");
        goto fini;
    }
680

681 682 683 684 685 686 687
    /* send the attach request to a group member to initiate a bulk transfer
     * of the group's membership view
     */
    sret = ssg_group_attach_send(group_descriptor, &group_name,
        &group_size, &view_buf);
    if (sret != SSG_SUCCESS || !group_name || !view_buf) goto fini;

Shane Snyder's avatar
Shane Snyder committed
688
    /* set up address string array for all group members */
689 690 691 692 693 694 695
    addr_strs = ssg_addr_str_buf_to_list(view_buf, group_size);
    if (!addr_strs) goto fini;

    /* allocate an SSG attached group data structure and initialize some of it */
    ag = malloc(sizeof(*ag));
    if (!ag) goto fini;
    memset(ag, 0, sizeof(*ag));
Shane Snyder's avatar
Shane Snyder committed
696
    ag->name = strdup(group_name);
697 698 699 700 701
    ag->descriptor = ssg_group_descriptor_dup(group_descriptor);
    if (!ag->descriptor) goto fini;
    ag->descriptor->owner_status = SSG_OWNER_IS_ATTACHER;

    /* create the view for the group */
702
    sret = ssg_group_view_create(addr_strs, group_size, NULL, ag->lock, &ag->view);
703 704 705 706 707 708 709 710 711
    if (sret != SSG_SUCCESS) goto fini;

    /* add this group reference to our group table */
    HASH_ADD(hh, ssg_inst->attached_group_table, descriptor->name_hash,
        sizeof(uint64_t), ag);

    sret = SSG_SUCCESS;

    /* don't free on success */
Shane Snyder's avatar
Shane Snyder committed
712
    group_name = NULL;
713 714
    ag = NULL;
fini:
Shane Snyder's avatar
Shane Snyder committed
715
    if (ag) ssg_attached_group_destroy(ag);
716
    free(addr_strs);
Shane Snyder's avatar
Shane Snyder committed
717 718
    free(view_buf);
    free(group_name);
719 720

    return sret;
721 722 723 724 725
}

int ssg_group_detach(
    ssg_group_id_t group_id)
{
726 727 728
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
    ssg_attached_group_t *ag;

Shane Snyder's avatar
Shane Snyder committed
729 730
    if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) return SSG_FAILURE;

731 732 733 734 735 736 737 738 739 740 741 742
    if (group_descriptor->owner_status != SSG_OWNER_IS_ATTACHER)
    {
        fprintf(stderr, "Error: SSG unable to detach from group that" \
            " was never attached\n");
        return SSG_FAILURE;
    }

    /* find the attached group structure and destroy it */
    HASH_FIND(hh, ssg_inst->attached_group_table, &group_descriptor->name_hash,
        sizeof(uint64_t), ag);
    if (!ag)
    {
Shane Snyder's avatar
Shane Snyder committed
743
        fprintf(stderr, "Error: SSG unable to find expected group attached\n");
744 745 746 747 748
        return SSG_FAILURE;
    }
    HASH_DELETE(hh, ssg_inst->attached_group_table, ag);
    ssg_attached_group_destroy(ag);

749 750
    return SSG_SUCCESS;
}
751
#endif
752

753 754 755
/*********************************************************
 *** SSG routines for obtaining self/group information ***
 *********************************************************/
Shane Snyder's avatar
Shane Snyder committed
756

757 758
ssg_member_id_t ssg_get_self_id(
    margo_instance_id mid)
Shane Snyder's avatar
Shane Snyder committed
759
{
760
    /* XXX eventually mid needed to distinguish multiple ssg contexts */
761

762
    if (!ssg_inst) return SSG_MEMBER_ID_INVALID;
763

764
    return ssg_inst->self_id;
Shane Snyder's avatar
Shane Snyder committed
765 766
}

767 768
int ssg_get_group_size(
    ssg_group_id_t group_id)
Shane Snyder's avatar
Shane Snyder committed
769
{
Shane Snyder's avatar
Shane Snyder committed
770
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
Shane Snyder's avatar
Shane Snyder committed
771
    int group_size = 0;
772

Shane Snyder's avatar
Shane Snyder committed
773
    if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) return 0;
774

775 776 777 778
    if (group_descriptor->owner_status == SSG_OWNER_IS_MEMBER)
    {
        ssg_group_t *g;

779
        ABT_rwlock_rdlock(ssg_inst->lock);
780 781 782
        HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash,
            sizeof(uint64_t), g);
        if (g)
Shane Snyder's avatar
Shane Snyder committed
783 784 785 786 787
        {
            ABT_rwlock_rdlock(g->lock);
            group_size = g->view.size + 1; /* add ourself to view size */
            ABT_rwlock_unlock(g->lock);
        }
788
        ABT_rwlock_unlock(ssg_inst->lock);
789
    }
790
#if 0
791 792 793 794 795 796 797
    else if (group_descriptor->owner_status == SSG_OWNER_IS_ATTACHER)
    {
        ssg_attached_group_t *ag;

        HASH_FIND(hh, ssg_inst->attached_group_table, &group_descriptor->name_hash,
            sizeof(uint64_t), ag);
        if (ag)
Shane Snyder's avatar
Shane Snyder committed
798 799 800 801 802
        {
            ABT_rwlock_rdlock(ag->lock);
            group_size = ag->view.size;
            ABT_rwlock_unlock(ag->lock);
        }
803
    }
804
#endif
805 806 807 808
    else
    {
        fprintf(stderr, "Error: SSG can only obtain size of groups that the caller" \
            " is a member of or an attacher of\n");
809
        return 0;
810
    }
811

Shane Snyder's avatar
Shane Snyder committed
812
    return group_size;
Shane Snyder's avatar
Shane Snyder committed
813 814
}

815
hg_addr_t ssg_get_group_addr(
816 817
    ssg_group_id_t group_id,
    ssg_member_id_t member_id)
Shane Snyder's avatar
Shane Snyder committed
818
{
Shane Snyder's avatar
Shane Snyder committed
819
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
820
    ssg_member_state_t *member_state;
Shane Snyder's avatar
Shane Snyder committed
821
    hg_addr_t member_addr = HG_ADDR_NULL;
822

823 824
    if (!ssg_inst || group_id == SSG_GROUP_ID_NULL ||
            member_id == SSG_MEMBER_ID_INVALID)
825 826
        return HG_ADDR_NULL;

827 828 829 830
    if (group_descriptor->owner_status == SSG_OWNER_IS_MEMBER)
    {
        ssg_group_t *g;

831
        ABT_rwlock_rdlock(ssg_inst->lock);
832 833 834
        HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash,
            sizeof(uint64_t), g);
        if (g)
Shane Snyder's avatar
Shane Snyder committed
835 836 837 838 839 840 841 842
        {
            ABT_rwlock_rdlock(g->lock);
            HASH_FIND(hh, g->view.member_map, &member_id, sizeof(ssg_member_id_t),
                member_state);
            if (member_state) 
                member_addr = member_state->addr;
            ABT_rwlock_unlock(g->lock);
        }
843
        ABT_rwlock_unlock(ssg_inst->lock);
844
    }
845
#if 0
846 847 848 849 850 851 852
    else if (group_descriptor->owner_status == SSG_OWNER_IS_ATTACHER)
    {
        ssg_attached_group_t *ag;

        HASH_FIND(hh, ssg_inst->attached_group_table, &group_descriptor->name_hash,
            sizeof(uint64_t), ag);
        if (ag)
Shane Snyder's avatar
Shane Snyder committed
853 854 855 856 857 858 859 860
        {
            ABT_rwlock_rdlock(ag->lock);
            HASH_FIND(hh, ag->view.member_map, &member_id, sizeof(ssg_member_id_t),
                member_state);
            if (member_state) 
                member_addr = member_state->addr;
            ABT_rwlock_unlock(ag->lock);
        }
861
    }
862
#endif
863 864 865 866 867 868 869
    else
    {
        fprintf(stderr, "Error: SSG can only obtain member addresses of groups" \
            " that the caller is a member of or an attacher of\n");
        return HG_ADDR_NULL;
    }

Shane Snyder's avatar
Shane Snyder committed
870
    return member_addr;
871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887
}

ssg_group_id_t ssg_group_id_dup(
    ssg_group_id_t group_id)
{
    ssg_group_descriptor_t *dup;

    dup = ssg_group_descriptor_dup((ssg_group_descriptor_t *)group_id);
    return (ssg_group_id_t)dup;
}

void ssg_group_id_free(
    ssg_group_id_t group_id)
{
    ssg_group_descriptor_t *descriptor = (ssg_group_descriptor_t *)group_id;

    ssg_group_descriptor_free(descriptor);
Shane Snyder's avatar
Shane Snyder committed
888
    descriptor = SSG_GROUP_ID_NULL;
889 890 891
    return;
}

892 893 894 895 896 897 898 899
char *ssg_group_id_get_addr_str(
    ssg_group_id_t group_id)
{
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;

    return strdup(group_descriptor->addr_str);
}

900 901 902 903 904 905 906
void ssg_group_id_serialize(
    ssg_group_id_t group_id,
    char ** buf_p,
    size_t * buf_size_p)
{
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
    size_t alloc_size;
907
    char *gid_buf, *p; 
908 909 910 911 912 913 914 915 916 917 918 919 920

    *buf_p = NULL;
    *buf_size_p = 0;

    /* determine needed buffer size */
    alloc_size = (sizeof(group_descriptor->magic_nr) + sizeof(group_descriptor->name_hash) +
        strlen(group_descriptor->addr_str) + 1);

    gid_buf = malloc(alloc_size);
    if (!gid_buf)
        return;

    /* serialize */
921 922 923 924 925 926
    p = gid_buf;
    *(uint64_t *)p = group_descriptor->magic_nr;
    p += sizeof(uint64_t);
    *(uint64_t *)p = group_descriptor->name_hash;
    p += sizeof(uint64_t);
    strcpy(p, group_descriptor->addr_str);
927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081
    /* the rest of the descriptor is stateful and not appropriate for serializing... */

    *buf_p = gid_buf;
    *buf_size_p = alloc_size;

    return;
}

void ssg_group_id_deserialize(
    const char * buf,
    size_t buf_size,
    ssg_group_id_t * group_id_p)
{
    size_t min_buf_size;
    uint64_t magic_nr;
    uint64_t name_hash;
    const char *addr_str;
    ssg_group_descriptor_t *group_descriptor;

    *group_id_p = SSG_GROUP_ID_NULL;

    /* check to ensure the buffer contains enough data to make a group ID */
    min_buf_size = (sizeof(group_descriptor->magic_nr) +
        sizeof(group_descriptor->name_hash) + 1);
    if (buf_size < min_buf_size)
    {
        fprintf(stderr, "Error: Serialized buffer does not contain a valid SSG group ID\n");
        return;
    }

    /* deserialize */
    magic_nr = *(uint64_t *)buf;
    if (magic_nr != SSG_MAGIC_NR)
    {
        fprintf(stderr, "Error: Magic number mismatch when deserializing SSG group ID\n");
        return;
    }
    buf += sizeof(uint64_t);
    name_hash = *(uint64_t *)buf;
    buf += sizeof(uint64_t);
    addr_str = buf;

    group_descriptor = ssg_group_descriptor_create(name_hash, addr_str,
        SSG_OWNER_IS_UNASSOCIATED);
    if (!group_descriptor)
        return;

    *group_id_p = (ssg_group_id_t)group_descriptor;

    return;
}

int ssg_group_id_store(
    const char * file_name,
    ssg_group_id_t group_id)
{
    int fd;
    char *buf;
    size_t buf_size;
    ssize_t bytes_written;

    fd = open(file_name, O_WRONLY | O_CREAT | O_TRUNC, 0644);
    if (fd < 0)
    {
        fprintf(stderr, "Error: Unable to open file %s for storing SSG group ID\n",
            file_name);
        return SSG_FAILURE;
    }

    ssg_group_id_serialize(group_id, &buf, &buf_size);
    if (buf == NULL)
    {
        fprintf(stderr, "Error: Unable to serialize SSG group ID.\n");
        close(fd);
        return SSG_FAILURE;
    }

    bytes_written = write(fd, buf, buf_size);
    if (bytes_written != (ssize_t)buf_size)
    {
        fprintf(stderr, "Error: Unable to write SSG group ID to file %s\n", file_name);
        close(fd);
        free(buf);
        return SSG_FAILURE;
    }

    close(fd);
    free(buf);
    return SSG_SUCCESS;
}

int ssg_group_id_load(
    const char * file_name,
    ssg_group_id_t * group_id_p)
{
    int fd;
    struct stat fstats;
    char *buf;
    ssize_t bytes_read;
    int ret;

    *group_id_p = SSG_GROUP_ID_NULL;

    fd = open(file_name, O_RDONLY);
    if (fd < 0)
    {
        fprintf(stderr, "Error: Unable to open file %s for loading SSG group ID\n",
            file_name);
        return SSG_FAILURE;
    }

    ret = fstat(fd, &fstats);
    if (ret != 0)
    {
        fprintf(stderr, "Error: Unable to stat file %s\n", file_name);
        close(fd);
        return SSG_FAILURE;
    }
    if (fstats.st_size == 0)
    {
        fprintf(stderr, "Error: SSG group ID file %s is empty\n", file_name);
        close(fd);
        return SSG_FAILURE;
    }

    buf = malloc(fstats.st_size);
    if (buf == NULL)
    {
        close(fd);
        return SSG_FAILURE;
    }

    bytes_read = read(fd, buf, fstats.st_size);
    if (bytes_read != (ssize_t)fstats.st_size)
    {
        fprintf(stderr, "Error: Unable to read SSG group ID from file %s\n", file_name);
        close(fd);
        free(buf);
        return SSG_FAILURE;
    }

    ssg_group_id_deserialize(buf, (size_t)bytes_read, group_id_p);
    if (*group_id_p == SSG_GROUP_ID_NULL)
    {
        fprintf(stderr, "Error: Unable to deserialize SSG group ID\n");
        close(fd);
        free(buf);
        return SSG_FAILURE;
    }

    close(fd);
    free(buf);
    return SSG_SUCCESS;
}

1082 1083 1084 1085 1086
void ssg_group_dump(
    ssg_group_id_t group_id)
{
    ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
    ssg_group_view_t *group_view = NULL;
1087 1088
    ABT_rwlock group_view_lock;
    int group_size;
1089 1090 1091 1092 1093 1094
    char *group_name = NULL;
    char group_role[32];
    char group_self_id[32];

    if (group_descriptor->owner_status == SSG_OWNER_IS_MEMBER)
    {
1095
        fprintf(stderr, "MEMBER DUMP\n");
1096 1097
        ssg_group_t *g;

1098
        ABT_rwlock_rdlock(ssg_inst->lock);
1099 1100 1101 1102 1103
        HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash,
            sizeof(uint64_t), g);
        if (g)
        {
            group_view = &g->view;
1104 1105
            group_view_lock = g->lock;
            group_size = g->view.size + 1;
1106 1107
            group_name = g->name;
            strcpy(group_role, "member");
1108
            sprintf(group_self_id, "%lu", ssg_inst->self_id);
1109
        }
1110
        ABT_rwlock_unlock(ssg_inst->lock);
1111
    }
1112
#if 0
1113 1114 1115 1116 1117 1118 1119 1120 1121
    else if (group_descriptor->owner_status == SSG_OWNER_IS_ATTACHER)
    {
        ssg_attached_group_t *ag;

        HASH_FIND(hh, ssg_inst->attached_group_table, &group_descriptor->name_hash,
            sizeof(uint64_t), ag);
        if (ag)
        {
            group_view = &ag->view;
1122
            group_size = ag->view.size;
1123 1124 1125 1126
            group_name = ag->name;
            strcpy(group_role, "attacher");
        }
    }
1127
#endif
1128 1129 1130 1131 1132 1133 1134 1135 1136
    else
    {
        fprintf(stderr, "Error: SSG can only dump membership information for" \
            " groups that the caller is a member of or an attacher of\n");
        return;
    }

    if (group_view)
    {
1137
        ssg_member_state_t *member_state, *tmp_ms;
1138 1139 1140 1141 1142

        printf("SSG membership information for group '%s':\n", group_name);
        printf("\trole: '%s'\n", group_role);
        if (strcmp(group_role, "member") == 0)
            printf("\tself_id: %s\n", group_self_id);
1143
        printf("\tsize: %d\n", group_size);
1144
        printf("\tview:\n");
1145
        ABT_rwlock_rdlock(group_view_lock);
1146
        HASH_ITER(hh, group_view->member_map, member_state, tmp_ms)
1147
        {
Shane Snyder's avatar
Shane Snyder committed
1148
            printf("\t\tid: %20lu\taddr: %s\n", member_state->id,
1149
                member_state->addr_str);
1150
        }
1151
        ABT_rwlock_unlock(group_view_lock);
1152 1153 1154 1155
    }
    else
        fprintf(stderr, "Error: SSG unable to find group view associated" \
            " with the given group ID\n");
1156

1157
    return;
Shane Snyder's avatar
Shane Snyder committed
1158 1159
}

1160 1161 1162
/************************************
 *** SSG internal helper routines ***
 ************************************/
1163

Shane Snyder's avatar
Shane Snyder committed
1164 1165 1166
static ssg_group_t * ssg_group_create_internal(
    const char * group_name, const char * const group_addr_strs[],
    int group_size, ssg_membership_update_cb update_cb, void *update_cb_dat)
1167
{
Shane Snyder's avatar
Shane Snyder committed
1168 1169 1170
    uint64_t name_hash;
    int sret;
    int success = 0;
1171
    ssg_group_t *g = NULL, *check_g;
1172

Shane Snyder's avatar
Shane Snyder committed
1173
    if (!ssg_inst) return NULL;
Shane Snyder's avatar
Shane Snyder committed
1174

Shane Snyder's avatar
Shane Snyder committed
1175 1176 1177 1178 1179 1180 1181 1182
    name_hash = ssg_hash64_str(group_name);

    /* allocate an SSG group data structure and initialize some of it */
    g = malloc(sizeof(*g));
    if (!g) goto fini;
    memset(g, 0, sizeof(*g));
    g->name = strdup(group_name);
    if (!g->name) goto fini;
1183
    g->ssg_inst = ssg_inst;
Shane Snyder's avatar
Shane Snyder committed
1184 1185 1186 1187 1188
    g->update_cb = update_cb;
    g->update_cb_dat = update_cb_dat;
    ABT_rwlock_create(&g->lock);

    /* generate unique descriptor for this group  */
1189
    g->descriptor = ssg_group_descriptor_create(name_hash, ssg_inst->self_addr_str,
Shane Snyder's avatar
Shane Snyder committed
1190 1191 1192 1193
        SSG_OWNER_IS_MEMBER);
    if (g->descriptor == NULL) goto fini;

    /* initialize the group view */
1194 1195
    sret = ssg_group_view_create(group_addr_strs, group_size, ssg_inst->self_addr_str,
        g->lock, &g->view);
Shane Snyder's avatar
Shane Snyder committed
1196
    if (sret != SSG_SUCCESS) goto fini;
1197

1198 1199 1200 1201 1202 1203 1204
#ifdef DEBUG
    /* set debug output pointer */
    char *dbg_log_dir = getenv("SSG_DEBUG_LOGDIR");
    if (dbg_log_dir)
    {
        char dbg_log_path[PATH_MAX];
        snprintf(dbg_log_path, PATH_MAX, "%s/ssg-%s-%lu.log",
1205
            dbg_log_dir, g->name, g->ssg_inst->self_id);
1206 1207 1208 1209 1210 1211 1212 1213 1214
        g->dbg_log = fopen(dbg_log_path, "a");
        if (!g->dbg_log) goto fini;
    }
    else
    {
        g->dbg_log = stdout;
    }
#endif

1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225
    /* make sure we aren't re-creating an existing group */
    ABT_rwlock_wrlock(ssg_inst->lock);
    HASH_FIND(hh, ssg_inst->group_table, &name_hash, sizeof(uint64_t), check_g);
    if (check_g) goto fini;

    /* add this group reference to the group table */
    HASH_ADD(hh, ssg_inst->group_table, descriptor->name_hash,
        sizeof(uint64_t), g);
    ABT_rwlock_unlock(ssg_inst->lock);

    /* initialize swim failure detector */
Shane Snyder's avatar
Shane Snyder committed
1226 1227
    sret = swim_init(g, ssg_inst->mid, 1);
    if (sret != SSG_SUCCESS)
1228 1229 1230 1231 1232 1233 1234
    {
        ABT_rwlock_wrlock(ssg_inst->lock);
        HASH_DELETE(hh, ssg_inst->group_table, g);
        ABT_rwlock_unlock(ssg_inst->lock);
        goto fini;
    }

1235 1236
    SSG_DEBUG(g, "group create successful (size=%d, self=%s)\n",
        group_size, ssg_inst->self_addr_str);
Shane Snyder's avatar
Shane Snyder committed
1237
    success = 1;
1238

Shane Snyder's avatar
Shane Snyder committed
1239 1240
fini:
    if (!success && g)
1241
    {
1242 1243 1244 1245 1246
#ifdef DEBUG
        /* if using logfile debug output, close the stream */
        if (getenv("SSG_DEBUG_LOGDIR"))
            fclose(g->dbg_log);
#endif
Shane Snyder's avatar
Shane Snyder committed
1247 1248
        if (g->descriptor) ssg_group_descriptor_free(g->descriptor);
        ssg_group_view_destroy(&g->view);
1249
        ABT_rwlock_free(&g->lock);
Shane Snyder's avatar
Shane Snyder committed
1250 1251 1252
        free(g->name);
        free(g);
        g = NULL;
1253
    }
Shane Snyder's avatar
Shane Snyder committed
1254 1255

    return g;
1256
}
1257

1258
static int ssg_group_view_create(
1259
    const char * const group_addr_strs[], int group_size,
Shane Snyder's avatar
Shane Snyder committed
1260
    const char * self_addr_str, ABT_rwlock view_lock,
1261
    ssg_group_view_t * view)
1262 1263
{
    int i, j, r;
1264 1265
    ABT_thread *lookup_ults = NULL;
    struct ssg_group_lookup_ult_args *lookup_ult_args = NULL;
1266 1267
    const char *self_addr_substr = NULL;
    const char *addr_substr = NULL;
1268
    int self_found = 0;
1269
    int aret;
1270
    int sret = SSG_FAILURE;
1271 1272 1273

    /* allocate lookup ULTs */
    lookup_ults = malloc(group_size * sizeof(*lookup_ults));
1274 1275
    if (lookup_ults == NULL) goto fini;
    for (i = 0; i < group_size; i++) lookup_ults[i] = ABT_THREAD_NULL;
1276
    lookup_ult_args = malloc(group_size * sizeof(*lookup_ult_args));
1277
    if (lookup_ult_args == NULL) goto fini;
Shane Snyder's avatar
Shane Snyder committed
1278

Shane Snyder's avatar
Shane Snyder committed
1279
    if(self_addr_str)
1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291
    {
        /* strstr is used here b/c there may be inconsistencies in whether the class
         * is included in the address or not (it should not be in HG_Addr_to_string,
         * but it's possible that it is in the list of group address strings)
         */
        self_addr_substr = strstr(self_addr_str, "://");
        if (self_addr_substr == NULL)
            self_addr_substr = self_addr_str;
        else
            self_addr_substr += 3;
    }

1292
    /* construct view using ULTs to lookup the address of each group member */
Shane Snyder's avatar
Shane Snyder committed
1293
    r = rand() % group_size;
1294