bake-client.c 20.2 KB
Newer Older
Philip Carns's avatar
Philip Carns committed
1 2 3 4 5 6
/*
 * (C) 2016 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

7 8
#include "bake-config.h"

9
#include <assert.h>
10
#include <margo.h>
11
#include <bake-client.h>
Philip Carns's avatar
Philip Carns committed
12
#include "uthash.h"
13
#include "bake-rpc.h"
14

15
#define BAKE_EAGER_LIMIT 2048
16

17
/* Refers to a single Margo initialization, for now this is shared by
18 19
 * all remote BAKE targets.  In the future we probably need to support
 * multiple in case we run atop more than one transport at a time.
20
 */
21
struct bake_client
22 23
{
    margo_instance_id mid;  
24

25 26 27 28 29 30
    hg_id_t bake_probe_id;
    hg_id_t bake_create_id;
    hg_id_t bake_eager_write_id;
    hg_id_t bake_eager_read_id;
    hg_id_t bake_write_id;
    hg_id_t bake_persist_id;
31
    hg_id_t bake_create_write_persist_id;
32 33 34
    hg_id_t bake_get_size_id;
    hg_id_t bake_read_id;
    hg_id_t bake_noop_id;
35 36

    uint64_t num_provider_handles;
37 38
};

39
struct bake_provider_handle {
40
    struct bake_client* client;
41 42 43
    hg_addr_t           addr;
    uint8_t             mplex_id;
    uint64_t            refcount;
44
};
45

46
static int bake_client_register(bake_client_t client, margo_instance_id mid)
47
{
48
    client->mid = mid;
49

50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
    /* check if RPCs have already been registered */
    hg_bool_t flag;
    hg_id_t id;
    margo_registered_name(mid, "bake_probe_rpc", &id, &flag);

    if(flag == HG_TRUE) { /* RPCs already registered */

        margo_registered_name(mid, "bake_probe_rpc",                &client->bake_probe_id,                &flag);
        margo_registered_name(mid, "bake_create_rpc",               &client->bake_create_id,               &flag);
        margo_registered_name(mid, "bake_write_rpc",                &client->bake_write_id,                &flag);
        margo_registered_name(mid, "bake_eager_write_rpc",          &client->bake_eager_write_id,          &flag);
        margo_registered_name(mid, "bake_eager_read_rpc",           &client->bake_eager_read_id,           &flag);
        margo_registered_name(mid, "bake_persist_rpc",              &client->bake_persist_id,              &flag);
        margo_registered_name(mid, "bake_create_write_persist_rpc", &client->bake_create_write_persist_id, &flag);
        margo_registered_name(mid, "bake_get_size_rpc",             &client->bake_get_size_id,             &flag);
        margo_registered_name(mid, "bake_read_rpc",                 &client->bake_read_id,                 &flag);
        margo_registered_name(mid, "bake_noop_rpc",                 &client->bake_noop_id,                 &flag);

    } else { /* RPCs not already registered */

        client->bake_probe_id = 
            MARGO_REGISTER(mid, "bake_probe_rpc",
                    bake_probe_in_t, bake_probe_out_t, NULL);
        client->bake_create_id = 
            MARGO_REGISTER(mid, "bake_create_rpc",
                    bake_create_in_t, bake_create_out_t, NULL);
        client->bake_write_id = 
            MARGO_REGISTER(mid, "bake_write_rpc",
                    bake_write_in_t, bake_write_out_t, NULL);
        client->bake_eager_write_id = 
            MARGO_REGISTER(mid, "bake_eager_write_rpc",
                    bake_eager_write_in_t, bake_eager_write_out_t, NULL);
        client->bake_eager_read_id = 
            MARGO_REGISTER(mid, "bake_eager_read_rpc",
                    bake_eager_read_in_t, bake_eager_read_out_t, NULL);
        client->bake_persist_id = 
            MARGO_REGISTER(mid, "bake_persist_rpc",
                    bake_persist_in_t, bake_persist_out_t, NULL);
        client->bake_create_write_persist_id =
            MARGO_REGISTER(mid, "bake_create_write_persist_rpc",
                    bake_create_write_persist_in_t, bake_create_write_persist_out_t, NULL);
        client->bake_get_size_id = 
            MARGO_REGISTER(mid, "bake_get_size_rpc",
                    bake_get_size_in_t, bake_get_size_out_t, NULL);
        client->bake_read_id = 
            MARGO_REGISTER(mid, "bake_read_rpc",
                    bake_read_in_t, bake_read_out_t, NULL);
        client->bake_noop_id = 
            MARGO_REGISTER(mid, "bake_noop_rpc",
                    void, void, NULL);
    }
101

102 103 104
    return(0);
}

105 106 107 108 109
int bake_client_init(margo_instance_id mid, bake_client_t* client)
{
    bake_client_t c = (bake_client_t)calloc(1, sizeof(*c));
    if(!c) return -1;

110 111
    c->num_provider_handles = 0;

112 113 114 115 116 117 118 119 120
    int ret = bake_client_register(c, mid);
    if(ret != 0) return ret;

    *client = c;
    return 0;
}

int bake_client_finalize(bake_client_t client)
{
121 122 123 124 125
    if(client->num_provider_handles != 0) {
        fprintf(stderr, 
                "[BAKE] Warning: %d provider handles not released before bake_client_finalize was called\n",
                client->num_provider_handles);
    }
126 127 128 129
    free(client);
    return 0;
}

130 131 132 133 134
int bake_probe(
    bake_provider_handle_t provider,
    uint64_t max_targets,
    bake_target_id_t *bti,
    uint64_t* num_targets)
135 136 137
{
    hg_return_t hret;
    int ret;
138
    bake_probe_in_t in;
139
    bake_probe_out_t out;
140
    hg_handle_t handle;
141

142 143 144
    if(bti == NULL) max_targets = 0;
    in.max_targets = max_targets;

145
    /* create handle */
146 147 148 149 150
    hret = margo_create(
                provider->client->mid, 
                provider->addr, 
                provider->client->bake_probe_id, 
                &handle);
151

152 153 154 155 156 157 158
    if(hret != HG_SUCCESS) return -1;

    hret = margo_set_target_id(handle, provider->mplex_id);
    
    if(hret != HG_SUCCESS) {
        margo_destroy(handle);
        return -1;
159 160
    }

161
    hret = margo_forward(handle, &in);
162
    if(hret != HG_SUCCESS) {
163
        margo_destroy(handle);
164
        return -1;
165 166
    }

167
    hret = margo_get_output(handle, &out);
168
    if(hret != HG_SUCCESS) {
169
        margo_destroy(handle);
170
        return -1;
171 172 173 174
    }

    ret = out.ret;

175
    if(ret == HG_SUCCESS) {
176 177 178
        if(max_targets == 0) {
            *num_targets = out.num_targets;
        } else {
179
            uint64_t s = out.num_targets > max_targets ? max_targets : out.num_targets;
180 181 182
            if(s > 0) {
                memcpy(bti, out.targets, sizeof(*bti)*s);
            }
183
            *num_targets = s;
184
        }
185 186
    }

187 188 189
    margo_free_output(handle, &out);
    margo_destroy(handle);

190
    return ret;
191
}
192 193 194 195 196 197

int bake_provider_handle_create(
        bake_client_t client,
        hg_addr_t addr,
        uint8_t mplex_id,
        bake_provider_handle_t* handle)
198
{
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
    if(client == BAKE_CLIENT_NULL) return -1;

    bake_provider_handle_t provider = 
        (bake_provider_handle_t)calloc(1, sizeof(*provider));

    if(!provider) return -1;

    hg_return_t ret = margo_addr_dup(client->mid, addr, &(provider->addr));
    if(ret != HG_SUCCESS) {
        free(provider);
        return -1;
    }
    
    provider->client   = client;
    provider->mplex_id = mplex_id;
    provider->refcount = 1;
215

216 217
    client->num_provider_handles += 1;

218 219 220 221 222 223 224 225 226
    *handle = provider;
    return 0;
}

int bake_provider_handle_ref_incr(bake_provider_handle_t handle)
{
    if(handle == BAKE_PROVIDER_HANDLE_NULL) return -1;
    handle->refcount += 1;
    return 0;
227 228
}

229 230 231 232 233 234
int bake_provider_handle_release(bake_provider_handle_t handle)
{
    if(handle == BAKE_PROVIDER_HANDLE_NULL) return -1;
    handle->refcount -= 1;
    if(handle->refcount == 0) {
        margo_addr_free(handle->client->mid, handle->addr);
235
        handle->client->num_provider_handles -= 1;
236 237 238 239 240
        free(handle);
    }
    return 0;
}
  
241
int bake_shutdown_service(bake_client_t client, hg_addr_t addr)
242
{
243
    return margo_shutdown_remote_instance(client->mid, addr);
244
}
Philip Carns's avatar
Philip Carns committed
245

246
static int bake_eager_write(
247
    bake_provider_handle_t provider,
248
    bake_region_id_t rid,
Philip Carns's avatar
Philip Carns committed
249 250 251 252 253
    uint64_t region_offset,
    void const *buf,
    uint64_t buf_size)
{
    hg_return_t hret;
254
    hg_handle_t handle;
255 256
    bake_eager_write_in_t in;
    bake_eager_write_out_t out;
Philip Carns's avatar
Philip Carns committed
257 258 259 260 261 262
    int ret;

    in.rid = rid;
    in.region_offset = region_offset;
    in.size = buf_size;
    in.buffer = (char*)buf;
263
  
264 265 266
    hret = margo_create(provider->client->mid, provider->addr, 
                provider->client->bake_eager_write_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
267

268 269
    if(hret != HG_SUCCESS)
        return(-1);
Philip Carns's avatar
Philip Carns committed
270

271
    hret = margo_forward(handle, &in);
Philip Carns's avatar
Philip Carns committed
272 273
    if(hret != HG_SUCCESS)
    {
274
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
275 276 277
        return(-1);
    }

278
    hret = margo_get_output(handle, &out);
Philip Carns's avatar
Philip Carns committed
279 280
    if(hret != HG_SUCCESS)
    {
281
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
282 283 284 285 286
        return(-1);
    }
    
    ret = out.ret;

287 288
    margo_free_output(handle, &out);
    margo_destroy(handle);
289

Philip Carns's avatar
Philip Carns committed
290 291 292
    return(ret);
}

293
int bake_write(
294
    bake_provider_handle_t provider,
295
    bake_region_id_t rid,
Philip Carns's avatar
Philip Carns committed
296 297 298 299
    uint64_t region_offset,
    void const *buf,
    uint64_t buf_size)
{
300
    hg_return_t hret;
301
    hg_handle_t handle;
302 303
    bake_write_in_t in;
    bake_write_out_t out;
304
    int ret;
305

306
    if(buf_size <= BAKE_EAGER_LIMIT)
307
        return(bake_eager_write(provider, rid, region_offset, buf, buf_size));
Philip Carns's avatar
Philip Carns committed
308

309 310
    in.rid = rid;
    in.region_offset = region_offset;
311 312 313
    in.bulk_offset = 0;
    in.bulk_size = buf_size;
    in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy write */
Philip Carns's avatar
Philip Carns committed
314

315
    hret = margo_bulk_create(provider->client->mid, 1, (void**)(&buf), &buf_size, 
316
        HG_BULK_READ_ONLY, &in.bulk_handle);
317 318 319
    if(hret != HG_SUCCESS)
        return(-1);
   
320 321 322
    hret = margo_create(provider->client->mid, provider->addr, 
        provider->client->bake_write_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
323

324 325
    if(hret != HG_SUCCESS)
    {
326
        margo_bulk_free(in.bulk_handle);
327 328 329
        return(-1);
    }

330
    hret = margo_forward(handle, &in);
331 332
    if(hret != HG_SUCCESS)
    {
333
        margo_bulk_free(in.bulk_handle);
334
        margo_destroy(handle);
335 336 337
        return(-1);
    }

338
    hret = margo_get_output(handle, &out);
339 340
    if(hret != HG_SUCCESS)
    {
341
        margo_bulk_free(in.bulk_handle);
342
        margo_destroy(handle);
343 344 345 346 347
        return(-1);
    }
    
    ret = out.ret;

348
    margo_free_output(handle, &out);
349
    margo_bulk_free(in.bulk_handle);
350
    margo_destroy(handle);
351
    return(ret);
Philip Carns's avatar
Philip Carns committed
352 353
}

354
int bake_proxy_write(
355
    bake_provider_handle_t provider,
356
    bake_region_id_t rid,
357 358 359
    uint64_t region_offset,
    hg_bulk_t remote_bulk,
    uint64_t remote_offset,
360
    const char* remote_addr,
361 362 363 364
    uint64_t size)
{
    hg_return_t hret;
    hg_handle_t handle;
365 366
    bake_write_in_t in;
    bake_write_out_t out;
367 368 369 370 371 372 373
    int ret;

    in.rid = rid;
    in.region_offset = region_offset;
    in.bulk_handle = remote_bulk;
    in.bulk_offset = remote_offset;
    in.bulk_size = size;
374
    in.remote_addr_str = (char*)remote_addr;
Shane Snyder's avatar
Shane Snyder committed
375

376 377 378
    hret = margo_create(provider->client->mid, provider->addr,
        provider->client->bake_write_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
379

380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403
    if(hret != HG_SUCCESS)
        return(-1);

    hret = margo_forward(handle, &in);
    if(hret != HG_SUCCESS)
    {   
        margo_destroy(handle);
        return(-1);
    }
    
    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {   
        margo_destroy(handle);
        return(-1);
    }

    ret = out.ret;

    margo_free_output(handle, &out);
    margo_destroy(handle);
    return(ret);
}

404
int bake_create(
405
    bake_provider_handle_t provider,
Philip Carns's avatar
Philip Carns committed
406 407
    bake_target_id_t bti,
    uint64_t region_size,
408
    bake_region_id_t *rid)
Philip Carns's avatar
Philip Carns committed
409
{
410
    hg_return_t hret;
411
    hg_handle_t handle;
412 413
    bake_create_in_t in;
    bake_create_out_t out;
414
    int ret = 0;
415

416
    in.bti = bti;
417 418
    in.region_size = region_size;

419 420
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_create_id, &handle);
421 422

    if(hret != HG_SUCCESS) {
423
        return(-1);
424 425
    }

426
    margo_set_target_id(handle, provider->mplex_id);
427

428
    hret = margo_forward(handle, &in);
429 430
    if(hret != HG_SUCCESS)
    {
431
        margo_destroy(handle);
432 433
        return(-1);
    }
Philip Carns's avatar
Philip Carns committed
434

435
    hret = margo_get_output(handle, &out);
436 437
    if(hret != HG_SUCCESS)
    {
438
        margo_destroy(handle);
439 440 441 442
        return(-1);
    }

    ret = out.ret;
443 444
    if(ret == 0) 
        *rid = out.rid;
445

446 447
    margo_free_output(handle, &out);
    margo_destroy(handle);
448
    return(ret);
Philip Carns's avatar
Philip Carns committed
449 450 451
}


452
int bake_persist(
453
    bake_provider_handle_t provider,
454
    bake_region_id_t rid)
Philip Carns's avatar
Philip Carns committed
455
{
456
    hg_return_t hret;
457
    hg_handle_t handle;
458 459
    bake_persist_in_t in;
    bake_persist_out_t out;
460
    int ret;
461

462 463
    in.rid = rid;

464 465 466
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_persist_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
467

468 469
    if(hret != HG_SUCCESS)
        return(-1);
470

471
    hret = margo_forward(handle, &in);
472 473
    if(hret != HG_SUCCESS)
    {
474
        margo_destroy(handle);
475 476 477
        return(-1);
    }

478
    hret = margo_get_output(handle, &out);
479 480
    if(hret != HG_SUCCESS)
    {
481
        margo_destroy(handle);
482 483 484 485 486
        return(-1);
    }

    ret = out.ret;

487 488
    margo_free_output(handle, &out);
    margo_destroy(handle);
489
    return(ret);
Philip Carns's avatar
Philip Carns committed
490
}
Philip Carns's avatar
Philip Carns committed
491

492
int bake_create_write_persist(
493
    bake_provider_handle_t provider,
494 495 496 497 498 499 500 501 502 503 504 505 506 507 508
    bake_target_id_t bti,
    uint64_t region_size,
    uint64_t region_offset,
    void const *buf,
    uint64_t buf_size,
    bake_region_id_t *rid)
{
    hg_return_t hret;
    hg_handle_t handle;
    bake_create_write_persist_in_t in;
    bake_create_write_persist_out_t out;
    int ret;

    /* XXX eager path? */

509
    in.bti = bti;
510 511 512 513 514 515
    in.region_size = region_size;
    in.region_offset = region_offset;
    in.bulk_offset = 0;
    in.bulk_size = buf_size;
    in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy write */

516
    hret = margo_bulk_create(provider->client->mid, 1, (void**)(&buf), &buf_size,
517 518 519 520
        HG_BULK_READ_ONLY, &in.bulk_handle);
    if(hret != HG_SUCCESS)
        return(-1);

521 522
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_create_write_persist_id, &handle);
523

524 525 526 527 528 529
    if(hret != HG_SUCCESS)
    {
        margo_bulk_free(in.bulk_handle);
        return(-1);
    }

530 531
    margo_set_target_id(handle, provider->mplex_id);

532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557
    hret = margo_forward(handle, &in);
    if(hret != HG_SUCCESS)
    {
        margo_bulk_free(in.bulk_handle);
        margo_destroy(handle);
        return(-1);
    }

    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        margo_bulk_free(in.bulk_handle);
        margo_destroy(handle);
        return(-1);
    }

    ret = out.ret;
    if(ret == 0)
        *rid = out.rid;

    margo_free_output(handle, &out);
    margo_bulk_free(in.bulk_handle);
    margo_destroy(handle);
    return(ret);
}

558
int bake_create_write_persist_proxy(
559
    bake_provider_handle_t provider,
560 561 562 563 564 565 566 567 568 569 570 571 572 573 574
    bake_target_id_t bti,
    uint64_t region_size,
    uint64_t region_offset,
    hg_bulk_t remote_bulk,
    uint64_t remote_offset,
    const char* remote_addr,
    uint64_t size,
    bake_region_id_t *rid)
{
    hg_return_t hret;
    hg_handle_t handle;
    bake_create_write_persist_in_t in;
    bake_create_write_persist_out_t out;
    int ret;

575
    in.bti = bti;
576 577 578 579 580 581 582
    in.region_size = region_size;
    in.region_offset = region_offset;
    in.bulk_handle = remote_bulk;
    in.bulk_offset = remote_offset;
    in.bulk_size = size;
    in.remote_addr_str = (char*)remote_addr;

583 584 585
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_create_write_persist_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
586

587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612
    if(hret != HG_SUCCESS)
        return(-1);

    hret = margo_forward(handle, &in);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }

    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }

    ret = out.ret;
    if(ret == 0)
        *rid = out.rid;

    margo_free_output(handle, &out);
    margo_destroy(handle);
    return(ret);
}

613
int bake_get_size(
614
    bake_provider_handle_t provider,
615
    bake_region_id_t rid,
616 617 618
    uint64_t *region_size)
{
    hg_return_t hret;
619
    hg_handle_t handle;
620 621
    bake_get_size_in_t in;
    bake_get_size_out_t out;
622 623 624 625
    int ret;

    in.rid = rid;

626 627 628
    hret = margo_create(provider->client->mid, provider->addr,
        provider->client->bake_get_size_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
629

630 631
    if(hret != HG_SUCCESS)
        return(-1);
632

633
    hret = margo_forward(handle, &in);
634 635
    if(hret != HG_SUCCESS)
    {
636
        margo_destroy(handle);
637 638 639
        return(-1);
    }

640
    hret = margo_get_output(handle, &out);
641 642
    if(hret != HG_SUCCESS)
    {
643
        margo_destroy(handle);
644 645 646 647 648 649
        return(-1);
    }

    ret = out.ret;
    *region_size = out.size;

650 651
    margo_free_output(handle, &out);
    margo_destroy(handle);
652 653 654
    return(ret);
}

655
int bake_noop(bake_provider_handle_t provider)
Philip Carns's avatar
Philip Carns committed
656 657
{
    hg_return_t hret;
658
    hg_handle_t handle;
Philip Carns's avatar
Philip Carns committed
659

660 661 662
    hret = margo_create(provider->client->mid, provider->addr,
        provider->client->bake_noop_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
Philip Carns's avatar
Philip Carns committed
663

664 665
    if(hret != HG_SUCCESS)
        return(-1);
Philip Carns's avatar
Philip Carns committed
666

667
    hret = margo_forward(handle, NULL);
Philip Carns's avatar
Philip Carns committed
668 669
    if(hret != HG_SUCCESS)
    {
670
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
671 672 673
        return(-1);
    }

674
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
675 676 677
    return(0);
}

678
static int bake_eager_read(
679
    bake_provider_handle_t provider,
680
    bake_region_id_t rid,
681 682 683 684 685 686
    uint64_t region_offset,
    void *buf,
    uint64_t buf_size)
{
    hg_return_t hret;
    hg_handle_t handle;
687 688
    bake_eager_read_in_t in;
    bake_eager_read_out_t out;
689 690 691 692 693 694
    int ret;

    in.rid = rid;
    in.region_offset = region_offset;
    in.size = buf_size;

695 696 697
    hret = margo_create(provider->client->mid, provider->addr,
        provider->client->bake_eager_read_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
698

699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724
    if(hret != HG_SUCCESS)
        return(-1);
  
    hret = margo_forward(handle, &in);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }

    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }
    
    ret = out.ret;
    if(ret == 0)
        memcpy(buf, out.buffer, out.size);

    margo_free_output(handle, &out);
    margo_destroy(handle);
    return(ret);
}

725
int bake_read(
726
    bake_provider_handle_t provider,
727
    bake_region_id_t rid,
Philip Carns's avatar
Philip Carns committed
728 729 730 731
    uint64_t region_offset,
    void *buf,
    uint64_t buf_size)
{
732
    hg_return_t hret;
733
    hg_handle_t handle;
734 735
    bake_read_in_t in;
    bake_read_out_t out;
736
    int ret;
737

738
    if(buf_size <= BAKE_EAGER_LIMIT)
739
        return(bake_eager_read(provider, rid, region_offset, buf, buf_size));
Philip Carns's avatar
Philip Carns committed
740

741 742
    in.rid = rid;
    in.region_offset = region_offset;
743 744 745
    in.bulk_offset = 0;
    in.bulk_size = buf_size;
    in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy read */
746

747
    hret = margo_bulk_create(provider->client->mid, 1, (void**)(&buf), &buf_size, 
748
        HG_BULK_WRITE_ONLY, &in.bulk_handle);
749 750 751
    if(hret != HG_SUCCESS)
        return(-1);
   
752 753 754
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_read_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
755

756 757
    if(hret != HG_SUCCESS)
    {
758
        margo_bulk_free(in.bulk_handle);
759 760 761
        return(-1);
    }

762
    hret = margo_forward(handle, &in);
763 764
    if(hret != HG_SUCCESS)
    {
765
        margo_bulk_free(in.bulk_handle);
766
        margo_destroy(handle);
767 768 769
        return(-1);
    }

770
    hret = margo_get_output(handle, &out);
771 772
    if(hret != HG_SUCCESS)
    {
773
        margo_bulk_free(in.bulk_handle);
774
        margo_destroy(handle);
775 776 777 778 779
        return(-1);
    }
    
    ret = out.ret;

780
    margo_free_output(handle, &out);
781
    margo_bulk_free(in.bulk_handle);
782
    margo_destroy(handle);
783
    return(ret);
Philip Carns's avatar
Philip Carns committed
784 785
}

786
int bake_proxy_read(
787
    bake_provider_handle_t provider,
788
    bake_region_id_t rid,
Philip Carns's avatar
Philip Carns committed
789
    uint64_t region_offset,
790 791 792 793
    hg_bulk_t remote_bulk,
    uint64_t remote_offset,
    const char* remote_addr,
    uint64_t size)
Philip Carns's avatar
Philip Carns committed
794 795
{
    hg_return_t hret;
796
    hg_handle_t handle;
797 798
    bake_read_in_t in;
    bake_read_out_t out;
799
    int ret;
Philip Carns's avatar
Philip Carns committed
800 801 802

    in.rid = rid;
    in.region_offset = region_offset;
803 804 805 806
    in.bulk_handle = remote_bulk;
    in.bulk_offset = remote_offset;
    in.bulk_size = size; 
    in.remote_addr_str = (char*)remote_addr;
Philip Carns's avatar
Philip Carns committed
807

808 809 810
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_read_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
811

812 813
    if(hret != HG_SUCCESS)
        return(-1);
814

815
    hret = margo_forward(handle, &in);
Philip Carns's avatar
Philip Carns committed
816 817
    if(hret != HG_SUCCESS)
    {
818
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
819 820 821
        return(-1);
    }

822
    hret = margo_get_output(handle, &out);
Philip Carns's avatar
Philip Carns committed
823 824
    if(hret != HG_SUCCESS)
    {
825
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
826 827
        return(-1);
    }
828

Philip Carns's avatar
Philip Carns committed
829 830
    ret = out.ret;

831 832
    margo_free_output(handle, &out);
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
833 834 835
    return(ret);
}