ssg.c 22.1 KB
Newer Older
1
2
3
4
5
6
/*
 * Copyright (c) 2016 UChicago Argonne, LLC
 *
 * See COPYRIGHT in top-level directory.
 */

Shane Snyder's avatar
Shane Snyder committed
7
8
#include <ssg-config.h>

Jonathan Jenkins's avatar
Jonathan Jenkins committed
9
10
11
12
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
13
#include <errno.h>
Jonathan Jenkins's avatar
Jonathan Jenkins committed
14
15
#include <stdlib.h>
#include <string.h>
Jonathan Jenkins's avatar
Jonathan Jenkins committed
16
#include <assert.h>
Jonathan Jenkins's avatar
Jonathan Jenkins committed
17

Shane Snyder's avatar
Shane Snyder committed
18
19
#include <mercury.h>
#include <margo.h>
20

21
#include <ssg.h>
22
#include "ssg-internal.h"
Jonathan Jenkins's avatar
Jonathan Jenkins committed
23

24
25
26
27
#if USE_SWIM_FD
#include "swim-fd/swim-fd.h"
#endif

Shane Snyder's avatar
Shane Snyder committed
28
// internal initialization of ssg data structures
29
static ssg_t ssg_init_internal(margo_instance_id mid, int self_rank,
30
    int group_size, hg_addr_t self_addr, char *addr_str_buf);
31
32

// lookup peer addresses
33
static hg_return_t ssg_lookup(ssg_t s, char **addr_strs);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
34
35
static char** setup_addr_str_list(int num_addrs, char * buf);

36
#if 0
Jonathan Jenkins's avatar
Jonathan Jenkins committed
37
38
39
40
41
42
43
44
45
46
47
// helper for hashing (don't want to pull in jenkins hash)
// see http://www.isthe.com/chongo/tech/comp/fnv/index.html
static uint64_t fnv1a_64(void *data, size_t size);

MERCURY_GEN_PROC(barrier_in_t,
        ((int32_t)(barrier_id)) \
        ((int32_t)(rank)))

// barrier RPC decls
static void proc_barrier(void *arg);
DEFINE_MARGO_RPC_HANDLER(proc_barrier)
48
#endif
Jonathan Jenkins's avatar
Jonathan Jenkins committed
49

50

51
ssg_t ssg_init_config(margo_instance_id mid, const char * fname)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
{
    // file to read
    int fd = -1;
    struct stat st;

    // file content to parse
    char *rdbuf = NULL;
    ssize_t rdsz;

    // parse metadata (strtok)
    char *tok;

    // vars to build up the addr string list
    int addr_cap = 128;
    int addr_len = 0;
    int num_addrs = 0;
68
    void *addr_buf = NULL;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
69

Shane Snyder's avatar
Shane Snyder committed
70
    // self rank/addr resolution helpers
71
    hg_class_t *hgcl = NULL;
Shane Snyder's avatar
Shane Snyder committed
72
73
74
75
76
    hg_addr_t self_addr = HG_ADDR_NULL;
    char * self_addr_str = NULL;
    const char * self_addr_substr = NULL;
    hg_size_t self_addr_size = 0;
    const char * addr_substr = NULL;
77
    int rank = -1;
Shane Snyder's avatar
Shane Snyder committed
78

Jonathan Jenkins's avatar
Jonathan Jenkins committed
79
80
    // misc return codes
    int ret;
81
82
83
84
    hg_return_t hret;

    // return data
    ssg_t s = NULL;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99

    // open file for reading
    fd = open(fname, O_RDONLY);
    if (fd == -1) goto fini;

    // get file size
    ret = fstat(fd, &st);
    if (ret == -1) goto fini;

    // slurp file in all at once
    rdbuf = malloc(st.st_size+1);
    if (rdbuf == NULL) goto fini;

    // load it all in one fell swoop
    rdsz = read(fd, rdbuf, st.st_size);
Shane Snyder's avatar
Shane Snyder committed
100
    if (rdsz != st.st_size) goto fini;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
101
102
    rdbuf[rdsz]='\0';

103
104
105
    hgcl = margo_get_class(mid);
    if(!hgcl) goto fini;

106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
    // get my address
    hret = HG_Addr_self(hgcl, &self_addr);
    if (hret != HG_SUCCESS) goto fini;
    hret = HG_Addr_to_string(hgcl, NULL, &self_addr_size, self_addr);
    if (hret != HG_SUCCESS) goto fini;
    self_addr_str = malloc(self_addr_size);
    if (self_addr_str == NULL) goto fini;
    hret = HG_Addr_to_string(hgcl, self_addr_str, &self_addr_size, self_addr);
    if (hret != HG_SUCCESS) goto fini;

    // strstr is used here b/c there may be inconsistencies in whether the class
    // is included in the address or not (it's not in HG_Addr_to_string, it
    // should be in ssg_init_config)
    self_addr_substr = strstr(self_addr_str, "://");
    if (self_addr_substr == NULL) goto fini;
    self_addr_substr += 3;
Shane Snyder's avatar
Shane Snyder committed
122

Jonathan Jenkins's avatar
Jonathan Jenkins committed
123
124
125
126
127
128
    // strtok the result - each space-delimited address is assumed to be
    // a unique mercury address
    tok = strtok(rdbuf, "\r\n\t ");
    if (tok == NULL) goto fini;

    // build up the address buffer
129
130
    addr_buf = malloc(addr_cap);
    if (addr_buf == NULL) goto fini;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
131
132
133
134
135
    do {
        int tok_sz = strlen(tok);
        if (tok_sz + addr_len + 1 > addr_cap) {
            void * tmp;
            addr_cap *= 2;
136
            tmp = realloc(addr_buf, addr_cap);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
137
            if (tmp == NULL) goto fini;
138
            addr_buf = tmp;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
139
        }
140
141
142
143
144
145
146
147

        // check if this is my addr to resolve rank
        addr_substr = strstr(tok, "://");
        if (addr_substr == NULL) goto fini;
        addr_substr+= 3;
        if (strcmp(self_addr_substr, addr_substr) == 0)
            rank = num_addrs;

148
        memcpy((char*)addr_buf + addr_len, tok, tok_sz+1);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
149
150
151
152
153
        addr_len += tok_sz+1;
        num_addrs++;
        tok = strtok(NULL, "\r\n\t ");
    } while (tok != NULL);

154
155
156
    // if rank not resolved, fail
    if (rank == -1) goto fini;

Shane Snyder's avatar
Shane Snyder committed
157
    // init ssg internal structures
158
    s = ssg_init_internal(mid, rank, num_addrs, self_addr, addr_buf);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
159
160
    if (s == NULL) goto fini;

Shane Snyder's avatar
Shane Snyder committed
161
162
    // don't free this on success
    self_addr = HG_ADDR_NULL;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
163
164
165
fini:
    if (fd != -1) close(fd);
    free(rdbuf);
166
    free(addr_buf);
167
    if (hgcl && self_addr != HG_ADDR_NULL) HG_Addr_free(hgcl, self_addr);
Shane Snyder's avatar
Shane Snyder committed
168
    free(self_addr_str);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
169
170
171
172
    return s;
}

#ifdef HAVE_MPI
173
ssg_t ssg_init_mpi(margo_instance_id mid, MPI_Comm comm)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
174
175
{
    // my addr
176
    hg_class_t *hgcl = NULL;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
177
    hg_addr_t self_addr = HG_ADDR_NULL;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
178
    char * self_addr_str = NULL;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
179
    hg_size_t self_addr_size = 0;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
180
181
182
    int self_addr_size_int = 0; // for mpi-friendly conversion

    // collective helpers
183
    char * addr_buf = NULL;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
184
185
186
187
188
    int * sizes = NULL;
    int * sizes_psum = NULL;
    int comm_size = 0;
    int comm_rank = 0;

189
190
191
    // misc return codes
    hg_return_t hret;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
192
193
194
    // return data
    ssg_t s = NULL;

195
196
    hgcl = margo_get_class(mid);
    if(!hgcl) goto fini;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
197
198

    // get my address
Jonathan Jenkins's avatar
Jonathan Jenkins committed
199
200
201
    hret = HG_Addr_self(hgcl, &self_addr);
    if (hret != HG_SUCCESS) goto fini;
    hret = HG_Addr_to_string(hgcl, NULL, &self_addr_size, self_addr);
Shane Snyder's avatar
Shane Snyder committed
202
    if (hret != HG_SUCCESS) goto fini;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
203
204
    self_addr_str = malloc(self_addr_size);
    if (self_addr_str == NULL) goto fini;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
205
206
    hret = HG_Addr_to_string(hgcl, self_addr_str, &self_addr_size, self_addr);
    if (hret != HG_SUCCESS) goto fini;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
    self_addr_size_int = (int)self_addr_size; // null char included in call

    // gather the buffer sizes
    MPI_Comm_size(comm, &comm_size);
    MPI_Comm_rank(comm, &comm_rank);
    sizes = malloc(comm_size * sizeof(*sizes));
    if (sizes == NULL) goto fini;
    sizes[comm_rank] = self_addr_size_int;
    MPI_Allgather(MPI_IN_PLACE, 0, MPI_BYTE, sizes, 1, MPI_INT, comm);

    // compute a exclusive prefix sum of the data sizes,
    // including the total at the end
    sizes_psum = malloc((comm_size+1) * sizeof(*sizes_psum));
    if (sizes_psum == NULL) goto fini;
    sizes_psum[0] = 0;
    for (int i = 1; i < comm_size+1; i++)
        sizes_psum[i] = sizes_psum[i-1] + sizes[i-1];

    // allgather the addresses
226
227
    addr_buf = malloc(sizes_psum[comm_size]);
    if (addr_buf == NULL) goto fini;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
228
    MPI_Allgatherv(self_addr_str, self_addr_size_int, MPI_BYTE,
229
            addr_buf, sizes, sizes_psum, MPI_BYTE, comm);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
230

Shane Snyder's avatar
Shane Snyder committed
231
    // init ssg internal structures
232
    s = ssg_init_internal(mid, comm_rank, comm_size, self_addr, addr_buf);
Shane Snyder's avatar
Shane Snyder committed
233
234
235
    if (s == NULL) goto fini;

    // don't free these on success
236
    self_addr = HG_ADDR_NULL;
Shane Snyder's avatar
Shane Snyder committed
237
238
239
fini:
    free(sizes);
    free(sizes_psum);
240
    free(addr_buf);
241
242
    if (hgcl && self_addr != HG_ADDR_NULL) HG_Addr_free(hgcl, self_addr);
    free(self_addr_str);
Shane Snyder's avatar
Shane Snyder committed
243
244
245
246
    return s;
}
#endif

247
static ssg_t ssg_init_internal(margo_instance_id mid, int self_rank,
248
    int group_size, hg_addr_t self_addr, char *addr_str_buf)
Shane Snyder's avatar
Shane Snyder committed
249
{
250
    // arrays of peer address strings
Shane Snyder's avatar
Shane Snyder committed
251
252
    char **addr_strs = NULL;

253
254
255
    // misc return codes
    hg_return_t hret;

Shane Snyder's avatar
Shane Snyder committed
256
257
258
    // return data
    ssg_t s = NULL;

259
260
    if (self_rank < 0 || self_rank >= group_size || self_addr == HG_ADDR_NULL)
        goto fini;
Shane Snyder's avatar
Shane Snyder committed
261
262

    // set peer address strings
263
    addr_strs = setup_addr_str_list(group_size, addr_str_buf);
264
    if (addr_strs == NULL) goto fini;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
265
266
267
268

    // set up the output
    s = malloc(sizeof(*s));
    if (s == NULL) goto fini;
269
    memset(s, 0, sizeof(*s));
270
    s->mid = mid;
271
272
273
274

    // initialize the group "view"
    s->view.self_rank = self_rank;
    s->view.group_size = group_size;
275
276
    s->view.member_states = malloc(group_size * sizeof(*(s->view.member_states)));
    if (s->view.member_states == NULL)
277
    {
278
279
280
281
282
283
284
285
        free(s);
        s = NULL;
        goto fini;
    }
    memset(s->view.member_states, 0, group_size * sizeof(*(s->view.member_states)));
    for (int i = 1; i < group_size; i++)
    {
        int r = (self_rank + i) % group_size;
286
        // NOTE: remote addrs are set in ssg_lookup
287
        s->view.member_states[r].addr = HG_ADDR_NULL;
288
289
    }
    // set view info for self
290
    s->view.member_states[self_rank].addr = self_addr;
291

Shane Snyder's avatar
Shane Snyder committed
292
#ifdef DEBUG
293
294
    // TODO: log file debug option, instead of just stdout
    s->dbg_strm = stdout;
Shane Snyder's avatar
Shane Snyder committed
295
296
#endif

297
#if 0
298
299
300
    s->barrier_mutex = ABT_MUTEX_NULL;
    s->barrier_cond  = ABT_COND_NULL;
    s->barrier_eventual = ABT_EVENTUAL_NULL;
301
#endif
302
303

    // lookup hg addr information for all group members
304
    hret = ssg_lookup(s, addr_strs);
305
306
    if (hret != HG_SUCCESS)
    {
307
308
        ssg_finalize(s);
        s = NULL;
309
310
        goto fini;
    }
311
    SSG_DEBUG(s, "group lookup successful\n");
Jonathan Jenkins's avatar
Jonathan Jenkins committed
312

313
#if USE_SWIM_FD
314
    // initialize swim failure detector
315
316
317
    // TODO: we should probably barrier or sync somehow to avoid rpc failures
    // due to timing skew of different ranks initializing swim
    s->swim_ctx = swim_init(s, 1);
318
    if (s->swim_ctx == NULL)
319
    {
320
321
        ssg_finalize(s);
        s = NULL;
322
323
324
    }
#endif
    
Jonathan Jenkins's avatar
Jonathan Jenkins committed
325
326
327
328
329
fini:
    free(addr_strs);
    return s;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
330
331
332
333
struct lookup_ult_args
{
    ssg_t ssg;
    int rank;
334
    char *addr_str;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
335
336
337
338
339
340
    hg_return_t out;
};

static void lookup_ult(void *arg)
{
    struct lookup_ult_args *l = arg;
341
    ssg_t s = l->ssg;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
342

343
344
345
346
    l->out = margo_addr_lookup(s->mid, l->addr_str,
        &s->view.member_states[l->rank].addr);
    if(l->out != HG_SUCCESS)
        SSG_DEBUG(s, "look up on rank %d failed [%d]\n", l->rank, l->out);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
347
348
}

349
static hg_return_t ssg_lookup(ssg_t s, char **addr_strs)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
350
351
{
    hg_context_t *hgctx;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
352
353
354
    ABT_thread *ults;
    struct lookup_ult_args *args;
    hg_return_t hret = HG_SUCCESS;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
355

356
    if (s == SSG_NULL) return HG_INVALID_PARAM;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
357
358

    // set the hg class up front - need for destructing addrs
Jonathan Jenkins's avatar
Jonathan Jenkins committed
359
    hgctx = margo_get_context(s->mid);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
360
361
    if (hgctx == NULL) return HG_INVALID_PARAM;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
362
    // initialize ULTs
363
    ults = malloc(s->view.group_size * sizeof(*ults));
Jonathan Jenkins's avatar
Jonathan Jenkins committed
364
    if (ults == NULL) return HG_NOMEM_ERROR;
365
    args = malloc(s->view.group_size * sizeof(*args));
Jonathan Jenkins's avatar
Jonathan Jenkins committed
366
367
368
369
    if (args == NULL) {
        free(ults);
        return HG_NOMEM_ERROR;
    }
370
    for (int i = 0; i < s->view.group_size; i++)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
371
372
        ults[i] = ABT_THREAD_NULL;

373
374
    for (int i = 1; i < s->view.group_size; i++) {
        int r = (s->view.self_rank + i) % s->view.group_size;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
375
376
        args[r].ssg = s;
        args[r].rank = r;
377
        args[r].addr_str = addr_strs[r];
378
#if 0
Jonathan Jenkins's avatar
Jonathan Jenkins committed
379
380
        int aret = ABT_thread_create(*margo_get_handler_pool(s->mid), &lookup_ult,
                &args[r], ABT_THREAD_ATTR_NULL, &ults[r]);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
381
382
        if (aret != ABT_SUCCESS) {
            hret = HG_OTHER_ERROR;
383
            goto fini;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
384
        }
385
#endif
Jonathan Jenkins's avatar
Jonathan Jenkins committed
386
    }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
387

Jonathan Jenkins's avatar
Jonathan Jenkins committed
388
    // wait on all
389
390
    for (int i = 1; i < s->view.group_size; i++) {
        int r = (s->view.self_rank + i) % s->view.group_size;
391
392
393
394
395
396
397
398
        int aret = ABT_thread_create(*margo_get_handler_pool(s->mid), &lookup_ult,
                &args[r], ABT_THREAD_ATTR_NULL, &ults[r]);
        if (aret != ABT_SUCCESS) {
            hret = HG_OTHER_ERROR;
            goto fini;
        }
        aret = ABT_thread_join(ults[r]);
        //int aret = ABT_thread_join(ults[r]);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
399
400
        ABT_thread_free(&ults[r]);
        ults[r] = ABT_THREAD_NULL; // in case of cascading failure from join
Jonathan Jenkins's avatar
Jonathan Jenkins committed
401
402
        if (aret != ABT_SUCCESS) {
            hret = HG_OTHER_ERROR;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
403
            break;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
404
        }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
405
406
407
        else if (args[r].out != HG_SUCCESS) {
            hret = args[r].out;
            break;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
408
409
410
        }
    }

411
fini:
Jonathan Jenkins's avatar
Jonathan Jenkins committed
412
413
    // cleanup
    if (ults != NULL) {
414
        for (int i = 0; i < s->view.group_size; i++) {
Jonathan Jenkins's avatar
Jonathan Jenkins committed
415
416
417
418
419
420
421
422
423
424
            if (ults[i] != ABT_THREAD_NULL) {
                ABT_thread_cancel(ults[i]);
                ABT_thread_free(ults[i]);
            }
        }
        free(ults);
    }
    if (args != NULL) free(args);

    return hret;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
425
}
Jonathan Jenkins's avatar
Jonathan Jenkins committed
426

427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
#if 0
// TODO: handle hash collision, misc errors
void ssg_register_barrier(ssg_t s, hg_class_t *hgcl)
{
    if (s->num_addrs == 1) return;

    s->barrier_rpc_id = fnv1a_64(s->backing_buf, s->buf_size);
    hg_return_t hret = HG_Register(hgcl, s->barrier_rpc_id,
            hg_proc_barrier_in_t, NULL, &proc_barrier_handler);
    assert(hret == HG_SUCCESS);
    hret = HG_Register_data(hgcl, s->barrier_rpc_id, s, NULL);
    assert(hret == HG_SUCCESS);

    int aret = ABT_mutex_create(&s->barrier_mutex);
    assert(aret == ABT_SUCCESS);
    aret = ABT_cond_create(&s->barrier_cond);
    assert(aret == ABT_SUCCESS);
    aret = ABT_eventual_create(0, &s->barrier_eventual);
    assert(aret == ABT_SUCCESS);
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
// TODO: process errors in a sane manner
static void proc_barrier(void *arg)
{
    barrier_in_t in;
    hg_return_t hret;
    int aret;
    hg_handle_t h = arg;
    struct hg_info *info = HG_Get_info(h);
    ssg_t s = HG_Registered_data(info->hg_class, info->id);

    assert(s->rank == 0);

    hret = HG_Get_input(h, &in);
    assert(hret == HG_SUCCESS);

    DEBUG("%d: barrier ult: rx round %d from %d\n", s->rank, in.barrier_id,
            in.rank);
    // first wait until the nth barrier has been processed
    aret = ABT_mutex_lock(s->barrier_mutex);
    assert(aret == ABT_SUCCESS);
    while (s->barrier_id < in.barrier_id) {
        DEBUG("%d: barrier ult: waiting to enter round %d\n", s->rank,
                in.barrier_id);
        aret = ABT_cond_wait(s->barrier_cond, s->barrier_mutex);
        assert(aret == ABT_SUCCESS);
    }

    // inform all other ULTs waiting on this
    aret = ABT_cond_signal(s->barrier_cond);
    assert(aret == ABT_SUCCESS);
    // now wait until all barriers have been rx'd
    DEBUG("%d: barrier ult: out, incr count to %d\n", s->rank,
            s->barrier_count+1);
    s->barrier_count++;
    while (s->barrier_count < s->num_addrs-1) {
        DEBUG("%d: barrier ult: waiting (count at %d)\n", s->rank,
                s->barrier_count);
        aret = ABT_cond_wait(s->barrier_cond, s->barrier_mutex);
        assert(aret == ABT_SUCCESS);
    }
    DEBUG("%d: barrier ult: count compl, signal and respond\n", s->rank);
    // all barriers rx'd, inform other ULTs
    ABT_cond_signal(s->barrier_cond);
    ABT_mutex_unlock(s->barrier_mutex);

    hret = margo_respond(s->mid, h, NULL);
    assert(hret == HG_SUCCESS);
    HG_Destroy(h);

    DEBUG("%d: barrier ult: respond compl, count at %d\n", s->rank,
            s->barrier_count);

    aret = ABT_mutex_lock(s->barrier_mutex);
    assert(aret == ABT_SUCCESS);
    // done -> I'm the last ULT to enter, I do the eventual set
    int is_done = (++s->barrier_count) == 2*(s->num_addrs-1);
    if (is_done) s->barrier_count = 0;
    aret = ABT_mutex_unlock(s->barrier_mutex);
    assert(aret == ABT_SUCCESS);
    if (is_done) {
        aret = ABT_eventual_set(s->barrier_eventual, NULL, 0);
        assert(aret == ABT_SUCCESS);
    }
}

hg_return_t ssg_barrier_margo(ssg_t s)
{
    // non-members can't barrier
    if (s->rank < 0) return HG_INVALID_PARAM;

    // return immediately if a singleton group
    if (s->num_addrs == 1) return HG_SUCCESS;

    int aret = ABT_eventual_reset(s->barrier_eventual);
    if (aret != ABT_SUCCESS) return HG_OTHER_ERROR;

    DEBUG("%d: barrier: lock and incr id to %d\n", s->rank, s->barrier_id+1);
    int bid;
    // init the barrier state
    aret = ABT_mutex_lock(s->barrier_mutex);
    if (aret != ABT_SUCCESS) return HG_OTHER_ERROR;
    bid = ++s->barrier_id;
    aret = ABT_cond_broadcast(s->barrier_cond);
    if (aret != ABT_SUCCESS) {
        ABT_mutex_unlock(s->barrier_mutex); return HG_OTHER_ERROR;
    }
    aret = ABT_mutex_unlock(s->barrier_mutex);
    if (aret != ABT_SUCCESS) return HG_OTHER_ERROR;

    if (s->rank > 0) {
        DEBUG("%d: barrier: create and forward to 0\n", s->rank);
        barrier_in_t in;
        hg_handle_t h;
        hg_return_t hret = HG_Create(margo_get_context(s->mid),
                ssg_get_addr(s, 0), s->barrier_rpc_id, &h);
        if (hret != HG_SUCCESS) return hret;

        in.rank = s->rank;
        in.barrier_id = bid;
        hret = margo_forward(s->mid, h, &in);
        DEBUG("%d: barrier: finish\n", s->rank);
        HG_Destroy(h);
        if (hret != HG_SUCCESS) return hret;
    }
    else {
        DEBUG("%d: barrier: wait on eventual\n", s->rank);
        aret = ABT_eventual_wait(s->barrier_eventual, NULL);
        if (aret != ABT_SUCCESS) return HG_OTHER_ERROR;
    }

    return HG_SUCCESS;
}
560
#endif
Jonathan Jenkins's avatar
Jonathan Jenkins committed
561

Jonathan Jenkins's avatar
Jonathan Jenkins committed
562
563
void ssg_finalize(ssg_t s)
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
564
565
    if (s == SSG_NULL) return;

566
#if USE_SWIM_FD
567
568
569
570
    if(s->swim_ctx)
        swim_finalize(s->swim_ctx);
#endif

571
#if 0
Jonathan Jenkins's avatar
Jonathan Jenkins committed
572
573
574
575
576
577
    if (s->barrier_mutex != ABT_MUTEX_NULL)
        ABT_mutex_free(&s->barrier_mutex);
    if (s->barrier_cond != ABT_COND_NULL)
        ABT_cond_free(&s->barrier_cond);
    if (s->barrier_eventual != ABT_EVENTUAL_NULL)
        ABT_eventual_free(&s->barrier_eventual);
578
#endif
579

580
581
582
    for (int i = 0; i < s->view.group_size; i++) {
        if (s->view.member_states[i].addr != HG_ADDR_NULL)
            HG_Addr_free(margo_get_class(s->mid), s->view.member_states[i].addr);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
583
    }
584
    free(s->view.member_states);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
585
586
587
    free(s);
}

588
int ssg_get_group_rank(const ssg_t s)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
589
{
590
    return s->view.self_rank;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
591
592
}

593
int ssg_get_group_size(const ssg_t s)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
594
{
595
    return s->view.group_size;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
596
597
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
598
hg_addr_t ssg_get_addr(const ssg_t s, int rank)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
599
{
600
601
    if (rank >= 0 && rank < s->view.group_size)
        return s->view.member_states[rank].addr;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
602
    else
Jonathan Jenkins's avatar
Jonathan Jenkins committed
603
        return HG_ADDR_NULL;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
604
605
}

606
#if 0
Jonathan Jenkins's avatar
Jonathan Jenkins committed
607
608
// serialization format looks like:
// < num members, buffer size, buffer... >
Jonathan Jenkins's avatar
Jonathan Jenkins committed
609
// doesn't attempt to grab hg_addr's, string buffers, etc. - client will be
Jonathan Jenkins's avatar
Jonathan Jenkins committed
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
// responsible for doing a separate address lookup routine
hg_return_t hg_proc_ssg_t(hg_proc_t proc, ssg_t *s)
{
    // error and return handling
    hg_return_t hret = HG_SUCCESS;
    char * err_str = NULL;

    // input/output vars + helpers for ssg decode setup
    ssg_t ss = NULL;

    switch(hg_proc_get_op(proc)) {
        case HG_ENCODE:
            ss = *s;
            // encode address count
            hret = hg_proc_int32_t(proc, &ss->num_addrs);
            if (hret != HG_SUCCESS) { err_str = "ssg num addrs"; goto end; }
            // encode addr
            hret = hg_proc_int32_t(proc, &ss->buf_size);
            if (hret != HG_SUCCESS) { err_str = "ssg buf size"; goto end; }
            // encode addr string, simple as blitting the backing buffer
            hret = hg_proc_memcpy(proc, ss->backing_buf, ss->buf_size);
            if (hret != HG_SUCCESS) { err_str = "ssg addr buf"; goto end; }
            break;

        case HG_DECODE:
            // create the output
            *s = NULL;
            ss = malloc(sizeof(*ss));
            if (ss == NULL) {
                err_str = "ssg alloc";
                hret = HG_NOMEM_ERROR;
                goto end;
            }
            ss->addr_strs = NULL;
            ss->addrs = NULL;
            ss->backing_buf = NULL;
            // get address count
            hret = hg_proc_int32_t(proc, &ss->num_addrs);
            if (hret != HG_SUCCESS) { err_str = "ssg num addrs"; goto end; }
            // get number of bytes for the address
            hret = hg_proc_int32_t(proc, &ss->buf_size);
            if (hret != HG_SUCCESS) { err_str = "ssg buf size"; goto end; }
            // allocate output buffer
            ss->backing_buf = malloc(ss->buf_size);
            if (hret != HG_SUCCESS) {
                err_str = "ssg buf alloc";
                hret = HG_NOMEM_ERROR;
                goto end;
            }
            hret = hg_proc_memcpy(proc, ss->backing_buf, ss->buf_size);
            if (hret != HG_SUCCESS) { err_str = "ssg addr buf"; goto end; }

            // set the remaining ssg vars

            ss->addr_strs = NULL; ss->addrs = NULL;
            ss->rank = -1; // receivers aren't part of the group

            ss->addr_strs = setup_addr_str_list(ss->num_addrs, ss->backing_buf);
            if (ss->addr_strs == NULL) {
                err_str = "ssg addr strs alloc";
                hret = HG_NOMEM_ERROR;
                goto end;
            }

            ss->addrs = malloc(ss->num_addrs * sizeof(*ss->addrs));
            if (ss->addrs == NULL) {
                err_str = "ssg addrs alloc";
                hret = HG_NOMEM_ERROR;
                goto end;
            }
            for (int i = 0; i < ss->num_addrs; i++) {
Jonathan Jenkins's avatar
Jonathan Jenkins committed
681
                ss->addrs[i] = HG_ADDR_NULL;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
            }

            // success: set the output
            *s = ss;
            break;

        case HG_FREE:
            if (s != NULL && *s != NULL) {
                err_str = "ssg shouldn't be freed via HG_Free_*";
                hret = HG_INVALID_PARAM;
            }
            goto end;

        default:
            err_str = "bad proc mode";
            hret = HG_INVALID_PARAM;
    }
end:
    if (err_str) {
        HG_LOG_ERROR("Proc error: %s", err_str);
        if (hg_proc_get_op(proc) == HG_DECODE) {
            free(ss->addr_strs);
            free(ss->addrs);
            free(ss->backing_buf);
            free(ss);
        }
    }
    return hret;
}

712
713
714
715
716
717
718
719
720
721
722
723
724
725
int ssg_dump(const ssg_t s, const char *fname)
{
    // file to write to
    int fd = -1;
    ssize_t written;

    // string to xform and dump
    char * addrs_dup = NULL;
    char * tok = NULL;
    char * addrs_dup_end = NULL;

    // return code
    int ret = 0;

726
727
    // copy the backing buffer, replacing all null chars with
    // newlines
728
729
730
731
732
    addrs_dup = malloc(s->buf_size);
    if (addrs_dup == NULL) { errno = ENOMEM; ret = -1; goto end; }
    memcpy(addrs_dup, s->backing_buf, s->buf_size);
    tok = addrs_dup;
    addrs_dup_end = addrs_dup + s->buf_size;
733
    for (int i = 0; i < s->num_addrs; i++) {
734
735
736
737
738
739
        tok = memchr(tok, '\0', addrs_dup_end - tok);
        if (tok == NULL) { errno = EINVAL; ret = -1; goto end; }
        *tok = '\n';
    }

    // open the file and dump in a single call
Jonathan Jenkins's avatar
Jonathan Jenkins committed
740
    fd = open(fname, O_WRONLY | O_CREAT | O_EXCL, 0644);
741
742
    if (fd == -1) { ret = -1; goto end; }
    // don't include the null char at the end
743
744
    written = write(fd, addrs_dup, s->buf_size);
    if (written != s->buf_size) ret = -1;
745
746
747
748
749
750
751

end:
    free(addrs_dup);
    if (fd != -1) close(fd);

    return ret;
}
752
#endif
753

Jonathan Jenkins's avatar
Jonathan Jenkins committed
754
755
756
757
758
759
760
761
762
763
764
765
static char** setup_addr_str_list(int num_addrs, char * buf)
{
    char ** ret = malloc(num_addrs * sizeof(*ret));
    if (ret == NULL) return NULL;

    ret[0] = buf;
    for (int i = 1; i < num_addrs; i++) {
        char * a = ret[i-1];
        ret[i] = a + strlen(a) + 1;
    }
    return ret;
}
Jonathan Jenkins's avatar
Jonathan Jenkins committed
766

767
#if 0
Jonathan Jenkins's avatar
Jonathan Jenkins committed
768
769
770
771
772
773
774
775
776
777
778
static uint64_t fnv1a_64(void *data, size_t size)
{
    uint64_t hash = 14695981039346656037ul;
    unsigned char *d = data;

    for (size_t i = 0; i < size; i++) {
        hash ^= (uint64_t)*d++;
        hash *= 1099511628211;
    }
    return hash;
}
779
780
#endif