bake-client.c 23.1 KB
Newer Older
Philip Carns's avatar
Philip Carns committed
1 2 3 4 5 6
/*
 * (C) 2016 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

7 8
#include "bake-config.h"

9
#include <assert.h>
10
#include <margo.h>
11
#include <bake-client.h>
Philip Carns's avatar
Philip Carns committed
12
#include "uthash.h"
13
#include "bake-rpc.h"
14

15
#define BAKE_DEFAULT_EAGER_LIMIT 2048
16

17
/* Refers to a single Margo initialization, for now this is shared by
18 19
 * all remote BAKE targets.  In the future we probably need to support
 * multiple in case we run atop more than one transport at a time.
20
 */
21
struct bake_client
22 23
{
    margo_instance_id mid;  
24

25 26 27 28 29 30
    hg_id_t bake_probe_id;
    hg_id_t bake_create_id;
    hg_id_t bake_eager_write_id;
    hg_id_t bake_eager_read_id;
    hg_id_t bake_write_id;
    hg_id_t bake_persist_id;
31
    hg_id_t bake_create_write_persist_id;
32
    hg_id_t bake_get_size_id;
33
    hg_id_t bake_get_data_id;
34 35
    hg_id_t bake_read_id;
    hg_id_t bake_noop_id;
36
    hg_id_t bake_remove_id;
37 38

    uint64_t num_provider_handles;
39 40
};

41
struct bake_provider_handle {
42
    struct bake_client* client;
43
    hg_addr_t           addr;
44
    uint16_t            provider_id;
45
    uint64_t            refcount;
46
    uint64_t            eager_limit;
47
};
48

49
static int bake_client_register(bake_client_t client, margo_instance_id mid)
50
{
51
    client->mid = mid;
52

53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
    /* check if RPCs have already been registered */
    hg_bool_t flag;
    hg_id_t id;
    margo_registered_name(mid, "bake_probe_rpc", &id, &flag);

    if(flag == HG_TRUE) { /* RPCs already registered */

        margo_registered_name(mid, "bake_probe_rpc",                &client->bake_probe_id,                &flag);
        margo_registered_name(mid, "bake_create_rpc",               &client->bake_create_id,               &flag);
        margo_registered_name(mid, "bake_write_rpc",                &client->bake_write_id,                &flag);
        margo_registered_name(mid, "bake_eager_write_rpc",          &client->bake_eager_write_id,          &flag);
        margo_registered_name(mid, "bake_eager_read_rpc",           &client->bake_eager_read_id,           &flag);
        margo_registered_name(mid, "bake_persist_rpc",              &client->bake_persist_id,              &flag);
        margo_registered_name(mid, "bake_create_write_persist_rpc", &client->bake_create_write_persist_id, &flag);
        margo_registered_name(mid, "bake_get_size_rpc",             &client->bake_get_size_id,             &flag);
68
        margo_registered_name(mid, "bake_get_data_rpc",             &client->bake_get_data_id,             &flag);
69 70
        margo_registered_name(mid, "bake_read_rpc",                 &client->bake_read_id,                 &flag);
        margo_registered_name(mid, "bake_noop_rpc",                 &client->bake_noop_id,                 &flag);
71
        margo_registered_name(mid, "bake_remove_rpc",               &client->bake_remove_id,               &flag);
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98

    } else { /* RPCs not already registered */

        client->bake_probe_id = 
            MARGO_REGISTER(mid, "bake_probe_rpc",
                    bake_probe_in_t, bake_probe_out_t, NULL);
        client->bake_create_id = 
            MARGO_REGISTER(mid, "bake_create_rpc",
                    bake_create_in_t, bake_create_out_t, NULL);
        client->bake_write_id = 
            MARGO_REGISTER(mid, "bake_write_rpc",
                    bake_write_in_t, bake_write_out_t, NULL);
        client->bake_eager_write_id = 
            MARGO_REGISTER(mid, "bake_eager_write_rpc",
                    bake_eager_write_in_t, bake_eager_write_out_t, NULL);
        client->bake_eager_read_id = 
            MARGO_REGISTER(mid, "bake_eager_read_rpc",
                    bake_eager_read_in_t, bake_eager_read_out_t, NULL);
        client->bake_persist_id = 
            MARGO_REGISTER(mid, "bake_persist_rpc",
                    bake_persist_in_t, bake_persist_out_t, NULL);
        client->bake_create_write_persist_id =
            MARGO_REGISTER(mid, "bake_create_write_persist_rpc",
                    bake_create_write_persist_in_t, bake_create_write_persist_out_t, NULL);
        client->bake_get_size_id = 
            MARGO_REGISTER(mid, "bake_get_size_rpc",
                    bake_get_size_in_t, bake_get_size_out_t, NULL);
99 100 101
        client->bake_get_data_id =
            MARGO_REGISTER(mid, "bake_get_data_rpc",
                    bake_get_data_in_t, bake_get_data_out_t, NULL);
102 103 104 105 106 107
        client->bake_read_id = 
            MARGO_REGISTER(mid, "bake_read_rpc",
                    bake_read_in_t, bake_read_out_t, NULL);
        client->bake_noop_id = 
            MARGO_REGISTER(mid, "bake_noop_rpc",
                    void, void, NULL);
108 109 110
        client->bake_remove_id =
            MARGO_REGISTER(mid, "bake_remove_rpc",
                    bake_remove_in_t, bake_remove_out_t, NULL);
111
    }
112

113 114 115
    return(0);
}

116 117 118 119 120
int bake_client_init(margo_instance_id mid, bake_client_t* client)
{
    bake_client_t c = (bake_client_t)calloc(1, sizeof(*c));
    if(!c) return -1;

121 122
    c->num_provider_handles = 0;

123 124 125 126 127 128 129 130 131
    int ret = bake_client_register(c, mid);
    if(ret != 0) return ret;

    *client = c;
    return 0;
}

int bake_client_finalize(bake_client_t client)
{
132 133 134 135 136
    if(client->num_provider_handles != 0) {
        fprintf(stderr, 
                "[BAKE] Warning: %d provider handles not released before bake_client_finalize was called\n",
                client->num_provider_handles);
    }
137 138 139 140
    free(client);
    return 0;
}

141 142 143 144 145
int bake_probe(
    bake_provider_handle_t provider,
    uint64_t max_targets,
    bake_target_id_t *bti,
    uint64_t* num_targets)
146 147 148
{
    hg_return_t hret;
    int ret;
149
    bake_probe_in_t in;
150
    bake_probe_out_t out;
151
    hg_handle_t handle;
152

153 154 155
    if(bti == NULL) max_targets = 0;
    in.max_targets = max_targets;

156
    /* create handle */
157 158 159 160 161
    hret = margo_create(
                provider->client->mid, 
                provider->addr, 
                provider->client->bake_probe_id, 
                &handle);
162

163 164
    if(hret != HG_SUCCESS) return -1;

165
    hret = margo_provider_forward(provider->provider_id, handle, &in);
166
    if(hret != HG_SUCCESS) {
167
        margo_destroy(handle);
168
        return -1;
169 170
    }

171
    hret = margo_get_output(handle, &out);
172
    if(hret != HG_SUCCESS) {
173
        margo_destroy(handle);
174
        return -1;
175 176 177 178
    }

    ret = out.ret;

179
    if(ret == HG_SUCCESS) {
180 181 182
        if(max_targets == 0) {
            *num_targets = out.num_targets;
        } else {
183
            uint64_t s = out.num_targets > max_targets ? max_targets : out.num_targets;
184 185 186
            if(s > 0) {
                memcpy(bti, out.targets, sizeof(*bti)*s);
            }
187
            *num_targets = s;
188
        }
189 190
    }

191 192 193
    margo_free_output(handle, &out);
    margo_destroy(handle);

194
    return ret;
195
}
196 197 198 199

int bake_provider_handle_create(
        bake_client_t client,
        hg_addr_t addr,
200
        uint16_t provider_id,
201
        bake_provider_handle_t* handle)
202
{
203 204 205 206 207 208 209 210 211 212 213 214 215 216
    if(client == BAKE_CLIENT_NULL) return -1;

    bake_provider_handle_t provider = 
        (bake_provider_handle_t)calloc(1, sizeof(*provider));

    if(!provider) return -1;

    hg_return_t ret = margo_addr_dup(client->mid, addr, &(provider->addr));
    if(ret != HG_SUCCESS) {
        free(provider);
        return -1;
    }
    
    provider->client   = client;
217
    provider->provider_id = provider_id;
218
    provider->refcount = 1;
219
    provider->eager_limit = BAKE_DEFAULT_EAGER_LIMIT;
220

221 222
    client->num_provider_handles += 1;

223 224 225 226
    *handle = provider;
    return 0;
}

227 228 229 230 231 232 233 234 235 236 237 238 239 240
int bake_provider_handle_get_eager_limit(bake_provider_handle_t handle, uint64_t* limit)
{
    if(handle == BAKE_PROVIDER_HANDLE_NULL) return -1;
    *limit = handle->eager_limit;
    return 0;
}

int bake_provider_handle_set_eager_limit(bake_provider_handle_t handle, uint64_t limit)
{
    if(handle == BAKE_PROVIDER_HANDLE_NULL) return -1;
    handle->eager_limit = limit;
    return 0;
}

241 242 243 244 245
int bake_provider_handle_ref_incr(bake_provider_handle_t handle)
{
    if(handle == BAKE_PROVIDER_HANDLE_NULL) return -1;
    handle->refcount += 1;
    return 0;
246 247
}

248 249 250 251 252 253
int bake_provider_handle_release(bake_provider_handle_t handle)
{
    if(handle == BAKE_PROVIDER_HANDLE_NULL) return -1;
    handle->refcount -= 1;
    if(handle->refcount == 0) {
        margo_addr_free(handle->client->mid, handle->addr);
254
        handle->client->num_provider_handles -= 1;
255 256 257 258 259
        free(handle);
    }
    return 0;
}
  
260
int bake_shutdown_service(bake_client_t client, hg_addr_t addr)
261
{
262
    return margo_shutdown_remote_instance(client->mid, addr);
263
}
Philip Carns's avatar
Philip Carns committed
264

265
static int bake_eager_write(
266
    bake_provider_handle_t provider,
267
    bake_region_id_t rid,
Philip Carns's avatar
Philip Carns committed
268 269 270 271 272
    uint64_t region_offset,
    void const *buf,
    uint64_t buf_size)
{
    hg_return_t hret;
273
    hg_handle_t handle;
274 275
    bake_eager_write_in_t in;
    bake_eager_write_out_t out;
Philip Carns's avatar
Philip Carns committed
276 277 278 279 280 281
    int ret;

    in.rid = rid;
    in.region_offset = region_offset;
    in.size = buf_size;
    in.buffer = (char*)buf;
282
  
283 284
    hret = margo_create(provider->client->mid, provider->addr, 
                provider->client->bake_eager_write_id, &handle);
285

286 287
    if(hret != HG_SUCCESS)
        return(-1);
Philip Carns's avatar
Philip Carns committed
288

289
    hret = margo_provider_forward(provider->provider_id, handle, &in);
Philip Carns's avatar
Philip Carns committed
290 291
    if(hret != HG_SUCCESS)
    {
292
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
293 294 295
        return(-1);
    }

296
    hret = margo_get_output(handle, &out);
Philip Carns's avatar
Philip Carns committed
297 298
    if(hret != HG_SUCCESS)
    {
299
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
300 301 302 303 304
        return(-1);
    }
    
    ret = out.ret;

305 306
    margo_free_output(handle, &out);
    margo_destroy(handle);
307

Philip Carns's avatar
Philip Carns committed
308 309 310
    return(ret);
}

311
int bake_write(
312
    bake_provider_handle_t provider,
313
    bake_region_id_t rid,
Philip Carns's avatar
Philip Carns committed
314 315 316 317
    uint64_t region_offset,
    void const *buf,
    uint64_t buf_size)
{
318
    hg_return_t hret;
319
    hg_handle_t handle;
320 321
    bake_write_in_t in;
    bake_write_out_t out;
322
    int ret;
323

324
    if(buf_size <= provider->eager_limit)
325
        return(bake_eager_write(provider, rid, region_offset, buf, buf_size));
Philip Carns's avatar
Philip Carns committed
326

327 328
    in.rid = rid;
    in.region_offset = region_offset;
329 330 331
    in.bulk_offset = 0;
    in.bulk_size = buf_size;
    in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy write */
Philip Carns's avatar
Philip Carns committed
332

333
    hret = margo_bulk_create(provider->client->mid, 1, (void**)(&buf), &buf_size, 
334
        HG_BULK_READ_ONLY, &in.bulk_handle);
335 336 337
    if(hret != HG_SUCCESS)
        return(-1);
   
338 339
    hret = margo_create(provider->client->mid, provider->addr, 
        provider->client->bake_write_id, &handle);
340

341 342
    if(hret != HG_SUCCESS)
    {
343
        margo_bulk_free(in.bulk_handle);
344 345 346
        return(-1);
    }

347
    hret = margo_provider_forward(provider->provider_id, handle, &in);
348 349
    if(hret != HG_SUCCESS)
    {
350
        margo_bulk_free(in.bulk_handle);
351
        margo_destroy(handle);
352 353 354
        return(-1);
    }

355
    hret = margo_get_output(handle, &out);
356 357
    if(hret != HG_SUCCESS)
    {
358
        margo_bulk_free(in.bulk_handle);
359
        margo_destroy(handle);
360 361 362 363 364
        return(-1);
    }
    
    ret = out.ret;

365
    margo_free_output(handle, &out);
366
    margo_bulk_free(in.bulk_handle);
367
    margo_destroy(handle);
368
    return(ret);
Philip Carns's avatar
Philip Carns committed
369 370
}

371
int bake_proxy_write(
372
    bake_provider_handle_t provider,
373
    bake_region_id_t rid,
374 375 376
    uint64_t region_offset,
    hg_bulk_t remote_bulk,
    uint64_t remote_offset,
377
    const char* remote_addr,
378 379 380 381
    uint64_t size)
{
    hg_return_t hret;
    hg_handle_t handle;
382 383
    bake_write_in_t in;
    bake_write_out_t out;
384 385 386 387 388 389 390
    int ret;

    in.rid = rid;
    in.region_offset = region_offset;
    in.bulk_handle = remote_bulk;
    in.bulk_offset = remote_offset;
    in.bulk_size = size;
391
    in.remote_addr_str = (char*)remote_addr;
Shane Snyder's avatar
Shane Snyder committed
392

393 394
    hret = margo_create(provider->client->mid, provider->addr,
        provider->client->bake_write_id, &handle);
395

396 397 398
    if(hret != HG_SUCCESS)
        return(-1);

399
    hret = margo_provider_forward(provider->provider_id, handle, &in);
400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419
    if(hret != HG_SUCCESS)
    {   
        margo_destroy(handle);
        return(-1);
    }
    
    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {   
        margo_destroy(handle);
        return(-1);
    }

    ret = out.ret;

    margo_free_output(handle, &out);
    margo_destroy(handle);
    return(ret);
}

420
int bake_create(
421
    bake_provider_handle_t provider,
Philip Carns's avatar
Philip Carns committed
422 423
    bake_target_id_t bti,
    uint64_t region_size,
424
    bake_region_id_t *rid)
Philip Carns's avatar
Philip Carns committed
425
{
426
    hg_return_t hret;
427
    hg_handle_t handle;
428 429
    bake_create_in_t in;
    bake_create_out_t out;
430
    int ret = 0;
431

432
    in.bti = bti;
433 434
    in.region_size = region_size;

435 436
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_create_id, &handle);
437 438

    if(hret != HG_SUCCESS) {
439
        return(-1);
440 441
    }

442
    hret = margo_provider_forward(provider->provider_id, handle, &in);
443 444
    if(hret != HG_SUCCESS)
    {
445
        margo_destroy(handle);
446 447
        return(-1);
    }
Philip Carns's avatar
Philip Carns committed
448

449
    hret = margo_get_output(handle, &out);
450 451
    if(hret != HG_SUCCESS)
    {
452
        margo_destroy(handle);
453 454 455 456
        return(-1);
    }

    ret = out.ret;
457 458
    if(ret == 0) 
        *rid = out.rid;
459

460 461
    margo_free_output(handle, &out);
    margo_destroy(handle);
462
    return(ret);
Philip Carns's avatar
Philip Carns committed
463 464 465
}


466
int bake_persist(
467
    bake_provider_handle_t provider,
468
    bake_region_id_t rid)
Philip Carns's avatar
Philip Carns committed
469
{
470
    hg_return_t hret;
471
    hg_handle_t handle;
472 473
    bake_persist_in_t in;
    bake_persist_out_t out;
474
    int ret;
475

476 477
    in.rid = rid;

478 479
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_persist_id, &handle);
480

481 482
    if(hret != HG_SUCCESS)
        return(-1);
483

484
    hret = margo_provider_forward(provider->provider_id, handle, &in);
485 486
    if(hret != HG_SUCCESS)
    {
487
        margo_destroy(handle);
488 489 490
        return(-1);
    }

491
    hret = margo_get_output(handle, &out);
492 493
    if(hret != HG_SUCCESS)
    {
494
        margo_destroy(handle);
495 496 497 498 499
        return(-1);
    }

    ret = out.ret;

500 501
    margo_free_output(handle, &out);
    margo_destroy(handle);
502
    return(ret);
Philip Carns's avatar
Philip Carns committed
503
}
Philip Carns's avatar
Philip Carns committed
504

505
int bake_create_write_persist(
506
    bake_provider_handle_t provider,
507 508 509 510 511 512 513 514 515 516 517 518 519
    bake_target_id_t bti,
    void const *buf,
    uint64_t buf_size,
    bake_region_id_t *rid)
{
    hg_return_t hret;
    hg_handle_t handle;
    bake_create_write_persist_in_t in;
    bake_create_write_persist_out_t out;
    int ret;

    /* XXX eager path? */

520
    in.bti = bti;
521 522 523 524
    in.bulk_offset = 0;
    in.bulk_size = buf_size;
    in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy write */

525
    hret = margo_bulk_create(provider->client->mid, 1, (void**)(&buf), &buf_size,
526 527 528 529
        HG_BULK_READ_ONLY, &in.bulk_handle);
    if(hret != HG_SUCCESS)
        return(-1);

530 531
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_create_write_persist_id, &handle);
532

533 534 535 536 537 538
    if(hret != HG_SUCCESS)
    {
        margo_bulk_free(in.bulk_handle);
        return(-1);
    }

539
    hret = margo_provider_forward(provider->provider_id, handle, &in);
540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564
    if(hret != HG_SUCCESS)
    {
        margo_bulk_free(in.bulk_handle);
        margo_destroy(handle);
        return(-1);
    }

    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        margo_bulk_free(in.bulk_handle);
        margo_destroy(handle);
        return(-1);
    }

    ret = out.ret;
    if(ret == 0)
        *rid = out.rid;

    margo_free_output(handle, &out);
    margo_bulk_free(in.bulk_handle);
    margo_destroy(handle);
    return(ret);
}

565
int bake_create_write_persist_proxy(
566
    bake_provider_handle_t provider,
567 568 569 570 571 572 573 574 575 576 577 578 579
    bake_target_id_t bti,
    hg_bulk_t remote_bulk,
    uint64_t remote_offset,
    const char* remote_addr,
    uint64_t size,
    bake_region_id_t *rid)
{
    hg_return_t hret;
    hg_handle_t handle;
    bake_create_write_persist_in_t in;
    bake_create_write_persist_out_t out;
    int ret;

580
    in.bti = bti;
581 582 583 584 585
    in.bulk_handle = remote_bulk;
    in.bulk_offset = remote_offset;
    in.bulk_size = size;
    in.remote_addr_str = (char*)remote_addr;

586 587
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_create_write_persist_id, &handle);
588

589 590 591
    if(hret != HG_SUCCESS)
        return(-1);

592
    hret = margo_provider_forward(provider->provider_id, handle, &in);
593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }

    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }

    ret = out.ret;
    if(ret == 0)
        *rid = out.rid;

    margo_free_output(handle, &out);
    margo_destroy(handle);
    return(ret);
}

615
int bake_get_size(
616
    bake_provider_handle_t provider,
617
    bake_region_id_t rid,
618 619 620
    uint64_t *region_size)
{
    hg_return_t hret;
621
    hg_handle_t handle;
622 623
    bake_get_size_in_t in;
    bake_get_size_out_t out;
624 625 626 627
    int ret;

    in.rid = rid;

628 629
    hret = margo_create(provider->client->mid, provider->addr,
        provider->client->bake_get_size_id, &handle);
630

631 632
    if(hret != HG_SUCCESS)
        return(-1);
633

634
    hret = margo_provider_forward(provider->provider_id, handle, &in);
635 636
    if(hret != HG_SUCCESS)
    {
637
        margo_destroy(handle);
638 639 640
        return(-1);
    }

641
    hret = margo_get_output(handle, &out);
642 643
    if(hret != HG_SUCCESS)
    {
644
        margo_destroy(handle);
645 646 647 648 649 650
        return(-1);
    }

    ret = out.ret;
    *region_size = out.size;

651 652
    margo_free_output(handle, &out);
    margo_destroy(handle);
653 654 655
    return(ret);
}

656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714
int bake_get_data(
    bake_provider_handle_t provider,
    bake_region_id_t rid,
    void** ptr)
{
    hg_return_t hret;
    hg_handle_t handle;
    bake_get_data_in_t in;
    bake_get_data_out_t out;
    int ret;

    // make sure the target provider is on the same address space
    hg_addr_t self_addr;
    if(HG_SUCCESS != margo_addr_self(provider->client->mid, &self_addr)) return -1;
    hg_addr_t trgt_addr = provider->addr;
    hg_size_t addr_size = 128;
    char self_addr_str[128];
    char trgt_addr_str[128];

    if(HG_SUCCESS != margo_addr_to_string(provider->client->mid, self_addr_str, &addr_size, self_addr)) {
        margo_addr_free(provider->client->mid, self_addr);
        return -1;
    }
    if(HG_SUCCESS != margo_addr_to_string(provider->client->mid, trgt_addr_str, &addr_size, trgt_addr)) {
        margo_addr_free(provider->client->mid, self_addr);
        return -1;
    }
    if(strcmp(self_addr_str, trgt_addr_str) != 0) {
        margo_addr_free(provider->client->mid, self_addr);
        return -1;
    }
    margo_addr_free(provider->client->mid, self_addr);

    in.rid = rid;

    hret = margo_create(provider->client->mid, provider->addr,
        provider->client->bake_get_data_id, &handle);

    if(hret != HG_SUCCESS)
        return(-1);

    hret = margo_provider_forward(provider->provider_id, handle, &in);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }

    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }

    ret = out.ret;
    *ptr = (void*)out.ptr;

    margo_free_output(handle, &out);
715
    margo_destroy(handle);
716 717 718
    return(ret);
}

719
int bake_noop(bake_provider_handle_t provider)
Philip Carns's avatar
Philip Carns committed
720 721
{
    hg_return_t hret;
722
    hg_handle_t handle;
Philip Carns's avatar
Philip Carns committed
723

724 725
    hret = margo_create(provider->client->mid, provider->addr,
        provider->client->bake_noop_id, &handle);
Philip Carns's avatar
Philip Carns committed
726

727 728
    if(hret != HG_SUCCESS)
        return(-1);
Philip Carns's avatar
Philip Carns committed
729

730
    hret = margo_provider_forward(provider->provider_id, handle, NULL);
Philip Carns's avatar
Philip Carns committed
731 732
    if(hret != HG_SUCCESS)
    {
733
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
734 735 736
        return(-1);
    }

737
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
738 739 740
    return(0);
}

741
static int bake_eager_read(
742
    bake_provider_handle_t provider,
743
    bake_region_id_t rid,
744 745 746 747 748 749
    uint64_t region_offset,
    void *buf,
    uint64_t buf_size)
{
    hg_return_t hret;
    hg_handle_t handle;
750 751
    bake_eager_read_in_t in;
    bake_eager_read_out_t out;
752 753 754 755 756 757
    int ret;

    in.rid = rid;
    in.region_offset = region_offset;
    in.size = buf_size;

758 759
    hret = margo_create(provider->client->mid, provider->addr,
        provider->client->bake_eager_read_id, &handle);
760

761 762 763
    if(hret != HG_SUCCESS)
        return(-1);
  
764
    hret = margo_provider_forward(provider->provider_id, handle, &in);
765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }

    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }
    
    ret = out.ret;
    if(ret == 0)
        memcpy(buf, out.buffer, out.size);

    margo_free_output(handle, &out);
    margo_destroy(handle);
    return(ret);
}

787
int bake_read(
788
    bake_provider_handle_t provider,
789
    bake_region_id_t rid,
Philip Carns's avatar
Philip Carns committed
790 791 792 793
    uint64_t region_offset,
    void *buf,
    uint64_t buf_size)
{
794
    hg_return_t hret;
795
    hg_handle_t handle;
796 797
    bake_read_in_t in;
    bake_read_out_t out;
798
    int ret;
799

800
    if(buf_size <= provider->eager_limit)
801
        return(bake_eager_read(provider, rid, region_offset, buf, buf_size));
Philip Carns's avatar
Philip Carns committed
802

803 804
    in.rid = rid;
    in.region_offset = region_offset;
805 806 807
    in.bulk_offset = 0;
    in.bulk_size = buf_size;
    in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy read */
808

809
    hret = margo_bulk_create(provider->client->mid, 1, (void**)(&buf), &buf_size, 
810
        HG_BULK_WRITE_ONLY, &in.bulk_handle);
811 812 813
    if(hret != HG_SUCCESS)
        return(-1);
   
814 815
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_read_id, &handle);
816

817 818
    if(hret != HG_SUCCESS)
    {
819
        margo_bulk_free(in.bulk_handle);
820 821 822
        return(-1);
    }

823
    hret = margo_provider_forward(provider->provider_id, handle, &in);
824 825
    if(hret != HG_SUCCESS)
    {
826
        margo_bulk_free(in.bulk_handle);
827
        margo_destroy(handle);
828 829 830
        return(-1);
    }

831
    hret = margo_get_output(handle, &out);
832 833
    if(hret != HG_SUCCESS)
    {
834
        margo_bulk_free(in.bulk_handle);
835
        margo_destroy(handle);
836 837 838 839 840
        return(-1);
    }
    
    ret = out.ret;

841
    margo_free_output(handle, &out);
842
    margo_bulk_free(in.bulk_handle);
843
    margo_destroy(handle);
844
    return(ret);
Philip Carns's avatar
Philip Carns committed
845 846
}

847
int bake_proxy_read(
848
    bake_provider_handle_t provider,
849
    bake_region_id_t rid,
Philip Carns's avatar
Philip Carns committed
850
    uint64_t region_offset,
851 852 853 854
    hg_bulk_t remote_bulk,
    uint64_t remote_offset,
    const char* remote_addr,
    uint64_t size)
Philip Carns's avatar
Philip Carns committed
855 856
{
    hg_return_t hret;
857
    hg_handle_t handle;
858 859
    bake_read_in_t in;
    bake_read_out_t out;
860
    int ret;
Philip Carns's avatar
Philip Carns committed
861 862 863

    in.rid = rid;
    in.region_offset = region_offset;
864 865 866 867
    in.bulk_handle = remote_bulk;
    in.bulk_offset = remote_offset;
    in.bulk_size = size; 
    in.remote_addr_str = (char*)remote_addr;
Philip Carns's avatar
Philip Carns committed
868

869 870
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_read_id, &handle);
871

872 873
    if(hret != HG_SUCCESS)
        return(-1);
874

875
    hret = margo_provider_forward(provider->provider_id, handle, &in);
Philip Carns's avatar
Philip Carns committed
876 877
    if(hret != HG_SUCCESS)
    {
878
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
879 880 881
        return(-1);
    }

882
    hret = margo_get_output(handle, &out);
Philip Carns's avatar
Philip Carns committed
883 884
    if(hret != HG_SUCCESS)
    {
885
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
886 887
        return(-1);
    }
888

Philip Carns's avatar
Philip Carns committed
889 890
    ret = out.ret;

891 892
    margo_free_output(handle, &out);
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
893 894 895
    return(ret);
}

896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913
int bake_remove(
    bake_provider_handle_t provider,
    bake_region_id_t rid)
{
    hg_return_t hret;
    hg_handle_t handle;
    bake_remove_in_t in;
    bake_remove_out_t out;
    int ret;

    in.rid = rid;

    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_remove_id, &handle);

    if(hret != HG_SUCCESS)
        return(-1);

914
    hret = margo_provider_forward(provider->provider_id, handle, &in);
915 916 917 918 919 920 921 922 923 924 925 926 927 928 929
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }

    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }

    ret = out.ret;

930
    margo_free_output(handle, &out);
931 932 933
    margo_destroy(handle);
    return(ret);
}