bake-client.c 19 KB
Newer Older
Philip Carns's avatar
Philip Carns committed
1 2 3 4 5 6
/*
 * (C) 2016 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

7 8
#include "bake-config.h"

9
#include <assert.h>
Philip Carns's avatar
Philip Carns committed
10
#include <margo.h>
11
#include <bake-client.h>
Philip Carns's avatar
Philip Carns committed
12
#include "uthash.h"
13
#include "bake-rpc.h"
Philip Carns's avatar
Philip Carns committed
14

15
#define BAKE_EAGER_LIMIT 2048
16

17
/* Refers to a single Margo initialization, for now this is shared by
18 19
 * all remote BAKE targets.  In the future we probably need to support
 * multiple in case we run atop more than one transport at a time.
20
 */
21
struct bake_client
22 23
{
    margo_instance_id mid;  
24

25 26 27 28 29 30 31
    hg_id_t bake_probe_id;
    hg_id_t bake_shutdown_id; 
    hg_id_t bake_create_id;
    hg_id_t bake_eager_write_id;
    hg_id_t bake_eager_read_id;
    hg_id_t bake_write_id;
    hg_id_t bake_persist_id;
32
    hg_id_t bake_create_write_persist_id;
33 34 35
    hg_id_t bake_get_size_id;
    hg_id_t bake_read_id;
    hg_id_t bake_noop_id;
36 37

    uint64_t num_provider_handles;
38 39
};

40
struct bake_provider_handle {
41
    struct bake_client* client;
42 43 44
    hg_addr_t           addr;
    uint8_t             mplex_id;
    uint64_t            refcount;
45
};
Philip Carns's avatar
Philip Carns committed
46

47
static int bake_client_register(bake_client_t client, margo_instance_id mid)
Philip Carns's avatar
Philip Carns committed
48
{
49
    client->mid = mid;
Philip Carns's avatar
Philip Carns committed
50 51

    /* register RPCs */
52 53
    client->bake_probe_id = 
        MARGO_REGISTER(mid, "bake_probe_rpc",
54
        bake_probe_in_t, bake_probe_out_t, NULL);
55 56
    client->bake_shutdown_id = 
        MARGO_REGISTER(mid, "bake_shutdown_rpc",
57
        void, void, NULL);
58 59
    client->bake_create_id = 
        MARGO_REGISTER(mid, "bake_create_rpc",
60
        bake_create_in_t, bake_create_out_t, NULL);
61 62
    client->bake_write_id = 
        MARGO_REGISTER(mid, "bake_write_rpc",
63
        bake_write_in_t, bake_write_out_t, NULL);
64 65
    client->bake_eager_write_id = 
        MARGO_REGISTER(mid, "bake_eager_write_rpc",
66
        bake_eager_write_in_t, bake_eager_write_out_t, NULL);
67 68
    client->bake_eager_read_id = 
        MARGO_REGISTER(mid, "bake_eager_read_rpc",
69
        bake_eager_read_in_t, bake_eager_read_out_t, NULL);
70 71
    client->bake_persist_id = 
        MARGO_REGISTER(mid, "bake_persist_rpc",
72
        bake_persist_in_t, bake_persist_out_t, NULL);
73 74
    client->bake_create_write_persist_id =
        MARGO_REGISTER(mid, "bake_create_write_persist_rpc",
75
         bake_create_write_persist_in_t, bake_create_write_persist_out_t, NULL);
76 77
    client->bake_get_size_id = 
        MARGO_REGISTER(mid, "bake_get_size_rpc",
78
        bake_get_size_in_t, bake_get_size_out_t, NULL);
79 80
    client->bake_read_id = 
        MARGO_REGISTER(mid, "bake_read_rpc",
81
        bake_read_in_t, bake_read_out_t, NULL);
82 83
    client->bake_noop_id = 
        MARGO_REGISTER(mid, "bake_noop_rpc",
84
        void, void, NULL);
Philip Carns's avatar
Philip Carns committed
85

86 87 88
    return(0);
}

89 90 91 92 93
int bake_client_init(margo_instance_id mid, bake_client_t* client)
{
    bake_client_t c = (bake_client_t)calloc(1, sizeof(*c));
    if(!c) return -1;

94 95
    c->num_provider_handles = 0;

96 97 98 99 100 101 102 103 104
    int ret = bake_client_register(c, mid);
    if(ret != 0) return ret;

    *client = c;
    return 0;
}

int bake_client_finalize(bake_client_t client)
{
105 106 107 108 109
    if(client->num_provider_handles != 0) {
        fprintf(stderr, 
                "[BAKE] Warning: %d provider handles not released before bake_client_finalize was called\n",
                client->num_provider_handles);
    }
110 111 112 113
    free(client);
    return 0;
}

114 115 116 117 118
int bake_probe(
    bake_provider_handle_t provider,
    uint64_t max_targets,
    bake_target_id_t *bti,
    uint64_t* num_targets)
119 120 121
{
    hg_return_t hret;
    int ret;
122
    bake_probe_in_t in;
123
    bake_probe_out_t out;
124
    hg_handle_t handle;
Philip Carns's avatar
Philip Carns committed
125

126 127 128
    if(bti == NULL) max_targets = 0;
    in.max_targets = max_targets;

129
    /* create handle */
130 131 132 133 134
    hret = margo_create(
                provider->client->mid, 
                provider->addr, 
                provider->client->bake_probe_id, 
                &handle);
135

136 137 138 139 140 141 142
    if(hret != HG_SUCCESS) return -1;

    hret = margo_set_target_id(handle, provider->mplex_id);
    
    if(hret != HG_SUCCESS) {
        margo_destroy(handle);
        return -1;
143 144
    }

145
    hret = margo_forward(handle, &in);
146
    if(hret != HG_SUCCESS) {
147
        margo_destroy(handle);
148
        return -1;
149 150
    }

151
    hret = margo_get_output(handle, &out);
152
    if(hret != HG_SUCCESS) {
153
        margo_destroy(handle);
154
        return -1;
155 156 157 158
    }

    ret = out.ret;

159
    if(ret == HG_SUCCESS) {
160 161 162 163 164 165 166 167
        if(max_targets == 0) {
            *num_targets = out.num_targets;
        } else {
            uint64_t s = *num_targets > max_targets ? max_targets : *num_targets;
            if(s > 0) {
                memcpy(bti, out.targets, sizeof(*bti)*s);
            }
        }
168 169
    }

170 171 172
    margo_free_output(handle, &out);
    margo_destroy(handle);

173
    return ret;
Philip Carns's avatar
Philip Carns committed
174
}
175 176 177 178 179 180

int bake_provider_handle_create(
        bake_client_t client,
        hg_addr_t addr,
        uint8_t mplex_id,
        bake_provider_handle_t* handle)
Philip Carns's avatar
Philip Carns committed
181
{
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
    if(client == BAKE_CLIENT_NULL) return -1;

    bake_provider_handle_t provider = 
        (bake_provider_handle_t)calloc(1, sizeof(*provider));

    if(!provider) return -1;

    hg_return_t ret = margo_addr_dup(client->mid, addr, &(provider->addr));
    if(ret != HG_SUCCESS) {
        free(provider);
        return -1;
    }
    
    provider->client   = client;
    provider->mplex_id = mplex_id;
    provider->refcount = 1;
198

199 200
    client->num_provider_handles += 1;

201 202 203 204 205 206 207 208 209
    *handle = provider;
    return 0;
}

int bake_provider_handle_ref_incr(bake_provider_handle_t handle)
{
    if(handle == BAKE_PROVIDER_HANDLE_NULL) return -1;
    handle->refcount += 1;
    return 0;
Philip Carns's avatar
Philip Carns committed
210 211
}

212 213 214 215 216 217
int bake_provider_handle_release(bake_provider_handle_t handle)
{
    if(handle == BAKE_PROVIDER_HANDLE_NULL) return -1;
    handle->refcount -= 1;
    if(handle->refcount == 0) {
        margo_addr_free(handle->client->mid, handle->addr);
218
        handle->client->num_provider_handles -= 1;
219 220 221 222 223
        free(handle);
    }
    return 0;
}
  
224
int bake_shutdown_service(bake_client_t client, hg_addr_t addr)
Philip Carns's avatar
Philip Carns committed
225 226
{
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
227
    hg_handle_t handle;
228

229 230
    hret = margo_create(client->mid, addr, 
            client->bake_shutdown_id, &handle);
Philip Carns's avatar
Philip Carns committed
231

Shane Snyder's avatar
Shane Snyder committed
232 233
    if(hret != HG_SUCCESS)
        return(-1);
Philip Carns's avatar
Philip Carns committed
234

Shane Snyder's avatar
Shane Snyder committed
235
    hret = margo_forward(handle, NULL);
Philip Carns's avatar
Philip Carns committed
236 237
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
238
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
239 240 241
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
242
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
243 244
    return(0);
}
Philip Carns's avatar
Philip Carns committed
245

246
static int bake_eager_write(
247
    bake_provider_handle_t provider,
248
    bake_region_id_t rid,
Philip Carns's avatar
Philip Carns committed
249 250 251 252 253
    uint64_t region_offset,
    void const *buf,
    uint64_t buf_size)
{
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
254
    hg_handle_t handle;
255 256
    bake_eager_write_in_t in;
    bake_eager_write_out_t out;
Philip Carns's avatar
Philip Carns committed
257 258 259 260 261 262
    int ret;

    in.rid = rid;
    in.region_offset = region_offset;
    in.size = buf_size;
    in.buffer = (char*)buf;
263
  
264 265 266
    hret = margo_create(provider->client->mid, provider->addr, 
                provider->client->bake_eager_write_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
267

Shane Snyder's avatar
Shane Snyder committed
268 269
    if(hret != HG_SUCCESS)
        return(-1);
Philip Carns's avatar
Philip Carns committed
270

Shane Snyder's avatar
Shane Snyder committed
271
    hret = margo_forward(handle, &in);
Philip Carns's avatar
Philip Carns committed
272 273
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
274
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
275 276 277
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
278
    hret = margo_get_output(handle, &out);
Philip Carns's avatar
Philip Carns committed
279 280
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
281
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
282 283 284 285 286
        return(-1);
    }
    
    ret = out.ret;

Shane Snyder's avatar
Shane Snyder committed
287 288
    margo_free_output(handle, &out);
    margo_destroy(handle);
289

Philip Carns's avatar
Philip Carns committed
290 291 292
    return(ret);
}

293
int bake_write(
294
    bake_provider_handle_t provider,
295
    bake_region_id_t rid,
Philip Carns's avatar
Philip Carns committed
296 297 298 299
    uint64_t region_offset,
    void const *buf,
    uint64_t buf_size)
{
Philip Carns's avatar
Philip Carns committed
300
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
301
    hg_handle_t handle;
302 303
    bake_write_in_t in;
    bake_write_out_t out;
Philip Carns's avatar
Philip Carns committed
304
    int ret;
305

306
    if(buf_size <= BAKE_EAGER_LIMIT)
307
        return(bake_eager_write(provider, rid, region_offset, buf, buf_size));
Philip Carns's avatar
Philip Carns committed
308

Philip Carns's avatar
Philip Carns committed
309 310
    in.rid = rid;
    in.region_offset = region_offset;
311 312 313
    in.bulk_offset = 0;
    in.bulk_size = buf_size;
    in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy write */
Philip Carns's avatar
Philip Carns committed
314

315
    hret = margo_bulk_create(provider->client->mid, 1, (void**)(&buf), &buf_size, 
Philip Carns's avatar
Philip Carns committed
316
        HG_BULK_READ_ONLY, &in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
317 318 319
    if(hret != HG_SUCCESS)
        return(-1);
   
320 321 322
    hret = margo_create(provider->client->mid, provider->addr, 
        provider->client->bake_write_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
323

Philip Carns's avatar
Philip Carns committed
324 325
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
326
        margo_bulk_free(in.bulk_handle);
Philip Carns's avatar
Philip Carns committed
327 328 329
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
330
    hret = margo_forward(handle, &in);
Philip Carns's avatar
Philip Carns committed
331 332
    if(hret != HG_SUCCESS)
    {
333
        margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
334
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
335 336 337
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
338
    hret = margo_get_output(handle, &out);
Philip Carns's avatar
Philip Carns committed
339 340
    if(hret != HG_SUCCESS)
    {
341
        margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
342
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
343 344 345 346 347
        return(-1);
    }
    
    ret = out.ret;

Shane Snyder's avatar
Shane Snyder committed
348
    margo_free_output(handle, &out);
349
    margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
350
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
351
    return(ret);
Philip Carns's avatar
Philip Carns committed
352 353
}

354
int bake_proxy_write(
355
    bake_provider_handle_t provider,
356
    bake_region_id_t rid,
357 358 359
    uint64_t region_offset,
    hg_bulk_t remote_bulk,
    uint64_t remote_offset,
360
    const char* remote_addr,
361 362 363 364
    uint64_t size)
{
    hg_return_t hret;
    hg_handle_t handle;
365 366
    bake_write_in_t in;
    bake_write_out_t out;
367 368 369 370 371 372 373
    int ret;

    in.rid = rid;
    in.region_offset = region_offset;
    in.bulk_handle = remote_bulk;
    in.bulk_offset = remote_offset;
    in.bulk_size = size;
374
    in.remote_addr_str = (char*)remote_addr;
Shane Snyder's avatar
Shane Snyder committed
375

376 377 378
    hret = margo_create(provider->client->mid, provider->addr,
        provider->client->bake_write_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
379

380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403
    if(hret != HG_SUCCESS)
        return(-1);

    hret = margo_forward(handle, &in);
    if(hret != HG_SUCCESS)
    {   
        margo_destroy(handle);
        return(-1);
    }
    
    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {   
        margo_destroy(handle);
        return(-1);
    }

    ret = out.ret;

    margo_free_output(handle, &out);
    margo_destroy(handle);
    return(ret);
}

404
int bake_create(
405
    bake_provider_handle_t provider,
Philip Carns's avatar
Philip Carns committed
406 407
    bake_target_id_t bti,
    uint64_t region_size,
408
    bake_region_id_t *rid)
Philip Carns's avatar
Philip Carns committed
409
{
410
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
411
    hg_handle_t handle;
412 413
    bake_create_in_t in;
    bake_create_out_t out;
414
    int ret = 0;
415

416
    in.bti = bti;
Philip Carns's avatar
Philip Carns committed
417 418
    in.region_size = region_size;

419 420
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_create_id, &handle);
421 422

    if(hret != HG_SUCCESS) {
Shane Snyder's avatar
Shane Snyder committed
423
        return(-1);
424 425
    }

426
    margo_set_target_id(handle, provider->mplex_id);
427

Shane Snyder's avatar
Shane Snyder committed
428
    hret = margo_forward(handle, &in);
429 430
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
431
        margo_destroy(handle);
432 433
        return(-1);
    }
Philip Carns's avatar
Philip Carns committed
434

Shane Snyder's avatar
Shane Snyder committed
435
    hret = margo_get_output(handle, &out);
436 437
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
438
        margo_destroy(handle);
439 440 441 442 443 444
        return(-1);
    }

    ret = out.ret;
    *rid = out.rid;

Shane Snyder's avatar
Shane Snyder committed
445 446
    margo_free_output(handle, &out);
    margo_destroy(handle);
447
    return(ret);
Philip Carns's avatar
Philip Carns committed
448 449 450
}


451
int bake_persist(
452
    bake_provider_handle_t provider,
453
    bake_region_id_t rid)
Philip Carns's avatar
Philip Carns committed
454
{
Philip Carns's avatar
Philip Carns committed
455
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
456
    hg_handle_t handle;
457 458
    bake_persist_in_t in;
    bake_persist_out_t out;
Philip Carns's avatar
Philip Carns committed
459
    int ret;
460

Philip Carns's avatar
Philip Carns committed
461 462
    in.rid = rid;

463 464 465
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_persist_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
466

Shane Snyder's avatar
Shane Snyder committed
467 468
    if(hret != HG_SUCCESS)
        return(-1);
Philip Carns's avatar
Philip Carns committed
469

Shane Snyder's avatar
Shane Snyder committed
470
    hret = margo_forward(handle, &in);
Philip Carns's avatar
Philip Carns committed
471 472
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
473
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
474 475 476
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
477
    hret = margo_get_output(handle, &out);
Philip Carns's avatar
Philip Carns committed
478 479
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
480
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
481 482 483 484 485
        return(-1);
    }

    ret = out.ret;

Shane Snyder's avatar
Shane Snyder committed
486 487
    margo_free_output(handle, &out);
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
488
    return(ret);
Philip Carns's avatar
Philip Carns committed
489
}
Philip Carns's avatar
Philip Carns committed
490

491
int bake_create_write_persist(
492
    bake_provider_handle_t provider,
493 494 495 496 497 498 499 500 501 502 503 504 505 506 507
    bake_target_id_t bti,
    uint64_t region_size,
    uint64_t region_offset,
    void const *buf,
    uint64_t buf_size,
    bake_region_id_t *rid)
{
    hg_return_t hret;
    hg_handle_t handle;
    bake_create_write_persist_in_t in;
    bake_create_write_persist_out_t out;
    int ret;

    /* XXX eager path? */

508
    in.bti = bti;
509 510 511 512 513 514
    in.region_size = region_size;
    in.region_offset = region_offset;
    in.bulk_offset = 0;
    in.bulk_size = buf_size;
    in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy write */

515
    hret = margo_bulk_create(provider->client->mid, 1, (void**)(&buf), &buf_size,
516 517 518 519
        HG_BULK_READ_ONLY, &in.bulk_handle);
    if(hret != HG_SUCCESS)
        return(-1);

520 521 522
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_create_write_persist_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
523

524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555
    if(hret != HG_SUCCESS)
    {
        margo_bulk_free(in.bulk_handle);
        return(-1);
    }

    hret = margo_forward(handle, &in);
    if(hret != HG_SUCCESS)
    {
        margo_bulk_free(in.bulk_handle);
        margo_destroy(handle);
        return(-1);
    }

    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        margo_bulk_free(in.bulk_handle);
        margo_destroy(handle);
        return(-1);
    }

    ret = out.ret;
    if(ret == 0)
        *rid = out.rid;

    margo_free_output(handle, &out);
    margo_bulk_free(in.bulk_handle);
    margo_destroy(handle);
    return(ret);
}

556
int bake_create_write_persist_proxy(
557
    bake_provider_handle_t provider,
558 559 560 561 562 563 564 565 566 567 568 569 570 571 572
    bake_target_id_t bti,
    uint64_t region_size,
    uint64_t region_offset,
    hg_bulk_t remote_bulk,
    uint64_t remote_offset,
    const char* remote_addr,
    uint64_t size,
    bake_region_id_t *rid)
{
    hg_return_t hret;
    hg_handle_t handle;
    bake_create_write_persist_in_t in;
    bake_create_write_persist_out_t out;
    int ret;

573
    in.bti = bti;
574 575 576 577 578 579 580
    in.region_size = region_size;
    in.region_offset = region_offset;
    in.bulk_handle = remote_bulk;
    in.bulk_offset = remote_offset;
    in.bulk_size = size;
    in.remote_addr_str = (char*)remote_addr;

581 582 583
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_create_write_persist_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
584

585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610
    if(hret != HG_SUCCESS)
        return(-1);

    hret = margo_forward(handle, &in);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }

    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }

    ret = out.ret;
    if(ret == 0)
        *rid = out.rid;

    margo_free_output(handle, &out);
    margo_destroy(handle);
    return(ret);
}

611
int bake_get_size(
612
    bake_provider_handle_t provider,
613
    bake_region_id_t rid,
614 615 616
    uint64_t *region_size)
{
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
617
    hg_handle_t handle;
618 619
    bake_get_size_in_t in;
    bake_get_size_out_t out;
620 621 622 623
    int ret;

    in.rid = rid;

624 625 626
    hret = margo_create(provider->client->mid, provider->addr,
        provider->client->bake_get_size_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
627

Shane Snyder's avatar
Shane Snyder committed
628 629
    if(hret != HG_SUCCESS)
        return(-1);
630

Shane Snyder's avatar
Shane Snyder committed
631
    hret = margo_forward(handle, &in);
632 633
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
634
        margo_destroy(handle);
635 636 637
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
638
    hret = margo_get_output(handle, &out);
639 640
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
641
        margo_destroy(handle);
642 643 644 645 646 647
        return(-1);
    }

    ret = out.ret;
    *region_size = out.size;

Shane Snyder's avatar
Shane Snyder committed
648 649
    margo_free_output(handle, &out);
    margo_destroy(handle);
650 651 652
    return(ret);
}

653
int bake_noop(bake_provider_handle_t provider)
Philip Carns's avatar
Philip Carns committed
654 655
{
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
656
    hg_handle_t handle;
Philip Carns's avatar
Philip Carns committed
657

658 659 660
    hret = margo_create(provider->client->mid, provider->addr,
        provider->client->bake_noop_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
Philip Carns's avatar
Philip Carns committed
661

Shane Snyder's avatar
Shane Snyder committed
662 663
    if(hret != HG_SUCCESS)
        return(-1);
Philip Carns's avatar
Philip Carns committed
664

Shane Snyder's avatar
Shane Snyder committed
665
    hret = margo_forward(handle, NULL);
Philip Carns's avatar
Philip Carns committed
666 667
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
668
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
669 670 671
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
672
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
673 674 675
    return(0);
}

676
static int bake_eager_read(
677
    bake_provider_handle_t provider,
678
    bake_region_id_t rid,
679 680 681 682 683 684
    uint64_t region_offset,
    void *buf,
    uint64_t buf_size)
{
    hg_return_t hret;
    hg_handle_t handle;
685 686
    bake_eager_read_in_t in;
    bake_eager_read_out_t out;
687 688 689 690 691 692
    int ret;

    in.rid = rid;
    in.region_offset = region_offset;
    in.size = buf_size;

693 694 695
    hret = margo_create(provider->client->mid, provider->addr,
        provider->client->bake_eager_read_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
696

697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722
    if(hret != HG_SUCCESS)
        return(-1);
  
    hret = margo_forward(handle, &in);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }

    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }
    
    ret = out.ret;
    if(ret == 0)
        memcpy(buf, out.buffer, out.size);

    margo_free_output(handle, &out);
    margo_destroy(handle);
    return(ret);
}

723
int bake_read(
724
    bake_provider_handle_t provider,
725
    bake_region_id_t rid,
Philip Carns's avatar
Philip Carns committed
726 727 728 729
    uint64_t region_offset,
    void *buf,
    uint64_t buf_size)
{
730
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
731
    hg_handle_t handle;
732 733
    bake_read_in_t in;
    bake_read_out_t out;
734
    int ret;
735

736
    if(buf_size <= BAKE_EAGER_LIMIT)
737
        return(bake_eager_read(provider, rid, region_offset, buf, buf_size));
Philip Carns's avatar
Philip Carns committed
738

739 740
    in.rid = rid;
    in.region_offset = region_offset;
741 742 743
    in.bulk_offset = 0;
    in.bulk_size = buf_size;
    in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy read */
744

745
    hret = margo_bulk_create(provider->client->mid, 1, (void**)(&buf), &buf_size, 
746
        HG_BULK_WRITE_ONLY, &in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
747 748 749
    if(hret != HG_SUCCESS)
        return(-1);
   
750 751 752
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_read_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
753

754 755
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
756
        margo_bulk_free(in.bulk_handle);
757 758 759
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
760
    hret = margo_forward(handle, &in);
761 762
    if(hret != HG_SUCCESS)
    {
763
        margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
764
        margo_destroy(handle);
765 766 767
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
768
    hret = margo_get_output(handle, &out);
769 770
    if(hret != HG_SUCCESS)
    {
771
        margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
772
        margo_destroy(handle);
773 774 775 776 777
        return(-1);
    }
    
    ret = out.ret;

Shane Snyder's avatar
Shane Snyder committed
778
    margo_free_output(handle, &out);
779
    margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
780
    margo_destroy(handle);
781
    return(ret);
Philip Carns's avatar
Philip Carns committed
782 783
}

784
int bake_proxy_read(
785
    bake_provider_handle_t provider,
786
    bake_region_id_t rid,
Philip Carns's avatar
Philip Carns committed
787
    uint64_t region_offset,
788 789 790 791
    hg_bulk_t remote_bulk,
    uint64_t remote_offset,
    const char* remote_addr,
    uint64_t size)
Philip Carns's avatar
Philip Carns committed
792 793
{
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
794
    hg_handle_t handle;
795 796
    bake_read_in_t in;
    bake_read_out_t out;
797
    int ret;
Philip Carns's avatar
Philip Carns committed
798 799 800

    in.rid = rid;
    in.region_offset = region_offset;
801 802 803 804
    in.bulk_handle = remote_bulk;
    in.bulk_offset = remote_offset;
    in.bulk_size = size; 
    in.remote_addr_str = (char*)remote_addr;
Philip Carns's avatar
Philip Carns committed
805

806 807 808
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_read_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);
809

Shane Snyder's avatar
Shane Snyder committed
810 811
    if(hret != HG_SUCCESS)
        return(-1);
812

Shane Snyder's avatar
Shane Snyder committed
813
    hret = margo_forward(handle, &in);
Philip Carns's avatar
Philip Carns committed
814 815
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
816
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
817 818 819
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
820
    hret = margo_get_output(handle, &out);
Philip Carns's avatar
Philip Carns committed
821 822
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
823
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
824 825
        return(-1);
    }
826

Philip Carns's avatar
Philip Carns committed
827 828
    ret = out.ret;

Shane Snyder's avatar
Shane Snyder committed
829 830
    margo_free_output(handle, &out);
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
831 832 833
    return(ret);
}