bake-server.c 25.6 KB
Newer Older
Philip Carns's avatar
Philip Carns committed
1 2 3 4 5 6
/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

7 8
#include "bake-config.h"

Philip Carns's avatar
Philip Carns committed
9
#include <assert.h>
10
#include <libpmemobj.h>
11
#include <bake-server.h>
12
#include "uthash.h"
13
#include "bake-rpc.h"
Philip Carns's avatar
Philip Carns committed
14

15
/* definition of BAKE root data structure (just a uuid for now) */
16
typedef struct
17
{   
18
    bake_target_id_t pool_id;
19
} bake_root_t;
20

21
/* definition of internal BAKE region_id_t identifier for libpmemobj back end */
22 23
typedef struct
{
24 25 26
    PMEMoid oid;
    uint64_t size;
} pmemobj_region_id_t;
27

28 29 30 31 32 33 34 35
typedef struct
{
    PMEMobjpool* pmem_pool;
    bake_root_t* pmem_root;
    bake_target_id_t target_id;
    UT_hash_handle hh;
} bake_pmem_entry_t;

36
typedef struct bake_server_context_t
37
{
38 39
    uint64_t num_targets;
    bake_pmem_entry_t* targets;
40
} bake_server_context_t;
41

42
static void bake_server_finalize_cb(void *data);
43

44
int bake_makepool(
45 46 47
        const char *pool_name,
        size_t pool_size,
        mode_t pool_mode)
48
{
49
    PMEMobjpool *pool;
50
    PMEMoid root_oid;
51
    bake_root_t *root;
52

53 54
    pool = pmemobj_create(pool_name, NULL, pool_size, pool_mode);
    if(!pool)
55
    {
56 57
        fprintf(stderr, "pmemobj_create: %s\n", pmemobj_errormsg());
        return(-1);
58 59 60
    }

    /* find root */
61
    root_oid = pmemobj_root(pool, sizeof(bake_root_t));
62 63 64
    root = pmemobj_direct(root_oid);

    /* store the target id for this bake pool at the root */
65
    uuid_generate(root->pool_id.id);
66
    pmemobj_persist(pool, root, sizeof(bake_root_t));
67
#if 0
68
    char target_string[64];
69
    uuid_unparse(root->id, target_string);
70
    fprintf(stderr, "created BAKE target ID: %s\n", target_string);
71
#endif
72 73 74 75 76 77

    pmemobj_close(pool);

    return(0);
}

78
int bake_provider_register(
79 80 81 82
        margo_instance_id mid,
        uint8_t mplex_id,
        ABT_pool abt_pool,
        bake_provider_t* provider)
83
{
84
    bake_server_context_t *tmp_svr_ctx;
85

86
    /* check if a provider with the same multiplex id already exists */
87
    {
88 89 90 91 92 93 94
        hg_id_t id;
        hg_bool_t flag;
        margo_registered_name_mplex(mid, "bake_probe_rpc", mplex_id, &id, &flag);
        if(flag == HG_TRUE) {
            fprintf(stderr, "bake_provider_register(): a provider with the same mplex id (%d) already exists\n", mplex_id);
            return -1;
        }
95
    }
96

97 98 99
    /* allocate the resulting structure */    
    tmp_svr_ctx = calloc(1,sizeof(*tmp_svr_ctx));
    if(!tmp_svr_ctx)
100
        return(-1);
Philip Carns's avatar
Philip Carns committed
101

102
    /* register RPCs */
103 104
    hg_id_t rpc_id;
    rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_create_rpc",
105 106
            bake_create_in_t, bake_create_out_t, 
            bake_create_ult, mplex_id, abt_pool);
107 108
    margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_write_rpc",
109 110
            bake_write_in_t, bake_write_out_t, 
            bake_write_ult, mplex_id, abt_pool);
111 112
    margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_eager_write_rpc",
113 114
            bake_eager_write_in_t, bake_eager_write_out_t, 
            bake_eager_write_ult, mplex_id, abt_pool);
115 116
    margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_eager_read_rpc",
117 118
            bake_eager_read_in_t, bake_eager_read_out_t, 
            bake_eager_read_ult, mplex_id, abt_pool);
119 120
    margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_persist_rpc",
121 122
            bake_persist_in_t, bake_persist_out_t, 
            bake_persist_ult, mplex_id, abt_pool);
123 124
    margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_create_write_persist_rpc",
125 126
            bake_create_write_persist_in_t, bake_create_write_persist_out_t,
            bake_create_write_persist_ult, mplex_id, abt_pool);
127 128
    margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_get_size_rpc",
129 130
            bake_get_size_in_t, bake_get_size_out_t, 
            bake_get_size_ult, mplex_id, abt_pool);
131 132
    margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_read_rpc",
133 134
            bake_read_in_t, bake_read_out_t, 
            bake_read_ult, mplex_id, abt_pool);
135 136
    margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_probe_rpc",
137 138
            bake_probe_in_t, bake_probe_out_t, bake_probe_ult, 
            mplex_id, abt_pool);
139 140
    margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_noop_rpc",
141
            void, void, bake_noop_ult, mplex_id, abt_pool);
142
    margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
143

144 145 146
    /* install the bake server finalize callback */
    margo_push_finalize_callback(mid, &bake_server_finalize_cb, tmp_svr_ctx);

147 148
    if(provider != BAKE_PROVIDER_IGNORE)
        *provider = tmp_svr_ctx;
149

150
    return(0);
151 152
}

153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
int bake_provider_add_storage_target(
        bake_provider_t provider,
        const char *target_name,
        bake_target_id_t* target_id)
{
    bake_pmem_entry_t* new_entry = calloc(1, sizeof(*new_entry));

    new_entry->pmem_pool = pmemobj_open(target_name, NULL);
    if(!(new_entry->pmem_pool)) {
        fprintf(stderr, "pmemobj_open: %s\n", pmemobj_errormsg());
        free(new_entry);
        return -1;
    }

    /* check to make sure the root is properly set */
    PMEMoid root_oid = pmemobj_root(new_entry->pmem_pool, sizeof(bake_root_t));
    new_entry->pmem_root = pmemobj_direct(root_oid);
    bake_target_id_t key = new_entry->pmem_root->pool_id;
    new_entry->target_id = key;

    if(uuid_is_null(key.id))
    {
        fprintf(stderr, "Error: BAKE pool %s is not properly initialized\n", target_name);
        pmemobj_close(new_entry->pmem_pool);
        free(new_entry);
        return(-1);
    }

    /* insert in the provider's hash */
    HASH_ADD(hh, provider->targets, target_id, sizeof(bake_target_id_t), new_entry);
    /* check that it was inserted */
    bake_pmem_entry_t* check_entry = NULL;
    HASH_FIND(hh, provider->targets, &key, sizeof(bake_target_id_t), check_entry);
    if(check_entry != new_entry) {
        fprintf(stderr, "Error: BAKE could not insert new pmem pool into the hash\n");
        pmemobj_close(new_entry->pmem_pool);
        free(new_entry);
        return -1;
    }

    provider->num_targets += 1;
    *target_id = key;
    return 0;
}

static bake_pmem_entry_t* find_pmem_entry(
            bake_provider_t provider,
            bake_target_id_t target_id)
{
    bake_pmem_entry_t* entry = NULL;
    HASH_FIND(hh, provider->targets, &target_id, sizeof(bake_target_id_t), entry);
    return entry;
}

int bake_provider_remove_storage_target(
        bake_provider_t provider,
        bake_target_id_t target_id)
{
    bake_pmem_entry_t* entry = NULL;
    HASH_FIND(hh, provider->targets, &target_id, sizeof(bake_target_id_t), entry);
    if(!entry) return -1;
    pmemobj_close(entry->pmem_pool);
    HASH_DEL(provider->targets, entry);
    free(entry);
    return 0;
}

int bake_provider_remove_all_storage_targets(
        bake_provider_t provider)
{
    bake_pmem_entry_t *p, *tmp;
    HASH_ITER(hh, provider->targets, p, tmp) {
        HASH_DEL(provider->targets, p);
        pmemobj_close(p->pmem_pool);
        free(p);
    }
229
    provider->num_targets = 0;
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
    return 0;
}

int bake_provider_count_storage_targets(
        bake_provider_t provider,
        uint64_t* num_targets)
{
    *num_targets = provider->num_targets;
    return 0;
}

int bake_provider_list_storage_targets(
        bake_provider_t provider,
        bake_target_id_t* targets)
{
    bake_pmem_entry_t *p, *tmp;
    uint64_t i = 0;
    HASH_ITER(hh, provider->targets, p, tmp) {
        targets[i] = p->target_id;
        i += 1;
    }
    return 0;
}

254
/* service a remote RPC that creates a BAKE region */
255
static void bake_create_ult(hg_handle_t handle)
256
{
257 258
    bake_create_out_t out;
    bake_create_in_t in;
259 260 261
    hg_return_t hret;
    pmemobj_region_id_t* prid;

262 263 264 265 266
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
    assert(mid);
    const struct hg_info* info = margo_get_info(handle);
    bake_provider_t svr_ctx = 
        margo_registered_data_mplex(mid, info->id, info->target_id);
267 268
    if(!svr_ctx) {
        fprintf(stderr, "Error: BAKE create could not find provider\n"); 
269
        goto respond_with_error;
270 271
    }

272 273 274 275
    hret = margo_get_input(handle, &in);
    if(hret != HG_SUCCESS)
        goto respond_with_error;

276 277 278 279 280 281 282
    /* find the pmem pool */
    bake_pmem_entry_t* entry = find_pmem_entry(svr_ctx, in.bti);
    if(entry == NULL) {
        fprintf(stderr, "Error: BAKE create could not find storage target\n");
        goto respond_with_error;
    }

283
    /* TODO: this check needs to be somewhere else */
284
    assert(sizeof(pmemobj_region_id_t) <= BAKE_REGION_ID_DATA_SIZE);
285

286 287 288 289
    memset(&out, 0, sizeof(out));

    prid = (pmemobj_region_id_t*)out.rid.data;
    prid->size = in.region_size;
290 291 292
    /* find the pmem pool */
    out.ret = pmemobj_alloc(entry->pmem_pool, &prid->oid,
            in.region_size, 0, NULL, NULL);
293

294 295
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
296 297

finish:
298
    margo_destroy(handle);
299
    return;
300 301 302 303 304

respond_with_error:
    out.ret = -1;
    margo_respond(handle, &out);
    goto finish;
305
}
306
DEFINE_MARGO_RPC_HANDLER(bake_create_ult)
307

308
    /* service a remote RPC that writes to a BAKE region */
309
static void bake_write_ult(hg_handle_t handle)
310
{
311 312
    bake_write_out_t out;
    bake_write_in_t in;
313
    hg_return_t hret;
314
    hg_addr_t src_addr;
315 316
    char* buffer;
    hg_bulk_t bulk_handle;
317
    const struct hg_info *hgi;
318 319 320 321 322
    margo_instance_id mid;
    pmemobj_region_id_t* prid;

    memset(&out, 0, sizeof(out));

323 324
    mid = margo_hg_handle_get_instance(handle);
    assert(mid);
325
    hgi = margo_get_info(handle);
326 327 328 329 330 331 332 333
    bake_provider_t svr_ctx = 
        margo_registered_data_mplex(mid, hgi->id, hgi->target_id);
    if(!svr_ctx) {
        out.ret = -1;
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }
334

335
    hret = margo_get_input(handle, &in);
336 337 338
    if(hret != HG_SUCCESS)
    {
        out.ret = -1;
339 340
        margo_respond(handle, &out);
        margo_destroy(handle);
341 342 343 344 345 346 347 348 349 350
        return;
    }

    prid = (pmemobj_region_id_t*)in.rid.data;

    /* find memory address for target object */
    buffer = pmemobj_direct(prid->oid);
    if(!buffer)
    {
        out.ret = -1;
351 352 353
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
354 355 356 357
        return;
    }

    /* create bulk handle for local side of transfer */
358
    hret = margo_bulk_create(mid, 1, (void**)(&buffer), &in.bulk_size, 
359
            HG_BULK_WRITE_ONLY, &bulk_handle);
360 361 362
    if(hret != HG_SUCCESS)
    {
        out.ret = -1;
363 364 365
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
366 367 368
        return;
    }

369 370
    if(in.remote_addr_str)
    {
371
        /* a proxy address was provided to pull write data from */
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389
        hret = margo_addr_lookup(mid, in.remote_addr_str, &src_addr);
        if(hret != HG_SUCCESS)
        {
            out.ret = -1;
            margo_bulk_free(bulk_handle);
            margo_free_input(handle, &in);
            margo_respond(handle, &out);
            margo_destroy(handle);
            return;
        }
    }
    else
    {
        /* no proxy write, use the source of this request */
        src_addr = hgi->addr;
    }

    hret = margo_bulk_transfer(mid, HG_BULK_PULL, src_addr, in.bulk_handle,
390
            in.bulk_offset, bulk_handle, 0, in.bulk_size);
391
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
392
    {
393
        out.ret = -1;
394 395
        if(in.remote_addr_str)
            margo_addr_free(mid, src_addr);
396 397 398 399
        margo_bulk_free(bulk_handle);
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
400
        return;
Philip Carns's avatar
Philip Carns committed
401 402
    }

403 404
    out.ret = 0;

405 406
    if(in.remote_addr_str)
        margo_addr_free(mid, src_addr);
407 408 409 410
    margo_bulk_free(bulk_handle);
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
411 412
    return;
}
413
DEFINE_MARGO_RPC_HANDLER(bake_write_ult)
414

415
    /* service a remote RPC that writes to a BAKE region in eager mode */
416
static void bake_eager_write_ult(hg_handle_t handle)
417
{
418 419
    bake_eager_write_out_t out;
    bake_eager_write_in_t in;
420 421 422 423 424 425 426
    hg_return_t hret;
    char* buffer;
    hg_bulk_t bulk_handle;
    pmemobj_region_id_t* prid;

    memset(&out, 0, sizeof(out));

427 428 429 430 431 432 433 434 435 436 437 438
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
    assert(mid);
    const struct hg_info* info = margo_get_info(handle);
    bake_provider_t svr_ctx = 
        margo_registered_data_mplex(mid, info->id, info->target_id);
    if(!svr_ctx) {
        out.ret = -1;
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

439
    hret = margo_get_input(handle, &in);
440 441 442
    if(hret != HG_SUCCESS)
    {
        out.ret = -1;
443 444
        margo_respond(handle, &out);
        margo_destroy(handle);
445 446 447 448 449 450 451 452
        return;
    }

    prid = (pmemobj_region_id_t*)in.rid.data;

    /* find memory address for target object */
    buffer = pmemobj_direct(prid->oid);
    if(!buffer)
453
    {
454
        out.ret = -1;
455 456 457
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
458
        return;
459
    }
460 461 462 463 464

    memcpy(buffer, in.buffer, in.size);

    out.ret = 0;

465 466 467
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
468 469
    return;
}
470
DEFINE_MARGO_RPC_HANDLER(bake_eager_write_ult)
471

472
    /* service a remote RPC that persists to a BAKE region */
473
static void bake_persist_ult(hg_handle_t handle)
474
{
475 476
    bake_persist_out_t out;
    bake_persist_in_t in;
477 478 479
    hg_return_t hret;
    char* buffer;
    pmemobj_region_id_t* prid;
480

481 482
    memset(&out, 0, sizeof(out));

483 484 485 486 487 488 489 490 491 492 493 494
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
    assert(mid);
    const struct hg_info* info = margo_get_info(handle);
    bake_provider_t svr_ctx = 
        margo_registered_data_mplex(mid, info->id, info->target_id);
    if(!svr_ctx) {
        out.ret = -1;
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

495
    hret = margo_get_input(handle, &in);
496
    if(hret != HG_SUCCESS)
497
    {
498
        out.ret = -1;
499 500
        margo_respond(handle, &out);
        margo_destroy(handle);
501
        return;
502
    }
503

504 505 506 507 508
    prid = (pmemobj_region_id_t*)in.rid.data;

    /* find memory address for target object */
    buffer = pmemobj_direct(prid->oid);
    if(!buffer)
Philip Carns's avatar
Philip Carns committed
509
    {
510
        out.ret = -1;
511 512 513
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
514
        return;
Philip Carns's avatar
Philip Carns committed
515
    }
516 517

    /* TODO: should this have an abt shim in case it blocks? */
518 519
    PMEMobjpool* pmem_pool = pmemobj_pool_by_oid(prid->oid);
    pmemobj_persist(pmem_pool, buffer, prid->size);
520 521 522

    out.ret = 0;

523 524 525
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
526 527
    return;
}
528
DEFINE_MARGO_RPC_HANDLER(bake_persist_ult)
529

530 531 532 533 534 535 536 537 538 539 540 541 542 543 544
static void bake_create_write_persist_ult(hg_handle_t handle)
{
    bake_create_write_persist_out_t out;
    bake_create_write_persist_in_t in;
    hg_addr_t src_addr;
    char* buffer;
    hg_bulk_t bulk_handle;
    const struct hg_info *hgi;
    margo_instance_id mid;
    hg_return_t hret;
    int ret;
    pmemobj_region_id_t* prid;

    memset(&out, 0, sizeof(out));

545 546
    mid = margo_hg_handle_get_instance(handle);
    assert(mid);
547
    hgi = margo_get_info(handle);
548 549 550
    bake_provider_t svr_ctx = 
        margo_registered_data_mplex(mid, hgi->id, hgi->target_id);
    if(!svr_ctx) {
551
        fprintf(stderr, "Error: BAKE create_write_persist could not find provider\n");
552 553 554 555 556 557 558 559
        out.ret = -1;
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

    /* TODO: this check needs to be somewhere else */
    assert(sizeof(pmemobj_region_id_t) <= BAKE_REGION_ID_DATA_SIZE);
560 561 562 563 564 565 566 567 568 569

    hret = margo_get_input(handle, &in);
    if(hret != HG_SUCCESS)
    {
        out.ret = -1;
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

570 571 572 573 574 575 576 577 578 579 580
    /* find the pmem pool */
    bake_pmem_entry_t* entry = find_pmem_entry(svr_ctx, in.bti);
    if(entry == NULL) {
        fprintf(stderr, "Error: BAKE create_write_persist could not find storage target\n");
        out.ret = -1;
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

581 582
    prid = (pmemobj_region_id_t*)out.rid.data;
    prid->size = in.region_size;
583 584
    ret = pmemobj_alloc(entry->pmem_pool, &prid->oid,
            in.region_size, 0, NULL, NULL);
585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606
    if(ret != 0)
    {
        out.ret = -1;
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

    /* find memory address for target object */
    buffer = pmemobj_direct(prid->oid);
    if(!buffer)
    {
        out.ret = -1;
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

    /* create bulk handle for local side of transfer */
    hret = margo_bulk_create(mid, 1, (void**)(&buffer), &in.bulk_size,
607
            HG_BULK_WRITE_ONLY, &bulk_handle);
608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637
    if(hret != HG_SUCCESS)
    {
        out.ret = -1;
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

    if(in.remote_addr_str)
    {
        /* a proxy address was provided to pull write data from */
        hret = margo_addr_lookup(mid, in.remote_addr_str, &src_addr);
        if(hret != HG_SUCCESS)
        {
            out.ret = -1;
            margo_bulk_free(bulk_handle);
            margo_free_input(handle, &in);
            margo_respond(handle, &out);
            margo_destroy(handle);
            return;
        }
    }
    else
    {
        /* no proxy write, use the source of this request */
        src_addr = hgi->addr;
    }

    hret = margo_bulk_transfer(mid, HG_BULK_PULL, src_addr, in.bulk_handle,
638
            in.bulk_offset, bulk_handle, 0, in.bulk_size);
639 640 641 642 643 644 645 646 647 648 649 650 651
    if(hret != HG_SUCCESS)
    {
        out.ret = -1;
        if(in.remote_addr_str)
            margo_addr_free(mid, src_addr);
        margo_bulk_free(bulk_handle);
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

    /* TODO: should this have an abt shim in case it blocks? */
652
    pmemobj_persist(entry->pmem_pool, buffer, prid->size);
653 654 655 656 657 658 659 660 661 662 663 664 665

    out.ret = 0;

    if(in.remote_addr_str)
        margo_addr_free(mid, src_addr);
    margo_bulk_free(bulk_handle);
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_create_write_persist_ult)

666
    /* service a remote RPC that retrieves the size of a BAKE region */
667
static void bake_get_size_ult(hg_handle_t handle)
668
{
669 670
    bake_get_size_out_t out;
    bake_get_size_in_t in;
671 672 673 674 675
    hg_return_t hret;
    pmemobj_region_id_t* prid;

    memset(&out, 0, sizeof(out));

676 677 678 679 680 681 682 683 684 685 686
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
    assert(mid);
    const struct hg_info* hgi = margo_get_info(handle);
    bake_provider_t svr_ctx = 
        margo_registered_data_mplex(mid, hgi->id, hgi->target_id);
    if(!svr_ctx) {
        out.ret = -1;
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }
687

688
    hret = margo_get_input(handle, &in);
689
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
690
    {
691
        out.ret = -1;
692 693
        margo_respond(handle, &out);
        margo_destroy(handle);
694
        return;
Philip Carns's avatar
Philip Carns committed
695 696
    }

697 698 699 700 701 702
    prid = (pmemobj_region_id_t*)in.rid.data;

    /* kind of cheating here; the size is encoded in the RID */
    out.size = prid->size;
    out.ret = 0;

703 704 705
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
706 707
    return;
}
708
DEFINE_MARGO_RPC_HANDLER(bake_get_size_ult)
709

710
    /* service a remote RPC for a BAKE no-op */
711
static void bake_noop_ult(hg_handle_t handle)
712
{
713 714 715 716 717
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
    assert(mid);
    const struct hg_info* hgi = margo_get_info(handle);
    bake_provider_t svr_ctx = 
        margo_registered_data_mplex(mid, hgi->id, hgi->target_id);
718

719 720
    margo_respond(handle, NULL);
    margo_destroy(handle);
721 722
    return;
}
723
DEFINE_MARGO_RPC_HANDLER(bake_noop_ult)
724

725 726
    /* TODO consolidate with write handler; read and write are nearly identical */
    /* service a remote RPC that reads from a BAKE region */
727
static void bake_read_ult(hg_handle_t handle)
728
{
729 730
    bake_read_out_t out;
    bake_read_in_t in;
731
    hg_return_t hret;
732
    hg_addr_t src_addr;
733 734
    char* buffer;
    hg_bulk_t bulk_handle;
735
    const struct hg_info *hgi;
736 737 738 739 740
    margo_instance_id mid;
    pmemobj_region_id_t* prid;

    memset(&out, 0, sizeof(out));

741 742
    mid = margo_hg_handle_get_instance(handle);
    assert(mid);
743
    hgi = margo_get_info(handle);
744 745 746 747 748 749 750 751
    bake_provider_t svr_ctx = 
        margo_registered_data_mplex(mid, hgi->id, hgi->target_id);
    if(!svr_ctx) {
        out.ret = -1;
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }
752

753
    hret = margo_get_input(handle, &in);
754
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
755
    {
756
        out.ret = -1;
757 758
        margo_respond(handle, &out);
        margo_destroy(handle);
759
        return;
Philip Carns's avatar
Philip Carns committed
760 761
    }

762 763 764 765 766
    prid = (pmemobj_region_id_t*)in.rid.data;

    /* find memory address for target object */
    buffer = pmemobj_direct(prid->oid);
    if(!buffer)
Philip Carns's avatar
Philip Carns committed
767
    {
768
        out.ret = -1;
769 770 771
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
772
        return;
Philip Carns's avatar
Philip Carns committed
773 774
    }

775
    /* create bulk handle for local side of transfer */
776
    hret = margo_bulk_create(mid, 1, (void**)(&buffer), &in.bulk_size, 
777
            HG_BULK_READ_ONLY, &bulk_handle);
778
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
779
    {
780
        out.ret = -1;
781 782 783
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
784
        return;
Philip Carns's avatar
Philip Carns committed
785
    }
786

787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807
    if(in.remote_addr_str)
    {
        /* a proxy address was provided to push read data to */
        hret = margo_addr_lookup(mid, in.remote_addr_str, &src_addr);
        if(hret != HG_SUCCESS)
        {
            out.ret = -1;
            margo_bulk_free(bulk_handle);
            margo_free_input(handle, &in);
            margo_respond(handle, &out);
            margo_destroy(handle);
            return;
        }
    }
    else
    {
        /* no proxy write, use the source of this request */
        src_addr = hgi->addr;
    }

    hret = margo_bulk_transfer(mid, HG_BULK_PUSH, src_addr, in.bulk_handle,
808
            in.bulk_offset, bulk_handle, 0, in.bulk_size);
809
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
810
    {
811
        out.ret = -1;
812 813
        if(in.remote_addr_str)
            margo_addr_free(mid, src_addr);
814 815 816 817
        margo_bulk_free(bulk_handle);
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
818
        return;
Philip Carns's avatar
Philip Carns committed
819 820
    }

821
    out.ret = 0;
Philip Carns's avatar
Philip Carns committed
822

823 824
    if(in.remote_addr_str)
        margo_addr_free(mid, src_addr);
825 826 827 828
    margo_bulk_free(bulk_handle);
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
829 830
    return;
}
831
DEFINE_MARGO_RPC_HANDLER(bake_read_ult)
Philip Carns's avatar
Philip Carns committed
832

833 834
    /* service a remote RPC that reads from a BAKE region and eagerly sends
     * response */
835
static void bake_eager_read_ult(hg_handle_t handle)
836
{
837 838
    bake_eager_read_out_t out;
    bake_eager_read_in_t in;
839 840 841 842 843 844 845
    hg_return_t hret;
    char* buffer;
    hg_size_t size;
    pmemobj_region_id_t* prid;

    memset(&out, 0, sizeof(out));

846 847 848 849 850 851 852 853 854 855 856 857
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
    assert(mid);
    const struct hg_info* hgi = margo_get_info(handle);
    bake_provider_t svr_ctx = 
        margo_registered_data_mplex(mid, hgi->id, hgi->target_id);
    if(!svr_ctx) {
        out.ret = -1;
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

858
    hret = margo_get_input(handle, &in);
859 860 861
    if(hret != HG_SUCCESS)
    {
        out.ret = -1;
862 863
        margo_respond(handle, &out);
        margo_destroy(handle);
864 865 866 867
        return;
    }

    prid = (pmemobj_region_id_t*)in.rid.data;
Philip Carns's avatar
Philip Carns committed
868

869 870 871 872 873
    /* find memory address for target object */
    buffer = pmemobj_direct(prid->oid);
    if(!buffer)
    {
        out.ret = -1;
874 875 876
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
877 878
        return;
    }
879

880 881 882 883
    out.ret = 0;
    out.buffer = buffer;
    out.size = in.size;

884 885 886
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
887
    return;
Philip Carns's avatar
Philip Carns committed
888
}
889
DEFINE_MARGO_RPC_HANDLER(bake_eager_read_ult)
890

891
    /* service a remote RPC that probes for a BAKE target id */
892
static void bake_probe_ult(hg_handle_t handle)
893
{
894
    bake_probe_out_t out;
895

896 897
    memset(&out, 0, sizeof(out));

898 899 900 901 902 903 904 905 906 907 908 909
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
    assert(mid);
    const struct hg_info* hgi = margo_get_info(handle);
    bake_provider_t svr_ctx = 
        margo_registered_data_mplex(mid, hgi->id, hgi->target_id);
    if(!svr_ctx) {
        out.ret = -1;
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

910
    out.ret = 0;
911
    // XXX this is where we should handle multiple targets
912 913 914 915 916
    uint64_t targets_count;
    bake_provider_count_storage_targets(svr_ctx, &targets_count);
    bake_target_id_t targets[targets_count];
    bake_provider_list_storage_targets(svr_ctx, targets);

917
    out.targets = targets;
918
    out.num_targets = targets_count;
919

920 921
    margo_respond(handle, &out);
    margo_destroy(handle);
922 923
    return;
}
924
DEFINE_MARGO_RPC_HANDLER(bake_probe_ult)
925

926
static void bake_server_finalize_cb(void *data)
927
{
928 929 930
    bake_server_context_t *svr_ctx = (bake_server_context_t *)data;
    assert(svr_ctx);

931 932
    bake_provider_remove_all_storage_targets(svr_ctx);

933 934 935 936
    free(svr_ctx);

    return;
}
Philip Carns's avatar
Philip Carns committed
937