bake-bulk-client.c 15.2 KB
Newer Older
Philip Carns's avatar
Philip Carns committed
1 2 3 4 5 6
/*
 * (C) 2016 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

7
#include <assert.h>
8
#include <bake-bulk-client.h>
Philip Carns's avatar
Philip Carns committed
9
#include <margo.h>
Philip Carns's avatar
Philip Carns committed
10
#include "uthash.h"
11
#include "bake-bulk-rpc.h"
Philip Carns's avatar
Philip Carns committed
12

13
/* Refers to a single Margo initialization, for now this is shared by
14 15 16
 * all remote targets.  In the future we probably need to support multiple in
 * case we run atop more than one transport at a time.
 */
17
struct bake_margo_instance
18 19 20
{
    margo_instance_id mid;  
    int refct;
21

22
    hg_id_t bake_bulk_probe_id;
23 24
    hg_id_t bake_bulk_shutdown_id; 
    hg_id_t bake_bulk_create_id;
Philip Carns's avatar
Philip Carns committed
25
    hg_id_t bake_bulk_eager_write_id;
Philip Carns's avatar
Philip Carns committed
26
    hg_id_t bake_bulk_eager_read_id;
27 28 29 30
    hg_id_t bake_bulk_write_id;
    hg_id_t bake_bulk_persist_id;
    hg_id_t bake_bulk_get_size_id;
    hg_id_t bake_bulk_read_id;
Philip Carns's avatar
Philip Carns committed
31
    hg_id_t bake_bulk_noop_id;
32 33 34
};

/* Refers to an instance connected to a specific target */
Philip Carns's avatar
Philip Carns committed
35 36
struct bake_instance
{
37 38 39
    bake_target_id_t bti;   /* persistent identifier for this target */
    hg_addr_t dest;         /* resolved Mercury address */
    UT_hash_handle hh;
Philip Carns's avatar
Philip Carns committed
40 41
};

42 43
struct bake_instance *instance_hash = NULL;

Philip Carns's avatar
Philip Carns committed
44

45
struct bake_margo_instance g_margo_inst = {
46 47 48
    .mid = MARGO_INSTANCE_NULL,
    .refct = 0,
};
Philip Carns's avatar
Philip Carns committed
49

Philip Carns's avatar
Philip Carns committed
50 51 52 53 54 55
static int bake_bulk_eager_read(
    bake_target_id_t bti,
    bake_bulk_region_id_t rid,
    uint64_t region_offset,
    void *buf,
    uint64_t buf_size);
Shane Snyder's avatar
Shane Snyder committed
56

Philip Carns's avatar
Philip Carns committed
57

58
static int bake_margo_instance_init(const char *mercury_dest)
Philip Carns's avatar
Philip Carns committed
59
{
60 61 62
    char hg_na[64] = {0};
    int i;

63 64
    /* have we already started a Margo instance? */
    if(g_margo_inst.refct > 0)
Philip Carns's avatar
Philip Carns committed
65
    {
66
        g_margo_inst.refct++;
67
        return(0);
Philip Carns's avatar
Philip Carns committed
68
    }
Philip Carns's avatar
Philip Carns committed
69

70
    /* initialize Margo using the transport portion of the destination
71
     * address (i.e., the part before the first : character if present)
Philip Carns's avatar
Philip Carns committed
72
     */
73 74 75
    for(i=0; (i<63 && mercury_dest[i] != '\0' && mercury_dest[i] != ':'); i++)
        hg_na[i] = mercury_dest[i];

76 77
    g_margo_inst.mid = margo_init(hg_na, MARGO_CLIENT_MODE, 0, 0);
    if(g_margo_inst.mid == MARGO_INSTANCE_NULL)
Philip Carns's avatar
Philip Carns committed
78 79 80
    {
        return(-1);
    }
81
    g_margo_inst.refct = 1;
Philip Carns's avatar
Philip Carns committed
82 83

    /* register RPCs */
84 85
    g_margo_inst.bake_bulk_probe_id = 
        MARGO_REGISTER(g_margo_inst.mid, 
86 87
        "bake_bulk_probe_rpc", void, bake_bulk_probe_out_t, 
        NULL);
88 89
    g_margo_inst.bake_bulk_shutdown_id = 
        MARGO_REGISTER(g_margo_inst.mid, 
Philip Carns's avatar
Philip Carns committed
90 91
        "bake_bulk_shutdown_rpc", void, void, 
        NULL);
92 93
    g_margo_inst.bake_bulk_create_id = 
        MARGO_REGISTER(g_margo_inst.mid, 
94 95 96 97
        "bake_bulk_create_rpc", 
        bake_bulk_create_in_t,
        bake_bulk_create_out_t,
        NULL);
98 99
    g_margo_inst.bake_bulk_write_id = 
        MARGO_REGISTER(g_margo_inst.mid, 
100 101 102 103
        "bake_bulk_write_rpc", 
        bake_bulk_write_in_t,
        bake_bulk_write_out_t,
        NULL);
104 105
    g_margo_inst.bake_bulk_eager_write_id = 
        MARGO_REGISTER(g_margo_inst.mid, 
Philip Carns's avatar
Philip Carns committed
106 107 108 109
        "bake_bulk_eager_write_rpc", 
        bake_bulk_eager_write_in_t,
        bake_bulk_eager_write_out_t,
        NULL);
110 111
    g_margo_inst.bake_bulk_eager_read_id = 
        MARGO_REGISTER(g_margo_inst.mid, 
Philip Carns's avatar
Philip Carns committed
112 113 114 115
        "bake_bulk_eager_read_rpc", 
        bake_bulk_eager_read_in_t,
        bake_bulk_eager_read_out_t,
        NULL);
116 117
    g_margo_inst.bake_bulk_persist_id = 
        MARGO_REGISTER(g_margo_inst.mid, 
Philip Carns's avatar
Philip Carns committed
118 119 120 121
        "bake_bulk_persist_rpc", 
        bake_bulk_persist_in_t,
        bake_bulk_persist_out_t,
        NULL);
122 123
    g_margo_inst.bake_bulk_get_size_id = 
        MARGO_REGISTER(g_margo_inst.mid, 
124 125 126 127
        "bake_bulk_get_size_rpc", 
        bake_bulk_get_size_in_t,
        bake_bulk_get_size_out_t,
        NULL);
128 129
    g_margo_inst.bake_bulk_read_id = 
        MARGO_REGISTER(g_margo_inst.mid, 
130 131 132 133
        "bake_bulk_read_rpc", 
        bake_bulk_read_in_t,
        bake_bulk_read_out_t,
        NULL);
134 135
    g_margo_inst.bake_bulk_noop_id = 
        MARGO_REGISTER(g_margo_inst.mid, 
Philip Carns's avatar
Philip Carns committed
136 137 138 139
        "bake_bulk_noop_rpc", 
        void,
        void,
        NULL);
Philip Carns's avatar
Philip Carns committed
140

141 142 143
    return(0);
}

144
void bake_margo_instance_finalize(void)
145
{
146
    g_margo_inst.refct--;
147

148
    assert(g_margo_inst.refct > -1);
149

150
    if(g_margo_inst.refct == 0)
151
    {
152
        margo_finalize(g_margo_inst.mid);
153 154 155 156 157 158 159 160 161
    }
}

int bake_probe_instance(
    const char *mercury_dest,
    bake_target_id_t *bti)
{
    hg_return_t hret;
    int ret;
162 163
    bake_bulk_probe_out_t out;
    hg_handle_t handle;
164
    struct bake_instance *new_instance;
165

166
    ret = bake_margo_instance_init(mercury_dest);
167 168
    if(ret < 0)
        return(ret);
Philip Carns's avatar
Philip Carns committed
169

170 171 172
    new_instance = calloc(1, sizeof(*new_instance));
    if(!new_instance)
    {
173
        bake_margo_instance_finalize();
174 175 176
        return(-1);
    }

177
    hret = margo_addr_lookup(g_margo_inst.mid, mercury_dest, &new_instance->dest);
Philip Carns's avatar
Philip Carns committed
178 179
    if(hret != HG_SUCCESS)
    {
180
        free(new_instance);
181
        bake_margo_instance_finalize();
Philip Carns's avatar
Philip Carns committed
182 183 184
        return(-1);
    }

185
    /* create handle */
186 187
    hret = margo_create(g_margo_inst.mid, new_instance->dest, 
        g_margo_inst.bake_bulk_probe_id, &handle);
188 189
    if(hret != HG_SUCCESS)
    {
190
        free(new_instance);
191
        bake_margo_instance_finalize();
192 193 194
        return(-1);
    }

195
    hret = margo_forward(handle, NULL);
196 197
    if(hret != HG_SUCCESS)
    {
198
        free(new_instance);
199 200
        margo_destroy(handle);
        bake_margo_instance_finalize();
201 202 203
        return(-1);
    }

204
    hret = margo_get_output(handle, &out);
205 206
    if(hret != HG_SUCCESS)
    {
207
        free(new_instance);
208 209
        margo_destroy(handle);
        bake_margo_instance_finalize();
210 211 212 213 214
        return(-1);
    }

    ret = out.ret;
    *bti = out.bti;
215
    new_instance->bti = out.bti;
216

217 218
    margo_free_output(handle, &out);
    margo_destroy(handle);
219

220 221 222
    if(ret != 0)
    {
        free(new_instance);
223
        bake_margo_instance_finalize();
224 225 226 227 228 229 230
    }
    else
    {
        /* TODO: safety check that it isn't already there.  Here or earlier? */
        HASH_ADD(hh, instance_hash, bti, sizeof(new_instance->bti), new_instance);
    }

231
    return(ret);
Philip Carns's avatar
Philip Carns committed
232 233 234 235 236
}
  
void bake_release_instance(
    bake_target_id_t bti)
{
237 238 239 240 241 242 243
    struct bake_instance *instance = NULL;

    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return;
    
    HASH_DELETE(hh, instance_hash, instance);
244
    margo_addr_free(g_margo_inst.mid, instance->dest);
245
    free(instance);
246
    bake_margo_instance_finalize();
247

Philip Carns's avatar
Philip Carns committed
248 249 250 251 252 253
    return;
}

int bake_shutdown_service(bake_target_id_t bti)
{
    hg_return_t hret;
254
    struct bake_instance *instance = NULL;
Shane Snyder's avatar
Shane Snyder committed
255
    hg_handle_t handle;
256 257 258 259

    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return(-1);
Philip Carns's avatar
Philip Carns committed
260

Shane Snyder's avatar
Shane Snyder committed
261 262 263 264
    hret = margo_create(g_margo_inst.mid, instance->dest, 
        g_margo_inst.bake_bulk_shutdown_id, &handle);
    if(hret != HG_SUCCESS)
        return(-1);
Philip Carns's avatar
Philip Carns committed
265

Shane Snyder's avatar
Shane Snyder committed
266
    hret = margo_forward(handle, NULL);
Philip Carns's avatar
Philip Carns committed
267 268
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
269
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
270 271 272
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
273
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
274 275
    return(0);
}
Philip Carns's avatar
Philip Carns committed
276

Philip Carns's avatar
Philip Carns committed
277
static int bake_bulk_eager_write(
Philip Carns's avatar
Philip Carns committed
278 279 280 281 282 283 284
    bake_target_id_t bti,
    bake_bulk_region_id_t rid,
    uint64_t region_offset,
    void const *buf,
    uint64_t buf_size)
{
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
285
    hg_handle_t handle;
Philip Carns's avatar
Philip Carns committed
286 287 288 289 290 291 292 293 294 295 296 297 298 299
    bake_bulk_eager_write_in_t in;
    bake_bulk_eager_write_out_t out;
    int ret;
    struct bake_instance *instance = NULL;

    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return(-1);

    in.bti = bti;
    in.rid = rid;
    in.region_offset = region_offset;
    in.size = buf_size;
    in.buffer = (char*)buf;
300
  
Shane Snyder's avatar
Shane Snyder committed
301 302 303 304
    hret = margo_create(g_margo_inst.mid, instance->dest, 
        g_margo_inst.bake_bulk_eager_write_id, &handle);
    if(hret != HG_SUCCESS)
        return(-1);
Philip Carns's avatar
Philip Carns committed
305

Shane Snyder's avatar
Shane Snyder committed
306
    hret = margo_forward(handle, &in);
Philip Carns's avatar
Philip Carns committed
307 308
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
309
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
310 311 312
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
313
    hret = margo_get_output(handle, &out);
Philip Carns's avatar
Philip Carns committed
314 315
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
316
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
317 318 319 320 321
        return(-1);
    }
    
    ret = out.ret;

Shane Snyder's avatar
Shane Snyder committed
322 323
    margo_free_output(handle, &out);
    margo_destroy(handle);
324

Philip Carns's avatar
Philip Carns committed
325 326 327
    return(ret);
}

Philip Carns's avatar
Philip Carns committed
328
#define BAKE_BULK_EAGER_LIMIT 2048
Philip Carns's avatar
Philip Carns committed
329

Philip Carns's avatar
Philip Carns committed
330 331 332 333 334 335 336
int bake_bulk_write(
    bake_target_id_t bti,
    bake_bulk_region_id_t rid,
    uint64_t region_offset,
    void const *buf,
    uint64_t buf_size)
{
Philip Carns's avatar
Philip Carns committed
337
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
338
    hg_handle_t handle;
Philip Carns's avatar
Philip Carns committed
339 340 341
    bake_bulk_write_in_t in;
    bake_bulk_write_out_t out;
    int ret;
342 343
    struct bake_instance *instance = NULL;

Philip Carns's avatar
Philip Carns committed
344 345 346
    if(buf_size <= BAKE_BULK_EAGER_LIMIT)
        return(bake_bulk_eager_write(bti, rid, region_offset, buf, buf_size));

347 348 349
    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return(-1);
Philip Carns's avatar
Philip Carns committed
350 351 352 353

    in.bti = bti;
    in.rid = rid;
    in.region_offset = region_offset;
Philip Carns's avatar
Philip Carns committed
354

355
    hret = margo_bulk_create(g_margo_inst.mid, 1, (void**)(&buf), &buf_size, 
Philip Carns's avatar
Philip Carns committed
356
        HG_BULK_READ_ONLY, &in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
357 358 359 360 361
    if(hret != HG_SUCCESS)
        return(-1);
   
    hret = margo_create(g_margo_inst.mid, instance->dest, 
        g_margo_inst.bake_bulk_write_id, &handle);
Philip Carns's avatar
Philip Carns committed
362 363
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
364
        margo_bulk_free(in.bulk_handle);
Philip Carns's avatar
Philip Carns committed
365 366 367
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
368
    hret = margo_forward(handle, &in);
Philip Carns's avatar
Philip Carns committed
369 370
    if(hret != HG_SUCCESS)
    {
371
        margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
372
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
373 374 375
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
376
    hret = margo_get_output(handle, &out);
Philip Carns's avatar
Philip Carns committed
377 378
    if(hret != HG_SUCCESS)
    {
379
        margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
380
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
381 382 383 384 385
        return(-1);
    }
    
    ret = out.ret;

Shane Snyder's avatar
Shane Snyder committed
386
    margo_free_output(handle, &out);
387
    margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
388
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
389
    return(ret);
Philip Carns's avatar
Philip Carns committed
390 391
}

Philip Carns's avatar
Philip Carns committed
392 393 394 395 396
int bake_bulk_create(
    bake_target_id_t bti,
    uint64_t region_size,
    bake_bulk_region_id_t *rid)
{
397
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
398
    hg_handle_t handle;
399 400 401
    bake_bulk_create_in_t in;
    bake_bulk_create_out_t out;
    int ret;
402 403 404 405 406
    struct bake_instance *instance = NULL;

    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return(-1);
407

Philip Carns's avatar
Philip Carns committed
408 409 410
    in.bti = bti;
    in.region_size = region_size;

Shane Snyder's avatar
Shane Snyder committed
411 412 413 414
    hret = margo_create(g_margo_inst.mid, instance->dest,
        g_margo_inst.bake_bulk_create_id, &handle);
    if(hret != HG_SUCCESS)
        return(-1);
415

Shane Snyder's avatar
Shane Snyder committed
416
    hret = margo_forward(handle, &in);
417 418
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
419
        margo_destroy(handle);
420 421
        return(-1);
    }
Philip Carns's avatar
Philip Carns committed
422

Shane Snyder's avatar
Shane Snyder committed
423
    hret = margo_get_output(handle, &out);
424 425
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
426
        margo_destroy(handle);
427 428 429 430 431 432
        return(-1);
    }

    ret = out.ret;
    *rid = out.rid;

Shane Snyder's avatar
Shane Snyder committed
433 434
    margo_free_output(handle, &out);
    margo_destroy(handle);
435
    return(ret);
Philip Carns's avatar
Philip Carns committed
436 437 438
}


Philip Carns's avatar
Philip Carns committed
439 440 441 442
int bake_bulk_persist(
    bake_target_id_t bti,
    bake_bulk_region_id_t rid)
{
Philip Carns's avatar
Philip Carns committed
443
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
444
    hg_handle_t handle;
Philip Carns's avatar
Philip Carns committed
445 446 447
    bake_bulk_persist_in_t in;
    bake_bulk_persist_out_t out;
    int ret;
448 449 450 451 452
    struct bake_instance *instance = NULL;

    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return(-1);
Philip Carns's avatar
Philip Carns committed
453 454 455 456

    in.bti = bti;
    in.rid = rid;

Shane Snyder's avatar
Shane Snyder committed
457 458 459 460
    hret = margo_create(g_margo_inst.mid, instance->dest,
        g_margo_inst.bake_bulk_persist_id, &handle);
    if(hret != HG_SUCCESS)
        return(-1);
Philip Carns's avatar
Philip Carns committed
461

Shane Snyder's avatar
Shane Snyder committed
462
    hret = margo_forward(handle, &in);
Philip Carns's avatar
Philip Carns committed
463 464
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
465
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
466 467 468
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
469
    hret = margo_get_output(handle, &out);
Philip Carns's avatar
Philip Carns committed
470 471
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
472
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
473 474 475 476 477
        return(-1);
    }

    ret = out.ret;

Shane Snyder's avatar
Shane Snyder committed
478 479
    margo_free_output(handle, &out);
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
480
    return(ret);
Philip Carns's avatar
Philip Carns committed
481
}
Philip Carns's avatar
Philip Carns committed
482

483 484 485 486 487 488
int bake_bulk_get_size(
    bake_target_id_t bti,
    bake_bulk_region_id_t rid,
    uint64_t *region_size)
{
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
489
    hg_handle_t handle;
490 491 492
    bake_bulk_get_size_in_t in;
    bake_bulk_get_size_out_t out;
    int ret;
493 494 495 496 497
    struct bake_instance *instance = NULL;

    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return(-1);
498 499 500 501

    in.bti = bti;
    in.rid = rid;

Shane Snyder's avatar
Shane Snyder committed
502 503 504 505
    hret = margo_create(g_margo_inst.mid, instance->dest,
        g_margo_inst.bake_bulk_get_size_id, &handle);
    if(hret != HG_SUCCESS)
        return(-1);
506

Shane Snyder's avatar
Shane Snyder committed
507
    hret = margo_forward(handle, &in);
508 509
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
510
        margo_destroy(handle);
511 512 513
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
514
    hret = margo_get_output(handle, &out);
515 516
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
517
        margo_destroy(handle);
518 519 520 521 522 523
        return(-1);
    }

    ret = out.ret;
    *region_size = out.size;

Shane Snyder's avatar
Shane Snyder committed
524 525
    margo_free_output(handle, &out);
    margo_destroy(handle);
526 527 528
    return(ret);
}

Philip Carns's avatar
Philip Carns committed
529 530 531 532
int bake_bulk_noop(
    bake_target_id_t bti)
{
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
533
    hg_handle_t handle;
Philip Carns's avatar
Philip Carns committed
534 535 536 537 538 539
    struct bake_instance *instance = NULL;

    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return(-1);

Shane Snyder's avatar
Shane Snyder committed
540 541 542 543
    hret = margo_create(g_margo_inst.mid, instance->dest,
        g_margo_inst.bake_bulk_noop_id, &handle);
    if(hret != HG_SUCCESS)
        return(-1);
Philip Carns's avatar
Philip Carns committed
544

Shane Snyder's avatar
Shane Snyder committed
545
    hret = margo_forward(handle, NULL);
Philip Carns's avatar
Philip Carns committed
546 547
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
548
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
549 550 551
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
552
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
553 554 555
    return(0);
}

Philip Carns's avatar
Philip Carns committed
556 557 558 559 560 561 562
int bake_bulk_read(
    bake_target_id_t bti,
    bake_bulk_region_id_t rid,
    uint64_t region_offset,
    void *buf,
    uint64_t buf_size)
{
563
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
564
    hg_handle_t handle;
565 566 567
    bake_bulk_read_in_t in;
    bake_bulk_read_out_t out;
    int ret;
568 569
    struct bake_instance *instance = NULL;

Philip Carns's avatar
Philip Carns committed
570 571 572
    if(buf_size <= BAKE_BULK_EAGER_LIMIT)
        return(bake_bulk_eager_read(bti, rid, region_offset, buf, buf_size));

573 574 575
    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return(-1);
576 577 578 579 580

    in.bti = bti;
    in.rid = rid;
    in.region_offset = region_offset;

581
    hret = margo_bulk_create(g_margo_inst.mid, 1, (void**)(&buf), &buf_size, 
582
        HG_BULK_WRITE_ONLY, &in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
583 584 585 586 587
    if(hret != HG_SUCCESS)
        return(-1);
   
    hret = margo_create(g_margo_inst.mid, instance->dest,
        g_margo_inst.bake_bulk_read_id, &handle);
588 589
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
590
        margo_bulk_free(in.bulk_handle);
591 592 593
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
594
    hret = margo_forward(handle, &in);
595 596
    if(hret != HG_SUCCESS)
    {
597
        margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
598
        margo_destroy(handle);
599 600 601
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
602
    hret = margo_get_output(handle, &out);
603 604
    if(hret != HG_SUCCESS)
    {
605
        margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
606
        margo_destroy(handle);
607 608 609 610 611
        return(-1);
    }
    
    ret = out.ret;

Shane Snyder's avatar
Shane Snyder committed
612
    margo_free_output(handle, &out);
613
    margo_bulk_free(in.bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
614
    margo_destroy(handle);
615
    return(ret);
Philip Carns's avatar
Philip Carns committed
616 617 618
}


Philip Carns's avatar
Philip Carns committed
619 620 621 622 623 624 625 626
static int bake_bulk_eager_read(
    bake_target_id_t bti,
    bake_bulk_region_id_t rid,
    uint64_t region_offset,
    void *buf,
    uint64_t buf_size)
{
    hg_return_t hret;
Shane Snyder's avatar
Shane Snyder committed
627
    hg_handle_t handle;
Philip Carns's avatar
Philip Carns committed
628 629 630 631 632 633 634 635 636 637 638 639 640 641
    bake_bulk_eager_read_in_t in;
    bake_bulk_eager_read_out_t out;
    int ret;
    struct bake_instance *instance = NULL;

    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return(-1);

    in.bti = bti;
    in.rid = rid;
    in.region_offset = region_offset;
    in.size = buf_size;

Shane Snyder's avatar
Shane Snyder committed
642 643 644 645 646 647
    hret = margo_create(g_margo_inst.mid, instance->dest,
        g_margo_inst.bake_bulk_eager_read_id, &handle);
    if(hret != HG_SUCCESS)
        return(-1);
  
    hret = margo_forward(handle, &in);
Philip Carns's avatar
Philip Carns committed
648 649
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
650
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
651 652 653
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
654
    hret = margo_get_output(handle, &out);
Philip Carns's avatar
Philip Carns committed
655 656
    if(hret != HG_SUCCESS)
    {
Shane Snyder's avatar
Shane Snyder committed
657
        margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
658 659 660 661 662 663 664
        return(-1);
    }
    
    ret = out.ret;
    if(ret == 0)
        memcpy(buf, out.buffer, out.size);

Shane Snyder's avatar
Shane Snyder committed
665 666
    margo_free_output(handle, &out);
    margo_destroy(handle);
Philip Carns's avatar
Philip Carns committed
667 668 669
    return(ret);
}