ssg-rpc.c 9.86 KB
Newer Older
1 2 3 4 5 6 7 8 9
/*
 * Copyright (c) 2016 UChicago Argonne, LLC
 *
 * See COPYRIGHT in top-level directory.
 */

#include "ssg-config.h"

#include <stdlib.h>
10
#include <assert.h>
11 12 13 14 15 16 17 18

#include <mercury.h>
#include <abt.h>
#include <margo.h>

#include "ssg.h"
#include "ssg-internal.h"

19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
#define SSG_VIEW_BUF_DEF_SIZE (128 * 1024)

/* SSG RPC types and (de)serialization routines */

/* NOTE: keep in sync with ssg_group_descriptor_t definition in ssg-internal.h */
MERCURY_GEN_STRUCT_PROC(ssg_group_descriptor_t, \
    ((uint64_t)     (magic_nr)) \
    ((uint64_t)     (name_hash)) \
    ((hg_string_t)  (addr_str)));

MERCURY_GEN_PROC(ssg_group_attach_request_t, \
    ((ssg_group_descriptor_t)   (group_descriptor))
    ((hg_bulk_t)                (bulk_handle)));

MERCURY_GEN_PROC(ssg_group_attach_response_t, \
    ((hg_string_t)  (group_name)) \
    ((uint32_t)     (group_size)) \
    ((hg_size_t)    (view_buf_size)));

/* SSG RPC handler prototypes */
39 40
DECLARE_MARGO_RPC_HANDLER(ssg_group_attach_recv_ult)

41 42 43
/* internal helper routine prototypes */
static int ssg_group_view_serialize(
    ssg_group_view_t *view, void **buf, hg_size_t *buf_size);
Shane Snyder's avatar
Shane Snyder committed
44

Shane Snyder's avatar
Shane Snyder committed
45
/* SSG RPC IDs */
46 47 48 49 50 51 52 53 54
static hg_id_t ssg_group_attach_rpc_id;

/* ssg_register_rpcs
 *
 *
 */
void ssg_register_rpcs()
{
    /* register HG RPCs for SSG */
Matthieu Dorier's avatar
Matthieu Dorier committed
55 56
    ssg_group_attach_rpc_id =
		MARGO_REGISTER(ssg_inst->mid, "ssg_group_attach",
57
        ssg_group_attach_request_t, ssg_group_attach_response_t,
Matthieu Dorier's avatar
Matthieu Dorier committed
58
        ssg_group_attach_recv_ult);
59 60 61 62 63 64 65 66

    return;
}

/* ssg_group_attach_send
 *
 *
 */
67 68 69 70 71
int ssg_group_attach_send(
    ssg_group_descriptor_t * group_descriptor,
    char ** group_name,
    int * group_size,
    void ** view_buf)
72 73 74 75
{
    hg_class_t *hgcl = NULL;
    hg_addr_t member_addr = HG_ADDR_NULL;
    hg_handle_t handle = HG_HANDLE_NULL;
76 77 78 79 80
    hg_bulk_t bulk_handle = HG_BULK_NULL;
    void *tmp_view_buf = NULL, *b;
    hg_size_t tmp_view_buf_size = SSG_VIEW_BUF_DEF_SIZE;
    ssg_group_attach_request_t attach_req;
    ssg_group_attach_response_t attach_resp;
81
    hg_return_t hret;
82 83 84 85 86
    int sret = SSG_FAILURE;

    *group_name = NULL;
    *group_size = 0;
    *view_buf = NULL;
87 88

    /* lookup the address of the given group member */
Shane Snyder's avatar
Shane Snyder committed
89 90
    hret = margo_addr_lookup(ssg_inst->mid, group_descriptor->addr_str,
        &member_addr);
91 92
    if (hret != HG_SUCCESS) goto fini;

93
    hret = margo_create(ssg_inst->mid, member_addr,
94 95 96
        ssg_group_attach_rpc_id, &handle);
    if (hret != HG_SUCCESS) goto fini;

97 98 99 100 101 102 103 104
    /* allocate a buffer of the given size to try to store the group view in */
    /* NOTE: We don't know if this buffer is big enough to store the complete
     * view. If the buffers is not large enough, the group member we are
     * attaching too will send a NACK indicating the necessary buffer size
     */
    tmp_view_buf = malloc(tmp_view_buf_size);
    if (!tmp_view_buf) goto fini;

105
    hret = margo_bulk_create(ssg_inst->mid, 1, &tmp_view_buf, &tmp_view_buf_size,
106 107 108
        HG_BULK_WRITE_ONLY, &bulk_handle);
    if (hret != HG_SUCCESS) goto fini;

109
    /* send an attach request to the given group member address */
110 111 112
    memcpy(&attach_req.group_descriptor, group_descriptor, sizeof(*group_descriptor));
    attach_req.bulk_handle = bulk_handle;
    hret = margo_forward(ssg_inst->mid, handle, &attach_req);
113 114
    if (hret != HG_SUCCESS) goto fini;

115
    hret = margo_get_output(handle, &attach_resp);
116 117 118 119 120 121 122 123
    if (hret != HG_SUCCESS) goto fini;

    /* if our initial buffer is too small, reallocate to the exact size & reattach */
    if (attach_resp.view_buf_size > tmp_view_buf_size)
    {
        b = realloc(tmp_view_buf, attach_resp.view_buf_size);
        if(!b)
        {
124
            margo_free_output(handle, &attach_resp);
125 126 127 128 129 130
            goto fini;
        }
        tmp_view_buf = b;
        tmp_view_buf_size = attach_resp.view_buf_size;

        /* free old bulk handle and recreate it */
131
        margo_bulk_free(bulk_handle);
132 133 134 135 136 137 138 139
        hret = HG_Bulk_create(hgcl, 1, &tmp_view_buf, &tmp_view_buf_size,
            HG_BULK_WRITE_ONLY, &bulk_handle);
        if (hret != HG_SUCCESS) goto fini;

        attach_req.bulk_handle = bulk_handle;
        hret = margo_forward(ssg_inst->mid, handle, &attach_req);
        if (hret != HG_SUCCESS) goto fini;

140
        margo_free_output(handle, &attach_resp);
141 142 143 144 145 146 147 148 149 150
        hret = HG_Get_output(handle, &attach_resp);
        if (hret != HG_SUCCESS) goto fini;
    }

    /* readjust view buf size if initial guess was too large */
    if (attach_resp.view_buf_size < tmp_view_buf_size)
    {
        b = realloc(tmp_view_buf, attach_resp.view_buf_size);
        if(!b)
        {
151
            margo_free_output(handle, &attach_resp);
152 153 154 155 156 157 158 159 160 161
            goto fini;
        }
        tmp_view_buf = b;
    }

    /* set output pointers according to the returned view parameters */
    *group_name = strdup(attach_resp.group_name);
    *group_size = (int)attach_resp.group_size;
    *view_buf = tmp_view_buf;

162
    margo_free_output(handle, &attach_resp);
163 164
    tmp_view_buf = NULL;
    sret = SSG_SUCCESS;
165
fini:
166 167 168
    if (member_addr != HG_ADDR_NULL) margo_addr_free(ssg_inst->mid, member_addr);
    if (handle != HG_HANDLE_NULL) margo_destroy(ssg_inst->mid, handle);
    if (bulk_handle != HG_BULK_NULL) margo_bulk_free(bulk_handle);
169
    free(tmp_view_buf);
170

171
    return sret;
172 173
}

174 175
static void ssg_group_attach_recv_ult(
    hg_handle_t handle)
176
{
177
	const struct hg_info *hgi = NULL;
Shane Snyder's avatar
Shane Snyder committed
178
    ssg_group_t *g = NULL;
179 180 181 182 183 184 185
    ssg_group_attach_request_t attach_req;
    ssg_group_attach_response_t attach_resp;
    hg_size_t view_size_requested;
    void *view_buf = NULL;
    hg_size_t view_buf_size;
    hg_bulk_t bulk_handle = HG_BULK_NULL;
    int sret;
Shane Snyder's avatar
Shane Snyder committed
186 187 188 189
    hg_return_t hret;

    if (!ssg_inst) goto fini;

190
    hgi = margo_get_info(handle);
191
    if (!hgi) goto fini;
192 193

    hret = margo_get_input(handle, &attach_req);
Shane Snyder's avatar
Shane Snyder committed
194
    if (hret != HG_SUCCESS) goto fini;
195
    view_size_requested = margo_bulk_get_size(attach_req.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
196 197

    /* look for the given group in my local table of groups */
198
    HASH_FIND(hh, ssg_inst->group_table, &attach_req.group_descriptor.name_hash,
Shane Snyder's avatar
Shane Snyder committed
199 200 201
        sizeof(uint64_t), g);
    if (!g)
    {
202
        margo_free_input(handle, &attach_req);
Shane Snyder's avatar
Shane Snyder committed
203 204 205
        goto fini;
    }

206 207 208
    sret = ssg_group_view_serialize(&g->view, &view_buf, &view_buf_size);
    if (sret != SSG_SUCCESS)
    {
209
        margo_free_input(handle, &attach_req);
210 211
        goto fini;
    }
Shane Snyder's avatar
Shane Snyder committed
212

213 214 215
    if (view_size_requested >= view_buf_size)
    {
        /* if attacher's buf is large enough, transfer the view */
216
        hret = margo_bulk_create(ssg_inst->mid, 1, &view_buf, &view_buf_size, HG_BULK_READ_ONLY,
217 218 219
            &bulk_handle);
        if (hret != HG_SUCCESS)
        {
220
            margo_free_input(handle, &attach_req);
221 222
            goto fini;
        }
Shane Snyder's avatar
Shane Snyder committed
223

224 225 226 227
        hret = margo_bulk_transfer(ssg_inst->mid, HG_BULK_PUSH, hgi->addr,
            attach_req.bulk_handle, 0, bulk_handle, 0, view_buf_size);
        if (hret != HG_SUCCESS)
        {
228
            margo_free_input(handle, &attach_req);
229 230 231 232 233 234 235 236 237 238
            goto fini;
        }
    }

    /* set the response and send back */
    attach_resp.group_name = g->name;
    attach_resp.group_size = (int)g->view.size;
    attach_resp.view_buf_size = view_buf_size;
    margo_respond(ssg_inst->mid, handle, &attach_resp);

239
    margo_free_input(handle, &attach_req);
Shane Snyder's avatar
Shane Snyder committed
240
fini:
241
    free(view_buf); /* TODO: cache this */
242 243
    if (handle != HG_HANDLE_NULL) margo_destroy(ssg_inst->mid, handle);
    if (bulk_handle != HG_BULK_NULL) margo_bulk_free(bulk_handle);
244

245 246 247
    return;
}
DEFINE_MARGO_RPC_HANDLER(ssg_group_attach_recv_ult)
Shane Snyder's avatar
Shane Snyder committed
248

249 250 251 252 253 254
static int ssg_group_view_serialize(
    ssg_group_view_t *view, void **buf, hg_size_t *buf_size)
{
    unsigned int i;
    hg_size_t view_size = 0;
    int tmp_size;
255
    void *view_buf, *buf_p, *str_p;
256 257 258

    *buf = NULL;
    *buf_size = 0;
Shane Snyder's avatar
Shane Snyder committed
259

260 261 262
    /* first determine view size */
    for (i = 0; i < view->size; i++)
    {
263 264 265 266
        if (view->member_states[i].addr_str)
            view_size += (strlen(view->member_states[i].addr_str) + 1);
        else
            view_size += 1;
267 268 269 270 271 272
    }

    view_buf = malloc(view_size);
    if(!view_buf)
        return SSG_FAILURE;

273
    buf_p = view_buf;
274 275
    for (i = 0; i < view->size; i++)
    {
276 277 278 279 280 281 282 283 284 285 286 287 288 289
        char null = '\0';

        if (view->member_states[i].addr_str)
        {
            tmp_size = strlen(view->member_states[i].addr_str) + 1;
            str_p = view->member_states[i].addr_str;
        }
        else
        {
            tmp_size = 1;
            str_p = &null;
        }
        memcpy(buf_p, str_p, tmp_size);
        buf_p += tmp_size;
290 291 292 293 294 295 296 297 298 299 300 301
    }

    *buf = view_buf;
    *buf_size = view_size;

    return SSG_SUCCESS;
}

/* custom SSG RPC proc routines */

hg_return_t hg_proc_ssg_group_id_t(
    hg_proc_t proc, void *data)
Shane Snyder's avatar
Shane Snyder committed
302
{
303
    ssg_group_descriptor_t **group_descriptor = (ssg_group_descriptor_t **)data;
Shane Snyder's avatar
Shane Snyder committed
304 305 306 307 308
    hg_return_t hret = HG_PROTOCOL_ERROR;

    switch(hg_proc_get_op(proc))
    {
        case HG_ENCODE:
309 310 311 312 313 314
            hret = hg_proc_ssg_group_descriptor_t(proc, *group_descriptor);
            if (hret != HG_SUCCESS)
            {
                hret = HG_PROTOCOL_ERROR;
                return hret;
            }
Shane Snyder's avatar
Shane Snyder committed
315 316
            break;
        case HG_DECODE:
317 318 319 320 321 322
            *group_descriptor = malloc(sizeof(**group_descriptor));
            if (!(*group_descriptor))
            {
                hret = HG_NOMEM_ERROR;
                return hret;
            }
323
            memset(*group_descriptor, 0, sizeof(**group_descriptor));
324 325 326 327 328 329
            hret = hg_proc_ssg_group_descriptor_t(proc, *group_descriptor);
            if (hret != HG_SUCCESS)
            {
                hret = HG_PROTOCOL_ERROR;
                return hret;
            }
Shane Snyder's avatar
Shane Snyder committed
330
            (*group_descriptor)->ref_count = 1;
Shane Snyder's avatar
Shane Snyder committed
331 332
            break;
        case HG_FREE:
Shane Snyder's avatar
Shane Snyder committed
333
            if((*group_descriptor)->ref_count == 1)
334
            {
Shane Snyder's avatar
Shane Snyder committed
335 336 337 338 339 340
                free((*group_descriptor)->addr_str);
                free(*group_descriptor);
            }
            else
            {
                (*group_descriptor)->ref_count--;
341 342
            }
            hret = HG_SUCCESS;
Shane Snyder's avatar
Shane Snyder committed
343 344 345 346 347 348 349
            break;
        default:
            break;
    }

    return hret;
}