bake-server.c 25.2 KB
Newer Older
Philip Carns's avatar
Philip Carns committed
1 2 3 4 5 6
/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

7 8
#include "bake-config.h"

Philip Carns's avatar
Philip Carns committed
9
#include <assert.h>
10
#include <libpmemobj.h>
11
#include <bake-server.h>
12
#include "uthash.h"
13
#include "bake-rpc.h"
Philip Carns's avatar
Philip Carns committed
14

15
/* definition of BAKE root data structure (just a uuid for now) */
16
typedef struct
17
{   
18
    bake_target_id_t pool_id;
19
} bake_root_t;
20

21
/* definition of internal BAKE region_id_t identifier for libpmemobj back end */
22 23
typedef struct
{
24 25 26
    PMEMoid oid;
    uint64_t size;
} pmemobj_region_id_t;
27

28 29 30 31 32 33 34 35
typedef struct
{
    PMEMobjpool* pmem_pool;
    bake_root_t* pmem_root;
    bake_target_id_t target_id;
    UT_hash_handle hh;
} bake_pmem_entry_t;

36
typedef struct bake_server_context_t
37
{
38 39
    uint64_t num_targets;
    bake_pmem_entry_t* targets;
40
} bake_server_context_t;
41

42
static void bake_server_finalize_cb(void *data);
43

44
int bake_makepool(
45 46 47
        const char *pool_name,
        size_t pool_size,
        mode_t pool_mode)
48
{
49
    PMEMobjpool *pool;
50
    PMEMoid root_oid;
51
    bake_root_t *root;
52

53 54
    pool = pmemobj_create(pool_name, NULL, pool_size, pool_mode);
    if(!pool)
55
    {
56 57
        fprintf(stderr, "pmemobj_create: %s\n", pmemobj_errormsg());
        return(-1);
58 59 60
    }

    /* find root */
61
    root_oid = pmemobj_root(pool, sizeof(bake_root_t));
62 63 64
    root = pmemobj_direct(root_oid);

    /* store the target id for this bake pool at the root */
65
    uuid_generate(root->pool_id.id);
66
    pmemobj_persist(pool, root, sizeof(bake_root_t));
67
#if 0
68
    char target_string[64];
69
    uuid_unparse(root->id, target_string);
70
    fprintf(stderr, "created BAKE target ID: %s\n", target_string);
71
#endif
72 73 74 75 76 77

    pmemobj_close(pool);

    return(0);
}

78
int bake_provider_register(
79
        margo_instance_id mid,
80
        uint16_t provider_id,
81 82
        ABT_pool abt_pool,
        bake_provider_t* provider)
83
{
84
    bake_server_context_t *tmp_svr_ctx;
85

86
    /* check if a provider with the same multiplex id already exists */
87
    {
88 89
        hg_id_t id;
        hg_bool_t flag;
90
        margo_provider_registered_name(mid, "bake_probe_rpc", provider_id, &id, &flag);
91
        if(flag == HG_TRUE) {
92
            fprintf(stderr, "bake_provider_register(): a provider with the same id (%d) already exists\n", provider_id);
93 94
            return -1;
        }
95
    }
96

97 98 99
    /* allocate the resulting structure */    
    tmp_svr_ctx = calloc(1,sizeof(*tmp_svr_ctx));
    if(!tmp_svr_ctx)
100
        return(-1);
Philip Carns's avatar
Philip Carns committed
101

102
    /* register RPCs */
103
    hg_id_t rpc_id;
104
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_create_rpc",
105
            bake_create_in_t, bake_create_out_t, 
106 107 108
            bake_create_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_write_rpc",
109
            bake_write_in_t, bake_write_out_t, 
110 111 112
            bake_write_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_eager_write_rpc",
113
            bake_eager_write_in_t, bake_eager_write_out_t, 
114 115 116
            bake_eager_write_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_eager_read_rpc",
117
            bake_eager_read_in_t, bake_eager_read_out_t, 
118 119 120
            bake_eager_read_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_persist_rpc",
121
            bake_persist_in_t, bake_persist_out_t, 
122 123 124
            bake_persist_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_create_write_persist_rpc",
125
            bake_create_write_persist_in_t, bake_create_write_persist_out_t,
126 127 128
            bake_create_write_persist_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_get_size_rpc",
129
            bake_get_size_in_t, bake_get_size_out_t, 
130 131 132
            bake_get_size_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_read_rpc",
133
            bake_read_in_t, bake_read_out_t, 
134 135 136
            bake_read_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_probe_rpc",
137
            bake_probe_in_t, bake_probe_out_t, bake_probe_ult, 
138 139 140 141 142
            provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_noop_rpc",
            void, void, bake_noop_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
143

144 145 146
    /* install the bake server finalize callback */
    margo_push_finalize_callback(mid, &bake_server_finalize_cb, tmp_svr_ctx);

147 148
    if(provider != BAKE_PROVIDER_IGNORE)
        *provider = tmp_svr_ctx;
149

150
    return(0);
151 152
}

153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
int bake_provider_add_storage_target(
        bake_provider_t provider,
        const char *target_name,
        bake_target_id_t* target_id)
{
    bake_pmem_entry_t* new_entry = calloc(1, sizeof(*new_entry));

    new_entry->pmem_pool = pmemobj_open(target_name, NULL);
    if(!(new_entry->pmem_pool)) {
        fprintf(stderr, "pmemobj_open: %s\n", pmemobj_errormsg());
        free(new_entry);
        return -1;
    }

    /* check to make sure the root is properly set */
    PMEMoid root_oid = pmemobj_root(new_entry->pmem_pool, sizeof(bake_root_t));
    new_entry->pmem_root = pmemobj_direct(root_oid);
    bake_target_id_t key = new_entry->pmem_root->pool_id;
    new_entry->target_id = key;

    if(uuid_is_null(key.id))
    {
        fprintf(stderr, "Error: BAKE pool %s is not properly initialized\n", target_name);
        pmemobj_close(new_entry->pmem_pool);
        free(new_entry);
        return(-1);
    }

    /* insert in the provider's hash */
    HASH_ADD(hh, provider->targets, target_id, sizeof(bake_target_id_t), new_entry);
    /* check that it was inserted */
    bake_pmem_entry_t* check_entry = NULL;
    HASH_FIND(hh, provider->targets, &key, sizeof(bake_target_id_t), check_entry);
    if(check_entry != new_entry) {
        fprintf(stderr, "Error: BAKE could not insert new pmem pool into the hash\n");
        pmemobj_close(new_entry->pmem_pool);
        free(new_entry);
        return -1;
    }

    provider->num_targets += 1;
    *target_id = key;
    return 0;
}

static bake_pmem_entry_t* find_pmem_entry(
            bake_provider_t provider,
            bake_target_id_t target_id)
{
    bake_pmem_entry_t* entry = NULL;
    HASH_FIND(hh, provider->targets, &target_id, sizeof(bake_target_id_t), entry);
    return entry;
}

int bake_provider_remove_storage_target(
        bake_provider_t provider,
        bake_target_id_t target_id)
{
    bake_pmem_entry_t* entry = NULL;
    HASH_FIND(hh, provider->targets, &target_id, sizeof(bake_target_id_t), entry);
    if(!entry) return -1;
    pmemobj_close(entry->pmem_pool);
    HASH_DEL(provider->targets, entry);
    free(entry);
    return 0;
}

int bake_provider_remove_all_storage_targets(
        bake_provider_t provider)
{
    bake_pmem_entry_t *p, *tmp;
    HASH_ITER(hh, provider->targets, p, tmp) {
        HASH_DEL(provider->targets, p);
        pmemobj_close(p->pmem_pool);
        free(p);
    }
229
    provider->num_targets = 0;
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
    return 0;
}

int bake_provider_count_storage_targets(
        bake_provider_t provider,
        uint64_t* num_targets)
{
    *num_targets = provider->num_targets;
    return 0;
}

int bake_provider_list_storage_targets(
        bake_provider_t provider,
        bake_target_id_t* targets)
{
    bake_pmem_entry_t *p, *tmp;
    uint64_t i = 0;
    HASH_ITER(hh, provider->targets, p, tmp) {
        targets[i] = p->target_id;
        i += 1;
    }
    return 0;
}

254
/* service a remote RPC that creates a BAKE region */
255
static void bake_create_ult(hg_handle_t handle)
256
{
257 258
    bake_create_out_t out;
    bake_create_in_t in;
259 260 261
    hg_return_t hret;
    pmemobj_region_id_t* prid;

262 263 264 265
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
    assert(mid);
    const struct hg_info* info = margo_get_info(handle);
    bake_provider_t svr_ctx = 
266
        margo_registered_data(mid, info->id);
267 268
    if(!svr_ctx) {
        fprintf(stderr, "Error: BAKE create could not find provider\n"); 
269
        goto respond_with_error;
270 271
    }

272 273 274 275
    hret = margo_get_input(handle, &in);
    if(hret != HG_SUCCESS)
        goto respond_with_error;

276 277 278 279 280 281 282
    /* find the pmem pool */
    bake_pmem_entry_t* entry = find_pmem_entry(svr_ctx, in.bti);
    if(entry == NULL) {
        fprintf(stderr, "Error: BAKE create could not find storage target\n");
        goto respond_with_error;
    }

283
    /* TODO: this check needs to be somewhere else */
284
    assert(sizeof(pmemobj_region_id_t) <= BAKE_REGION_ID_DATA_SIZE);
285

286 287 288 289
    memset(&out, 0, sizeof(out));

    prid = (pmemobj_region_id_t*)out.rid.data;
    prid->size = in.region_size;
290 291 292
    /* find the pmem pool */
    out.ret = pmemobj_alloc(entry->pmem_pool, &prid->oid,
            in.region_size, 0, NULL, NULL);
293

294 295
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
296 297

finish:
298
    margo_destroy(handle);
299
    return;
300 301 302 303 304

respond_with_error:
    out.ret = -1;
    margo_respond(handle, &out);
    goto finish;
305
}
306
DEFINE_MARGO_RPC_HANDLER(bake_create_ult)
307

308
    /* service a remote RPC that writes to a BAKE region */
309
static void bake_write_ult(hg_handle_t handle)
310
{
311 312
    bake_write_out_t out;
    bake_write_in_t in;
313
    hg_return_t hret;
314
    hg_addr_t src_addr;
315 316
    char* buffer;
    hg_bulk_t bulk_handle;
317
    const struct hg_info *hgi;
318 319 320 321 322
    margo_instance_id mid;
    pmemobj_region_id_t* prid;

    memset(&out, 0, sizeof(out));

323 324
    mid = margo_hg_handle_get_instance(handle);
    assert(mid);
325
    hgi = margo_get_info(handle);
326
    bake_provider_t svr_ctx = margo_registered_data(mid, hgi->id);
327 328 329 330 331 332
    if(!svr_ctx) {
        out.ret = -1;
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }
333

334
    hret = margo_get_input(handle, &in);
335 336 337
    if(hret != HG_SUCCESS)
    {
        out.ret = -1;
338 339
        margo_respond(handle, &out);
        margo_destroy(handle);
340 341 342 343 344 345 346 347 348 349
        return;
    }

    prid = (pmemobj_region_id_t*)in.rid.data;

    /* find memory address for target object */
    buffer = pmemobj_direct(prid->oid);
    if(!buffer)
    {
        out.ret = -1;
350 351 352
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
353 354 355 356
        return;
    }

    /* create bulk handle for local side of transfer */
357
    hret = margo_bulk_create(mid, 1, (void**)(&buffer), &in.bulk_size, 
358
            HG_BULK_WRITE_ONLY, &bulk_handle);
359 360 361
    if(hret != HG_SUCCESS)
    {
        out.ret = -1;
362 363 364
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
365 366 367
        return;
    }

368 369
    if(in.remote_addr_str)
    {
370
        /* a proxy address was provided to pull write data from */
371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388
        hret = margo_addr_lookup(mid, in.remote_addr_str, &src_addr);
        if(hret != HG_SUCCESS)
        {
            out.ret = -1;
            margo_bulk_free(bulk_handle);
            margo_free_input(handle, &in);
            margo_respond(handle, &out);
            margo_destroy(handle);
            return;
        }
    }
    else
    {
        /* no proxy write, use the source of this request */
        src_addr = hgi->addr;
    }

    hret = margo_bulk_transfer(mid, HG_BULK_PULL, src_addr, in.bulk_handle,
389
            in.bulk_offset, bulk_handle, 0, in.bulk_size);
390
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
391
    {
392
        out.ret = -1;
393 394
        if(in.remote_addr_str)
            margo_addr_free(mid, src_addr);
395 396 397 398
        margo_bulk_free(bulk_handle);
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
399
        return;
Philip Carns's avatar
Philip Carns committed
400 401
    }

402 403
    out.ret = 0;

404 405
    if(in.remote_addr_str)
        margo_addr_free(mid, src_addr);
406 407 408 409
    margo_bulk_free(bulk_handle);
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
410 411
    return;
}
412
DEFINE_MARGO_RPC_HANDLER(bake_write_ult)
413

414
    /* service a remote RPC that writes to a BAKE region in eager mode */
415
static void bake_eager_write_ult(hg_handle_t handle)
416
{
417 418
    bake_eager_write_out_t out;
    bake_eager_write_in_t in;
419 420 421 422 423 424 425
    hg_return_t hret;
    char* buffer;
    hg_bulk_t bulk_handle;
    pmemobj_region_id_t* prid;

    memset(&out, 0, sizeof(out));

426 427 428
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
    assert(mid);
    const struct hg_info* info = margo_get_info(handle);
429
    bake_provider_t svr_ctx = margo_registered_data(mid, info->id);
430 431 432 433 434 435 436
    if(!svr_ctx) {
        out.ret = -1;
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

437
    hret = margo_get_input(handle, &in);
438 439 440
    if(hret != HG_SUCCESS)
    {
        out.ret = -1;
441 442
        margo_respond(handle, &out);
        margo_destroy(handle);
443 444 445 446 447 448 449 450
        return;
    }

    prid = (pmemobj_region_id_t*)in.rid.data;

    /* find memory address for target object */
    buffer = pmemobj_direct(prid->oid);
    if(!buffer)
451
    {
452
        out.ret = -1;
453 454 455
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
456
        return;
457
    }
458 459 460 461 462

    memcpy(buffer, in.buffer, in.size);

    out.ret = 0;

463 464 465
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
466 467
    return;
}
468
DEFINE_MARGO_RPC_HANDLER(bake_eager_write_ult)
469

470
    /* service a remote RPC that persists to a BAKE region */
471
static void bake_persist_ult(hg_handle_t handle)
472
{
473 474
    bake_persist_out_t out;
    bake_persist_in_t in;
475 476 477
    hg_return_t hret;
    char* buffer;
    pmemobj_region_id_t* prid;
478

479 480
    memset(&out, 0, sizeof(out));

481 482 483
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
    assert(mid);
    const struct hg_info* info = margo_get_info(handle);
484
    bake_provider_t svr_ctx = margo_registered_data(mid, info->id);
485 486 487 488 489 490 491
    if(!svr_ctx) {
        out.ret = -1;
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

492
    hret = margo_get_input(handle, &in);
493
    if(hret != HG_SUCCESS)
494
    {
495
        out.ret = -1;
496 497
        margo_respond(handle, &out);
        margo_destroy(handle);
498
        return;
499
    }
500

501 502 503 504 505
    prid = (pmemobj_region_id_t*)in.rid.data;

    /* find memory address for target object */
    buffer = pmemobj_direct(prid->oid);
    if(!buffer)
Philip Carns's avatar
Philip Carns committed
506
    {
507
        out.ret = -1;
508 509 510
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
511
        return;
Philip Carns's avatar
Philip Carns committed
512
    }
513 514

    /* TODO: should this have an abt shim in case it blocks? */
515 516
    PMEMobjpool* pmem_pool = pmemobj_pool_by_oid(prid->oid);
    pmemobj_persist(pmem_pool, buffer, prid->size);
517 518 519

    out.ret = 0;

520 521 522
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
523 524
    return;
}
525
DEFINE_MARGO_RPC_HANDLER(bake_persist_ult)
526

527 528 529 530 531 532 533 534 535 536 537 538 539 540 541
static void bake_create_write_persist_ult(hg_handle_t handle)
{
    bake_create_write_persist_out_t out;
    bake_create_write_persist_in_t in;
    hg_addr_t src_addr;
    char* buffer;
    hg_bulk_t bulk_handle;
    const struct hg_info *hgi;
    margo_instance_id mid;
    hg_return_t hret;
    int ret;
    pmemobj_region_id_t* prid;

    memset(&out, 0, sizeof(out));

542 543
    mid = margo_hg_handle_get_instance(handle);
    assert(mid);
544
    hgi = margo_get_info(handle);
545
    bake_provider_t svr_ctx = margo_registered_data(mid, hgi->id);
546
    if(!svr_ctx) {
547
        fprintf(stderr, "Error: BAKE create_write_persist could not find provider\n");
548 549 550 551 552 553 554 555
        out.ret = -1;
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

    /* TODO: this check needs to be somewhere else */
    assert(sizeof(pmemobj_region_id_t) <= BAKE_REGION_ID_DATA_SIZE);
556 557 558 559 560 561 562 563 564 565

    hret = margo_get_input(handle, &in);
    if(hret != HG_SUCCESS)
    {
        out.ret = -1;
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

566 567 568 569 570 571 572 573 574 575 576
    /* find the pmem pool */
    bake_pmem_entry_t* entry = find_pmem_entry(svr_ctx, in.bti);
    if(entry == NULL) {
        fprintf(stderr, "Error: BAKE create_write_persist could not find storage target\n");
        out.ret = -1;
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

577 578
    prid = (pmemobj_region_id_t*)out.rid.data;
    prid->size = in.region_size;
579 580
    ret = pmemobj_alloc(entry->pmem_pool, &prid->oid,
            in.region_size, 0, NULL, NULL);
581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602
    if(ret != 0)
    {
        out.ret = -1;
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

    /* find memory address for target object */
    buffer = pmemobj_direct(prid->oid);
    if(!buffer)
    {
        out.ret = -1;
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

    /* create bulk handle for local side of transfer */
    hret = margo_bulk_create(mid, 1, (void**)(&buffer), &in.bulk_size,
603
            HG_BULK_WRITE_ONLY, &bulk_handle);
604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633
    if(hret != HG_SUCCESS)
    {
        out.ret = -1;
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

    if(in.remote_addr_str)
    {
        /* a proxy address was provided to pull write data from */
        hret = margo_addr_lookup(mid, in.remote_addr_str, &src_addr);
        if(hret != HG_SUCCESS)
        {
            out.ret = -1;
            margo_bulk_free(bulk_handle);
            margo_free_input(handle, &in);
            margo_respond(handle, &out);
            margo_destroy(handle);
            return;
        }
    }
    else
    {
        /* no proxy write, use the source of this request */
        src_addr = hgi->addr;
    }

    hret = margo_bulk_transfer(mid, HG_BULK_PULL, src_addr, in.bulk_handle,
634
            in.bulk_offset, bulk_handle, 0, in.bulk_size);
635 636 637 638 639 640 641 642 643 644 645 646 647
    if(hret != HG_SUCCESS)
    {
        out.ret = -1;
        if(in.remote_addr_str)
            margo_addr_free(mid, src_addr);
        margo_bulk_free(bulk_handle);
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

    /* TODO: should this have an abt shim in case it blocks? */
648
    pmemobj_persist(entry->pmem_pool, buffer, prid->size);
649 650 651 652 653 654 655 656 657 658 659 660 661

    out.ret = 0;

    if(in.remote_addr_str)
        margo_addr_free(mid, src_addr);
    margo_bulk_free(bulk_handle);
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_create_write_persist_ult)

662
    /* service a remote RPC that retrieves the size of a BAKE region */
663
static void bake_get_size_ult(hg_handle_t handle)
664
{
665 666
    bake_get_size_out_t out;
    bake_get_size_in_t in;
667 668 669 670 671
    hg_return_t hret;
    pmemobj_region_id_t* prid;

    memset(&out, 0, sizeof(out));

672 673 674
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
    assert(mid);
    const struct hg_info* hgi = margo_get_info(handle);
675
    bake_provider_t svr_ctx = margo_registered_data(mid, hgi->id);
676 677 678 679 680 681
    if(!svr_ctx) {
        out.ret = -1;
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }
682

683
    hret = margo_get_input(handle, &in);
684
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
685
    {
686
        out.ret = -1;
687 688
        margo_respond(handle, &out);
        margo_destroy(handle);
689
        return;
Philip Carns's avatar
Philip Carns committed
690 691
    }

692 693 694 695 696 697
    prid = (pmemobj_region_id_t*)in.rid.data;

    /* kind of cheating here; the size is encoded in the RID */
    out.size = prid->size;
    out.ret = 0;

698 699 700
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
701 702
    return;
}
703
DEFINE_MARGO_RPC_HANDLER(bake_get_size_ult)
704

705
    /* service a remote RPC for a BAKE no-op */
706
static void bake_noop_ult(hg_handle_t handle)
707
{
708 709 710
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
    assert(mid);
    const struct hg_info* hgi = margo_get_info(handle);
711
    bake_provider_t svr_ctx = margo_registered_data(mid, hgi->id);
712

713 714
    margo_respond(handle, NULL);
    margo_destroy(handle);
715 716
    return;
}
717
DEFINE_MARGO_RPC_HANDLER(bake_noop_ult)
718

719 720
    /* TODO consolidate with write handler; read and write are nearly identical */
    /* service a remote RPC that reads from a BAKE region */
721
static void bake_read_ult(hg_handle_t handle)
722
{
723 724
    bake_read_out_t out;
    bake_read_in_t in;
725
    hg_return_t hret;
726
    hg_addr_t src_addr;
727 728
    char* buffer;
    hg_bulk_t bulk_handle;
729
    const struct hg_info *hgi;
730 731 732 733 734
    margo_instance_id mid;
    pmemobj_region_id_t* prid;

    memset(&out, 0, sizeof(out));

735 736
    mid = margo_hg_handle_get_instance(handle);
    assert(mid);
737
    hgi = margo_get_info(handle);
738
    bake_provider_t svr_ctx = 
739
        margo_registered_data(mid, hgi->id);
740 741 742 743 744 745
    if(!svr_ctx) {
        out.ret = -1;
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }
746

747
    hret = margo_get_input(handle, &in);
748
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
749
    {
750
        out.ret = -1;
751 752
        margo_respond(handle, &out);
        margo_destroy(handle);
753
        return;
Philip Carns's avatar
Philip Carns committed
754 755
    }

756 757 758 759 760
    prid = (pmemobj_region_id_t*)in.rid.data;

    /* find memory address for target object */
    buffer = pmemobj_direct(prid->oid);
    if(!buffer)
Philip Carns's avatar
Philip Carns committed
761
    {
762
        out.ret = -1;
763 764 765
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
766
        return;
Philip Carns's avatar
Philip Carns committed
767 768
    }

769
    /* create bulk handle for local side of transfer */
770
    hret = margo_bulk_create(mid, 1, (void**)(&buffer), &in.bulk_size, 
771
            HG_BULK_READ_ONLY, &bulk_handle);
772
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
773
    {
774
        out.ret = -1;
775 776 777
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
778
        return;
Philip Carns's avatar
Philip Carns committed
779
    }
780

781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801
    if(in.remote_addr_str)
    {
        /* a proxy address was provided to push read data to */
        hret = margo_addr_lookup(mid, in.remote_addr_str, &src_addr);
        if(hret != HG_SUCCESS)
        {
            out.ret = -1;
            margo_bulk_free(bulk_handle);
            margo_free_input(handle, &in);
            margo_respond(handle, &out);
            margo_destroy(handle);
            return;
        }
    }
    else
    {
        /* no proxy write, use the source of this request */
        src_addr = hgi->addr;
    }

    hret = margo_bulk_transfer(mid, HG_BULK_PUSH, src_addr, in.bulk_handle,
802
            in.bulk_offset, bulk_handle, 0, in.bulk_size);
803
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
804
    {
805
        out.ret = -1;
806 807
        if(in.remote_addr_str)
            margo_addr_free(mid, src_addr);
808 809 810 811
        margo_bulk_free(bulk_handle);
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
812
        return;
Philip Carns's avatar
Philip Carns committed
813 814
    }

815
    out.ret = 0;
Philip Carns's avatar
Philip Carns committed
816

817 818
    if(in.remote_addr_str)
        margo_addr_free(mid, src_addr);
819 820 821 822
    margo_bulk_free(bulk_handle);
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
823 824
    return;
}
825
DEFINE_MARGO_RPC_HANDLER(bake_read_ult)
Philip Carns's avatar
Philip Carns committed
826

827 828
    /* service a remote RPC that reads from a BAKE region and eagerly sends
     * response */
829
static void bake_eager_read_ult(hg_handle_t handle)
830
{
831 832
    bake_eager_read_out_t out;
    bake_eager_read_in_t in;
833 834 835 836 837 838 839
    hg_return_t hret;
    char* buffer;
    hg_size_t size;
    pmemobj_region_id_t* prid;

    memset(&out, 0, sizeof(out));

840 841 842 843
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
    assert(mid);
    const struct hg_info* hgi = margo_get_info(handle);
    bake_provider_t svr_ctx = 
844
        margo_registered_data(mid, hgi->id);
845 846 847 848 849 850 851
    if(!svr_ctx) {
        out.ret = -1;
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

852
    hret = margo_get_input(handle, &in);
853 854 855
    if(hret != HG_SUCCESS)
    {
        out.ret = -1;
856 857
        margo_respond(handle, &out);
        margo_destroy(handle);
858 859 860 861
        return;
    }

    prid = (pmemobj_region_id_t*)in.rid.data;
Philip Carns's avatar
Philip Carns committed
862

863 864 865 866 867
    /* find memory address for target object */
    buffer = pmemobj_direct(prid->oid);
    if(!buffer)
    {
        out.ret = -1;
868 869 870
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
871 872
        return;
    }
873

874 875 876 877
    out.ret = 0;
    out.buffer = buffer;
    out.size = in.size;

878 879 880
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
881
    return;
Philip Carns's avatar
Philip Carns committed
882
}
883
DEFINE_MARGO_RPC_HANDLER(bake_eager_read_ult)
884

885
    /* service a remote RPC that probes for a BAKE target id */
886
static void bake_probe_ult(hg_handle_t handle)
887
{
888
    bake_probe_out_t out;
889

890 891
    memset(&out, 0, sizeof(out));

892 893 894 895
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
    assert(mid);
    const struct hg_info* hgi = margo_get_info(handle);
    bake_provider_t svr_ctx = 
896
        margo_registered_data(mid, hgi->id);
897 898 899 900 901 902 903
    if(!svr_ctx) {
        out.ret = -1;
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

904
    out.ret = 0;
905
    // XXX this is where we should handle multiple targets
906 907 908 909 910
    uint64_t targets_count;
    bake_provider_count_storage_targets(svr_ctx, &targets_count);
    bake_target_id_t targets[targets_count];
    bake_provider_list_storage_targets(svr_ctx, targets);

911
    out.targets = targets;
912
    out.num_targets = targets_count;
913

914 915
    margo_respond(handle, &out);
    margo_destroy(handle);
916 917
    return;
}
918
DEFINE_MARGO_RPC_HANDLER(bake_probe_ult)
919

920
static void bake_server_finalize_cb(void *data)
921
{
922 923 924
    bake_server_context_t *svr_ctx = (bake_server_context_t *)data;
    assert(svr_ctx);

925 926
    bake_provider_remove_all_storage_targets(svr_ctx);

927 928 929 930
    free(svr_ctx);

    return;
}
Philip Carns's avatar
Philip Carns committed
931