bake-client.c 19.5 KB
Newer Older
Philip Carns's avatar
Philip Carns committed
1 2 3 4 5 6
/*
 * (C) 2016 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

7 8
#include "bake-config.h"

9
#include <assert.h>
Philip Carns's avatar
Philip Carns committed
10
#include <margo.h>
11
#include <bake-client.h>
Philip Carns's avatar
Philip Carns committed
12
#include "uthash.h"
13
#include "bake-rpc.h"
Philip Carns's avatar
Philip Carns committed
14

15
#define BAKE_EAGER_LIMIT 2048
16

17
/* Refers to a single Margo initialization, for now this is shared by
18 19
 * all remote BAKE targets.  In the future we probably need to support
 * multiple in case we run atop more than one transport at a time.
20
 */
21
struct bake_margo_instance
22 23
{
    margo_instance_id mid;  
24

25 26 27 28 29 30 31
    hg_id_t bake_probe_id;
    hg_id_t bake_shutdown_id; 
    hg_id_t bake_create_id;
    hg_id_t bake_eager_write_id;
    hg_id_t bake_eager_read_id;
    hg_id_t bake_write_id;
    hg_id_t bake_persist_id;
32
    hg_id_t bake_create_write_persist_id;
33 34 35
    hg_id_t bake_get_size_id;
    hg_id_t bake_read_id;
    hg_id_t bake_noop_id;
36 37
};

38
/* Refers to an instance connected to a specific BAKE target */
Philip Carns's avatar
Philip Carns committed
39 40
struct bake_instance
{
41 42 43
    bake_target_id_t bti;   /* persistent identifier for this target */
    hg_addr_t dest;         /* resolved Mercury address */
    UT_hash_handle hh;
Philip Carns's avatar
Philip Carns committed
44 45
};

46 47
struct bake_instance *instance_hash = NULL;

Philip Carns's avatar
Philip Carns committed
48

49
struct bake_margo_instance g_margo_inst = {
50 51
    .mid = MARGO_INSTANCE_NULL,
};
Philip Carns's avatar
Philip Carns committed
52

Shane Snyder's avatar
Shane Snyder committed
53

54 55 56 57
/* XXX calling this function again just overwrites the previous global mid...
 * need to be smarter if we truly want to support multiple client-side mids
 */
static int bake_margo_instance_init(margo_instance_id mid)
Philip Carns's avatar
Philip Carns committed
58
{
59
    g_margo_inst.mid = mid;
Philip Carns's avatar
Philip Carns committed
60 61

    /* register RPCs */
62
    g_margo_inst.bake_probe_id = 
63 64
        MARGO_REGISTER(g_margo_inst.mid, "bake_probe_rpc",
        void, bake_probe_out_t, NULL);
65
    g_margo_inst.bake_shutdown_id = 
66 67
        MARGO_REGISTER(g_margo_inst.mid, "bake_shutdown_rpc",
        void, void, NULL);
68
    g_margo_inst.bake_create_id = 
69 70
        MARGO_REGISTER(g_margo_inst.mid, "bake_create_rpc",
        bake_create_in_t, bake_create_out_t, NULL);
71
    g_margo_inst.bake_write_id = 
72 73
        MARGO_REGISTER(g_margo_inst.mid, "bake_write_rpc",
        bake_write_in_t, bake_write_out_t, NULL);
74
    g_margo_inst.bake_eager_write_id = 
75 76
        MARGO_REGISTER(g_margo_inst.mid, "bake_eager_write_rpc",
        bake_eager_write_in_t, bake_eager_write_out_t, NULL);
77
    g_margo_inst.bake_eager_read_id = 
78 79
        MARGO_REGISTER(g_margo_inst.mid, "bake_eager_read_rpc",
        bake_eager_read_in_t, bake_eager_read_out_t, NULL);
80
    g_margo_inst.bake_persist_id = 
81 82 83 84 85
        MARGO_REGISTER(g_margo_inst.mid, "bake_persist_rpc",
        bake_persist_in_t, bake_persist_out_t, NULL);
    g_margo_inst.bake_create_write_persist_id =
        MARGO_REGISTER(g_margo_inst.mid, "bake_create_write_persist_rpc",
         bake_create_write_persist_in_t, bake_create_write_persist_out_t, NULL);
86
    g_margo_inst.bake_get_size_id = 
87 88
        MARGO_REGISTER(g_margo_inst.mid, "bake_get_size_rpc",
        bake_get_size_in_t, bake_get_size_out_t, NULL);
89
    g_margo_inst.bake_read_id = 
90 91
        MARGO_REGISTER(g_margo_inst.mid, "bake_read_rpc",
        bake_read_in_t, bake_read_out_t, NULL);
92
    g_margo_inst.bake_noop_id = 
93 94
        MARGO_REGISTER(g_margo_inst.mid, "bake_noop_rpc",
        void, void, NULL);
Philip Carns's avatar
Philip Carns committed
95

96 97 98 99
    return(0);
}

int bake_probe_instance(
100 101
    margo_instance_id mid,
    hg_addr_t dest_addr,
102 103 104 105
    bake_target_id_t *bti)
{
    hg_return_t hret;
    int ret;
106
    bake_probe_out_t out;
107
    hg_handle_t handle;
108
    struct bake_instance *new_instance;
109

110
    ret = bake_margo_instance_init(mid);
111 112
    if(ret < 0)
        return(ret);
Philip Carns's avatar
Philip Carns committed
113

114 115 116 117
    new_instance = calloc(1, sizeof(*new_instance));
    if(!new_instance)
        return(-1);

118
    hret = margo_addr_dup(g_margo_inst.mid, dest_addr, &new_instance->dest);
Philip Carns's avatar
Philip Carns committed
119 120
    if(hret != HG_SUCCESS)
    {
121
        free(new_instance);
Philip Carns's avatar
Philip Carns committed
122 123 124
        return(-1);
    }

125
    /* create handle */
126
    hret = margo_create(g_margo_inst.mid, new_instance->dest, 
127
        g_margo_inst.bake_probe_id, &handle);
128 129
    if(hret != HG_SUCCESS)
    {
130
        margo_addr_free(g_margo_inst.mid, new_instance->dest);
131
        free(new_instance);
132 133 134
        return(-1);
    }

135
    hret = margo_forward(handle, NULL);
136 137
    if(hret != HG_SUCCESS)
    {
138
        margo_destroy(handle);
139 140
        margo_addr_free(g_margo_inst.mid, new_instance->dest);
        free(new_instance);
141 142 143
        return(-1);
    }

144
    hret = margo_get_output(handle, &out);
145 146
    if(hret != HG_SUCCESS)
    {
147
        margo_destroy(handle);
148 149
        margo_addr_free(g_margo_inst.mid, new_instance->dest);
        free(new_instance);
150 151 152 153 154
        return(-1);
    }

    ret = out.ret;
    *bti = out.bti;
155
    new_instance->bti = out.bti;
156

157 158
    margo_free_output(handle, &out);
    margo_destroy(handle);
159

160 161
    if(ret != 0)
    {
162
        margo_addr_free(g_margo_inst.mid, new_instance->dest);
163 164 165 166 167 168 169 170
        free(new_instance);
    }
    else
    {
        /* TODO: safety check that it isn't already there.  Here or earlier? */
        HASH_ADD(hh, instance_hash, bti, sizeof(new_instance->bti), new_instance);
    }

171
    return(ret);
Philip Carns's avatar
Philip Carns committed
172 173 174 175 176
}
  
void bake_release_instance(
    bake_target_id_t bti)
{
177 178 179 180 181 182 183
    struct bake_instance *instance = NULL;

    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return;
    
    HASH_DELETE(hh, instance_hash, instance);
184
    margo_addr_free(g_margo_inst.mid, instance->dest);
185 186
    free(instance);

Philip Carns's avatar
Philip Carns committed
187 188 189 190 191 192
    return;
}

int bake_shutdown_service(bake_target_id_t bti)
{
    hg_return_t hret;
193
    struct bake_instance *instance = NULL;
Shane Snyder's avatar
Shane Snyder committed
194
    hg_handle_t handle;
195 196 197 198

    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return(-1);
Philip Carns's avatar
Philip Carns committed
199

Shane Snyder's avatar
Shane Snyder committed
200
    hret = margo_create(g_margo_inst.mid, instance->dest, 
201
        g_margo_inst.bake_shutdown_id, &handle);
Shane Snyder's avatar
Shane Snyder committed
202 203
    if(hret != HG_SUCCESS)
        return(-1);
Philip Carns's avatar
Philip Carns committed
204

Shane Snyder's avatar
Shane Snyder committed
205
    hret = margo_forward(handle, NULL);
Philip Carns's avatar
Philip Carns committed
206 207
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
208
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
209 210 211
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
212
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
213 214
    return(0);
}
Philip Carns's avatar
Philip Carns committed
215

216
static int bake_eager_write(
Philip Carns's avatar
Philip Carns committed
217
    bake_target_id_t bti,
218
    bake_region_id_t rid,
Philip Carns's avatar
Philip Carns committed
219 220 221 222 223
    uint64_t region_offset,
    void const *buf,
    uint64_t buf_size)
{
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
224
    hg_handle_t handle;
225 226
    bake_eager_write_in_t in;
    bake_eager_write_out_t out;
Philip Carns's avatar
Philip Carns committed
227 228 229 230 231 232 233 234 235 236 237 238
    int ret;
    struct bake_instance *instance = NULL;

    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return(-1);

    in.bti = bti;
    in.rid = rid;
    in.region_offset = region_offset;
    in.size = buf_size;
    in.buffer = (char*)buf;
239
  
Shane Snyder's avatar
Shane Snyder committed
240
    hret = margo_create(g_margo_inst.mid, instance->dest, 
241
        g_margo_inst.bake_eager_write_id, &handle);
Shane Snyder's avatar
Shane Snyder committed
242 243
    if(hret != HG_SUCCESS)
        return(-1);
Philip Carns's avatar
Philip Carns committed
244

Shane Snyder's avatar
Shane Snyder committed
245
    hret = margo_forward(handle, &in);
Philip Carns's avatar
Philip Carns committed
246 247
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
248
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
249 250 251
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
252
    hret = margo_get_output(handle, &out);
Philip Carns's avatar
Philip Carns committed
253 254
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
255
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
256 257 258 259 260
        return(-1);
    }
    
    ret = out.ret;

Shane Snyder's avatar
Shane Snyder committed
261 262
    margo_free_output(handle, &out);
    margo_destroy(handle);
263

Philip Carns's avatar
Philip Carns committed
264 265 266
    return(ret);
}

267
int bake_write(
Philip Carns's avatar
Philip Carns committed
268
    bake_target_id_t bti,
269
    bake_region_id_t rid,
Philip Carns's avatar
Philip Carns committed
270 271 272 273
    uint64_t region_offset,
    void const *buf,
    uint64_t buf_size)
{
Philip Carns's avatar
Philip Carns committed
274
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
275
    hg_handle_t handle;
276 277
    bake_write_in_t in;
    bake_write_out_t out;
Philip Carns's avatar
Philip Carns committed
278
    int ret;
279 280
    struct bake_instance *instance = NULL;

281 282
    if(buf_size <= BAKE_EAGER_LIMIT)
        return(bake_eager_write(bti, rid, region_offset, buf, buf_size));
Philip Carns's avatar
Philip Carns committed
283

284 285 286
    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return(-1);
Philip Carns's avatar
Philip Carns committed
287 288 289 290

    in.bti = bti;
    in.rid = rid;
    in.region_offset = region_offset;
291 292 293
    in.bulk_offset = 0;
    in.bulk_size = buf_size;
    in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy write */
Philip Carns's avatar
Philip Carns committed
294

295
    hret = margo_bulk_create(g_margo_inst.mid, 1, (void**)(&buf), &buf_size, 
Philip Carns's avatar
Philip Carns committed
296
        HG_BULK_READ_ONLY, &in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
297 298 299 300
    if(hret != HG_SUCCESS)
        return(-1);
   
    hret = margo_create(g_margo_inst.mid, instance->dest, 
301
        g_margo_inst.bake_write_id, &handle);
Philip Carns's avatar
Philip Carns committed
302 303
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
304
        margo_bulk_free(in.bulk_handle);
Philip Carns's avatar
Philip Carns committed
305 306 307
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
308
    hret = margo_forward(handle, &in);
Philip Carns's avatar
Philip Carns committed
309 310
    if(hret != HG_SUCCESS)
    {
311
        margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
312
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
313 314 315
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
316
    hret = margo_get_output(handle, &out);
Philip Carns's avatar
Philip Carns committed
317 318
    if(hret != HG_SUCCESS)
    {
319
        margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
320
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
321 322 323 324 325
        return(-1);
    }
    
    ret = out.ret;

Shane Snyder's avatar
Shane Snyder committed
326
    margo_free_output(handle, &out);
327
    margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
328
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
329
    return(ret);
Philip Carns's avatar
Philip Carns committed
330 331
}

332
int bake_proxy_write(
333
    bake_target_id_t bti,
334
    bake_region_id_t rid,
335 336 337
    uint64_t region_offset,
    hg_bulk_t remote_bulk,
    uint64_t remote_offset,
338
    const char* remote_addr,
339 340 341 342
    uint64_t size)
{
    hg_return_t hret;
    hg_handle_t handle;
343 344
    bake_write_in_t in;
    bake_write_out_t out;
345 346 347 348 349 350 351 352 353 354 355 356 357
    struct bake_instance *instance = NULL;
    int ret;

    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return(-1);

    in.bti = bti;
    in.rid = rid;
    in.region_offset = region_offset;
    in.bulk_handle = remote_bulk;
    in.bulk_offset = remote_offset;
    in.bulk_size = size;
358
    in.remote_addr_str = (char*)remote_addr;
Shane Snyder's avatar
Shane Snyder committed
359

360
    hret = margo_create(g_margo_inst.mid, instance->dest,
361
        g_margo_inst.bake_write_id, &handle);
362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385
    if(hret != HG_SUCCESS)
        return(-1);

    hret = margo_forward(handle, &in);
    if(hret != HG_SUCCESS)
    {   
        margo_destroy(handle);
        return(-1);
    }
    
    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {   
        margo_destroy(handle);
        return(-1);
    }

    ret = out.ret;

    margo_free_output(handle, &out);
    margo_destroy(handle);
    return(ret);
}

386
int bake_create(
Philip Carns's avatar
Philip Carns committed
387 388
    bake_target_id_t bti,
    uint64_t region_size,
389
    bake_region_id_t *rid)
Philip Carns's avatar
Philip Carns committed
390
{
391
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
392
    hg_handle_t handle;
393 394
    bake_create_in_t in;
    bake_create_out_t out;
395
    int ret;
396 397 398 399 400
    struct bake_instance *instance = NULL;

    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return(-1);
401

Philip Carns's avatar
Philip Carns committed
402 403 404
    in.bti = bti;
    in.region_size = region_size;

Shane Snyder's avatar
Shane Snyder committed
405
    hret = margo_create(g_margo_inst.mid, instance->dest,
406
        g_margo_inst.bake_create_id, &handle);
Shane Snyder's avatar
Shane Snyder committed
407 408
    if(hret != HG_SUCCESS)
        return(-1);
409

Shane Snyder's avatar
Shane Snyder committed
410
    hret = margo_forward(handle, &in);
411 412
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
413
        margo_destroy(handle);
414 415
        return(-1);
    }
Philip Carns's avatar
Philip Carns committed
416

Shane Snyder's avatar
Shane Snyder committed
417
    hret = margo_get_output(handle, &out);
418 419
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
420
        margo_destroy(handle);
421 422 423 424 425 426
        return(-1);
    }

    ret = out.ret;
    *rid = out.rid;

Shane Snyder's avatar
Shane Snyder committed
427 428
    margo_free_output(handle, &out);
    margo_destroy(handle);
429
    return(ret);
Philip Carns's avatar
Philip Carns committed
430 431 432
}


433
int bake_persist(
Philip Carns's avatar
Philip Carns committed
434
    bake_target_id_t bti,
435
    bake_region_id_t rid)
Philip Carns's avatar
Philip Carns committed
436
{
Philip Carns's avatar
Philip Carns committed
437
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
438
    hg_handle_t handle;
439 440
    bake_persist_in_t in;
    bake_persist_out_t out;
Philip Carns's avatar
Philip Carns committed
441
    int ret;
442 443 444 445 446
    struct bake_instance *instance = NULL;

    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return(-1);
Philip Carns's avatar
Philip Carns committed
447 448 449 450

    in.bti = bti;
    in.rid = rid;

Shane Snyder's avatar
Shane Snyder committed
451
    hret = margo_create(g_margo_inst.mid, instance->dest,
452
        g_margo_inst.bake_persist_id, &handle);
Shane Snyder's avatar
Shane Snyder committed
453 454
    if(hret != HG_SUCCESS)
        return(-1);
Philip Carns's avatar
Philip Carns committed
455

Shane Snyder's avatar
Shane Snyder committed
456
    hret = margo_forward(handle, &in);
Philip Carns's avatar
Philip Carns committed
457 458
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
459
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
460 461 462
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
463
    hret = margo_get_output(handle, &out);
Philip Carns's avatar
Philip Carns committed
464 465
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
466
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
467 468 469 470 471
        return(-1);
    }

    ret = out.ret;

Shane Snyder's avatar
Shane Snyder committed
472 473
    margo_free_output(handle, &out);
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
474
    return(ret);
Philip Carns's avatar
Philip Carns committed
475
}
Philip Carns's avatar
Philip Carns committed
476

477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543
int bake_create_write_persist(
    bake_target_id_t bti,
    uint64_t region_size,
    uint64_t region_offset,
    void const *buf,
    uint64_t buf_size,
    bake_region_id_t *rid)
{
    hg_return_t hret;
    hg_handle_t handle;
    bake_create_write_persist_in_t in;
    bake_create_write_persist_out_t out;
    int ret;
    struct bake_instance *instance = NULL;

    /* XXX eager path? */

    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return(-1);

    in.bti = bti;
    in.region_size = region_size;
    in.region_offset = region_offset;
    in.bulk_offset = 0;
    in.bulk_size = buf_size;
    in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy write */

    hret = margo_bulk_create(g_margo_inst.mid, 1, (void**)(&buf), &buf_size,
        HG_BULK_READ_ONLY, &in.bulk_handle);
    if(hret != HG_SUCCESS)
        return(-1);

    hret = margo_create(g_margo_inst.mid, instance->dest,
        g_margo_inst.bake_create_write_persist_id, &handle);
    if(hret != HG_SUCCESS)
    {
        margo_bulk_free(in.bulk_handle);
        return(-1);
    }

    hret = margo_forward(handle, &in);
    if(hret != HG_SUCCESS)
    {
        margo_bulk_free(in.bulk_handle);
        margo_destroy(handle);
        return(-1);
    }

    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        margo_bulk_free(in.bulk_handle);
        margo_destroy(handle);
        return(-1);
    }

    ret = out.ret;
    if(ret == 0)
        *rid = out.rid;

    margo_free_output(handle, &out);
    margo_bulk_free(in.bulk_handle);
    margo_destroy(handle);
    return(ret);
}

544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600
int bake_create_write_persist_proxy(
    bake_target_id_t bti,
    uint64_t region_size,
    uint64_t region_offset,
    hg_bulk_t remote_bulk,
    uint64_t remote_offset,
    const char* remote_addr,
    uint64_t size,
    bake_region_id_t *rid)
{
    hg_return_t hret;
    hg_handle_t handle;
    bake_create_write_persist_in_t in;
    bake_create_write_persist_out_t out;
    int ret;
    struct bake_instance *instance = NULL;

    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return(-1);

    in.bti = bti;
    in.region_size = region_size;
    in.region_offset = region_offset;
    in.bulk_handle = remote_bulk;
    in.bulk_offset = remote_offset;
    in.bulk_size = size;
    in.remote_addr_str = (char*)remote_addr;

    hret = margo_create(g_margo_inst.mid, instance->dest,
        g_margo_inst.bake_create_write_persist_id, &handle);
    if(hret != HG_SUCCESS)
        return(-1);

    hret = margo_forward(handle, &in);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }

    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }

    ret = out.ret;
    if(ret == 0)
        *rid = out.rid;

    margo_free_output(handle, &out);
    margo_destroy(handle);
    return(ret);
}

601
int bake_get_size(
602
    bake_target_id_t bti,
603
    bake_region_id_t rid,
604 605 606
    uint64_t *region_size)
{
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
607
    hg_handle_t handle;
608 609
    bake_get_size_in_t in;
    bake_get_size_out_t out;
610
    int ret;
611 612 613 614 615
    struct bake_instance *instance = NULL;

    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return(-1);
616 617 618 619

    in.bti = bti;
    in.rid = rid;

Shane Snyder's avatar
Shane Snyder committed
620
    hret = margo_create(g_margo_inst.mid, instance->dest,
621
        g_margo_inst.bake_get_size_id, &handle);
Shane Snyder's avatar
Shane Snyder committed
622 623
    if(hret != HG_SUCCESS)
        return(-1);
624

Shane Snyder's avatar
Shane Snyder committed
625
    hret = margo_forward(handle, &in);
626 627
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
628
        margo_destroy(handle);
629 630 631
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
632
    hret = margo_get_output(handle, &out);
633 634
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
635
        margo_destroy(handle);
636 637 638 639 640 641
        return(-1);
    }

    ret = out.ret;
    *region_size = out.size;

Shane Snyder's avatar
Shane Snyder committed
642 643
    margo_free_output(handle, &out);
    margo_destroy(handle);
644 645 646
    return(ret);
}

647
int bake_noop(
Philip Carns's avatar
Philip Carns committed
648 649 650
    bake_target_id_t bti)
{
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
651
    hg_handle_t handle;
Philip Carns's avatar
Philip Carns committed
652 653 654 655 656 657
    struct bake_instance *instance = NULL;

    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return(-1);

Shane Snyder's avatar
Shane Snyder committed
658
    hret = margo_create(g_margo_inst.mid, instance->dest,
659
        g_margo_inst.bake_noop_id, &handle);
Shane Snyder's avatar
Shane Snyder committed
660 661
    if(hret != HG_SUCCESS)
        return(-1);
Philip Carns's avatar
Philip Carns committed
662

Shane Snyder's avatar
Shane Snyder committed
663
    hret = margo_forward(handle, NULL);
Philip Carns's avatar
Philip Carns committed
664 665
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
666
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
667 668 669
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
670
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
671 672 673
    return(0);
}

674
static int bake_eager_read(
675
    bake_target_id_t bti,
676
    bake_region_id_t rid,
677 678 679 680 681 682
    uint64_t region_offset,
    void *buf,
    uint64_t buf_size)
{
    hg_return_t hret;
    hg_handle_t handle;
683 684
    bake_eager_read_in_t in;
    bake_eager_read_out_t out;
685 686 687 688 689 690 691 692 693 694 695 696 697
    int ret;
    struct bake_instance *instance = NULL;

    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return(-1);

    in.bti = bti;
    in.rid = rid;
    in.region_offset = region_offset;
    in.size = buf_size;

    hret = margo_create(g_margo_inst.mid, instance->dest,
698
        g_margo_inst.bake_eager_read_id, &handle);
699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724
    if(hret != HG_SUCCESS)
        return(-1);
  
    hret = margo_forward(handle, &in);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }

    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return(-1);
    }
    
    ret = out.ret;
    if(ret == 0)
        memcpy(buf, out.buffer, out.size);

    margo_free_output(handle, &out);
    margo_destroy(handle);
    return(ret);
}

725
int bake_read(
Philip Carns's avatar
Philip Carns committed
726
    bake_target_id_t bti,
727
    bake_region_id_t rid,
Philip Carns's avatar
Philip Carns committed
728 729 730 731
    uint64_t region_offset,
    void *buf,
    uint64_t buf_size)
{
732
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
733
    hg_handle_t handle;
734 735
    bake_read_in_t in;
    bake_read_out_t out;
736
    int ret;
737 738
    struct bake_instance *instance = NULL;

739 740
    if(buf_size <= BAKE_EAGER_LIMIT)
        return(bake_eager_read(bti, rid, region_offset, buf, buf_size));
Philip Carns's avatar
Philip Carns committed
741

742 743 744
    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return(-1);
745 746 747 748

    in.bti = bti;
    in.rid = rid;
    in.region_offset = region_offset;
749 750 751
    in.bulk_offset = 0;
    in.bulk_size = buf_size;
    in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy read */
752

753
    hret = margo_bulk_create(g_margo_inst.mid, 1, (void**)(&buf), &buf_size, 
754
        HG_BULK_WRITE_ONLY, &in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
755 756 757 758
    if(hret != HG_SUCCESS)
        return(-1);
   
    hret = margo_create(g_margo_inst.mid, instance->dest,
759
        g_margo_inst.bake_read_id, &handle);
760 761
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
762
        margo_bulk_free(in.bulk_handle);
763 764 765
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
766
    hret = margo_forward(handle, &in);
767 768
    if(hret != HG_SUCCESS)
    {
769
        margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
770
        margo_destroy(handle);
771 772 773
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
774
    hret = margo_get_output(handle, &out);
775 776
    if(hret != HG_SUCCESS)
    {
777
        margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
778
        margo_destroy(handle);
779 780 781 782 783
        return(-1);
    }
    
    ret = out.ret;

Shane Snyder's avatar
Shane Snyder committed
784
    margo_free_output(handle, &out);
785
    margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
786
    margo_destroy(handle);
787
    return(ret);
Philip Carns's avatar
Philip Carns committed
788 789
}

790
int bake_proxy_read(
Philip Carns's avatar
Philip Carns committed
791
    bake_target_id_t bti,
792
    bake_region_id_t rid,
Philip Carns's avatar
Philip Carns committed
793
    uint64_t region_offset,
794 795 796 797
    hg_bulk_t remote_bulk,
    uint64_t remote_offset,
    const char* remote_addr,
    uint64_t size)
Philip Carns's avatar
Philip Carns committed
798 799
{
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
800
    hg_handle_t handle;
801 802
    bake_read_in_t in;
    bake_read_out_t out;
Philip Carns's avatar
Philip Carns committed
803
    struct bake_instance *instance = NULL;
804
    int ret;
Philip Carns's avatar
Philip Carns committed
805 806 807 808 809 810 811 812

    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return(-1);

    in.bti = bti;
    in.rid = rid;
    in.region_offset = region_offset;
813 814 815 816
    in.bulk_handle = remote_bulk;
    in.bulk_offset = remote_offset;
    in.bulk_size = size; 
    in.remote_addr_str = (char*)remote_addr;
Philip Carns's avatar
Philip Carns committed
817

Shane Snyder's avatar
Shane Snyder committed
818
    hret = margo_create(g_margo_inst.mid, instance->dest,
819
        g_margo_inst.bake_read_id, &handle);
Shane Snyder's avatar
Shane Snyder committed
820 821
    if(hret != HG_SUCCESS)
        return(-1);
822

Shane Snyder's avatar
Shane Snyder committed
823
    hret = margo_forward(handle, &in);
Philip Carns's avatar
Philip Carns committed
824 825
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
826
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
827 828 829
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
830
    hret = margo_get_output(handle, &out);
Philip Carns's avatar
Philip Carns committed
831 832
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
833
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
834 835
        return(-1);
    }
836

Philip Carns's avatar
Philip Carns committed
837 838
    ret = out.ret;

Shane Snyder's avatar
Shane Snyder committed
839 840
    margo_free_output(handle, &out);
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
841 842 843
    return(ret);
}