bake-client.c 20.4 KB
Newer Older
Philip Carns's avatar
Philip Carns committed
1 2 3 4 5 6
/*
 * (C) 2016 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

7 8
#include "bake-config.h"

9
#include <assert.h>
Philip Carns's avatar
Philip Carns committed
10
#include <margo.h>
11
#include <bake-client.h>
Philip Carns's avatar
Philip Carns committed
12
#include "uthash.h"
13
#include "bake-rpc.h"
Philip Carns's avatar
Philip Carns committed
14

15
#define BAKE_DEFAULT_EAGER_LIMIT 2048
16

17
/* Refers to a single Margo initialization, for now this is shared by
18 19
 * all remote BAKE targets.  In the future we probably need to support
 * multiple in case we run atop more than one transport at a time.
20
 */
21
struct bake_client
22 23
{
    margo_instance_id mid;  
24

25 26 27 28 29 30
    hg_id_t bake_probe_id;
    hg_id_t bake_create_id;
    hg_id_t bake_eager_write_id;
    hg_id_t bake_eager_read_id;
    hg_id_t bake_write_id;
    hg_id_t bake_persist_id;
31
    hg_id_t bake_create_write_persist_id;
32 33 34
    hg_id_t bake_get_size_id;
    hg_id_t bake_read_id;
    hg_id_t bake_noop_id;
35 36

    uint64_t num_provider_handles;
37 38
};

39
struct bake_provider_handle {
40
    struct bake_client* client;
41
    hg_addr_t           addr;
42
    uint16_t            provider_id;
43
    uint64_t            refcount;
44
    uint64_t            eager_limit;
45
};
Philip Carns's avatar
Philip Carns committed
46

47
static int bake_client_register(bake_client_t client, margo_instance_id mid)
Philip Carns's avatar
Philip Carns committed
48
{
49
    client->mid = mid;
Philip Carns's avatar
Philip Carns committed
50

51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
    /* check if RPCs have already been registered */
    hg_bool_t flag;
    hg_id_t id;
    margo_registered_name(mid, "bake_probe_rpc", &id, &flag);

    if(flag == HG_TRUE) { /* RPCs already registered */

        margo_registered_name(mid, "bake_probe_rpc",                &client->bake_probe_id,                &flag);
        margo_registered_name(mid, "bake_create_rpc",               &client->bake_create_id,               &flag);
        margo_registered_name(mid, "bake_write_rpc",                &client->bake_write_id,                &flag);
        margo_registered_name(mid, "bake_eager_write_rpc",          &client->bake_eager_write_id,          &flag);
        margo_registered_name(mid, "bake_eager_read_rpc",           &client->bake_eager_read_id,           &flag);
        margo_registered_name(mid, "bake_persist_rpc",              &client->bake_persist_id,              &flag);
        margo_registered_name(mid, "bake_create_write_persist_rpc", &client->bake_create_write_persist_id, &flag);
        margo_registered_name(mid, "bake_get_size_rpc",             &client->bake_get_size_id,             &flag);
        margo_registered_name(mid, "bake_read_rpc",                 &client->bake_read_id,                 &flag);
        margo_registered_name(mid, "bake_noop_rpc",                 &client->bake_noop_id,                 &flag);

    } else { /* RPCs not already registered */

        client->bake_probe_id = 
            MARGO_REGISTER(mid, "bake_probe_rpc",
                    bake_probe_in_t, bake_probe_out_t, NULL);
        client->bake_create_id = 
            MARGO_REGISTER(mid, "bake_create_rpc",
                    bake_create_in_t, bake_create_out_t, NULL);
        client->bake_write_id = 
            MARGO_REGISTER(mid, "bake_write_rpc",
                    bake_write_in_t, bake_write_out_t, NULL);
        client->bake_eager_write_id = 
            MARGO_REGISTER(mid, "bake_eager_write_rpc",
                    bake_eager_write_in_t, bake_eager_write_out_t, NULL);
        client->bake_eager_read_id = 
            MARGO_REGISTER(mid, "bake_eager_read_rpc",
                    bake_eager_read_in_t, bake_eager_read_out_t, NULL);
        client->bake_persist_id = 
            MARGO_REGISTER(mid, "bake_persist_rpc",
                    bake_persist_in_t, bake_persist_out_t, NULL);
        client->bake_create_write_persist_id =
            MARGO_REGISTER(mid, "bake_create_write_persist_rpc",
                    bake_create_write_persist_in_t, bake_create_write_persist_out_t, NULL);
        client->bake_get_size_id = 
            MARGO_REGISTER(mid, "bake_get_size_rpc",
                    bake_get_size_in_t, bake_get_size_out_t, NULL);
        client->bake_read_id = 
            MARGO_REGISTER(mid, "bake_read_rpc",
                    bake_read_in_t, bake_read_out_t, NULL);
        client->bake_noop_id = 
            MARGO_REGISTER(mid, "bake_noop_rpc",
                    void, void, NULL);
    }
Philip Carns's avatar
Philip Carns committed
102

103 104 105
    return(0);
}

106 107 108 109 110
int bake_client_init(margo_instance_id mid, bake_client_t* client)
{
    bake_client_t c = (bake_client_t)calloc(1, sizeof(*c));
    if(!c) return -1;

111 112
    c->num_provider_handles = 0;

113 114 115 116 117 118 119 120 121
    int ret = bake_client_register(c, mid);
    if(ret != 0) return ret;

    *client = c;
    return 0;
}

int bake_client_finalize(bake_client_t client)
{
122 123 124 125 126
    if(client->num_provider_handles != 0) {
        fprintf(stderr, 
                "[BAKE] Warning: %d provider handles not released before bake_client_finalize was called\n",
                client->num_provider_handles);
    }
127 128 129 130
    free(client);
    return 0;
}

131 132 133 134 135
int bake_probe(
    bake_provider_handle_t provider,
    uint64_t max_targets,
    bake_target_id_t *bti,
    uint64_t* num_targets)
136 137 138
{
    hg_return_t hret;
    int ret;
139
    bake_probe_in_t in;
140
    bake_probe_out_t out;
141
    hg_handle_t handle;
Philip Carns's avatar
Philip Carns committed
142

143 144 145
    if(bti == NULL) max_targets = 0;
    in.max_targets = max_targets;

146
    /* create handle */
147 148 149 150 151
    hret = margo_create(
                provider->client->mid, 
                provider->addr, 
                provider->client->bake_probe_id, 
                &handle);
152

153 154
    if(hret != HG_SUCCESS) return -1;

155
    hret = margo_provider_forward(provider->provider_id, handle, &in);
156
    if(hret != HG_SUCCESS) {
157
        margo_destroy(handle);
158
        return -1;
159 160
    }

161
    hret = margo_get_output(handle, &out);
162
    if(hret != HG_SUCCESS) {
163
        margo_destroy(handle);
164
        return -1;
165 166 167 168
    }

    ret = out.ret;

169
    if(ret == HG_SUCCESS) {
170 171 172
        if(max_targets == 0) {
            *num_targets = out.num_targets;
        } else {
173
            uint64_t s = out.num_targets > max_targets ? max_targets : out.num_targets;
174 175 176
            if(s > 0) {
                memcpy(bti, out.targets, sizeof(*bti)*s);
            }
177
            *num_targets = s;
178
        }
179 180
    }

181 182 183
    margo_free_output(handle, &out);
    margo_destroy(handle);

184
    return ret;
Philip Carns's avatar
Philip Carns committed
185
}
186 187 188 189

int bake_provider_handle_create(
        bake_client_t client,
        hg_addr_t addr,
190
        uint16_t provider_id,
191
        bake_provider_handle_t* handle)
Philip Carns's avatar
Philip Carns committed
192
{
193 194 195 196 197 198 199 200 201 202 203 204 205 206
    if(client == BAKE_CLIENT_NULL) return -1;

    bake_provider_handle_t provider = 
        (bake_provider_handle_t)calloc(1, sizeof(*provider));

    if(!provider) return -1;

    hg_return_t ret = margo_addr_dup(client->mid, addr, &(provider->addr));
    if(ret != HG_SUCCESS) {
        free(provider);
        return -1;
    }
    
    provider->client   = client;
207
    provider->provider_id = provider_id;
208
    provider->refcount = 1;
209
    provider->eager_limit = BAKE_DEFAULT_EAGER_LIMIT;
210

211 212
    client->num_provider_handles += 1;

213 214 215 216
    *handle = provider;
    return 0;
}

217 218 219 220 221 222 223 224 225 226 227 228 229 230
int bake_provider_handle_get_eager_limit(bake_provider_handle_t handle, uint64_t* limit)
{
    if(handle == BAKE_PROVIDER_HANDLE_NULL) return -1;
    *limit = handle->eager_limit;
    return 0;
}

int bake_provider_handle_set_eager_limit(bake_provider_handle_t handle, uint64_t limit)
{
    if(handle == BAKE_PROVIDER_HANDLE_NULL) return -1;
    handle->eager_limit = limit;
    return 0;
}

231 232 233 234 235
int bake_provider_handle_ref_incr(bake_provider_handle_t handle)
{
    if(handle == BAKE_PROVIDER_HANDLE_NULL) return -1;
    handle->refcount += 1;
    return 0;
Philip Carns's avatar
Philip Carns committed
236 237
}

238 239 240 241 242 243
int bake_provider_handle_release(bake_provider_handle_t handle)
{
    if(handle == BAKE_PROVIDER_HANDLE_NULL) return -1;
    handle->refcount -= 1;
    if(handle->refcount == 0) {
        margo_addr_free(handle->client->mid, handle->addr);
244
        handle->client->num_provider_handles -= 1;
245 246 247 248 249
        free(handle);
    }
    return 0;
}
  
250
int bake_shutdown_service(bake_client_t client, hg_addr_t addr)
Philip Carns's avatar
Philip Carns committed
251
{
252
    return margo_shutdown_remote_instance(client->mid, addr);
Philip Carns's avatar
Philip Carns committed
253
}
Philip Carns's avatar
Philip Carns committed
254

255
static int bake_eager_write(
256
    bake_provider_handle_t provider,
257
    bake_region_id_t rid,
Philip Carns's avatar
Philip Carns committed
258 259 260 261 262
    uint64_t region_offset,
    void const *buf,
    uint64_t buf_size)
{
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
263
    hg_handle_t handle;
264 265
    bake_eager_write_in_t in;
    bake_eager_write_out_t out;
Philip Carns's avatar
Philip Carns committed
266 267 268 269 270 271
    int ret;

    in.rid = rid;
    in.region_offset = region_offset;
    in.size = buf_size;
    in.buffer = (char*)buf;
272
  
273 274
    hret = margo_create(provider->client->mid, provider->addr, 
                provider->client->bake_eager_write_id, &handle);
275

Shane Snyder's avatar
Shane Snyder committed
276 277
    if(hret != HG_SUCCESS)
        return(-1);
Philip Carns's avatar
Philip Carns committed
278

279
    hret = margo_provider_forward(provider->provider_id, handle, &in);
Philip Carns's avatar
Philip Carns committed
280 281
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
282
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
283 284 285
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
286
    hret = margo_get_output(handle, &out);
Philip Carns's avatar
Philip Carns committed
287 288
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
289
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
290 291 292 293 294
        return(-1);
    }
    
    ret = out.ret;

Shane Snyder's avatar
Shane Snyder committed
295 296
    margo_free_output(handle, &out);
    margo_destroy(handle);
297

Philip Carns's avatar
Philip Carns committed
298 299 300
    return(ret);
}

301
int bake_write(
302
    bake_provider_handle_t provider,
303
    bake_region_id_t rid,
Philip Carns's avatar
Philip Carns committed
304 305 306 307
    uint64_t region_offset,
    void const *buf,
    uint64_t buf_size)
{
Philip Carns's avatar
Philip Carns committed
308
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
309
    hg_handle_t handle;
310 311
    bake_write_in_t in;
    bake_write_out_t out;
Philip Carns's avatar
Philip Carns committed
312
    int ret;
313

314
    if(buf_size <= provider->eager_limit)
315
        return(bake_eager_write(provider, rid, region_offset, buf, buf_size));
Philip Carns's avatar
Philip Carns committed
316

Philip Carns's avatar
Philip Carns committed
317 318
    in.rid = rid;
    in.region_offset = region_offset;
319 320 321
    in.bulk_offset = 0;
    in.bulk_size = buf_size;
    in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy write */
Philip Carns's avatar
Philip Carns committed
322

323
    hret = margo_bulk_create(provider->client->mid, 1, (void**)(&buf), &buf_size, 
Philip Carns's avatar
Philip Carns committed
324
        HG_BULK_READ_ONLY, &in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
325 326 327
    if(hret != HG_SUCCESS)
        return(-1);
   
328 329
    hret = margo_create(provider->client->mid, provider->addr, 
        provider->client->bake_write_id, &handle);
330

Philip Carns's avatar
Philip Carns committed
331 332
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
333
        margo_bulk_free(in.bulk_handle);
Philip Carns's avatar
Philip Carns committed
334 335 336
        return(-1);
    }

337
    hret = margo_provider_forward(provider->provider_id, handle, &in);
Philip Carns's avatar
Philip Carns committed
338 339
    if(hret != HG_SUCCESS)
    {
340
        margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
341
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
342 343 344
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
345
    hret = margo_get_output(handle, &out);
Philip Carns's avatar
Philip Carns committed
346 347
    if(hret != HG_SUCCESS)
    {
348
        margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
349
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
350 351 352 353 354
        return(-1);
    }
    
    ret = out.ret;

Shane Snyder's avatar
Shane Snyder committed
355
    margo_free_output(handle, &out);
356
    margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
357
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
358
    return(ret);
Philip Carns's avatar
Philip Carns committed
359 360
}

361
int bake_proxy_write(
362
    bake_provider_handle_t provider,
363
    bake_region_id_t rid,
364 365 366
    uint64_t region_offset,
    hg_bulk_t remote_bulk,
    uint64_t remote_offset,
367
    const char* remote_addr,
368 369 370 371
    uint64_t size)
{
    hg_return_t hret;
    hg_handle_t handle;
372 373
    bake_write_in_t in;
    bake_write_out_t out;
374 375 376 377 378 379 380
    int ret;

    in.rid = rid;
    in.region_offset = region_offset;
    in.bulk_handle = remote_bulk;
    in.bulk_offset = remote_offset;
    in.bulk_size = size;
381
    in.remote_addr_str = (char*)remote_addr;
Shane Snyder's avatar
Shane Snyder committed
382

383 384
    hret = margo_create(provider->client->mid, provider->addr,
        provider->client->bake_write_id, &handle);
385

386 387 388
    if(hret != HG_SUCCESS)
        return(-1);

389
    hret = margo_provider_forward(provider->provider_id, handle, &in);
390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409
    if(hret != HG_SUCCESS)
    {   
        margo_destroy(handle);
        return(-1);
    }
    
    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {   
        margo_destroy(handle);
        return(-1);
    }

    ret = out.ret;

    margo_free_output(handle, &out);
    margo_destroy(handle);
    return(ret);
}

410
int bake_create(
411
    bake_provider_handle_t provider,
Philip Carns's avatar
Philip Carns committed
412 413
    bake_target_id_t bti,
    uint64_t region_size,
414
    bake_region_id_t *rid)
Philip Carns's avatar
Philip Carns committed
415
{
416
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
417
    hg_handle_t handle;
418 419
    bake_create_in_t in;
    bake_create_out_t out;
420
    int ret = 0;
421

422
    in.bti = bti;
Philip Carns's avatar
Philip Carns committed
423 424
    in.region_size = region_size;

425 426
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_create_id, &handle);
427 428

    if(hret != HG_SUCCESS) {
Shane Snyder's avatar
Shane Snyder committed
429
        return(-1);
430 431
    }

432
    hret = margo_provider_forward(provider->provider_id, handle, &in);
433 434
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
435
        margo_destroy(handle);
436 437
        return(-1);
    }
Philip Carns's avatar
Philip Carns committed
438

Shane Snyder's avatar
Shane Snyder committed
439
    hret = margo_get_output(handle, &out);
440 441
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
442
        margo_destroy(handle);
443 444 445 446
        return(-1);
    }

    ret = out.ret;
447 448
    if(ret == 0) 
        *rid = out.rid;
449

Shane Snyder's avatar
Shane Snyder committed
450 451
    margo_free_output(handle, &out);
    margo_destroy(handle);
452
    return(ret);
Philip Carns's avatar
Philip Carns committed
453 454 455
}


456
int bake_persist(
457
    bake_provider_handle_t provider,
458
    bake_region_id_t rid)
Philip Carns's avatar
Philip Carns committed
459
{
Philip Carns's avatar
Philip Carns committed
460
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
461
    hg_handle_t handle;
462 463
    bake_persist_in_t in;
    bake_persist_out_t out;
Philip Carns's avatar
Philip Carns committed
464
    int ret;
465

Philip Carns's avatar
Philip Carns committed
466 467
    in.rid = rid;

468 469
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_persist_id, &handle);
470

Shane Snyder's avatar
Shane Snyder committed
471 472
    if(hret != HG_SUCCESS)
        return(-1);
Philip Carns's avatar
Philip Carns committed
473

474
    hret = margo_provider_forward(provider->provider_id, handle, &in);
Philip Carns's avatar
Philip Carns committed
475 476
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
477
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
478 479 480
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
481
    hret = margo_get_output(handle, &out);
Philip Carns's avatar
Philip Carns committed
482 483
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
484
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
485 486 487 488 489
        return(-1);
    }

    ret = out.ret;

Shane Snyder's avatar
Shane Snyder committed
490 491
    margo_free_output(handle, &out);
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
492
    return(ret);
Philip Carns's avatar
Philip Carns committed
493
}
Philip Carns's avatar
Philip Carns committed
494

495
int bake_create_write_persist(
496
    bake_provider_handle_t provider,
497 498 499 500 501 502 503 504 505 506 507 508 509 510 511
    bake_target_id_t bti,
    uint64_t region_size,
    uint64_t region_offset,
    void const *buf,
    uint64_t buf_size,
    bake_region_id_t *rid)
{
    hg_return_t hret;
    hg_handle_t handle;
    bake_create_write_persist_in_t in;
    bake_create_write_persist_out_t out;
    int ret;

    /* XXX eager path? */

512
    in.bti = bti;
513 514 515 516 517 518
    in.region_size = region_size;
    in.region_offset = region_offset;
    in.bulk_offset = 0;
    in.bulk_size = buf_size;
    in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy write */

519
    hret = margo_bulk_create(provider->client->mid, 1, (void**)(&buf), &buf_size,
520 521 522 523
        HG_BULK_READ_ONLY, &in.bulk_handle);
    if(hret != HG_SUCCESS)
        return(-1);

524 525
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_create_write_persist_id, &handle);
526

527 528 529 530 531 532
    if(hret != HG_SUCCESS)
    {
        margo_bulk_free(in.bulk_handle);
        return(-1);
    }

533
    hret = margo_provider_forward(provider->provider_id, handle, &in);
534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558
    if(hret != HG_SUCCESS)
    {
        margo_bulk_free(in.bulk_handle);
        margo_destroy(handle);
        return(-1);
    }

    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        margo_bulk_free(in.bulk_handle);
        margo_destroy(handle);
        return(-1);
    }

    ret = out.ret;
    if(ret == 0)
        *rid = out.rid;

    margo_free_output(handle, &out);
    margo_bulk_free(in.bulk_handle);
    margo_destroy(handle);
    return(ret);
}

559
int bake_create_write_persist_proxy(
560
    bake_provider_handle_t provider,
561 562 563 564 565 566 567 568 569 570 571 572 573 574 575
    bake_target_id_t bti,
    uint64_t region_size,
    uint64_t region_offset,
    hg_bulk_t remote_bulk,
    uint64_t remote_offset,
    const char* remote_addr,
    uint64_t size,
    bake_region_id_t *rid)
{
    hg_return_t hret;
    hg_handle_t handle;
    bake_create_write_persist_in_t in;
    bake_create_write_persist_out_t out;
    int ret;

576
    in.bti = bti;
577 578 579 580 581 582 583
    in.region_size = region_size;
    in.region_offset = region_offset;
    in.bulk_handle = remote_bulk;
    in.bulk_offset = remote_offset;
    in.bulk_size = size;
    in.remote_addr_str = (char*)remote_addr;

584 585
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_create_write_persist_id, &handle);
586

587 588 589
    if(hret != HG_SUCCESS)
        return(-1);

590
    hret = margo_provider_forward(provider->provider_id, handle, &in);
591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }

    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }

    ret = out.ret;
    if(ret == 0)
        *rid = out.rid;

    margo_free_output(handle, &out);
    margo_destroy(handle);
    return(ret);
}

613
int bake_get_size(
614
    bake_provider_handle_t provider,
615
    bake_region_id_t rid,
616 617 618
    uint64_t *region_size)
{
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
619
    hg_handle_t handle;
620 621
    bake_get_size_in_t in;
    bake_get_size_out_t out;
622 623 624 625
    int ret;

    in.rid = rid;

626 627
    hret = margo_create(provider->client->mid, provider->addr,
        provider->client->bake_get_size_id, &handle);
628

Shane Snyder's avatar
Shane Snyder committed
629 630
    if(hret != HG_SUCCESS)
        return(-1);
631

632
    hret = margo_provider_forward(provider->provider_id, handle, &in);
633 634
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
635
        margo_destroy(handle);
636 637 638
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
639
    hret = margo_get_output(handle, &out);
640 641
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
642
        margo_destroy(handle);
643 644 645 646 647 648
        return(-1);
    }

    ret = out.ret;
    *region_size = out.size;

Shane Snyder's avatar
Shane Snyder committed
649 650
    margo_free_output(handle, &out);
    margo_destroy(handle);
651 652 653
    return(ret);
}

654
int bake_noop(bake_provider_handle_t provider)
Philip Carns's avatar
Philip Carns committed
655 656
{
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
657
    hg_handle_t handle;
Philip Carns's avatar
Philip Carns committed
658

659 660
    hret = margo_create(provider->client->mid, provider->addr,
        provider->client->bake_noop_id, &handle);
Philip Carns's avatar
Philip Carns committed
661

Shane Snyder's avatar
Shane Snyder committed
662 663
    if(hret != HG_SUCCESS)
        return(-1);
Philip Carns's avatar
Philip Carns committed
664

665
    hret = margo_provider_forward(provider->provider_id, handle, NULL);
Philip Carns's avatar
Philip Carns committed
666 667
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
668
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
669 670 671
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
672
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
673 674 675
    return(0);
}

676
static int bake_eager_read(
677
    bake_provider_handle_t provider,
678
    bake_region_id_t rid,
679 680 681 682 683 684
    uint64_t region_offset,
    void *buf,
    uint64_t buf_size)
{
    hg_return_t hret;
    hg_handle_t handle;
685 686
    bake_eager_read_in_t in;
    bake_eager_read_out_t out;
687 688 689 690 691 692
    int ret;

    in.rid = rid;
    in.region_offset = region_offset;
    in.size = buf_size;

693 694
    hret = margo_create(provider->client->mid, provider->addr,
        provider->client->bake_eager_read_id, &handle);
695

696 697 698
    if(hret != HG_SUCCESS)
        return(-1);
  
699
    hret = margo_provider_forward(provider->provider_id, handle, &in);
700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }

    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }
    
    ret = out.ret;
    if(ret == 0)
        memcpy(buf, out.buffer, out.size);

    margo_free_output(handle, &out);
    margo_destroy(handle);
    return(ret);
}

722
int bake_read(
723
    bake_provider_handle_t provider,
724
    bake_region_id_t rid,
Philip Carns's avatar
Philip Carns committed
725 726 727 728
    uint64_t region_offset,
    void *buf,
    uint64_t buf_size)
{
729
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
730
    hg_handle_t handle;
731 732
    bake_read_in_t in;
    bake_read_out_t out;
733
    int ret;
734

735
    if(buf_size <= provider->eager_limit)
736
        return(bake_eager_read(provider, rid, region_offset, buf, buf_size));
Philip Carns's avatar
Philip Carns committed
737

738 739
    in.rid = rid;
    in.region_offset = region_offset;
740 741 742
    in.bulk_offset = 0;
    in.bulk_size = buf_size;
    in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy read */
743

744
    hret = margo_bulk_create(provider->client->mid, 1, (void**)(&buf), &buf_size, 
745
        HG_BULK_WRITE_ONLY, &in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
746 747 748
    if(hret != HG_SUCCESS)
        return(-1);
   
749 750
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_read_id, &handle);
751

752 753
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
754
        margo_bulk_free(in.bulk_handle);
755 756 757
        return(-1);
    }

758
    hret = margo_provider_forward(provider->provider_id, handle, &in);
759 760
    if(hret != HG_SUCCESS)
    {
761
        margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
762
        margo_destroy(handle);
763 764 765
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
766
    hret = margo_get_output(handle, &out);
767 768
    if(hret != HG_SUCCESS)
    {
769
        margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
770
        margo_destroy(handle);
771 772 773 774 775
        return(-1);
    }
    
    ret = out.ret;

Shane Snyder's avatar
Shane Snyder committed
776
    margo_free_output(handle, &out);
777
    margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
778
    margo_destroy(handle);
779
    return(ret);
Philip Carns's avatar
Philip Carns committed
780 781
}

782
int bake_proxy_read(
783
    bake_provider_handle_t provider,
784
    bake_region_id_t rid,
Philip Carns's avatar
Philip Carns committed
785
    uint64_t region_offset,
786 787 788 789
    hg_bulk_t remote_bulk,
    uint64_t remote_offset,
    const char* remote_addr,
    uint64_t size)
Philip Carns's avatar
Philip Carns committed
790 791
{
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
792
    hg_handle_t handle;
793 794
    bake_read_in_t in;
    bake_read_out_t out;
795
    int ret;
Philip Carns's avatar
Philip Carns committed
796 797 798

    in.rid = rid;
    in.region_offset = region_offset;
799 800 801 802
    in.bulk_handle = remote_bulk;
    in.bulk_offset = remote_offset;
    in.bulk_size = size; 
    in.remote_addr_str = (char*)remote_addr;
Philip Carns's avatar
Philip Carns committed
803

804 805
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_read_id, &handle);
806

Shane Snyder's avatar
Shane Snyder committed
807 808
    if(hret != HG_SUCCESS)
        return(-1);
809

810
    hret = margo_provider_forward(provider->provider_id, handle, &in);
Philip Carns's avatar
Philip Carns committed
811 812
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
813
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
814 815 816
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
817
    hret = margo_get_output(handle, &out);
Philip Carns's avatar
Philip Carns committed
818 819
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
820
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
821 822
        return(-1);
    }
823

Philip Carns's avatar
Philip Carns committed
824 825
    ret = out.ret;

Shane Snyder's avatar
Shane Snyder committed
826 827
    margo_free_output(handle, &out);
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
828 829 830
    return(ret);
}