ssg.c 11.9 KB
Newer Older
1 2 3 4 5 6
/*
 * Copyright (c) 2016 UChicago Argonne, LLC
 *
 * See COPYRIGHT in top-level directory.
 */

Shane Snyder's avatar
Shane Snyder committed
7 8
#include <ssg-config.h>

Jonathan Jenkins's avatar
Jonathan Jenkins committed
9 10 11 12
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
13
#include <errno.h>
Jonathan Jenkins's avatar
Jonathan Jenkins committed
14 15
#include <stdlib.h>
#include <string.h>
Jonathan Jenkins's avatar
Jonathan Jenkins committed
16
#include <assert.h>
Jonathan Jenkins's avatar
Jonathan Jenkins committed
17

Shane Snyder's avatar
Shane Snyder committed
18 19
#include <mercury.h>
#include <margo.h>
20

21
#include <ssg.h>
Shane Snyder's avatar
Shane Snyder committed
22
//#include "ssg-internal.h"
Jonathan Jenkins's avatar
Jonathan Jenkins committed
23

24 25 26 27
#if USE_SWIM_FD
#include "swim-fd/swim-fd.h"
#endif

Shane Snyder's avatar
Shane Snyder committed
28
#if 0
Shane Snyder's avatar
Shane Snyder committed
29
// internal initialization of ssg data structures
30
static ssg_t ssg_init_internal(margo_instance_id mid, int self_rank,
31
    int group_size, hg_addr_t self_addr, char *addr_str_buf);
32 33

// lookup peer addresses
34
static hg_return_t ssg_lookup(ssg_t s, char **addr_strs);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
35 36
static char** setup_addr_str_list(int num_addrs, char * buf);

37

38
ssg_t ssg_init_config(margo_instance_id mid, const char * fname)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
{
    // file to read
    int fd = -1;
    struct stat st;

    // file content to parse
    char *rdbuf = NULL;
    ssize_t rdsz;

    // parse metadata (strtok)
    char *tok;

    // vars to build up the addr string list
    int addr_cap = 128;
    int addr_len = 0;
    int num_addrs = 0;
55
    void *addr_buf = NULL;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
56

Shane Snyder's avatar
Shane Snyder committed
57
    // self rank/addr resolution helpers
58
    hg_class_t *hgcl = NULL;
Shane Snyder's avatar
Shane Snyder committed
59 60 61 62 63
    hg_addr_t self_addr = HG_ADDR_NULL;
    char * self_addr_str = NULL;
    const char * self_addr_substr = NULL;
    hg_size_t self_addr_size = 0;
    const char * addr_substr = NULL;
64
    int rank = -1;
Shane Snyder's avatar
Shane Snyder committed
65

Jonathan Jenkins's avatar
Jonathan Jenkins committed
66 67
    // misc return codes
    int ret;
68 69 70 71
    hg_return_t hret;

    // return data
    ssg_t s = NULL;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86

    // open file for reading
    fd = open(fname, O_RDONLY);
    if (fd == -1) goto fini;

    // get file size
    ret = fstat(fd, &st);
    if (ret == -1) goto fini;

    // slurp file in all at once
    rdbuf = malloc(st.st_size+1);
    if (rdbuf == NULL) goto fini;

    // load it all in one fell swoop
    rdsz = read(fd, rdbuf, st.st_size);
Shane Snyder's avatar
Shane Snyder committed
87
    if (rdsz != st.st_size) goto fini;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
88 89
    rdbuf[rdsz]='\0';

90 91 92
    hgcl = margo_get_class(mid);
    if(!hgcl) goto fini;

93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
    // get my address
    hret = HG_Addr_self(hgcl, &self_addr);
    if (hret != HG_SUCCESS) goto fini;
    hret = HG_Addr_to_string(hgcl, NULL, &self_addr_size, self_addr);
    if (hret != HG_SUCCESS) goto fini;
    self_addr_str = malloc(self_addr_size);
    if (self_addr_str == NULL) goto fini;
    hret = HG_Addr_to_string(hgcl, self_addr_str, &self_addr_size, self_addr);
    if (hret != HG_SUCCESS) goto fini;

    // strstr is used here b/c there may be inconsistencies in whether the class
    // is included in the address or not (it's not in HG_Addr_to_string, it
    // should be in ssg_init_config)
    self_addr_substr = strstr(self_addr_str, "://");
    if (self_addr_substr == NULL) goto fini;
    self_addr_substr += 3;
Shane Snyder's avatar
Shane Snyder committed
109

Jonathan Jenkins's avatar
Jonathan Jenkins committed
110 111 112 113 114 115
    // strtok the result - each space-delimited address is assumed to be
    // a unique mercury address
    tok = strtok(rdbuf, "\r\n\t ");
    if (tok == NULL) goto fini;

    // build up the address buffer
116 117
    addr_buf = malloc(addr_cap);
    if (addr_buf == NULL) goto fini;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
118 119 120 121 122
    do {
        int tok_sz = strlen(tok);
        if (tok_sz + addr_len + 1 > addr_cap) {
            void * tmp;
            addr_cap *= 2;
123
            tmp = realloc(addr_buf, addr_cap);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
124
            if (tmp == NULL) goto fini;
125
            addr_buf = tmp;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
126
        }
127 128 129 130 131 132 133 134

        // check if this is my addr to resolve rank
        addr_substr = strstr(tok, "://");
        if (addr_substr == NULL) goto fini;
        addr_substr+= 3;
        if (strcmp(self_addr_substr, addr_substr) == 0)
            rank = num_addrs;

135
        memcpy((char*)addr_buf + addr_len, tok, tok_sz+1);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
136 137 138 139 140
        addr_len += tok_sz+1;
        num_addrs++;
        tok = strtok(NULL, "\r\n\t ");
    } while (tok != NULL);

141 142 143
    // if rank not resolved, fail
    if (rank == -1) goto fini;

Shane Snyder's avatar
Shane Snyder committed
144
    // init ssg internal structures
145
    s = ssg_init_internal(mid, rank, num_addrs, self_addr, addr_buf);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
146 147
    if (s == NULL) goto fini;

Shane Snyder's avatar
Shane Snyder committed
148 149
    // don't free this on success
    self_addr = HG_ADDR_NULL;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
150 151 152
fini:
    if (fd != -1) close(fd);
    free(rdbuf);
153
    free(addr_buf);
154
    if (hgcl && self_addr != HG_ADDR_NULL) HG_Addr_free(hgcl, self_addr);
Shane Snyder's avatar
Shane Snyder committed
155
    free(self_addr_str);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
156 157 158 159
    return s;
}

#ifdef HAVE_MPI
160
ssg_t ssg_init_mpi(margo_instance_id mid, MPI_Comm comm)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
161 162
{
    // my addr
163
    hg_class_t *hgcl = NULL;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
164
    hg_addr_t self_addr = HG_ADDR_NULL;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
165
    char * self_addr_str = NULL;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
166
    hg_size_t self_addr_size = 0;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
167 168 169
    int self_addr_size_int = 0; // for mpi-friendly conversion

    // collective helpers
170
    char * addr_buf = NULL;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
171 172 173 174 175
    int * sizes = NULL;
    int * sizes_psum = NULL;
    int comm_size = 0;
    int comm_rank = 0;

176 177 178
    // misc return codes
    hg_return_t hret;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
179 180 181
    // return data
    ssg_t s = NULL;

182 183
    hgcl = margo_get_class(mid);
    if(!hgcl) goto fini;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
184 185

    // get my address
Jonathan Jenkins's avatar
Jonathan Jenkins committed
186 187 188
    hret = HG_Addr_self(hgcl, &self_addr);
    if (hret != HG_SUCCESS) goto fini;
    hret = HG_Addr_to_string(hgcl, NULL, &self_addr_size, self_addr);
Shane Snyder's avatar
Shane Snyder committed
189
    if (hret != HG_SUCCESS) goto fini;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
190 191
    self_addr_str = malloc(self_addr_size);
    if (self_addr_str == NULL) goto fini;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
192 193
    hret = HG_Addr_to_string(hgcl, self_addr_str, &self_addr_size, self_addr);
    if (hret != HG_SUCCESS) goto fini;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
    self_addr_size_int = (int)self_addr_size; // null char included in call

    // gather the buffer sizes
    MPI_Comm_size(comm, &comm_size);
    MPI_Comm_rank(comm, &comm_rank);
    sizes = malloc(comm_size * sizeof(*sizes));
    if (sizes == NULL) goto fini;
    sizes[comm_rank] = self_addr_size_int;
    MPI_Allgather(MPI_IN_PLACE, 0, MPI_BYTE, sizes, 1, MPI_INT, comm);

    // compute a exclusive prefix sum of the data sizes,
    // including the total at the end
    sizes_psum = malloc((comm_size+1) * sizeof(*sizes_psum));
    if (sizes_psum == NULL) goto fini;
    sizes_psum[0] = 0;
    for (int i = 1; i < comm_size+1; i++)
        sizes_psum[i] = sizes_psum[i-1] + sizes[i-1];

    // allgather the addresses
213 214
    addr_buf = malloc(sizes_psum[comm_size]);
    if (addr_buf == NULL) goto fini;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
215
    MPI_Allgatherv(self_addr_str, self_addr_size_int, MPI_BYTE,
216
            addr_buf, sizes, sizes_psum, MPI_BYTE, comm);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
217

Shane Snyder's avatar
Shane Snyder committed
218
    // init ssg internal structures
219
    s = ssg_init_internal(mid, comm_rank, comm_size, self_addr, addr_buf);
Shane Snyder's avatar
Shane Snyder committed
220 221 222
    if (s == NULL) goto fini;

    // don't free these on success
223
    self_addr = HG_ADDR_NULL;
Shane Snyder's avatar
Shane Snyder committed
224 225 226
fini:
    free(sizes);
    free(sizes_psum);
227
    free(addr_buf);
228 229
    if (hgcl && self_addr != HG_ADDR_NULL) HG_Addr_free(hgcl, self_addr);
    free(self_addr_str);
Shane Snyder's avatar
Shane Snyder committed
230 231 232 233
    return s;
}
#endif

234
static ssg_t ssg_init_internal(margo_instance_id mid, int self_rank,
235
    int group_size, hg_addr_t self_addr, char *addr_str_buf)
Shane Snyder's avatar
Shane Snyder committed
236
{
237
    // arrays of peer address strings
Shane Snyder's avatar
Shane Snyder committed
238 239
    char **addr_strs = NULL;

240 241 242
    // misc return codes
    hg_return_t hret;

Shane Snyder's avatar
Shane Snyder committed
243 244 245
    // return data
    ssg_t s = NULL;

246 247
    if (self_rank < 0 || self_rank >= group_size || self_addr == HG_ADDR_NULL)
        goto fini;
Shane Snyder's avatar
Shane Snyder committed
248 249

    // set peer address strings
250
    addr_strs = setup_addr_str_list(group_size, addr_str_buf);
251
    if (addr_strs == NULL) goto fini;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
252 253 254 255

    // set up the output
    s = malloc(sizeof(*s));
    if (s == NULL) goto fini;
256
    memset(s, 0, sizeof(*s));
257
    s->mid = mid;
258 259 260 261

    // initialize the group "view"
    s->view.self_rank = self_rank;
    s->view.group_size = group_size;
262 263
    s->view.member_states = malloc(group_size * sizeof(*(s->view.member_states)));
    if (s->view.member_states == NULL)
264
    {
265 266 267 268 269 270 271 272
        free(s);
        s = NULL;
        goto fini;
    }
    memset(s->view.member_states, 0, group_size * sizeof(*(s->view.member_states)));
    for (int i = 1; i < group_size; i++)
    {
        int r = (self_rank + i) % group_size;
273
        // NOTE: remote addrs are set in ssg_lookup
274
        s->view.member_states[r].addr = HG_ADDR_NULL;
275
        s->view.member_states[r].is_member = 1;
276 277
    }
    // set view info for self
278
    s->view.member_states[self_rank].addr = self_addr;
279
    s->view.member_states[self_rank].is_member = 1;
280

Shane Snyder's avatar
Shane Snyder committed
281
#ifdef DEBUG
282 283
    // TODO: log file debug option, instead of just stdout
    s->dbg_strm = stdout;
Shane Snyder's avatar
Shane Snyder committed
284 285
#endif

286
    // lookup hg addr information for all group members
287
    hret = ssg_lookup(s, addr_strs);
288 289
    if (hret != HG_SUCCESS)
    {
290 291
        ssg_finalize(s);
        s = NULL;
292 293
        goto fini;
    }
Shane Snyder's avatar
Shane Snyder committed
294
    SSG_DEBUG(s, "group lookup successful (size=%d)\n", group_size);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
295

296
#if USE_SWIM_FD
297
    // initialize swim failure detector
298 299
    // TODO: we should probably barrier or sync somehow to avoid rpc failures
    // due to timing skew of different ranks initializing swim
300
    s->swim_ctx = swim_init(s, 1);
301
    if (s->swim_ctx == NULL)
302
    {
303 304
        ssg_finalize(s);
        s = NULL;
305 306 307
    }
#endif
    
Jonathan Jenkins's avatar
Jonathan Jenkins committed
308 309 310 311 312
fini:
    free(addr_strs);
    return s;
}

313 314 315 316
struct lookup_ult_args
{
    ssg_t ssg;
    int rank;
317
    char *addr_str;
318 319 320 321 322 323
    hg_return_t out;
};

static void lookup_ult(void *arg)
{
    struct lookup_ult_args *l = arg;
324
    ssg_t s = l->ssg;
325

326 327 328
    l->out = margo_addr_lookup(s->mid, l->addr_str,
        &s->view.member_states[l->rank].addr);
    if(l->out != HG_SUCCESS)
Shane Snyder's avatar
Shane Snyder committed
329
        SSG_DEBUG(s, "look up on member %d failed [%d]\n", l->rank, l->out);
330 331
}

332
static hg_return_t ssg_lookup(ssg_t s, char **addr_strs)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
333
{
334 335
    ABT_thread *ults;
    struct lookup_ult_args *args;
336
    int aret;
337
    hg_return_t hret = HG_SUCCESS;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
338

339
    if (s == SSG_NULL) return HG_INVALID_PARAM;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
340

341
    // initialize ULTs
342
    ults = malloc(s->view.group_size * sizeof(*ults));
343
    if (ults == NULL) return HG_NOMEM_ERROR;
344
    args = malloc(s->view.group_size * sizeof(*args));
345 346 347 348
    if (args == NULL) {
        free(ults);
        return HG_NOMEM_ERROR;
    }
349
    for (int i = 0; i < s->view.group_size; i++)
350 351
        ults[i] = ABT_THREAD_NULL;

352 353
    for (int i = 1; i < s->view.group_size; i++) {
        int r = (s->view.self_rank + i) % s->view.group_size;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
354 355
        args[r].ssg = s;
        args[r].rank = r;
356
        args[r].addr_str = addr_strs[r];
357
#if 0
358
        aret = ABT_thread_create(*margo_get_handler_pool(s->mid), &lookup_ult,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
359
                &args[r], ABT_THREAD_ATTR_NULL, &ults[r]);
360 361
        if (aret != ABT_SUCCESS) {
            hret = HG_OTHER_ERROR;
362
            goto fini;
363
        }
364
#endif
Jonathan Jenkins's avatar
Jonathan Jenkins committed
365
    }
366

Jonathan Jenkins's avatar
Jonathan Jenkins committed
367
    // wait on all
368 369
    for (int i = 1; i < s->view.group_size; i++) {
        int r = (s->view.self_rank + i) % s->view.group_size;
370
#if 1
371
        aret = ABT_thread_create(*margo_get_handler_pool(s->mid), &lookup_ult,
372 373 374 375 376
                &args[r], ABT_THREAD_ATTR_NULL, &ults[r]);
        if (aret != ABT_SUCCESS) {
            hret = HG_OTHER_ERROR;
            goto fini;
        }
377
#endif
378
        aret = ABT_thread_join(ults[r]);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
379 380
        ABT_thread_free(&ults[r]);
        ults[r] = ABT_THREAD_NULL; // in case of cascading failure from join
381 382
        if (aret != ABT_SUCCESS) {
            hret = HG_OTHER_ERROR;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
383
            break;
384
        }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
385 386 387
        else if (args[r].out != HG_SUCCESS) {
            hret = args[r].out;
            break;
388 389 390
        }
    }

391
fini:
392 393
    // cleanup
    if (ults != NULL) {
394
        for (int i = 0; i < s->view.group_size; i++) {
395 396 397 398 399 400 401 402 403 404
            if (ults[i] != ABT_THREAD_NULL) {
                ABT_thread_cancel(ults[i]);
                ABT_thread_free(ults[i]);
            }
        }
        free(ults);
    }
    if (args != NULL) free(args);

    return hret;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
405
}
Jonathan Jenkins's avatar
Jonathan Jenkins committed
406

Jonathan Jenkins's avatar
Jonathan Jenkins committed
407 408
void ssg_finalize(ssg_t s)
{
409 410
    if (s == SSG_NULL) return;

411
#if USE_SWIM_FD
412 413 414 415
    if(s->swim_ctx)
        swim_finalize(s->swim_ctx);
#endif

416 417 418
    for (int i = 0; i < s->view.group_size; i++) {
        if (s->view.member_states[i].addr != HG_ADDR_NULL)
            HG_Addr_free(margo_get_class(s->mid), s->view.member_states[i].addr);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
419
    }
420
    free(s->view.member_states);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
421 422 423
    free(s);
}

424
int ssg_get_group_rank(const ssg_t s)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
425
{
426
    return s->view.self_rank;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
427 428
}

429
int ssg_get_group_size(const ssg_t s)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
430
{
431
    return s->view.group_size;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
432 433
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
434
hg_addr_t ssg_get_addr(const ssg_t s, int rank)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
435
{
436 437
    if (rank >= 0 && rank < s->view.group_size)
        return s->view.member_states[rank].addr;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
438
    else
Jonathan Jenkins's avatar
Jonathan Jenkins committed
439
        return HG_ADDR_NULL;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
440 441
}

442
/* -------- */
443

Jonathan Jenkins's avatar
Jonathan Jenkins committed
444 445 446 447 448 449 450 451 452 453 454 455
static char** setup_addr_str_list(int num_addrs, char * buf)
{
    char ** ret = malloc(num_addrs * sizeof(*ret));
    if (ret == NULL) return NULL;

    ret[0] = buf;
    for (int i = 1; i < num_addrs; i++) {
        char * a = ret[i-1];
        ret[i] = a + strlen(a) + 1;
    }
    return ret;
}
Shane Snyder's avatar
Shane Snyder committed
456
#endif