bake-client.c 20.2 KB
Newer Older
Philip Carns's avatar
Philip Carns committed
1 2 3 4 5 6
/*
 * (C) 2016 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

7 8
#include "bake-config.h"

9
#include <assert.h>
Philip Carns's avatar
Philip Carns committed
10
#include <margo.h>
11
#include <bake-client.h>
Philip Carns's avatar
Philip Carns committed
12
#include "uthash.h"
13
#include "bake-rpc.h"
Philip Carns's avatar
Philip Carns committed
14

15
#define BAKE_EAGER_LIMIT 2048
16

17
/* Refers to a single Margo initialization, for now this is shared by
18 19
 * all remote BAKE targets.  In the future we probably need to support
 * multiple in case we run atop more than one transport at a time.
20
 */
21
struct bake_client
22 23
{
    margo_instance_id mid;  
24

25 26 27 28 29 30
    hg_id_t bake_probe_id;
    hg_id_t bake_create_id;
    hg_id_t bake_eager_write_id;
    hg_id_t bake_eager_read_id;
    hg_id_t bake_write_id;
    hg_id_t bake_persist_id;
31
    hg_id_t bake_create_write_persist_id;
32 33 34
    hg_id_t bake_get_size_id;
    hg_id_t bake_read_id;
    hg_id_t bake_noop_id;
35 36

    uint64_t num_provider_handles;
37 38
};

39
struct bake_provider_handle {
40
    struct bake_client* client;
41 42 43
    hg_addr_t           addr;
    uint8_t             mplex_id;
    uint64_t            refcount;
44
};
Philip Carns's avatar
Philip Carns committed
45

46
static int bake_client_register(bake_client_t client, margo_instance_id mid)
Philip Carns's avatar
Philip Carns committed
47
{
48
    client->mid = mid;
Philip Carns's avatar
Philip Carns committed
49

50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
    /* check if RPCs have already been registered */
    hg_bool_t flag;
    hg_id_t id;
    margo_registered_name(mid, "bake_probe_rpc", &id, &flag);

    if(flag == HG_TRUE) { /* RPCs already registered */

        margo_registered_name(mid, "bake_probe_rpc",                &client->bake_probe_id,                &flag);
        margo_registered_name(mid, "bake_create_rpc",               &client->bake_create_id,               &flag);
        margo_registered_name(mid, "bake_write_rpc",                &client->bake_write_id,                &flag);
        margo_registered_name(mid, "bake_eager_write_rpc",          &client->bake_eager_write_id,          &flag);
        margo_registered_name(mid, "bake_eager_read_rpc",           &client->bake_eager_read_id,           &flag);
        margo_registered_name(mid, "bake_persist_rpc",              &client->bake_persist_id,              &flag);
        margo_registered_name(mid, "bake_create_write_persist_rpc", &client->bake_create_write_persist_id, &flag);
        margo_registered_name(mid, "bake_get_size_rpc",             &client->bake_get_size_id,             &flag);
        margo_registered_name(mid, "bake_read_rpc",                 &client->bake_read_id,                 &flag);
        margo_registered_name(mid, "bake_noop_rpc",                 &client->bake_noop_id,                 &flag);

    } else { /* RPCs not already registered */

        client->bake_probe_id = 
            MARGO_REGISTER(mid, "bake_probe_rpc",
                    bake_probe_in_t, bake_probe_out_t, NULL);
        client->bake_create_id = 
            MARGO_REGISTER(mid, "bake_create_rpc",
                    bake_create_in_t, bake_create_out_t, NULL);
        client->bake_write_id = 
            MARGO_REGISTER(mid, "bake_write_rpc",
                    bake_write_in_t, bake_write_out_t, NULL);
        client->bake_eager_write_id = 
            MARGO_REGISTER(mid, "bake_eager_write_rpc",
                    bake_eager_write_in_t, bake_eager_write_out_t, NULL);
        client->bake_eager_read_id = 
            MARGO_REGISTER(mid, "bake_eager_read_rpc",
                    bake_eager_read_in_t, bake_eager_read_out_t, NULL);
        client->bake_persist_id = 
            MARGO_REGISTER(mid, "bake_persist_rpc",
                    bake_persist_in_t, bake_persist_out_t, NULL);
        client->bake_create_write_persist_id =
            MARGO_REGISTER(mid, "bake_create_write_persist_rpc",
                    bake_create_write_persist_in_t, bake_create_write_persist_out_t, NULL);
        client->bake_get_size_id = 
            MARGO_REGISTER(mid, "bake_get_size_rpc",
                    bake_get_size_in_t, bake_get_size_out_t, NULL);
        client->bake_read_id = 
            MARGO_REGISTER(mid, "bake_read_rpc",
                    bake_read_in_t, bake_read_out_t, NULL);
        client->bake_noop_id = 
            MARGO_REGISTER(mid, "bake_noop_rpc",
                    void, void, NULL);
    }
Philip Carns's avatar
Philip Carns committed
101

102 103 104
    return(0);
}

105 106 107 108 109
int bake_client_init(margo_instance_id mid, bake_client_t* client)
{
    bake_client_t c = (bake_client_t)calloc(1, sizeof(*c));
    if(!c) return -1;

110 111
    c->num_provider_handles = 0;

112 113 114 115 116 117 118 119 120
    int ret = bake_client_register(c, mid);
    if(ret != 0) return ret;

    *client = c;
    return 0;
}

int bake_client_finalize(bake_client_t client)
{
121 122 123 124 125
    if(client->num_provider_handles != 0) {
        fprintf(stderr, 
                "[BAKE] Warning: %d provider handles not released before bake_client_finalize was called\n",
                client->num_provider_handles);
    }
126 127 128 129
    free(client);
    return 0;
}

130 131 132 133 134
int bake_probe(
    bake_provider_handle_t provider,
    uint64_t max_targets,
    bake_target_id_t *bti,
    uint64_t* num_targets)
135 136 137
{
    hg_return_t hret;
    int ret;
138
    bake_probe_in_t in;
139
    bake_probe_out_t out;
140
    hg_handle_t handle;
Philip Carns's avatar
Philip Carns committed
141

142 143 144
    if(bti == NULL) max_targets = 0;
    in.max_targets = max_targets;

145
    /* create handle */
146 147 148 149 150
    hret = margo_create(
                provider->client->mid, 
                provider->addr, 
                provider->client->bake_probe_id, 
                &handle);
151

152 153 154 155 156 157 158
    if(hret != HG_SUCCESS) return -1;

    hret = margo_set_target_id(handle, provider->mplex_id);
    
    if(hret != HG_SUCCESS) {
        margo_destroy(handle);
        return -1;
159 160
    }

161
    hret = margo_forward(handle, &in);
162
    if(hret != HG_SUCCESS) {
163
        margo_destroy(handle);
164
        return -1;
165 166
    }

167
    hret = margo_get_output(handle, &out);
168
    if(hret != HG_SUCCESS) {
169
        margo_destroy(handle);
170
        return -1;
171 172 173 174
    }

    ret = out.ret;

175
    if(ret == HG_SUCCESS) {
176 177 178
        if(max_targets == 0) {
            *num_targets = out.num_targets;
        } else {
179
            uint64_t s = out.num_targets > max_targets ? max_targets : out.num_targets;
180 181 182
            if(s > 0) {
                memcpy(bti, out.targets, sizeof(*bti)*s);
            }
183
            *num_targets = s;
184
        }
185 186
    }

187 188 189
    margo_free_output(handle, &out);
    margo_destroy(handle);

190
    return ret;
Philip Carns's avatar
Philip Carns committed
191
}
192 193 194 195 196 197

int bake_provider_handle_create(
        bake_client_t client,
        hg_addr_t addr,
        uint8_t mplex_id,
        bake_provider_handle_t* handle)
Philip Carns's avatar
Philip Carns committed
198
{
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
    if(client == BAKE_CLIENT_NULL) return -1;

    bake_provider_handle_t provider = 
        (bake_provider_handle_t)calloc(1, sizeof(*provider));

    if(!provider) return -1;

    hg_return_t ret = margo_addr_dup(client->mid, addr, &(provider->addr));
    if(ret != HG_SUCCESS) {
        free(provider);
        return -1;
    }
    
    provider->client   = client;
    provider->mplex_id = mplex_id;
    provider->refcount = 1;
215

216 217
    client->num_provider_handles += 1;

218 219 220 221 222 223 224 225 226
    *handle = provider;
    return 0;
}

int bake_provider_handle_ref_incr(bake_provider_handle_t handle)
{
    if(handle == BAKE_PROVIDER_HANDLE_NULL) return -1;
    handle->refcount += 1;
    return 0;
Philip Carns's avatar
Philip Carns committed
227 228
}

229 230 231 232 233 234
int bake_provider_handle_release(bake_provider_handle_t handle)
{
    if(handle == BAKE_PROVIDER_HANDLE_NULL) return -1;
    handle->refcount -= 1;
    if(handle->refcount == 0) {
        margo_addr_free(handle->client->mid, handle->addr);
235
        handle->client->num_provider_handles -= 1;
236 237 238 239 240
        free(handle);
    }
    return 0;
}
  
241
int bake_shutdown_service(bake_client_t client, hg_addr_t addr)
Philip Carns's avatar
Philip Carns committed
242
{
243
    return margo_shutdown_remote_instance(client->mid, addr);
Philip Carns's avatar
Philip Carns committed
244
}
Philip Carns's avatar
Philip Carns committed
245

246
static int bake_eager_write(
247
    bake_provider_handle_t provider,
248
    bake_region_id_t rid,
Philip Carns's avatar
Philip Carns committed
249 250 251 252 253
    uint64_t region_offset,
    void const *buf,
    uint64_t buf_size)
{
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
254
    hg_handle_t handle;
255 256
    bake_eager_write_in_t in;
    bake_eager_write_out_t out;
Philip Carns's avatar
Philip Carns committed
257 258 259 260 261 262
    int ret;

    in.rid = rid;
    in.region_offset = region_offset;
    in.size = buf_size;
    in.buffer = (char*)buf;
263
  
264 265 266
    hret = margo_create(provider->client->mid, provider->addr, 
                provider->client->bake_eager_write_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
267

Shane Snyder's avatar
Shane Snyder committed
268 269
    if(hret != HG_SUCCESS)
        return(-1);
Philip Carns's avatar
Philip Carns committed
270

Shane Snyder's avatar
Shane Snyder committed
271
    hret = margo_forward(handle, &in);
Philip Carns's avatar
Philip Carns committed
272 273
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
274
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
275 276 277
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
278
    hret = margo_get_output(handle, &out);
Philip Carns's avatar
Philip Carns committed
279 280
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
281
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
282 283 284 285 286
        return(-1);
    }
    
    ret = out.ret;

Shane Snyder's avatar
Shane Snyder committed
287 288
    margo_free_output(handle, &out);
    margo_destroy(handle);
289

Philip Carns's avatar
Philip Carns committed
290 291 292
    return(ret);
}

293
int bake_write(
294
    bake_provider_handle_t provider,
295
    bake_region_id_t rid,
Philip Carns's avatar
Philip Carns committed
296 297 298 299
    uint64_t region_offset,
    void const *buf,
    uint64_t buf_size)
{
300
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
301
    hg_handle_t handle;
302 303
    bake_write_in_t in;
    bake_write_out_t out;
304
    int ret;
305

306
    if(buf_size <= BAKE_EAGER_LIMIT)
307
        return(bake_eager_write(provider, rid, region_offset, buf, buf_size));
Philip Carns's avatar
Philip Carns committed
308

309 310
    in.rid = rid;
    in.region_offset = region_offset;
311 312 313
    in.bulk_offset = 0;
    in.bulk_size = buf_size;
    in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy write */
Philip Carns's avatar
Philip Carns committed
314

315
    hret = margo_bulk_create(provider->client->mid, 1, (void**)(&buf), &buf_size, 
316
        HG_BULK_READ_ONLY, &in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
317 318 319
    if(hret != HG_SUCCESS)
        return(-1);
   
320 321 322
    hret = margo_create(provider->client->mid, provider->addr, 
        provider->client->bake_write_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
323

324 325
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
326
        margo_bulk_free(in.bulk_handle);
327 328 329
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
330
    hret = margo_forward(handle, &in);
331 332
    if(hret != HG_SUCCESS)
    {
333
        margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
334
        margo_destroy(handle);
335 336 337
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
338
    hret = margo_get_output(handle, &out);
339 340
    if(hret != HG_SUCCESS)
    {
341
        margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
342
        margo_destroy(handle);
343 344 345 346 347
        return(-1);
    }
    
    ret = out.ret;

Shane Snyder's avatar
Shane Snyder committed
348
    margo_free_output(handle, &out);
349
    margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
350
    margo_destroy(handle);
351
    return(ret);
Philip Carns's avatar
Philip Carns committed
352 353
}

354
int bake_proxy_write(
355
    bake_provider_handle_t provider,
356
    bake_region_id_t rid,
357 358 359
    uint64_t region_offset,
    hg_bulk_t remote_bulk,
    uint64_t remote_offset,
360
    const char* remote_addr,
361 362 363 364
    uint64_t size)
{
    hg_return_t hret;
    hg_handle_t handle;
365 366
    bake_write_in_t in;
    bake_write_out_t out;
367 368 369 370 371 372 373
    int ret;

    in.rid = rid;
    in.region_offset = region_offset;
    in.bulk_handle = remote_bulk;
    in.bulk_offset = remote_offset;
    in.bulk_size = size;
374
    in.remote_addr_str = (char*)remote_addr;
Shane Snyder's avatar
Shane Snyder committed
375

376 377 378
    hret = margo_create(provider->client->mid, provider->addr,
        provider->client->bake_write_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
379

380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403
    if(hret != HG_SUCCESS)
        return(-1);

    hret = margo_forward(handle, &in);
    if(hret != HG_SUCCESS)
    {   
        margo_destroy(handle);
        return(-1);
    }
    
    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {   
        margo_destroy(handle);
        return(-1);
    }

    ret = out.ret;

    margo_free_output(handle, &out);
    margo_destroy(handle);
    return(ret);
}

404
int bake_create(
405
    bake_provider_handle_t provider,
Philip Carns's avatar
Philip Carns committed
406 407
    bake_target_id_t bti,
    uint64_t region_size,
408
    bake_region_id_t *rid)
Philip Carns's avatar
Philip Carns committed
409
{
410
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
411
    hg_handle_t handle;
412 413
    bake_create_in_t in;
    bake_create_out_t out;
414
    int ret = 0;
415

416
    in.bti = bti;
417 418
    in.region_size = region_size;

419 420
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_create_id, &handle);
421 422

    if(hret != HG_SUCCESS) {
Shane Snyder's avatar
Shane Snyder committed
423
        return(-1);
424 425
    }

426
    margo_set_target_id(handle, provider->mplex_id);
427

Shane Snyder's avatar
Shane Snyder committed
428
    hret = margo_forward(handle, &in);
429 430
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
431
        margo_destroy(handle);
432 433
        return(-1);
    }
Philip Carns's avatar
Philip Carns committed
434

Shane Snyder's avatar
Shane Snyder committed
435
    hret = margo_get_output(handle, &out);
436 437
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
438
        margo_destroy(handle);
439 440 441 442
        return(-1);
    }

    ret = out.ret;
443 444
    if(ret == 0) 
        *rid = out.rid;
445

Shane Snyder's avatar
Shane Snyder committed
446 447
    margo_free_output(handle, &out);
    margo_destroy(handle);
448
    return(ret);
Philip Carns's avatar
Philip Carns committed
449 450 451
}


452
int bake_persist(
453
    bake_provider_handle_t provider,
454
    bake_region_id_t rid)
Philip Carns's avatar
Philip Carns committed
455
{
Philip Carns's avatar
Philip Carns committed
456
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
457
    hg_handle_t handle;
458 459
    bake_persist_in_t in;
    bake_persist_out_t out;
Philip Carns's avatar
Philip Carns committed
460
    int ret;
461

Philip Carns's avatar
Philip Carns committed
462 463
    in.rid = rid;

464 465 466
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_persist_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
467

Shane Snyder's avatar
Shane Snyder committed
468 469
    if(hret != HG_SUCCESS)
        return(-1);
Philip Carns's avatar
Philip Carns committed
470

Shane Snyder's avatar
Shane Snyder committed
471
    hret = margo_forward(handle, &in);
Philip Carns's avatar
Philip Carns committed
472 473
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
474
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
475 476 477
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
478
    hret = margo_get_output(handle, &out);
Philip Carns's avatar
Philip Carns committed
479 480
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
481
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
482 483 484 485 486
        return(-1);
    }

    ret = out.ret;

Shane Snyder's avatar
Shane Snyder committed
487 488
    margo_free_output(handle, &out);
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
489
    return(ret);
Philip Carns's avatar
Philip Carns committed
490
}
Philip Carns's avatar
Philip Carns committed
491

492
int bake_create_write_persist(
493
    bake_provider_handle_t provider,
494 495 496 497 498 499 500 501 502 503 504 505 506 507 508
    bake_target_id_t bti,
    uint64_t region_size,
    uint64_t region_offset,
    void const *buf,
    uint64_t buf_size,
    bake_region_id_t *rid)
{
    hg_return_t hret;
    hg_handle_t handle;
    bake_create_write_persist_in_t in;
    bake_create_write_persist_out_t out;
    int ret;

    /* XXX eager path? */

509
    in.bti = bti;
510 511 512 513 514 515
    in.region_size = region_size;
    in.region_offset = region_offset;
    in.bulk_offset = 0;
    in.bulk_size = buf_size;
    in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy write */

516
    hret = margo_bulk_create(provider->client->mid, 1, (void**)(&buf), &buf_size,
517 518 519 520
        HG_BULK_READ_ONLY, &in.bulk_handle);
    if(hret != HG_SUCCESS)
        return(-1);

521 522
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_create_write_persist_id, &handle);
523

524 525 526 527 528 529
    if(hret != HG_SUCCESS)
    {
        margo_bulk_free(in.bulk_handle);
        return(-1);
    }

530 531
    margo_set_target_id(handle, provider->mplex_id);

532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557
    hret = margo_forward(handle, &in);
    if(hret != HG_SUCCESS)
    {
        margo_bulk_free(in.bulk_handle);
        margo_destroy(handle);
        return(-1);
    }

    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        margo_bulk_free(in.bulk_handle);
        margo_destroy(handle);
        return(-1);
    }

    ret = out.ret;
    if(ret == 0)
        *rid = out.rid;

    margo_free_output(handle, &out);
    margo_bulk_free(in.bulk_handle);
    margo_destroy(handle);
    return(ret);
}

558
int bake_create_write_persist_proxy(
559
    bake_provider_handle_t provider,
560 561 562 563 564 565 566 567 568 569 570 571 572 573 574
    bake_target_id_t bti,
    uint64_t region_size,
    uint64_t region_offset,
    hg_bulk_t remote_bulk,
    uint64_t remote_offset,
    const char* remote_addr,
    uint64_t size,
    bake_region_id_t *rid)
{
    hg_return_t hret;
    hg_handle_t handle;
    bake_create_write_persist_in_t in;
    bake_create_write_persist_out_t out;
    int ret;

575
    in.bti = bti;
576 577 578 579 580 581 582
    in.region_size = region_size;
    in.region_offset = region_offset;
    in.bulk_handle = remote_bulk;
    in.bulk_offset = remote_offset;
    in.bulk_size = size;
    in.remote_addr_str = (char*)remote_addr;

583 584 585
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_create_write_persist_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
586

587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612
    if(hret != HG_SUCCESS)
        return(-1);

    hret = margo_forward(handle, &in);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }

    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }

    ret = out.ret;
    if(ret == 0)
        *rid = out.rid;

    margo_free_output(handle, &out);
    margo_destroy(handle);
    return(ret);
}

613
int bake_get_size(
614
    bake_provider_handle_t provider,
615
    bake_region_id_t rid,
616 617 618
    uint64_t *region_size)
{
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
619
    hg_handle_t handle;
620 621
    bake_get_size_in_t in;
    bake_get_size_out_t out;
622 623 624 625
    int ret;

    in.rid = rid;

626 627 628
    hret = margo_create(provider->client->mid, provider->addr,
        provider->client->bake_get_size_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
629

Shane Snyder's avatar
Shane Snyder committed
630 631
    if(hret != HG_SUCCESS)
        return(-1);
632

Shane Snyder's avatar
Shane Snyder committed
633
    hret = margo_forward(handle, &in);
634 635
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
636
        margo_destroy(handle);
637 638 639
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
640
    hret = margo_get_output(handle, &out);
641 642
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
643
        margo_destroy(handle);
644 645 646 647 648 649
        return(-1);
    }

    ret = out.ret;
    *region_size = out.size;

Shane Snyder's avatar
Shane Snyder committed
650 651
    margo_free_output(handle, &out);
    margo_destroy(handle);
652 653 654
    return(ret);
}

655
int bake_noop(bake_provider_handle_t provider)
Philip Carns's avatar
Philip Carns committed
656 657
{
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
658
    hg_handle_t handle;
Philip Carns's avatar
Philip Carns committed
659

660 661 662
    hret = margo_create(provider->client->mid, provider->addr,
        provider->client->bake_noop_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
Philip Carns's avatar
Philip Carns committed
663

Shane Snyder's avatar
Shane Snyder committed
664 665
    if(hret != HG_SUCCESS)
        return(-1);
Philip Carns's avatar
Philip Carns committed
666

Shane Snyder's avatar
Shane Snyder committed
667
    hret = margo_forward(handle, NULL);
Philip Carns's avatar
Philip Carns committed
668 669
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
670
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
671 672 673
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
674
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
675 676 677
    return(0);
}

678
static int bake_eager_read(
679
    bake_provider_handle_t provider,
680
    bake_region_id_t rid,
681 682 683 684 685 686
    uint64_t region_offset,
    void *buf,
    uint64_t buf_size)
{
    hg_return_t hret;
    hg_handle_t handle;
687 688
    bake_eager_read_in_t in;
    bake_eager_read_out_t out;
689 690 691 692 693 694
    int ret;

    in.rid = rid;
    in.region_offset = region_offset;
    in.size = buf_size;

695 696 697
    hret = margo_create(provider->client->mid, provider->addr,
        provider->client->bake_eager_read_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
698

699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724
    if(hret != HG_SUCCESS)
        return(-1);
  
    hret = margo_forward(handle, &in);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }

    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }
    
    ret = out.ret;
    if(ret == 0)
        memcpy(buf, out.buffer, out.size);

    margo_free_output(handle, &out);
    margo_destroy(handle);
    return(ret);
}

725
int bake_read(
726
    bake_provider_handle_t provider,
727
    bake_region_id_t rid,
Philip Carns's avatar
Philip Carns committed
728 729 730 731
    uint64_t region_offset,
    void *buf,
    uint64_t buf_size)
{
732
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
733
    hg_handle_t handle;
734 735
    bake_read_in_t in;
    bake_read_out_t out;
736
    int ret;
737

738
    if(buf_size <= BAKE_EAGER_LIMIT)
739
        return(bake_eager_read(provider, rid, region_offset, buf, buf_size));
Philip Carns's avatar
Philip Carns committed
740

741 742
    in.rid = rid;
    in.region_offset = region_offset;
743 744 745
    in.bulk_offset = 0;
    in.bulk_size = buf_size;
    in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy read */
746

747
    hret = margo_bulk_create(provider->client->mid, 1, (void**)(&buf), &buf_size, 
748
        HG_BULK_WRITE_ONLY, &in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
749 750 751
    if(hret != HG_SUCCESS)
        return(-1);
   
752 753 754
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_read_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
755

756 757
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
758
        margo_bulk_free(in.bulk_handle);
759 760 761
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
762
    hret = margo_forward(handle, &in);
763 764
    if(hret != HG_SUCCESS)
    {
765
        margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
766
        margo_destroy(handle);
767 768 769
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
770
    hret = margo_get_output(handle, &out);
771 772
    if(hret != HG_SUCCESS)
    {
773
        margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
774
        margo_destroy(handle);
775 776 777 778 779
        return(-1);
    }
    
    ret = out.ret;

Shane Snyder's avatar
Shane Snyder committed
780
    margo_free_output(handle, &out);
781
    margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
782
    margo_destroy(handle);
783
    return(ret);
Philip Carns's avatar
Philip Carns committed
784 785
}

786
int bake_proxy_read(
787
    bake_provider_handle_t provider,
788
    bake_region_id_t rid,
Philip Carns's avatar
Philip Carns committed
789
    uint64_t region_offset,
790 791 792 793
    hg_bulk_t remote_bulk,
    uint64_t remote_offset,
    const char* remote_addr,
    uint64_t size)
Philip Carns's avatar
Philip Carns committed
794 795
{
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
796
    hg_handle_t handle;
797 798
    bake_read_in_t in;
    bake_read_out_t out;
799
    int ret;
Philip Carns's avatar
Philip Carns committed
800 801 802

    in.rid = rid;
    in.region_offset = region_offset;
803 804 805 806
    in.bulk_handle = remote_bulk;
    in.bulk_offset = remote_offset;
    in.bulk_size = size; 
    in.remote_addr_str = (char*)remote_addr;
Philip Carns's avatar
Philip Carns committed
807

808 809 810
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_read_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
811

Shane Snyder's avatar
Shane Snyder committed
812 813
    if(hret != HG_SUCCESS)
        return(-1);
814

Shane Snyder's avatar
Shane Snyder committed
815
    hret = margo_forward(handle, &in);
Philip Carns's avatar
Philip Carns committed
816 817
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
818
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
819 820 821
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
822
    hret = margo_get_output(handle, &out);
Philip Carns's avatar
Philip Carns committed
823 824
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
825
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
826 827
        return(-1);
    }
828

Philip Carns's avatar
Philip Carns committed
829 830
    ret = out.ret;

Shane Snyder's avatar
Shane Snyder committed
831 832
    margo_free_output(handle, &out);
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
833 834 835
    return(ret);
}