bake-client.c 21.4 KB
Newer Older
Philip Carns's avatar
Philip Carns committed
1 2 3 4 5 6
/*
 * (C) 2016 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

7 8
#include "bake-config.h"

9
#include <assert.h>
Philip Carns's avatar
Philip Carns committed
10
#include <margo.h>
11
#include <bake-client.h>
Philip Carns's avatar
Philip Carns committed
12
#include "uthash.h"
13
#include "bake-rpc.h"
Philip Carns's avatar
Philip Carns committed
14

15
#define BAKE_DEFAULT_EAGER_LIMIT 2048
16

17
/* Refers to a single Margo initialization, for now this is shared by
18 19
 * all remote BAKE targets.  In the future we probably need to support
 * multiple in case we run atop more than one transport at a time.
20
 */
21
struct bake_client
22 23
{
    margo_instance_id mid;  
24

25 26 27 28 29 30
    hg_id_t bake_probe_id;
    hg_id_t bake_create_id;
    hg_id_t bake_eager_write_id;
    hg_id_t bake_eager_read_id;
    hg_id_t bake_write_id;
    hg_id_t bake_persist_id;
31
    hg_id_t bake_create_write_persist_id;
32 33 34
    hg_id_t bake_get_size_id;
    hg_id_t bake_read_id;
    hg_id_t bake_noop_id;
Shane Snyder's avatar
Shane Snyder committed
35
    hg_id_t bake_remove_id;
36 37

    uint64_t num_provider_handles;
38 39
};

40
struct bake_provider_handle {
41
    struct bake_client* client;
42
    hg_addr_t           addr;
43
    uint16_t            provider_id;
44
    uint64_t            refcount;
45
    uint64_t            eager_limit;
46
};
Philip Carns's avatar
Philip Carns committed
47

48
static int bake_client_register(bake_client_t client, margo_instance_id mid)
Philip Carns's avatar
Philip Carns committed
49
{
50
    client->mid = mid;
Philip Carns's avatar
Philip Carns committed
51

52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
    /* check if RPCs have already been registered */
    hg_bool_t flag;
    hg_id_t id;
    margo_registered_name(mid, "bake_probe_rpc", &id, &flag);

    if(flag == HG_TRUE) { /* RPCs already registered */

        margo_registered_name(mid, "bake_probe_rpc",                &client->bake_probe_id,                &flag);
        margo_registered_name(mid, "bake_create_rpc",               &client->bake_create_id,               &flag);
        margo_registered_name(mid, "bake_write_rpc",                &client->bake_write_id,                &flag);
        margo_registered_name(mid, "bake_eager_write_rpc",          &client->bake_eager_write_id,          &flag);
        margo_registered_name(mid, "bake_eager_read_rpc",           &client->bake_eager_read_id,           &flag);
        margo_registered_name(mid, "bake_persist_rpc",              &client->bake_persist_id,              &flag);
        margo_registered_name(mid, "bake_create_write_persist_rpc", &client->bake_create_write_persist_id, &flag);
        margo_registered_name(mid, "bake_get_size_rpc",             &client->bake_get_size_id,             &flag);
        margo_registered_name(mid, "bake_read_rpc",                 &client->bake_read_id,                 &flag);
        margo_registered_name(mid, "bake_noop_rpc",                 &client->bake_noop_id,                 &flag);
Shane Snyder's avatar
Shane Snyder committed
69
        margo_registered_name(mid, "bake_remove_rpc",               &client->bake_remove_id,               &flag);
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102

    } else { /* RPCs not already registered */

        client->bake_probe_id = 
            MARGO_REGISTER(mid, "bake_probe_rpc",
                    bake_probe_in_t, bake_probe_out_t, NULL);
        client->bake_create_id = 
            MARGO_REGISTER(mid, "bake_create_rpc",
                    bake_create_in_t, bake_create_out_t, NULL);
        client->bake_write_id = 
            MARGO_REGISTER(mid, "bake_write_rpc",
                    bake_write_in_t, bake_write_out_t, NULL);
        client->bake_eager_write_id = 
            MARGO_REGISTER(mid, "bake_eager_write_rpc",
                    bake_eager_write_in_t, bake_eager_write_out_t, NULL);
        client->bake_eager_read_id = 
            MARGO_REGISTER(mid, "bake_eager_read_rpc",
                    bake_eager_read_in_t, bake_eager_read_out_t, NULL);
        client->bake_persist_id = 
            MARGO_REGISTER(mid, "bake_persist_rpc",
                    bake_persist_in_t, bake_persist_out_t, NULL);
        client->bake_create_write_persist_id =
            MARGO_REGISTER(mid, "bake_create_write_persist_rpc",
                    bake_create_write_persist_in_t, bake_create_write_persist_out_t, NULL);
        client->bake_get_size_id = 
            MARGO_REGISTER(mid, "bake_get_size_rpc",
                    bake_get_size_in_t, bake_get_size_out_t, NULL);
        client->bake_read_id = 
            MARGO_REGISTER(mid, "bake_read_rpc",
                    bake_read_in_t, bake_read_out_t, NULL);
        client->bake_noop_id = 
            MARGO_REGISTER(mid, "bake_noop_rpc",
                    void, void, NULL);
Shane Snyder's avatar
Shane Snyder committed
103 104 105
        client->bake_remove_id =
            MARGO_REGISTER(mid, "bake_remove_rpc",
                    bake_remove_in_t, bake_remove_out_t, NULL);
106
    }
Philip Carns's avatar
Philip Carns committed
107

108 109 110
    return(0);
}

111 112 113 114 115
int bake_client_init(margo_instance_id mid, bake_client_t* client)
{
    bake_client_t c = (bake_client_t)calloc(1, sizeof(*c));
    if(!c) return -1;

116 117
    c->num_provider_handles = 0;

118 119 120 121 122 123 124 125 126
    int ret = bake_client_register(c, mid);
    if(ret != 0) return ret;

    *client = c;
    return 0;
}

int bake_client_finalize(bake_client_t client)
{
127 128 129 130 131
    if(client->num_provider_handles != 0) {
        fprintf(stderr, 
                "[BAKE] Warning: %d provider handles not released before bake_client_finalize was called\n",
                client->num_provider_handles);
    }
132 133 134 135
    free(client);
    return 0;
}

136 137 138 139 140
int bake_probe(
    bake_provider_handle_t provider,
    uint64_t max_targets,
    bake_target_id_t *bti,
    uint64_t* num_targets)
141 142 143
{
    hg_return_t hret;
    int ret;
144
    bake_probe_in_t in;
145
    bake_probe_out_t out;
146
    hg_handle_t handle;
Philip Carns's avatar
Philip Carns committed
147

148 149 150
    if(bti == NULL) max_targets = 0;
    in.max_targets = max_targets;

151
    /* create handle */
152 153 154 155 156
    hret = margo_create(
                provider->client->mid, 
                provider->addr, 
                provider->client->bake_probe_id, 
                &handle);
157

158 159
    if(hret != HG_SUCCESS) return -1;

160
    hret = margo_provider_forward(provider->provider_id, handle, &in);
161
    if(hret != HG_SUCCESS) {
162
        margo_destroy(handle);
163
        return -1;
164 165
    }

166
    hret = margo_get_output(handle, &out);
167
    if(hret != HG_SUCCESS) {
168
        margo_destroy(handle);
169
        return -1;
170 171 172 173
    }

    ret = out.ret;

174
    if(ret == HG_SUCCESS) {
175 176 177
        if(max_targets == 0) {
            *num_targets = out.num_targets;
        } else {
178
            uint64_t s = out.num_targets > max_targets ? max_targets : out.num_targets;
179 180 181
            if(s > 0) {
                memcpy(bti, out.targets, sizeof(*bti)*s);
            }
182
            *num_targets = s;
183
        }
184 185
    }

186 187 188
    margo_free_output(handle, &out);
    margo_destroy(handle);

189
    return ret;
Philip Carns's avatar
Philip Carns committed
190
}
191 192 193 194

int bake_provider_handle_create(
        bake_client_t client,
        hg_addr_t addr,
195
        uint16_t provider_id,
196
        bake_provider_handle_t* handle)
Philip Carns's avatar
Philip Carns committed
197
{
198 199 200 201 202 203 204 205 206 207 208 209 210 211
    if(client == BAKE_CLIENT_NULL) return -1;

    bake_provider_handle_t provider = 
        (bake_provider_handle_t)calloc(1, sizeof(*provider));

    if(!provider) return -1;

    hg_return_t ret = margo_addr_dup(client->mid, addr, &(provider->addr));
    if(ret != HG_SUCCESS) {
        free(provider);
        return -1;
    }
    
    provider->client   = client;
212
    provider->provider_id = provider_id;
213
    provider->refcount = 1;
214
    provider->eager_limit = BAKE_DEFAULT_EAGER_LIMIT;
215

216 217
    client->num_provider_handles += 1;

218 219 220 221
    *handle = provider;
    return 0;
}

222 223 224 225 226 227 228 229 230 231 232 233 234 235
int bake_provider_handle_get_eager_limit(bake_provider_handle_t handle, uint64_t* limit)
{
    if(handle == BAKE_PROVIDER_HANDLE_NULL) return -1;
    *limit = handle->eager_limit;
    return 0;
}

int bake_provider_handle_set_eager_limit(bake_provider_handle_t handle, uint64_t limit)
{
    if(handle == BAKE_PROVIDER_HANDLE_NULL) return -1;
    handle->eager_limit = limit;
    return 0;
}

236 237 238 239 240
int bake_provider_handle_ref_incr(bake_provider_handle_t handle)
{
    if(handle == BAKE_PROVIDER_HANDLE_NULL) return -1;
    handle->refcount += 1;
    return 0;
Philip Carns's avatar
Philip Carns committed
241 242
}

243 244 245 246 247 248
int bake_provider_handle_release(bake_provider_handle_t handle)
{
    if(handle == BAKE_PROVIDER_HANDLE_NULL) return -1;
    handle->refcount -= 1;
    if(handle->refcount == 0) {
        margo_addr_free(handle->client->mid, handle->addr);
249
        handle->client->num_provider_handles -= 1;
250 251 252 253 254
        free(handle);
    }
    return 0;
}
  
255
int bake_shutdown_service(bake_client_t client, hg_addr_t addr)
Philip Carns's avatar
Philip Carns committed
256
{
257
    return margo_shutdown_remote_instance(client->mid, addr);
Philip Carns's avatar
Philip Carns committed
258
}
Philip Carns's avatar
Philip Carns committed
259

260
static int bake_eager_write(
261
    bake_provider_handle_t provider,
262
    bake_region_id_t rid,
Philip Carns's avatar
Philip Carns committed
263 264 265 266 267
    uint64_t region_offset,
    void const *buf,
    uint64_t buf_size)
{
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
268
    hg_handle_t handle;
269 270
    bake_eager_write_in_t in;
    bake_eager_write_out_t out;
Philip Carns's avatar
Philip Carns committed
271 272 273 274 275 276
    int ret;

    in.rid = rid;
    in.region_offset = region_offset;
    in.size = buf_size;
    in.buffer = (char*)buf;
277
  
278 279
    hret = margo_create(provider->client->mid, provider->addr, 
                provider->client->bake_eager_write_id, &handle);
280

Shane Snyder's avatar
Shane Snyder committed
281 282
    if(hret != HG_SUCCESS)
        return(-1);
Philip Carns's avatar
Philip Carns committed
283

284
    hret = margo_provider_forward(provider->provider_id, handle, &in);
Philip Carns's avatar
Philip Carns committed
285 286
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
287
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
288 289 290
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
291
    hret = margo_get_output(handle, &out);
Philip Carns's avatar
Philip Carns committed
292 293
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
294
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
295 296 297 298 299
        return(-1);
    }
    
    ret = out.ret;

Shane Snyder's avatar
Shane Snyder committed
300 301
    margo_free_output(handle, &out);
    margo_destroy(handle);
302

Philip Carns's avatar
Philip Carns committed
303 304 305
    return(ret);
}

306
int bake_write(
307
    bake_provider_handle_t provider,
308
    bake_region_id_t rid,
Philip Carns's avatar
Philip Carns committed
309 310 311 312
    uint64_t region_offset,
    void const *buf,
    uint64_t buf_size)
{
Philip Carns's avatar
Philip Carns committed
313
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
314
    hg_handle_t handle;
315 316
    bake_write_in_t in;
    bake_write_out_t out;
Philip Carns's avatar
Philip Carns committed
317
    int ret;
318

319
    if(buf_size <= provider->eager_limit)
320
        return(bake_eager_write(provider, rid, region_offset, buf, buf_size));
Philip Carns's avatar
Philip Carns committed
321

Philip Carns's avatar
Philip Carns committed
322 323
    in.rid = rid;
    in.region_offset = region_offset;
324 325 326
    in.bulk_offset = 0;
    in.bulk_size = buf_size;
    in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy write */
Philip Carns's avatar
Philip Carns committed
327

328
    hret = margo_bulk_create(provider->client->mid, 1, (void**)(&buf), &buf_size, 
Philip Carns's avatar
Philip Carns committed
329
        HG_BULK_READ_ONLY, &in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
330 331 332
    if(hret != HG_SUCCESS)
        return(-1);
   
333 334
    hret = margo_create(provider->client->mid, provider->addr, 
        provider->client->bake_write_id, &handle);
335

Philip Carns's avatar
Philip Carns committed
336 337
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
338
        margo_bulk_free(in.bulk_handle);
Philip Carns's avatar
Philip Carns committed
339 340 341
        return(-1);
    }

342
    hret = margo_provider_forward(provider->provider_id, handle, &in);
Philip Carns's avatar
Philip Carns committed
343 344
    if(hret != HG_SUCCESS)
    {
345
        margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
346
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
347 348 349
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
350
    hret = margo_get_output(handle, &out);
Philip Carns's avatar
Philip Carns committed
351 352
    if(hret != HG_SUCCESS)
    {
353
        margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
354
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
355 356 357 358 359
        return(-1);
    }
    
    ret = out.ret;

Shane Snyder's avatar
Shane Snyder committed
360
    margo_free_output(handle, &out);
361
    margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
362
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
363
    return(ret);
Philip Carns's avatar
Philip Carns committed
364 365
}

366
int bake_proxy_write(
367
    bake_provider_handle_t provider,
368
    bake_region_id_t rid,
369 370 371
    uint64_t region_offset,
    hg_bulk_t remote_bulk,
    uint64_t remote_offset,
372
    const char* remote_addr,
373 374 375 376
    uint64_t size)
{
    hg_return_t hret;
    hg_handle_t handle;
377 378
    bake_write_in_t in;
    bake_write_out_t out;
379 380 381 382 383 384 385
    int ret;

    in.rid = rid;
    in.region_offset = region_offset;
    in.bulk_handle = remote_bulk;
    in.bulk_offset = remote_offset;
    in.bulk_size = size;
386
    in.remote_addr_str = (char*)remote_addr;
Shane Snyder's avatar
Shane Snyder committed
387

388 389
    hret = margo_create(provider->client->mid, provider->addr,
        provider->client->bake_write_id, &handle);
390

391 392 393
    if(hret != HG_SUCCESS)
        return(-1);

394
    hret = margo_provider_forward(provider->provider_id, handle, &in);
395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414
    if(hret != HG_SUCCESS)
    {   
        margo_destroy(handle);
        return(-1);
    }
    
    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {   
        margo_destroy(handle);
        return(-1);
    }

    ret = out.ret;

    margo_free_output(handle, &out);
    margo_destroy(handle);
    return(ret);
}

415
int bake_create(
416
    bake_provider_handle_t provider,
Philip Carns's avatar
Philip Carns committed
417 418
    bake_target_id_t bti,
    uint64_t region_size,
419
    bake_region_id_t *rid)
Philip Carns's avatar
Philip Carns committed
420
{
421
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
422
    hg_handle_t handle;
423 424
    bake_create_in_t in;
    bake_create_out_t out;
425
    int ret = 0;
426

427
    in.bti = bti;
Philip Carns's avatar
Philip Carns committed
428 429
    in.region_size = region_size;

430 431
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_create_id, &handle);
432 433

    if(hret != HG_SUCCESS) {
Shane Snyder's avatar
Shane Snyder committed
434
        return(-1);
435 436
    }

437
    hret = margo_provider_forward(provider->provider_id, handle, &in);
438 439
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
440
        margo_destroy(handle);
441 442
        return(-1);
    }
Philip Carns's avatar
Philip Carns committed
443

Shane Snyder's avatar
Shane Snyder committed
444
    hret = margo_get_output(handle, &out);
445 446
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
447
        margo_destroy(handle);
448 449 450 451
        return(-1);
    }

    ret = out.ret;
452 453
    if(ret == 0) 
        *rid = out.rid;
454

Shane Snyder's avatar
Shane Snyder committed
455 456
    margo_free_output(handle, &out);
    margo_destroy(handle);
457
    return(ret);
Philip Carns's avatar
Philip Carns committed
458 459 460
}


461
int bake_persist(
462
    bake_provider_handle_t provider,
463
    bake_region_id_t rid)
Philip Carns's avatar
Philip Carns committed
464
{
Philip Carns's avatar
Philip Carns committed
465
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
466
    hg_handle_t handle;
467 468
    bake_persist_in_t in;
    bake_persist_out_t out;
Philip Carns's avatar
Philip Carns committed
469
    int ret;
470

Philip Carns's avatar
Philip Carns committed
471 472
    in.rid = rid;

473 474
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_persist_id, &handle);
475

Shane Snyder's avatar
Shane Snyder committed
476 477
    if(hret != HG_SUCCESS)
        return(-1);
Philip Carns's avatar
Philip Carns committed
478

479
    hret = margo_provider_forward(provider->provider_id, handle, &in);
Philip Carns's avatar
Philip Carns committed
480 481
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
482
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
483 484 485
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
486
    hret = margo_get_output(handle, &out);
Philip Carns's avatar
Philip Carns committed
487 488
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
489
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
490 491 492 493 494
        return(-1);
    }

    ret = out.ret;

Shane Snyder's avatar
Shane Snyder committed
495 496
    margo_free_output(handle, &out);
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
497
    return(ret);
Philip Carns's avatar
Philip Carns committed
498
}
Philip Carns's avatar
Philip Carns committed
499

500
int bake_create_write_persist(
501
    bake_provider_handle_t provider,
502 503 504 505 506 507 508 509 510 511 512 513 514 515 516
    bake_target_id_t bti,
    uint64_t region_size,
    uint64_t region_offset,
    void const *buf,
    uint64_t buf_size,
    bake_region_id_t *rid)
{
    hg_return_t hret;
    hg_handle_t handle;
    bake_create_write_persist_in_t in;
    bake_create_write_persist_out_t out;
    int ret;

    /* XXX eager path? */

517
    in.bti = bti;
518 519 520 521 522 523
    in.region_size = region_size;
    in.region_offset = region_offset;
    in.bulk_offset = 0;
    in.bulk_size = buf_size;
    in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy write */

524
    hret = margo_bulk_create(provider->client->mid, 1, (void**)(&buf), &buf_size,
525 526 527 528
        HG_BULK_READ_ONLY, &in.bulk_handle);
    if(hret != HG_SUCCESS)
        return(-1);

529 530
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_create_write_persist_id, &handle);
531

532 533 534 535 536 537
    if(hret != HG_SUCCESS)
    {
        margo_bulk_free(in.bulk_handle);
        return(-1);
    }

538
    hret = margo_provider_forward(provider->provider_id, handle, &in);
539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563
    if(hret != HG_SUCCESS)
    {
        margo_bulk_free(in.bulk_handle);
        margo_destroy(handle);
        return(-1);
    }

    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        margo_bulk_free(in.bulk_handle);
        margo_destroy(handle);
        return(-1);
    }

    ret = out.ret;
    if(ret == 0)
        *rid = out.rid;

    margo_free_output(handle, &out);
    margo_bulk_free(in.bulk_handle);
    margo_destroy(handle);
    return(ret);
}

564
int bake_create_write_persist_proxy(
565
    bake_provider_handle_t provider,
566 567 568 569 570 571 572 573 574 575 576 577 578 579 580
    bake_target_id_t bti,
    uint64_t region_size,
    uint64_t region_offset,
    hg_bulk_t remote_bulk,
    uint64_t remote_offset,
    const char* remote_addr,
    uint64_t size,
    bake_region_id_t *rid)
{
    hg_return_t hret;
    hg_handle_t handle;
    bake_create_write_persist_in_t in;
    bake_create_write_persist_out_t out;
    int ret;

581
    in.bti = bti;
582 583 584 585 586 587 588
    in.region_size = region_size;
    in.region_offset = region_offset;
    in.bulk_handle = remote_bulk;
    in.bulk_offset = remote_offset;
    in.bulk_size = size;
    in.remote_addr_str = (char*)remote_addr;

589 590
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_create_write_persist_id, &handle);
591

592 593 594
    if(hret != HG_SUCCESS)
        return(-1);

595
    hret = margo_provider_forward(provider->provider_id, handle, &in);
596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }

    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }

    ret = out.ret;
    if(ret == 0)
        *rid = out.rid;

    margo_free_output(handle, &out);
    margo_destroy(handle);
    return(ret);
}

618
int bake_get_size(
619
    bake_provider_handle_t provider,
620
    bake_region_id_t rid,
621 622 623
    uint64_t *region_size)
{
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
624
    hg_handle_t handle;
625 626
    bake_get_size_in_t in;
    bake_get_size_out_t out;
627 628 629 630
    int ret;

    in.rid = rid;

631 632
    hret = margo_create(provider->client->mid, provider->addr,
        provider->client->bake_get_size_id, &handle);
633

Shane Snyder's avatar
Shane Snyder committed
634 635
    if(hret != HG_SUCCESS)
        return(-1);
636

637
    hret = margo_provider_forward(provider->provider_id, handle, &in);
638 639
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
640
        margo_destroy(handle);
641 642 643
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
644
    hret = margo_get_output(handle, &out);
645 646
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
647
        margo_destroy(handle);
648 649 650 651 652 653
        return(-1);
    }

    ret = out.ret;
    *region_size = out.size;

Shane Snyder's avatar
Shane Snyder committed
654 655
    margo_free_output(handle, &out);
    margo_destroy(handle);
656 657 658
    return(ret);
}

659
int bake_noop(bake_provider_handle_t provider)
Philip Carns's avatar
Philip Carns committed
660 661
{
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
662
    hg_handle_t handle;
Philip Carns's avatar
Philip Carns committed
663

664 665
    hret = margo_create(provider->client->mid, provider->addr,
        provider->client->bake_noop_id, &handle);
Philip Carns's avatar
Philip Carns committed
666

Shane Snyder's avatar
Shane Snyder committed
667 668
    if(hret != HG_SUCCESS)
        return(-1);
Philip Carns's avatar
Philip Carns committed
669

670
    hret = margo_provider_forward(provider->provider_id, handle, NULL);
Philip Carns's avatar
Philip Carns committed
671 672
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
673
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
674 675 676
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
677
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
678 679 680
    return(0);
}

681
static int bake_eager_read(
682
    bake_provider_handle_t provider,
683
    bake_region_id_t rid,
684 685 686 687 688 689
    uint64_t region_offset,
    void *buf,
    uint64_t buf_size)
{
    hg_return_t hret;
    hg_handle_t handle;
690 691
    bake_eager_read_in_t in;
    bake_eager_read_out_t out;
692 693 694 695 696 697
    int ret;

    in.rid = rid;
    in.region_offset = region_offset;
    in.size = buf_size;

698 699
    hret = margo_create(provider->client->mid, provider->addr,
        provider->client->bake_eager_read_id, &handle);
700

701 702 703
    if(hret != HG_SUCCESS)
        return(-1);
  
704
    hret = margo_provider_forward(provider->provider_id, handle, &in);
705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }

    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }
    
    ret = out.ret;
    if(ret == 0)
        memcpy(buf, out.buffer, out.size);

    margo_free_output(handle, &out);
    margo_destroy(handle);
    return(ret);
}

727
int bake_read(
728
    bake_provider_handle_t provider,
729
    bake_region_id_t rid,
Philip Carns's avatar
Philip Carns committed
730 731 732 733
    uint64_t region_offset,
    void *buf,
    uint64_t buf_size)
{
734
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
735
    hg_handle_t handle;
736 737
    bake_read_in_t in;
    bake_read_out_t out;
738
    int ret;
739

740
    if(buf_size <= provider->eager_limit)
741
        return(bake_eager_read(provider, rid, region_offset, buf, buf_size));
Philip Carns's avatar
Philip Carns committed
742

743 744
    in.rid = rid;
    in.region_offset = region_offset;
745 746 747
    in.bulk_offset = 0;
    in.bulk_size = buf_size;
    in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy read */
748

749
    hret = margo_bulk_create(provider->client->mid, 1, (void**)(&buf), &buf_size, 
750
        HG_BULK_WRITE_ONLY, &in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
751 752 753
    if(hret != HG_SUCCESS)
        return(-1);
   
754 755
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_read_id, &handle);
756

757 758
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
759
        margo_bulk_free(in.bulk_handle);
760 761 762
        return(-1);
    }

763
    hret = margo_provider_forward(provider->provider_id, handle, &in);
764 765
    if(hret != HG_SUCCESS)
    {
766
        margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
767
        margo_destroy(handle);
768 769 770
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
771
    hret = margo_get_output(handle, &out);
772 773
    if(hret != HG_SUCCESS)
    {
774
        margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
775
        margo_destroy(handle);
776 777 778 779 780
        return(-1);
    }
    
    ret = out.ret;

Shane Snyder's avatar
Shane Snyder committed
781
    margo_free_output(handle, &out);
782
    margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
783
    margo_destroy(handle);
784
    return(ret);
Philip Carns's avatar
Philip Carns committed
785 786
}

787
int bake_proxy_read(
788
    bake_provider_handle_t provider,
789
    bake_region_id_t rid,
Philip Carns's avatar
Philip Carns committed
790
    uint64_t region_offset,
791 792 793 794
    hg_bulk_t remote_bulk,
    uint64_t remote_offset,
    const char* remote_addr,
    uint64_t size)
Philip Carns's avatar
Philip Carns committed
795 796
{
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
797
    hg_handle_t handle;
798 799
    bake_read_in_t in;
    bake_read_out_t out;
800
    int ret;
Philip Carns's avatar
Philip Carns committed
801 802 803

    in.rid = rid;
    in.region_offset = region_offset;
804 805 806 807
    in.bulk_handle = remote_bulk;
    in.bulk_offset = remote_offset;
    in.bulk_size = size; 
    in.remote_addr_str = (char*)remote_addr;
Philip Carns's avatar
Philip Carns committed
808

809 810
    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_read_id, &handle);
811

Shane Snyder's avatar
Shane Snyder committed
812 813
    if(hret != HG_SUCCESS)
        return(-1);
814

815
    hret = margo_provider_forward(provider->provider_id, handle, &in);
Philip Carns's avatar
Philip Carns committed
816 817
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
818
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
819 820 821
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
822
    hret = margo_get_output(handle, &out);
Philip Carns's avatar
Philip Carns committed
823 824
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
825
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
826 827
        return(-1);
    }
828

Philip Carns's avatar
Philip Carns committed
829 830
    ret = out.ret;

Shane Snyder's avatar
Shane Snyder committed
831 832
    margo_free_output(handle, &out);
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
833 834 835
    return(ret);
}

Shane Snyder's avatar
Shane Snyder committed
836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873
int bake_remove(
    bake_provider_handle_t provider,
    bake_region_id_t rid)
{
    hg_return_t hret;
    hg_handle_t handle;
    bake_remove_in_t in;
    bake_remove_out_t out;
    int ret;

    in.rid = rid;

    hret = margo_create(provider->client->mid, provider->addr,
            provider->client->bake_remove_id, &handle);
    margo_set_target_id(handle, provider->mplex_id);

    if(hret != HG_SUCCESS)
        return(-1);

    hret = margo_forward(handle, &in);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }

    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }

    ret = out.ret;

    margo_destroy(handle);
    return(ret);
}