ssg.c 13.6 KB
Newer Older
1 2 3 4 5 6
/*
 * Copyright (c) 2016 UChicago Argonne, LLC
 *
 * See COPYRIGHT in top-level directory.
 */

7
#include "ssg-config.h"
Shane Snyder's avatar
Shane Snyder committed
8

Jonathan Jenkins's avatar
Jonathan Jenkins committed
9 10 11 12
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
13
#include <errno.h>
Jonathan Jenkins's avatar
Jonathan Jenkins committed
14 15
#include <stdlib.h>
#include <string.h>
Jonathan Jenkins's avatar
Jonathan Jenkins committed
16
#include <assert.h>
Jonathan Jenkins's avatar
Jonathan Jenkins committed
17

Shane Snyder's avatar
Shane Snyder committed
18
#include <mercury.h>
19
#include <abt.h>
Shane Snyder's avatar
Shane Snyder committed
20
#include <margo.h>
21

22
#include "ssg.h"
23
#include "ssg-internal.h"
24 25 26 27
#if USE_SWIM_FD
#include "swim-fd/swim-fd.h"
#endif

28 29 30 31 32
/* SSG helper routine prototypes */
static hg_return_t ssg_group_lookup(
    ssg_group_t * g, const char * const addr_strs[]);
static const char ** ssg_setup_addr_str_list(
    char * buf, int num_addrs);
33

34
/* XXX: is this right? can this be a global? */
35
margo_instance_id ssg_mid = MARGO_INSTANCE_NULL;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
36

37 38 39
/* XXX: fix this */
ssg_group_t *the_group = NULL;

40 41 42
/***************************************************
 *** SSG runtime intialization/shutdown routines ***
 ***************************************************/
43

44 45
int ssg_init(
    margo_instance_id mid)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
46
{
47
    ssg_mid = mid;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
48

49 50
    return SSG_SUCCESS;
}
Jonathan Jenkins's avatar
Jonathan Jenkins committed
51

52 53 54 55
void ssg_finalize()
{
    return;
}
Jonathan Jenkins's avatar
Jonathan Jenkins committed
56

57 58 59
/*************************************
 *** SSG group management routines ***
 *************************************/
Jonathan Jenkins's avatar
Jonathan Jenkins committed
60

61 62 63 64 65
ssg_group_id_t ssg_group_create(
    const char * group_name,
    const char * const group_addr_strs[],
    int group_size)
{
66
    hg_class_t *hgcl = NULL;
Shane Snyder's avatar
Shane Snyder committed
67
    hg_addr_t self_addr = HG_ADDR_NULL;
68
    char *self_addr_str = NULL;
Shane Snyder's avatar
Shane Snyder committed
69
    hg_size_t self_addr_size = 0;
70 71 72
    const char *self_addr_substr = NULL;
    const char *addr_substr = NULL;
    int i;
73
    hg_return_t hret;
74 75
    ssg_group_t *g = NULL;
    ssg_group_id_t g_id = SSG_GROUP_ID_NULL;
76

77 78
    hgcl = margo_get_class(ssg_mid);
    if (!hgcl) goto fini;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
79

80
    /* get my address */
81 82 83 84 85 86 87 88 89
    hret = HG_Addr_self(hgcl, &self_addr);
    if (hret != HG_SUCCESS) goto fini;
    hret = HG_Addr_to_string(hgcl, NULL, &self_addr_size, self_addr);
    if (hret != HG_SUCCESS) goto fini;
    self_addr_str = malloc(self_addr_size);
    if (self_addr_str == NULL) goto fini;
    hret = HG_Addr_to_string(hgcl, self_addr_str, &self_addr_size, self_addr);
    if (hret != HG_SUCCESS) goto fini;

90 91 92 93
    /* strstr is used here b/c there may be inconsistencies in whether the class
     * is included in the address or not (it should not be in HG_Addr_to_string,
     * but it's possible that it is in the list of group address strings)
     */
94
    self_addr_substr = strstr(self_addr_str, "://");
Shane Snyder's avatar
Shane Snyder committed
95 96 97 98
    if (self_addr_substr == NULL)
        self_addr_substr = self_addr_str;
    else
        self_addr_substr += 3;
Shane Snyder's avatar
Shane Snyder committed
99

100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
    /* allocate an SSG group data structure and initialize some of it */
    g = malloc(sizeof(*g));
    if (!g) goto fini;
    memset(g, 0, sizeof(*g));
    g->name = strdup(group_name);
    if (!g->name) goto fini;
    g->self_rank = -1;
    g->view.group_size = group_size;
    g->view.member_states = malloc(group_size * sizeof(*g->view.member_states));
    if (!g->view.member_states) goto fini;
    memset(g->view.member_states, 0, group_size * sizeof(*g->view.member_states));

    /* resolve my rank within the group */
    for (i = 0; i < group_size; i++)
    {
        addr_substr = strstr(group_addr_strs[i], "://");
Shane Snyder's avatar
Shane Snyder committed
116 117 118 119
        if (addr_substr == NULL)
            addr_substr = group_addr_strs[i];
        else
            addr_substr += 3;
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
        if (strcmp(self_addr_substr, addr_substr) == 0)
        {
            /* this is my address -- my rank is the offset in the address array */
            g->self_rank = i;
            g->view.member_states[i].addr = self_addr;
        }
        else
        {
            /* initialize group member addresses to NULL before looking them up */
            g->view.member_states[i].addr = HG_ADDR_NULL;
        }
        g->view.member_states[i].is_member = 1;
    }
    /* if unable to resolve my rank within the group, error out */
    if(g->self_rank == -1)
    {
        fprintf(stderr, "Error: SSG unable to resolve rank in group %s\n",
            group_name);
        goto fini;
    }

    /* lookup hg addresses information for all group members */
    hret = ssg_group_lookup(g, group_addr_strs);
    if (hret != HG_SUCCESS)
    {
        fprintf(stderr, "Error: SSG unable to complete lookup for group %s\n",
            group_name);
        goto fini;
    }
Shane Snyder's avatar
Shane Snyder committed
149
    SSG_DEBUG(g, "group lookup successful (size=%d)\n", group_size);
150 151

#if USE_SWIM_FD
152 153 154 155 156 157
    int swim_active = 1;
#ifdef SWIM_FORCE_FAIL
    if (g->self_rank == 1)
        swim_active = 0;
#endif

158
    /* initialize swim failure detector */
159 160
    // TODO: we should probably barrier or sync somehow to avoid rpc failures
    // due to timing skew of different ranks initializing swim
161
    g->fd_ctx = (void *)swim_init(g, swim_active);
162
    if (g->fd_ctx == NULL) goto fini;
163 164 165
#endif

    /* TODO: last step => add reference to this group to SSG runtime state */
166
    the_group = g;
167 168

    /* don't free these pointers on success */
169
    self_addr = HG_ADDR_NULL;
170 171
    g = NULL;
fini:
Shane Snyder's avatar
Shane Snyder committed
172 173
    if (hgcl && self_addr != HG_ADDR_NULL) HG_Addr_free(hgcl, self_addr);
    free(self_addr_str);
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
    if (g)
    {
        free(g->name);
        free(g->view.member_states);
        free(g);
    }

    return g_id;
}

ssg_group_id_t ssg_group_create_config(
    const char * group_name,
    const char * file_name)
{
    int fd;
    struct stat st;
    char *rd_buf = NULL;
    ssize_t rd_buf_sz;
    char *tok;
    void *addr_buf = NULL;
    int addr_buf_len = 0, num_addrs = 0;
    int ret;
    const char **addr_strs = NULL;
    ssg_group_id_t g_id = SSG_GROUP_ID_NULL;

    /* open config file for reading */
    fd = open(file_name, O_RDONLY);
    if (fd == -1)
    {
        fprintf(stderr, "Error: SSG unable to open config file %s for group %s\n",
            file_name, group_name);
        goto fini;
    }

    /* get file size and allocate a buffer to store it */
    ret = fstat(fd, &st);
    if (ret == -1)
    {
        fprintf(stderr, "Error: SSG unable to stat config file %s for group %s\n",
            file_name, group_name);
        goto fini;
    }
    rd_buf = malloc(st.st_size+1);
    if (rd_buf == NULL) goto fini;

    /* load it all in one fell swoop */
    rd_buf_sz = read(fd, rd_buf, st.st_size);
    if (rd_buf_sz != st.st_size)
    {
        fprintf(stderr, "Error: SSG unable to read config file %s for group %s\n",
            file_name, group_name);
        goto fini;
    }
    rd_buf[rd_buf_sz]='\0';

    /* strtok the result - each space-delimited address is assumed to be
     * a unique mercury address
     */
    tok = strtok(rd_buf, "\r\n\t ");
Jonathan Jenkins's avatar
Jonathan Jenkins committed
233 234
    if (tok == NULL) goto fini;

235 236
    /* build up the address buffer */
    addr_buf = malloc(rd_buf_sz);
237
    if (addr_buf == NULL) goto fini;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
238 239
    do {
        int tok_sz = strlen(tok);
240 241
        memcpy((char*)addr_buf + addr_buf_len, tok, tok_sz+1);
        addr_buf_len += tok_sz+1;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
242 243 244
        num_addrs++;
        tok = strtok(NULL, "\r\n\t ");
    } while (tok != NULL);
245 246 247 248 249 250 251
    if (addr_buf_len != rd_buf_sz)
    {
        /* adjust buffer size if our initial guess was wrong */
        void *tmp = realloc(addr_buf, addr_buf_len);
        if (tmp == NULL) goto fini;
        addr_buf = tmp;
    }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
252

253 254 255
    /* set up address string array for group members */
    addr_strs = ssg_setup_addr_str_list(addr_buf, num_addrs);
    if (!addr_strs) goto fini;
256

257 258
    /* invoke the generic group create routine using our list of addrs */
    g_id = ssg_group_create(group_name, addr_strs, num_addrs);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
259 260

fini:
261
    /* cleanup before returning */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
262
    if (fd != -1) close(fd);
263
    free(rd_buf);
264
    free(addr_buf);
265 266 267
    free(addr_strs);

    return g_id;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
268 269 270
}

#ifdef HAVE_MPI
271 272 273
ssg_group_id_t ssg_group_create_mpi(
    const char * group_name,
    MPI_Comm comm)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
274
{
275
    hg_class_t *hgcl = NULL;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
276
    hg_addr_t self_addr = HG_ADDR_NULL;
277
    char *self_addr_str = NULL;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
278
    hg_size_t self_addr_size = 0;
279 280 281 282 283 284
    int self_addr_size_int = 0; /* for mpi-friendly conversion */
    char *addr_buf = NULL;
    int *sizes = NULL;
    int *sizes_psum = NULL;
    int comm_size = 0, comm_rank = 0;
    const char **addr_strs = NULL;
285
    hg_return_t hret;
286
    ssg_group_id_t g_id = SSG_GROUP_ID_NULL;
287

288 289
    hgcl = margo_get_class(ssg_mid);
    if (!hgcl) goto fini;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
290

291
    /* get my address */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
292 293 294
    hret = HG_Addr_self(hgcl, &self_addr);
    if (hret != HG_SUCCESS) goto fini;
    hret = HG_Addr_to_string(hgcl, NULL, &self_addr_size, self_addr);
Shane Snyder's avatar
Shane Snyder committed
295
    if (hret != HG_SUCCESS) goto fini;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
296 297
    self_addr_str = malloc(self_addr_size);
    if (self_addr_str == NULL) goto fini;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
298 299
    hret = HG_Addr_to_string(hgcl, self_addr_str, &self_addr_size, self_addr);
    if (hret != HG_SUCCESS) goto fini;
300
    self_addr_size_int = (int)self_addr_size; /* null char included in call */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
301

302
    /* gather the buffer sizes */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
303 304 305 306 307 308 309
    MPI_Comm_size(comm, &comm_size);
    MPI_Comm_rank(comm, &comm_rank);
    sizes = malloc(comm_size * sizeof(*sizes));
    if (sizes == NULL) goto fini;
    sizes[comm_rank] = self_addr_size_int;
    MPI_Allgather(MPI_IN_PLACE, 0, MPI_BYTE, sizes, 1, MPI_INT, comm);

310 311 312
    /* compute a exclusive prefix sum of the data sizes, including the
     * total at the end
     */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
313 314 315 316 317 318
    sizes_psum = malloc((comm_size+1) * sizeof(*sizes_psum));
    if (sizes_psum == NULL) goto fini;
    sizes_psum[0] = 0;
    for (int i = 1; i < comm_size+1; i++)
        sizes_psum[i] = sizes_psum[i-1] + sizes[i-1];

319
    /* allgather the addresses */
320 321
    addr_buf = malloc(sizes_psum[comm_size]);
    if (addr_buf == NULL) goto fini;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
322
    MPI_Allgatherv(self_addr_str, self_addr_size_int, MPI_BYTE,
323
            addr_buf, sizes, sizes_psum, MPI_BYTE, comm);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
324

325 326 327 328 329 330
    /* set up address string array for group members */
    addr_strs = ssg_setup_addr_str_list(addr_buf, comm_size);
    if (!addr_strs) goto fini;

    /* invoke the generic group create routine using our list of addrs */
    g_id = ssg_group_create(group_name, addr_strs, comm_size);
Shane Snyder's avatar
Shane Snyder committed
331 332

fini:
333
    /* cleanup before returning */
Shane Snyder's avatar
Shane Snyder committed
334 335
    free(sizes);
    free(sizes_psum);
336
    free(addr_buf);
337 338
    if (hgcl && self_addr != HG_ADDR_NULL) HG_Addr_free(hgcl, self_addr);
    free(self_addr_str);
339 340 341
    free(addr_strs);

    return g_id;
Shane Snyder's avatar
Shane Snyder committed
342 343 344
}
#endif

345 346
int ssg_group_destroy(
    ssg_group_id_t group_id)
Shane Snyder's avatar
Shane Snyder committed
347
{
348 349 350 351
    int i;
    ssg_group_t *g = the_group;
    assert(g);

352
#if USE_SWIM_FD
353 354
    swim_context_t *swim_ctx = (swim_context_t *)g->fd_ctx;
    assert(swim_ctx);
355 356
    if(swim_ctx)
        swim_finalize(swim_ctx);
357
#endif
358 359 360 361

    for (i = 0; i < g->view.group_size; i++) {
        if (g->view.member_states[i].addr != HG_ADDR_NULL)
            HG_Addr_free(margo_get_class(ssg_mid), g->view.member_states[i].addr);
362
    }
363 364 365
    free(g->view.member_states);
    free(g);

366
    return SSG_SUCCESS;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
367 368
}

Shane Snyder's avatar
Shane Snyder committed
369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390
#if 0
/*** SSG group membership view access routines */

int ssg_get_group_rank(const ssg_t s)
{
    return s->view.self_rank;
}

int ssg_get_group_size(const ssg_t s)
{
    return s->view.group_size;
}

hg_addr_t ssg_get_addr(const ssg_t s, int rank)
{
    if (rank >= 0 && rank < s->view.group_size)
        return s->view.member_states[rank].addr;
    else
        return HG_ADDR_NULL;
}
#endif

391 392 393 394 395
/***************************
 *** SSG helper routines ***
 ***************************/

static void ssg_lookup_ult(void * arg);
396 397
struct lookup_ult_args
{
398
    ssg_group_t *g;
399
    int rank;
400
    const char *addr_str;
401 402 403
    hg_return_t out;
};

404 405
static hg_return_t ssg_group_lookup(
    ssg_group_t * g, const char * const addr_strs[])
Jonathan Jenkins's avatar
Jonathan Jenkins committed
406
{
407 408
    ABT_thread *ults;
    struct lookup_ult_args *args;
409
    int i, r;
410
    int aret;
411
    hg_return_t hret = HG_SUCCESS;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
412

413
    if (g == NULL) return HG_INVALID_PARAM;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
414

415 416
    /* initialize ULTs */
    ults = malloc(g->view.group_size * sizeof(*ults));
417
    if (ults == NULL) return HG_NOMEM_ERROR;
418
    args = malloc(g->view.group_size * sizeof(*args));
419 420 421 422
    if (args == NULL) {
        free(ults);
        return HG_NOMEM_ERROR;
    }
423
    for (i = 0; i < g->view.group_size; i++)
424 425
        ults[i] = ABT_THREAD_NULL;

426 427 428
    for (i = 1; i < g->view.group_size; i++) {
        r = (g->self_rank + i) % g->view.group_size;
        args[r].g = g;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
429
        args[r].rank = r;
430
        args[r].addr_str = addr_strs[r];
431
#if 0
432
        aret = ABT_thread_create(*margo_get_handler_pool(ssg_mid), &ssg_lookup_ult,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
433
                &args[r], ABT_THREAD_ATTR_NULL, &ults[r]);
434 435
        if (aret != ABT_SUCCESS) {
            hret = HG_OTHER_ERROR;
436
            goto fini;
437
        }
438
#endif
Jonathan Jenkins's avatar
Jonathan Jenkins committed
439
    }
440

441 442 443
    /* wait on all */
    for (i = 1; i < g->view.group_size; i++) {
        r = (g->self_rank + i) % g->view.group_size;
444
#if 1
445
        aret = ABT_thread_create(*margo_get_handler_pool(ssg_mid), &ssg_lookup_ult,
446 447 448 449 450
                &args[r], ABT_THREAD_ATTR_NULL, &ults[r]);
        if (aret != ABT_SUCCESS) {
            hret = HG_OTHER_ERROR;
            goto fini;
        }
451
#endif
452
        aret = ABT_thread_join(ults[r]);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
453 454
        ABT_thread_free(&ults[r]);
        ults[r] = ABT_THREAD_NULL; // in case of cascading failure from join
455 456
        if (aret != ABT_SUCCESS) {
            hret = HG_OTHER_ERROR;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
457
            break;
458
        }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
459
        else if (args[r].out != HG_SUCCESS) {
Shane Snyder's avatar
Shane Snyder committed
460
            fprintf(stderr, "Error: SSG unable to lookup HG address for rank %d"
461
                "(err=%d)\n", r, args[r].out);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
462 463
            hret = args[r].out;
            break;
464 465 466
        }
    }

467
fini:
468 469 470 471 472
    /* cleanup */
    for (i = 0; i < g->view.group_size; i++) {
        if (ults[i] != ABT_THREAD_NULL) {
            ABT_thread_cancel(ults[i]);
            ABT_thread_free(ults[i]);
473 474
        }
    }
475 476
    free(ults);
    free(args);
477 478

    return hret;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
479
}
Jonathan Jenkins's avatar
Jonathan Jenkins committed
480

481 482
static void ssg_lookup_ult(
    void * arg)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
483
{
484 485
    struct lookup_ult_args *l = arg;
    ssg_group_t *g = l->g;
486

487 488 489 490
    l->out = margo_addr_lookup(ssg_mid, l->addr_str,
        &g->view.member_states[l->rank].addr);
    return;
}
491

492 493 494 495 496 497 498 499 500 501
static const char ** ssg_setup_addr_str_list(
    char * buf, int num_addrs)
{
    const char **ret = malloc(num_addrs * sizeof(*ret));
    if (ret == NULL) return NULL;

    ret[0] = buf;
    for (int i = 1; i < num_addrs; i++) {
        const char * a = ret[i-1];
        ret[i] = a + strlen(a) + 1;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
502
    }
503
    return ret;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
504
}