ssg-rpc.c 9.77 KB
Newer Older
1 2 3 4 5 6 7 8 9
/*
 * Copyright (c) 2016 UChicago Argonne, LLC
 *
 * See COPYRIGHT in top-level directory.
 */

#include "ssg-config.h"

#include <stdlib.h>
10
#include <assert.h>
11 12 13 14 15 16 17 18

#include <mercury.h>
#include <abt.h>
#include <margo.h>

#include "ssg.h"
#include "ssg-internal.h"

19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
#define SSG_VIEW_BUF_DEF_SIZE (128 * 1024)

/* SSG RPC types and (de)serialization routines */

/* NOTE: keep in sync with ssg_group_descriptor_t definition in ssg-internal.h */
MERCURY_GEN_STRUCT_PROC(ssg_group_descriptor_t, \
    ((uint64_t)     (magic_nr)) \
    ((uint64_t)     (name_hash)) \
    ((hg_string_t)  (addr_str)));

MERCURY_GEN_PROC(ssg_group_attach_request_t, \
    ((ssg_group_descriptor_t)   (group_descriptor))
    ((hg_bulk_t)                (bulk_handle)));

MERCURY_GEN_PROC(ssg_group_attach_response_t, \
    ((hg_string_t)  (group_name)) \
    ((uint32_t)     (group_size)) \
    ((hg_size_t)    (view_buf_size)));

/* SSG RPC handler prototypes */
39 40
DECLARE_MARGO_RPC_HANDLER(ssg_group_attach_recv_ult)

41 42 43
/* internal helper routine prototypes */
static int ssg_group_view_serialize(
    ssg_group_view_t *view, void **buf, hg_size_t *buf_size);
Shane Snyder's avatar
Shane Snyder committed
44

Shane Snyder's avatar
Shane Snyder committed
45
/* SSG RPC IDs */
46 47 48 49 50 51 52 53 54
static hg_id_t ssg_group_attach_rpc_id;

/* ssg_register_rpcs
 *
 *
 */
void ssg_register_rpcs()
{
    /* register HG RPCs for SSG */
Matthieu Dorier's avatar
Matthieu Dorier committed
55 56
    ssg_group_attach_rpc_id =
		MARGO_REGISTER(ssg_inst->mid, "ssg_group_attach",
57
        ssg_group_attach_request_t, ssg_group_attach_response_t,
Matthieu Dorier's avatar
Matthieu Dorier committed
58
        ssg_group_attach_recv_ult);
59 60 61 62 63 64 65 66

    return;
}

/* ssg_group_attach_send
 *
 *
 */
67 68 69 70 71
int ssg_group_attach_send(
    ssg_group_descriptor_t * group_descriptor,
    char ** group_name,
    int * group_size,
    void ** view_buf)
72 73 74
{
    hg_addr_t member_addr = HG_ADDR_NULL;
    hg_handle_t handle = HG_HANDLE_NULL;
75 76 77 78 79
    hg_bulk_t bulk_handle = HG_BULK_NULL;
    void *tmp_view_buf = NULL, *b;
    hg_size_t tmp_view_buf_size = SSG_VIEW_BUF_DEF_SIZE;
    ssg_group_attach_request_t attach_req;
    ssg_group_attach_response_t attach_resp;
80
    hg_return_t hret;
81 82 83 84 85
    int sret = SSG_FAILURE;

    *group_name = NULL;
    *group_size = 0;
    *view_buf = NULL;
86 87

    /* lookup the address of the given group member */
Shane Snyder's avatar
Shane Snyder committed
88 89
    hret = margo_addr_lookup(ssg_inst->mid, group_descriptor->addr_str,
        &member_addr);
90 91
    if (hret != HG_SUCCESS) goto fini;

92
    hret = margo_create(ssg_inst->mid, member_addr,
93 94 95
        ssg_group_attach_rpc_id, &handle);
    if (hret != HG_SUCCESS) goto fini;

96 97 98 99 100 101 102 103
    /* allocate a buffer of the given size to try to store the group view in */
    /* NOTE: We don't know if this buffer is big enough to store the complete
     * view. If the buffers is not large enough, the group member we are
     * attaching too will send a NACK indicating the necessary buffer size
     */
    tmp_view_buf = malloc(tmp_view_buf_size);
    if (!tmp_view_buf) goto fini;

104
    hret = margo_bulk_create(ssg_inst->mid, 1, &tmp_view_buf, &tmp_view_buf_size,
105 106 107
        HG_BULK_WRITE_ONLY, &bulk_handle);
    if (hret != HG_SUCCESS) goto fini;

108
    /* send an attach request to the given group member address */
109 110
    memcpy(&attach_req.group_descriptor, group_descriptor, sizeof(*group_descriptor));
    attach_req.bulk_handle = bulk_handle;
111
    hret = margo_forward(handle, &attach_req);
112 113
    if (hret != HG_SUCCESS) goto fini;

114
    hret = margo_get_output(handle, &attach_resp);
115 116 117 118 119 120 121 122
    if (hret != HG_SUCCESS) goto fini;

    /* if our initial buffer is too small, reallocate to the exact size & reattach */
    if (attach_resp.view_buf_size > tmp_view_buf_size)
    {
        b = realloc(tmp_view_buf, attach_resp.view_buf_size);
        if(!b)
        {
123
            margo_free_output(handle, &attach_resp);
124 125 126 127 128 129
            goto fini;
        }
        tmp_view_buf = b;
        tmp_view_buf_size = attach_resp.view_buf_size;

        /* free old bulk handle and recreate it */
130
        margo_bulk_free(bulk_handle);
131
        hret = margo_bulk_create(ssg_inst->mid, 1, &tmp_view_buf, &tmp_view_buf_size,
132 133 134 135
            HG_BULK_WRITE_ONLY, &bulk_handle);
        if (hret != HG_SUCCESS) goto fini;

        attach_req.bulk_handle = bulk_handle;
136
        hret = margo_forward(handle, &attach_req);
137 138
        if (hret != HG_SUCCESS) goto fini;

139
        margo_free_output(handle, &attach_resp);
140
        hret = margo_get_output(handle, &attach_resp);
141 142 143 144 145 146 147 148 149
        if (hret != HG_SUCCESS) goto fini;
    }

    /* readjust view buf size if initial guess was too large */
    if (attach_resp.view_buf_size < tmp_view_buf_size)
    {
        b = realloc(tmp_view_buf, attach_resp.view_buf_size);
        if(!b)
        {
150
            margo_free_output(handle, &attach_resp);
151 152 153 154 155 156 157 158 159 160
            goto fini;
        }
        tmp_view_buf = b;
    }

    /* set output pointers according to the returned view parameters */
    *group_name = strdup(attach_resp.group_name);
    *group_size = (int)attach_resp.group_size;
    *view_buf = tmp_view_buf;

161
    margo_free_output(handle, &attach_resp);
162 163
    tmp_view_buf = NULL;
    sret = SSG_SUCCESS;
164
fini:
165
    if (member_addr != HG_ADDR_NULL) margo_addr_free(ssg_inst->mid, member_addr);
166
    if (handle != HG_HANDLE_NULL) margo_destroy(handle);
167
    if (bulk_handle != HG_BULK_NULL) margo_bulk_free(bulk_handle);
168
    free(tmp_view_buf);
169

170
    return sret;
171 172
}

173 174
static void ssg_group_attach_recv_ult(
    hg_handle_t handle)
175
{
176
    const struct hg_info *hgi = NULL;
Shane Snyder's avatar
Shane Snyder committed
177
    ssg_group_t *g = NULL;
178 179 180 181 182 183 184
    ssg_group_attach_request_t attach_req;
    ssg_group_attach_response_t attach_resp;
    hg_size_t view_size_requested;
    void *view_buf = NULL;
    hg_size_t view_buf_size;
    hg_bulk_t bulk_handle = HG_BULK_NULL;
    int sret;
Shane Snyder's avatar
Shane Snyder committed
185 186 187 188
    hg_return_t hret;

    if (!ssg_inst) goto fini;

189
    hgi = margo_get_info(handle);
190
    if (!hgi) goto fini;
191 192

    hret = margo_get_input(handle, &attach_req);
Shane Snyder's avatar
Shane Snyder committed
193
    if (hret != HG_SUCCESS) goto fini;
194
    view_size_requested = margo_bulk_get_size(attach_req.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
195 196

    /* look for the given group in my local table of groups */
197
    HASH_FIND(hh, ssg_inst->group_table, &attach_req.group_descriptor.name_hash,
Shane Snyder's avatar
Shane Snyder committed
198 199 200
        sizeof(uint64_t), g);
    if (!g)
    {
201
        margo_free_input(handle, &attach_req);
Shane Snyder's avatar
Shane Snyder committed
202 203 204
        goto fini;
    }

205 206 207
    sret = ssg_group_view_serialize(&g->view, &view_buf, &view_buf_size);
    if (sret != SSG_SUCCESS)
    {
208
        margo_free_input(handle, &attach_req);
209 210
        goto fini;
    }
Shane Snyder's avatar
Shane Snyder committed
211

212 213 214
    if (view_size_requested >= view_buf_size)
    {
        /* if attacher's buf is large enough, transfer the view */
215 216
        hret = margo_bulk_create(ssg_inst->mid, 1, &view_buf, &view_buf_size,
            HG_BULK_READ_ONLY, &bulk_handle);
217 218
        if (hret != HG_SUCCESS)
        {
219
            margo_free_input(handle, &attach_req);
220 221
            goto fini;
        }
Shane Snyder's avatar
Shane Snyder committed
222

223 224 225 226
        hret = margo_bulk_transfer(ssg_inst->mid, HG_BULK_PUSH, hgi->addr,
            attach_req.bulk_handle, 0, bulk_handle, 0, view_buf_size);
        if (hret != HG_SUCCESS)
        {
227
            margo_free_input(handle, &attach_req);
228 229 230 231 232 233 234 235
            goto fini;
        }
    }

    /* set the response and send back */
    attach_resp.group_name = g->name;
    attach_resp.group_size = (int)g->view.size;
    attach_resp.view_buf_size = view_buf_size;
236
    margo_respond(handle, &attach_resp);
237

238
    margo_free_input(handle, &attach_req);
Shane Snyder's avatar
Shane Snyder committed
239
fini:
240
    free(view_buf); /* TODO: cache this */
241
    if (handle != HG_HANDLE_NULL) margo_destroy(handle);
242
    if (bulk_handle != HG_BULK_NULL) margo_bulk_free(bulk_handle);
243

244 245 246
    return;
}
DEFINE_MARGO_RPC_HANDLER(ssg_group_attach_recv_ult)
Shane Snyder's avatar
Shane Snyder committed
247

248 249 250 251 252 253
static int ssg_group_view_serialize(
    ssg_group_view_t *view, void **buf, hg_size_t *buf_size)
{
    unsigned int i;
    hg_size_t view_size = 0;
    int tmp_size;
254
    void *view_buf, *buf_p, *str_p;
255 256 257

    *buf = NULL;
    *buf_size = 0;
Shane Snyder's avatar
Shane Snyder committed
258

259 260 261
    /* first determine view size */
    for (i = 0; i < view->size; i++)
    {
262 263 264 265
        if (view->member_states[i].addr_str)
            view_size += (strlen(view->member_states[i].addr_str) + 1);
        else
            view_size += 1;
266 267 268 269 270 271
    }

    view_buf = malloc(view_size);
    if(!view_buf)
        return SSG_FAILURE;

272
    buf_p = view_buf;
273 274
    for (i = 0; i < view->size; i++)
    {
275 276 277 278 279 280 281 282 283 284 285 286 287 288
        char null = '\0';

        if (view->member_states[i].addr_str)
        {
            tmp_size = strlen(view->member_states[i].addr_str) + 1;
            str_p = view->member_states[i].addr_str;
        }
        else
        {
            tmp_size = 1;
            str_p = &null;
        }
        memcpy(buf_p, str_p, tmp_size);
        buf_p += tmp_size;
289 290 291 292 293 294 295 296 297 298 299 300
    }

    *buf = view_buf;
    *buf_size = view_size;

    return SSG_SUCCESS;
}

/* custom SSG RPC proc routines */

hg_return_t hg_proc_ssg_group_id_t(
    hg_proc_t proc, void *data)
Shane Snyder's avatar
Shane Snyder committed
301
{
302
    ssg_group_descriptor_t **group_descriptor = (ssg_group_descriptor_t **)data;
Shane Snyder's avatar
Shane Snyder committed
303 304 305 306 307
    hg_return_t hret = HG_PROTOCOL_ERROR;

    switch(hg_proc_get_op(proc))
    {
        case HG_ENCODE:
308 309 310 311 312 313
            hret = hg_proc_ssg_group_descriptor_t(proc, *group_descriptor);
            if (hret != HG_SUCCESS)
            {
                hret = HG_PROTOCOL_ERROR;
                return hret;
            }
Shane Snyder's avatar
Shane Snyder committed
314 315
            break;
        case HG_DECODE:
316 317 318 319 320 321
            *group_descriptor = malloc(sizeof(**group_descriptor));
            if (!(*group_descriptor))
            {
                hret = HG_NOMEM_ERROR;
                return hret;
            }
322
            memset(*group_descriptor, 0, sizeof(**group_descriptor));
323 324 325 326 327 328
            hret = hg_proc_ssg_group_descriptor_t(proc, *group_descriptor);
            if (hret != HG_SUCCESS)
            {
                hret = HG_PROTOCOL_ERROR;
                return hret;
            }
Shane Snyder's avatar
Shane Snyder committed
329
            (*group_descriptor)->ref_count = 1;
Shane Snyder's avatar
Shane Snyder committed
330 331
            break;
        case HG_FREE:
Shane Snyder's avatar
Shane Snyder committed
332
            if((*group_descriptor)->ref_count == 1)
333
            {
Shane Snyder's avatar
Shane Snyder committed
334 335 336 337 338 339
                free((*group_descriptor)->addr_str);
                free(*group_descriptor);
            }
            else
            {
                (*group_descriptor)->ref_count--;
340 341
            }
            hret = HG_SUCCESS;
Shane Snyder's avatar
Shane Snyder committed
342 343 344 345 346 347 348
            break;
        default:
            break;
    }

    return hret;
}