ssg-rpc.c 21.4 KB
Newer Older
1 2 3 4 5 6 7 8 9
/*
 * Copyright (c) 2016 UChicago Argonne, LLC
 *
 * See COPYRIGHT in top-level directory.
 */

#include "ssg-config.h"

#include <stdlib.h>
10
#include <assert.h>
11 12 13 14 15 16 17 18

#include <mercury.h>
#include <abt.h>
#include <margo.h>

#include "ssg.h"
#include "ssg-internal.h"

19 20 21 22
#define SSG_VIEW_BUF_DEF_SIZE (128 * 1024)

/* SSG RPC types and (de)serialization routines */

Shane Snyder's avatar
Shane Snyder committed
23 24
/* TODO join and attach are nearly identical -- refactor */

25 26 27 28 29 30
/* NOTE: keep in sync with ssg_group_descriptor_t definition in ssg-internal.h */
MERCURY_GEN_STRUCT_PROC(ssg_group_descriptor_t, \
    ((uint64_t)     (magic_nr)) \
    ((uint64_t)     (name_hash)) \
    ((hg_string_t)  (addr_str)));

Shane Snyder's avatar
Shane Snyder committed
31 32
MERCURY_GEN_PROC(ssg_group_join_request_t, \
    ((ssg_group_descriptor_t)   (group_descriptor))
33
    ((hg_string_t)              (addr_str))
Shane Snyder's avatar
Shane Snyder committed
34 35 36 37
    ((hg_bulk_t)                (bulk_handle)));
MERCURY_GEN_PROC(ssg_group_join_response_t, \
    ((hg_string_t)  (group_name)) \
    ((uint32_t)     (group_size)) \
38 39
    ((hg_size_t)    (view_buf_size))
    ((uint8_t)  (ret)));
Shane Snyder's avatar
Shane Snyder committed
40 41

MERCURY_GEN_PROC(ssg_group_leave_request_t, \
42 43 44 45
    ((ssg_group_descriptor_t)   (group_descriptor))
    ((ssg_member_id_t)          (member_id)));
MERCURY_GEN_PROC(ssg_group_leave_response_t, \
    ((uint8_t)  (ret)));
Shane Snyder's avatar
Shane Snyder committed
46

47 48 49 50 51 52 53 54 55 56
MERCURY_GEN_PROC(ssg_group_attach_request_t, \
    ((ssg_group_descriptor_t)   (group_descriptor))
    ((hg_bulk_t)                (bulk_handle)));

MERCURY_GEN_PROC(ssg_group_attach_response_t, \
    ((hg_string_t)  (group_name)) \
    ((uint32_t)     (group_size)) \
    ((hg_size_t)    (view_buf_size)));

/* SSG RPC handler prototypes */
Shane Snyder's avatar
Shane Snyder committed
57 58
DECLARE_MARGO_RPC_HANDLER(ssg_group_join_recv_ult)
DECLARE_MARGO_RPC_HANDLER(ssg_group_leave_recv_ult)
59 60
DECLARE_MARGO_RPC_HANDLER(ssg_group_attach_recv_ult)

61
/* internal helper routine prototypes */
62 63
static int ssg_group_serialize(
    ssg_group_t *g, void **buf, hg_size_t *buf_size);
Shane Snyder's avatar
Shane Snyder committed
64

Shane Snyder's avatar
Shane Snyder committed
65
/* SSG RPC IDs */
Shane Snyder's avatar
Shane Snyder committed
66 67
static hg_id_t ssg_group_join_rpc_id;
static hg_id_t ssg_group_leave_rpc_id;
68 69 70 71 72 73 74 75
static hg_id_t ssg_group_attach_rpc_id;

/* ssg_register_rpcs
 *
 *
 */
void ssg_register_rpcs()
{
76
    /* register RPCs for SSG */
Shane Snyder's avatar
Shane Snyder committed
77 78 79 80 81 82
    ssg_group_join_rpc_id =
        MARGO_REGISTER(ssg_inst->mid, "ssg_group_join",
        ssg_group_join_request_t, ssg_group_join_response_t,
        ssg_group_join_recv_ult);
    ssg_group_leave_rpc_id =
        MARGO_REGISTER(ssg_inst->mid, "ssg_group_leave",
83
        ssg_group_leave_request_t, ssg_group_leave_response_t,
Shane Snyder's avatar
Shane Snyder committed
84
        ssg_group_leave_recv_ult);
85 86
    ssg_group_attach_rpc_id =
		MARGO_REGISTER(ssg_inst->mid, "ssg_group_attach",
87
        ssg_group_attach_request_t, ssg_group_attach_response_t,
88
        ssg_group_attach_recv_ult);
89 90 91 92

    return;
}

Shane Snyder's avatar
Shane Snyder committed
93 94 95 96 97 98
/* ssg_group_join_send
 *
 *
 */
int ssg_group_join_send(
    ssg_group_descriptor_t * group_descriptor,
99
    hg_addr_t group_target_addr,
Shane Snyder's avatar
Shane Snyder committed
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
    char ** group_name,
    int * group_size,
    void ** view_buf)
{
    hg_handle_t handle = HG_HANDLE_NULL;
    hg_bulk_t bulk_handle = HG_BULK_NULL;
    void *tmp_view_buf = NULL, *b;
    hg_size_t tmp_view_buf_size = SSG_VIEW_BUF_DEF_SIZE;
    ssg_group_join_request_t join_req;
    ssg_group_join_response_t join_resp;
    hg_return_t hret;
    int sret = SSG_FAILURE;

    *group_name = NULL;
    *group_size = 0;
    *view_buf = NULL;

117
    hret = margo_create(ssg_inst->mid, group_target_addr,
Shane Snyder's avatar
Shane Snyder committed
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
        ssg_group_join_rpc_id, &handle);
    if (hret != HG_SUCCESS) goto fini;

    /* allocate a buffer to try to store the group view in */
    /* NOTE: We don't know if this buffer is big enough to store the complete
     * view. If the buffer is not large enough, the group member we are
     * attaching too will send a NACK indicating the necessary buffer size
     */
    tmp_view_buf = malloc(tmp_view_buf_size);
    if (!tmp_view_buf) goto fini;

    hret = margo_bulk_create(ssg_inst->mid, 1, &tmp_view_buf, &tmp_view_buf_size,
        HG_BULK_WRITE_ONLY, &bulk_handle);
    if (hret != HG_SUCCESS) goto fini;

    /* send a join request to the given group member address */
134
    /* XXX is the whole descriptor really needed? */
Shane Snyder's avatar
Shane Snyder committed
135
    memcpy(&join_req.group_descriptor, group_descriptor, sizeof(*group_descriptor));
136
    join_req.addr_str = ssg_inst->self_addr_str;
Shane Snyder's avatar
Shane Snyder committed
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
    join_req.bulk_handle = bulk_handle;
    hret = margo_forward(handle, &join_req);
    if (hret != HG_SUCCESS) goto fini;

    hret = margo_get_output(handle, &join_resp);
    if (hret != HG_SUCCESS) goto fini;

    /* if our initial buffer is too small, reallocate to the exact size & rejoin */
    if (join_resp.view_buf_size > tmp_view_buf_size)
    {
        b = realloc(tmp_view_buf, join_resp.view_buf_size);
        if(!b)
        {
            margo_free_output(handle, &join_resp);
            goto fini;
        }
        tmp_view_buf = b;
        tmp_view_buf_size = join_resp.view_buf_size;
        margo_free_output(handle, &join_resp);

        /* free old bulk handle and recreate it */
        margo_bulk_free(bulk_handle);
        hret = margo_bulk_create(ssg_inst->mid, 1, &tmp_view_buf, &tmp_view_buf_size,
            HG_BULK_WRITE_ONLY, &bulk_handle);
        if (hret != HG_SUCCESS) goto fini;

        join_req.bulk_handle = bulk_handle;
        hret = margo_forward(handle, &join_req);
        if (hret != HG_SUCCESS) goto fini;

        hret = margo_get_output(handle, &join_resp);
        if (hret != HG_SUCCESS) goto fini;
    }

    /* readjust view buf size if initial guess was too large */
    if (join_resp.view_buf_size < tmp_view_buf_size)
    {
        b = realloc(tmp_view_buf, join_resp.view_buf_size);
        if(!b)
        {
177
            margo_free_output(handle, &join_resp);
Shane Snyder's avatar
Shane Snyder committed
178 179 180 181 182 183 184 185 186
            goto fini;
        }
        tmp_view_buf = b;
    }

    /* set output pointers according to the returned view parameters */
    *group_name = strdup(join_resp.group_name);
    *group_size = (int)join_resp.group_size;
    *view_buf = tmp_view_buf;
187
    sret = join_resp.ret;
Shane Snyder's avatar
Shane Snyder committed
188
    margo_free_output(handle, &join_resp);
189 190 191

    if (sret == SSG_SUCCESS)
        tmp_view_buf = NULL; /* don't free on success */
Shane Snyder's avatar
Shane Snyder committed
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
fini:
    if (handle != HG_HANDLE_NULL) margo_destroy(handle);
    if (bulk_handle != HG_BULK_NULL) margo_bulk_free(bulk_handle);
    free(tmp_view_buf);

    return sret;
}

static void ssg_group_join_recv_ult(
    hg_handle_t handle)
{
    const struct hg_info *hgi = NULL;
    ssg_group_t *g = NULL;
    ssg_group_join_request_t join_req;
    ssg_group_join_response_t join_resp;
    hg_size_t view_size_requested;
    void *view_buf = NULL;
    hg_size_t view_buf_size;
    hg_bulk_t bulk_handle = HG_BULK_NULL;
Shane Snyder's avatar
Shane Snyder committed
211
    ssg_member_update_t join_update;
Shane Snyder's avatar
Shane Snyder committed
212 213 214
    int sret;
    hg_return_t hret;

215 216 217
    join_resp.ret = SSG_FAILURE;

    if (!ssg_inst) goto fini;
Shane Snyder's avatar
Shane Snyder committed
218 219

    hgi = margo_get_info(handle);
220
    if (!hgi) goto fini;
Shane Snyder's avatar
Shane Snyder committed
221 222

    hret = margo_get_input(handle, &join_req);
223
    if (hret != HG_SUCCESS) goto fini;
Shane Snyder's avatar
Shane Snyder committed
224 225 226 227 228
    view_size_requested = margo_bulk_get_size(join_req.bulk_handle);

    /* look for the given group in my local table of groups */
    HASH_FIND(hh, ssg_inst->group_table, &join_req.group_descriptor.name_hash,
        sizeof(uint64_t), g);
229 230 231 232 233
    if (!g)
    {
        margo_free_input(handle, &join_req);
        goto fini;
    }
Shane Snyder's avatar
Shane Snyder committed
234

235
    sret = ssg_group_serialize(g, &view_buf, &view_buf_size);
236 237 238 239 240
    if (sret != SSG_SUCCESS)
    {
        margo_free_input(handle, &join_req);
        goto fini;
    }
Shane Snyder's avatar
Shane Snyder committed
241 242 243 244 245 246

    if (view_size_requested >= view_buf_size)
    {
        /* if attacher's buf is large enough, transfer the view */
        hret = margo_bulk_create(ssg_inst->mid, 1, &view_buf, &view_buf_size,
            HG_BULK_READ_ONLY, &bulk_handle);
247 248 249 250 251
        if (hret != HG_SUCCESS)
        {
            margo_free_input(handle, &join_req);
            goto fini;
        }
Shane Snyder's avatar
Shane Snyder committed
252 253 254

        hret = margo_bulk_transfer(ssg_inst->mid, HG_BULK_PUSH, hgi->addr,
            join_req.bulk_handle, 0, bulk_handle, 0, view_buf_size);
255 256 257 258 259
        if (hret != HG_SUCCESS)
        {
            margo_free_input(handle, &join_req);
            goto fini;
        }
260 261

        /* apply group join locally */
Shane Snyder's avatar
Shane Snyder committed
262 263 264
        join_update.type = SSG_MEMBER_JOINED;
        join_update.u.member_addr_str = join_req.addr_str;
        ssg_apply_member_updates(g, &join_update, 1);
265
    }
266
    margo_free_input(handle, &join_req);
Shane Snyder's avatar
Shane Snyder committed
267 268 269 270 271

    /* set the response and send back */
    join_resp.group_name = g->name;
    join_resp.group_size = (int)g->view.size;
    join_resp.view_buf_size = view_buf_size;
272 273 274
    join_resp.ret = SSG_SUCCESS;
fini:
    /* respond */
Shane Snyder's avatar
Shane Snyder committed
275 276
    margo_respond(handle, &join_resp);

277
    /* cleanup */
Shane Snyder's avatar
Shane Snyder committed
278
    if (bulk_handle != HG_BULK_NULL) margo_bulk_free(bulk_handle);
279 280
    margo_destroy(handle);
    free(view_buf);
Shane Snyder's avatar
Shane Snyder committed
281 282 283 284 285 286 287 288 289 290 291

    return;
}
DEFINE_MARGO_RPC_HANDLER(ssg_group_join_recv_ult)

/* ssg_group_leave_send
 *
 *
 */
int ssg_group_leave_send(
    ssg_group_descriptor_t * group_descriptor,
292 293
    ssg_member_id_t self_id,
    hg_addr_t group_target_addr)
Shane Snyder's avatar
Shane Snyder committed
294 295
{
    hg_handle_t handle = HG_HANDLE_NULL;
296 297 298 299
    ssg_group_leave_request_t leave_req;
    ssg_group_leave_response_t leave_resp;
    hg_return_t hret;
    int sret = SSG_FAILURE;
Shane Snyder's avatar
Shane Snyder committed
300

301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
    hret = margo_create(ssg_inst->mid, group_target_addr,
        ssg_group_leave_rpc_id, &handle);
    if (hret != HG_SUCCESS) goto fini;

    /* send a leave request to the given group member */
    /* XXX is the whole descriptor really needed? */
    memcpy(&leave_req.group_descriptor, group_descriptor, sizeof(*group_descriptor));
    leave_req.member_id = self_id;
    hret = margo_forward(handle, &leave_req);
    if (hret != HG_SUCCESS) goto fini;

    hret = margo_get_output(handle, &leave_resp);
    if (hret != HG_SUCCESS) goto fini;

    sret = leave_resp.ret;
    margo_free_output(handle, &leave_resp);
fini:
    if (handle != HG_HANDLE_NULL) margo_destroy(handle);

    return sret;
}

static void ssg_group_leave_recv_ult(
    hg_handle_t handle)
{
    const struct hg_info *hgi = NULL;
    ssg_group_t *g = NULL;
    ssg_group_leave_request_t leave_req;
    ssg_group_leave_response_t leave_resp;
Shane Snyder's avatar
Shane Snyder committed
330
    ssg_member_update_t leave_update;
331 332 333 334 335 336 337 338
    hg_return_t hret;

    leave_resp.ret = SSG_FAILURE;

    if (!ssg_inst) goto fini;

    hgi = margo_get_info(handle);
    if (!hgi) goto fini;
Shane Snyder's avatar
Shane Snyder committed
339

340 341 342 343 344 345 346 347 348 349 350 351
    hret = margo_get_input(handle, &leave_req);
    if (hret != HG_SUCCESS) goto fini;

    /* look for the given group in my local table of groups */
    HASH_FIND(hh, ssg_inst->group_table, &leave_req.group_descriptor.name_hash,
        sizeof(uint64_t), g);
    if (!g)
    {
        margo_free_input(handle, &leave_req);
        goto fini;
    }

Shane Snyder's avatar
Shane Snyder committed
352 353 354 355
    /* apply group leave locally */
    leave_update.type = SSG_MEMBER_LEFT;
    leave_update.u.member_id = leave_req.member_id;
    ssg_apply_member_updates(g, &leave_update, 1);
356

Shane Snyder's avatar
Shane Snyder committed
357
    margo_free_input(handle, &leave_req);
358 359 360 361 362 363 364 365 366
    leave_resp.ret = SSG_SUCCESS;
fini:
    /* respond */
    margo_respond(handle, &leave_resp);

    /* cleanup */
    margo_destroy(handle);

    return;
Shane Snyder's avatar
Shane Snyder committed
367
}
368 369
DEFINE_MARGO_RPC_HANDLER(ssg_group_leave_recv_ult)

Shane Snyder's avatar
Shane Snyder committed
370

371 372 373 374
/* ssg_group_attach_send
 *
 *
 */
375 376 377 378 379
int ssg_group_attach_send(
    ssg_group_descriptor_t * group_descriptor,
    char ** group_name,
    int * group_size,
    void ** view_buf)
380 381 382
{
    hg_addr_t member_addr = HG_ADDR_NULL;
    hg_handle_t handle = HG_HANDLE_NULL;
383 384 385 386 387
    hg_bulk_t bulk_handle = HG_BULK_NULL;
    void *tmp_view_buf = NULL, *b;
    hg_size_t tmp_view_buf_size = SSG_VIEW_BUF_DEF_SIZE;
    ssg_group_attach_request_t attach_req;
    ssg_group_attach_response_t attach_resp;
388
    hg_return_t hret;
389 390 391 392 393
    int sret = SSG_FAILURE;

    *group_name = NULL;
    *group_size = 0;
    *view_buf = NULL;
394 395

    /* lookup the address of the given group member */
Shane Snyder's avatar
Shane Snyder committed
396 397
    hret = margo_addr_lookup(ssg_inst->mid, group_descriptor->addr_str,
        &member_addr);
398 399
    if (hret != HG_SUCCESS) goto fini;

400
    hret = margo_create(ssg_inst->mid, member_addr,
401 402 403
        ssg_group_attach_rpc_id, &handle);
    if (hret != HG_SUCCESS) goto fini;

Shane Snyder's avatar
Shane Snyder committed
404
    /* allocate a buffer to try to store the group view in */
405
    /* NOTE: We don't know if this buffer is big enough to store the complete
406
     * view. If the buffer is not large enough, the group member we are
407 408 409 410 411
     * attaching too will send a NACK indicating the necessary buffer size
     */
    tmp_view_buf = malloc(tmp_view_buf_size);
    if (!tmp_view_buf) goto fini;

412
    hret = margo_bulk_create(ssg_inst->mid, 1, &tmp_view_buf, &tmp_view_buf_size,
413 414 415
        HG_BULK_WRITE_ONLY, &bulk_handle);
    if (hret != HG_SUCCESS) goto fini;

416
    /* send an attach request to the given group member address */
417 418
    memcpy(&attach_req.group_descriptor, group_descriptor, sizeof(*group_descriptor));
    attach_req.bulk_handle = bulk_handle;
419
    hret = margo_forward(handle, &attach_req);
420 421
    if (hret != HG_SUCCESS) goto fini;

422
    hret = margo_get_output(handle, &attach_resp);
423 424 425 426 427 428 429 430
    if (hret != HG_SUCCESS) goto fini;

    /* if our initial buffer is too small, reallocate to the exact size & reattach */
    if (attach_resp.view_buf_size > tmp_view_buf_size)
    {
        b = realloc(tmp_view_buf, attach_resp.view_buf_size);
        if(!b)
        {
431
            margo_free_output(handle, &attach_resp);
432 433 434 435
            goto fini;
        }
        tmp_view_buf = b;
        tmp_view_buf_size = attach_resp.view_buf_size;
Shane Snyder's avatar
Shane Snyder committed
436
        margo_free_output(handle, &attach_resp);
437 438

        /* free old bulk handle and recreate it */
439
        margo_bulk_free(bulk_handle);
440
        hret = margo_bulk_create(ssg_inst->mid, 1, &tmp_view_buf, &tmp_view_buf_size,
441 442 443 444
            HG_BULK_WRITE_ONLY, &bulk_handle);
        if (hret != HG_SUCCESS) goto fini;

        attach_req.bulk_handle = bulk_handle;
445
        hret = margo_forward(handle, &attach_req);
446 447
        if (hret != HG_SUCCESS) goto fini;

448
        hret = margo_get_output(handle, &attach_resp);
449 450 451 452 453 454 455 456 457
        if (hret != HG_SUCCESS) goto fini;
    }

    /* readjust view buf size if initial guess was too large */
    if (attach_resp.view_buf_size < tmp_view_buf_size)
    {
        b = realloc(tmp_view_buf, attach_resp.view_buf_size);
        if(!b)
        {
458
            margo_free_output(handle, &attach_resp);
459 460 461 462 463 464 465 466 467 468
            goto fini;
        }
        tmp_view_buf = b;
    }

    /* set output pointers according to the returned view parameters */
    *group_name = strdup(attach_resp.group_name);
    *group_size = (int)attach_resp.group_size;
    *view_buf = tmp_view_buf;

469
    margo_free_output(handle, &attach_resp);
470 471
    tmp_view_buf = NULL;
    sret = SSG_SUCCESS;
472
fini:
473
    if (member_addr != HG_ADDR_NULL) margo_addr_free(ssg_inst->mid, member_addr);
474
    if (handle != HG_HANDLE_NULL) margo_destroy(handle);
475
    if (bulk_handle != HG_BULK_NULL) margo_bulk_free(bulk_handle);
476
    free(tmp_view_buf);
477

478
    return sret;
479 480
}

481 482
static void ssg_group_attach_recv_ult(
    hg_handle_t handle)
483
{
484
    const struct hg_info *hgi = NULL;
Shane Snyder's avatar
Shane Snyder committed
485
    ssg_group_t *g = NULL;
486 487 488 489 490 491 492
    ssg_group_attach_request_t attach_req;
    ssg_group_attach_response_t attach_resp;
    hg_size_t view_size_requested;
    void *view_buf = NULL;
    hg_size_t view_buf_size;
    hg_bulk_t bulk_handle = HG_BULK_NULL;
    int sret;
Shane Snyder's avatar
Shane Snyder committed
493 494 495 496
    hg_return_t hret;

    if (!ssg_inst) goto fini;

497
    hgi = margo_get_info(handle);
498
    if (!hgi) goto fini;
499 500

    hret = margo_get_input(handle, &attach_req);
Shane Snyder's avatar
Shane Snyder committed
501
    if (hret != HG_SUCCESS) goto fini;
502
    view_size_requested = margo_bulk_get_size(attach_req.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
503 504

    /* look for the given group in my local table of groups */
505
    HASH_FIND(hh, ssg_inst->group_table, &attach_req.group_descriptor.name_hash,
Shane Snyder's avatar
Shane Snyder committed
506 507 508
        sizeof(uint64_t), g);
    if (!g)
    {
509
        margo_free_input(handle, &attach_req);
Shane Snyder's avatar
Shane Snyder committed
510 511 512
        goto fini;
    }

513
    sret = ssg_group_serialize(g, &view_buf, &view_buf_size);
514 515
    if (sret != SSG_SUCCESS)
    {
516
        margo_free_input(handle, &attach_req);
517 518
        goto fini;
    }
Shane Snyder's avatar
Shane Snyder committed
519

520 521 522
    if (view_size_requested >= view_buf_size)
    {
        /* if attacher's buf is large enough, transfer the view */
523 524
        hret = margo_bulk_create(ssg_inst->mid, 1, &view_buf, &view_buf_size,
            HG_BULK_READ_ONLY, &bulk_handle);
525 526
        if (hret != HG_SUCCESS)
        {
527
            margo_free_input(handle, &attach_req);
528 529
            goto fini;
        }
Shane Snyder's avatar
Shane Snyder committed
530

531 532 533 534
        hret = margo_bulk_transfer(ssg_inst->mid, HG_BULK_PUSH, hgi->addr,
            attach_req.bulk_handle, 0, bulk_handle, 0, view_buf_size);
        if (hret != HG_SUCCESS)
        {
535
            margo_free_input(handle, &attach_req);
536 537 538 539 540 541 542 543
            goto fini;
        }
    }

    /* set the response and send back */
    attach_resp.group_name = g->name;
    attach_resp.group_size = (int)g->view.size;
    attach_resp.view_buf_size = view_buf_size;
544
    margo_respond(handle, &attach_resp);
545

546
    margo_free_input(handle, &attach_req);
Shane Snyder's avatar
Shane Snyder committed
547
fini:
Shane Snyder's avatar
Shane Snyder committed
548
    free(view_buf);
549
    if (handle != HG_HANDLE_NULL) margo_destroy(handle);
550
    if (bulk_handle != HG_BULK_NULL) margo_bulk_free(bulk_handle);
551

552 553 554
    return;
}
DEFINE_MARGO_RPC_HANDLER(ssg_group_attach_recv_ult)
Shane Snyder's avatar
Shane Snyder committed
555

556 557
static int ssg_group_serialize(
    ssg_group_t *g, void **buf, hg_size_t *buf_size)
558
{
Shane Snyder's avatar
Shane Snyder committed
559
    ssg_member_state_t *member_state, *tmp;
560 561
    hg_size_t group_buf_size = 0;
    void *group_buf;
562
    void *buf_p, *str_p;
563 564 565

    *buf = NULL;
    *buf_size = 0;
Shane Snyder's avatar
Shane Snyder committed
566

567
    /* first determine size */
568
    group_buf_size = strlen(ssg_inst->self_addr_str) + 1;
569
    HASH_ITER(hh, g->view.member_map, member_state, tmp)
570
    {
571
        group_buf_size += strlen(member_state->addr_str) + 1;
572 573
    }

574 575 576
    group_buf = malloc(group_buf_size);
    if(!group_buf)
    {
577
        return SSG_FAILURE;
578
    }
579

580
    buf_p = group_buf;
581 582
    strcpy(buf_p, ssg_inst->self_addr_str);
    buf_p += strlen(ssg_inst->self_addr_str) + 1;
583
    HASH_ITER(hh, g->view.member_map, member_state, tmp)
584
    {
585 586 587
        str_p = member_state->addr_str;
        strcpy(buf_p, str_p);
        buf_p += strlen(member_state->addr_str) + 1;
588 589
    }

590 591
    *buf = group_buf;
    *buf_size = group_buf_size;
592 593 594 595 596 597 598 599

    return SSG_SUCCESS;
}

/* custom SSG RPC proc routines */

hg_return_t hg_proc_ssg_group_id_t(
    hg_proc_t proc, void *data)
Shane Snyder's avatar
Shane Snyder committed
600
{
601
    ssg_group_descriptor_t **group_descriptor = (ssg_group_descriptor_t **)data;
Shane Snyder's avatar
Shane Snyder committed
602 603 604 605 606
    hg_return_t hret = HG_PROTOCOL_ERROR;

    switch(hg_proc_get_op(proc))
    {
        case HG_ENCODE:
607 608 609 610 611 612
            hret = hg_proc_ssg_group_descriptor_t(proc, *group_descriptor);
            if (hret != HG_SUCCESS)
            {
                hret = HG_PROTOCOL_ERROR;
                return hret;
            }
Shane Snyder's avatar
Shane Snyder committed
613 614
            break;
        case HG_DECODE:
615 616 617 618 619 620
            *group_descriptor = malloc(sizeof(**group_descriptor));
            if (!(*group_descriptor))
            {
                hret = HG_NOMEM_ERROR;
                return hret;
            }
621
            memset(*group_descriptor, 0, sizeof(**group_descriptor));
622 623 624 625 626 627
            hret = hg_proc_ssg_group_descriptor_t(proc, *group_descriptor);
            if (hret != HG_SUCCESS)
            {
                hret = HG_PROTOCOL_ERROR;
                return hret;
            }
Shane Snyder's avatar
Shane Snyder committed
628
            (*group_descriptor)->ref_count = 1;
Shane Snyder's avatar
Shane Snyder committed
629 630
            break;
        case HG_FREE:
Shane Snyder's avatar
Shane Snyder committed
631
            if ((*group_descriptor)->ref_count == 1)
632
            {
Shane Snyder's avatar
Shane Snyder committed
633 634 635 636 637 638
                free((*group_descriptor)->addr_str);
                free(*group_descriptor);
            }
            else
            {
                (*group_descriptor)->ref_count--;
639 640
            }
            hret = HG_SUCCESS;
Shane Snyder's avatar
Shane Snyder committed
641 642 643 644 645 646 647
            break;
        default:
            break;
    }

    return hret;
}
Shane Snyder's avatar
Shane Snyder committed
648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737

hg_return_t hg_proc_ssg_member_update_t(
    hg_proc_t proc, void *data)
{
    ssg_member_update_t *update = (ssg_member_update_t *)data;
    hg_return_t hret = HG_PROTOCOL_ERROR;

    switch(hg_proc_get_op(proc))
    {
        case HG_ENCODE:
            hret = hg_proc_uint8_t(proc, &(update->type));
            if (hret != HG_SUCCESS)
            {
                hret = HG_PROTOCOL_ERROR;
                return hret;
            }
            if (update->type == SSG_MEMBER_JOINED)
            {
                hret = hg_proc_hg_string_t(proc, &(update->u.member_addr_str));
                if (hret != HG_SUCCESS)
                {
                    hret = HG_PROTOCOL_ERROR;
                    return hret;
                }
            }
            else if (update->type == SSG_MEMBER_LEFT)
            {
                hret = hg_proc_ssg_member_id_t(proc, &(update->u.member_id));
                if (hret != HG_SUCCESS)
                {
                    hret = HG_PROTOCOL_ERROR;
                    return hret;
                }
            }
            else
            {
                hret = HG_PROTOCOL_ERROR;
            }
            break;
        case HG_DECODE:
            hret = hg_proc_uint8_t(proc, &(update->type));
            if (hret != HG_SUCCESS)
            {
                hret = HG_PROTOCOL_ERROR;
                return hret;
            }
            if (update->type == SSG_MEMBER_JOINED)
            {
                hret = hg_proc_hg_string_t(proc, &(update->u.member_addr_str));
                if (hret != HG_SUCCESS)
                {
                    hret = HG_PROTOCOL_ERROR;
                    return hret;
                }
            }
            else if (update->type == SSG_MEMBER_LEFT)
            {
                hret = hg_proc_ssg_member_id_t(proc, &(update->u.member_id));
                if (hret != HG_SUCCESS)
                {
                    hret = HG_PROTOCOL_ERROR;
                    return hret;
                }
            }
            else
            {
                hret = HG_PROTOCOL_ERROR;
            }
            break;
        case HG_FREE:
            if (update->type == SSG_MEMBER_JOINED)
            {
                hret = hg_proc_hg_string_t(proc, &(update->u.member_addr_str));
                if (hret != HG_SUCCESS)
                {
                    hret = HG_PROTOCOL_ERROR;
                    return hret;
                }
            }
            else
            {
                hret = HG_SUCCESS;
            }
            break;
        default:
            break;
    }

    return hret;
}