bake-server.c 35.8 KB
Newer Older
Philip Carns's avatar
Philip Carns committed
1 2 3 4 5 6
/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

7 8
#include "bake-config.h"

Philip Carns's avatar
Philip Carns committed
9
#include <assert.h>
10
#include <libpmemobj.h>
11
#include <bake-server.h>
12
#include "uthash.h"
13
#include "bake-rpc.h"
Philip Carns's avatar
Philip Carns committed
14

15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
DECLARE_MARGO_RPC_HANDLER(bake_shutdown_ult)
DECLARE_MARGO_RPC_HANDLER(bake_create_ult)
DECLARE_MARGO_RPC_HANDLER(bake_write_ult)
DECLARE_MARGO_RPC_HANDLER(bake_eager_write_ult)
DECLARE_MARGO_RPC_HANDLER(bake_persist_ult)
DECLARE_MARGO_RPC_HANDLER(bake_create_write_persist_ult)
DECLARE_MARGO_RPC_HANDLER(bake_get_size_ult)
DECLARE_MARGO_RPC_HANDLER(bake_get_data_ult)
DECLARE_MARGO_RPC_HANDLER(bake_read_ult)
DECLARE_MARGO_RPC_HANDLER(bake_eager_read_ult)
DECLARE_MARGO_RPC_HANDLER(bake_probe_ult)
DECLARE_MARGO_RPC_HANDLER(bake_noop_ult)
DECLARE_MARGO_RPC_HANDLER(bake_remove_ult)
DECLARE_MARGO_RPC_HANDLER(bake_migrate_ult)

30
/* definition of BAKE root data structure (just a uuid for now) */
31
typedef struct
32
{   
33
    bake_target_id_t pool_id;
34
} bake_root_t;
35

36
/* definition of internal BAKE region_id_t identifier for libpmemobj back end */
37 38
typedef struct
{
39 40
    PMEMoid oid;
} pmemobj_region_id_t;
41

42 43 44 45 46
typedef struct {
    uint64_t size;
    char data[1];
} region_content_t;

47 48 49 50 51 52 53 54
typedef struct
{
    PMEMobjpool* pmem_pool;
    bake_root_t* pmem_root;
    bake_target_id_t target_id;
    UT_hash_handle hh;
} bake_pmem_entry_t;

55
typedef struct bake_server_context_t
56
{
57 58
    uint64_t num_targets;
    bake_pmem_entry_t* targets;
59
    hg_id_t bake_create_write_persist_id;
60
} bake_server_context_t;
61

62
static void bake_server_finalize_cb(void *data);
63

64
int bake_makepool(
65 66 67
        const char *pool_name,
        size_t pool_size,
        mode_t pool_mode)
68
{
69
    PMEMobjpool *pool;
70
    PMEMoid root_oid;
71
    bake_root_t *root;
72

73 74
    pool = pmemobj_create(pool_name, NULL, pool_size, pool_mode);
    if(!pool)
75
    {
76
        fprintf(stderr, "pmemobj_create: %s\n", pmemobj_errormsg());
Matthieu Dorier's avatar
Matthieu Dorier committed
77
        return BAKE_ERR_PMEM;
78 79 80
    }

    /* find root */
81
    root_oid = pmemobj_root(pool, sizeof(bake_root_t));
82 83 84
    root = pmemobj_direct(root_oid);

    /* store the target id for this bake pool at the root */
85
    uuid_generate(root->pool_id.id);
86
    pmemobj_persist(pool, root, sizeof(bake_root_t));
87 88 89

    pmemobj_close(pool);

Matthieu Dorier's avatar
Matthieu Dorier committed
90
    return BAKE_SUCCESS;
91 92
}

93
int bake_provider_register(
94
        margo_instance_id mid,
95
        uint16_t provider_id,
96 97
        ABT_pool abt_pool,
        bake_provider_t* provider)
98
{
99
    bake_server_context_t *tmp_svr_ctx;
100

101
    /* check if a provider with the same multiplex id already exists */
102
    {
103 104
        hg_id_t id;
        hg_bool_t flag;
105
        margo_provider_registered_name(mid, "bake_probe_rpc", provider_id, &id, &flag);
106
        if(flag == HG_TRUE) {
107
            fprintf(stderr, "bake_provider_register(): a provider with the same id (%d) already exists\n", provider_id);
Matthieu Dorier's avatar
Matthieu Dorier committed
108
            return BAKE_ERR_MERCURY;
109
        }
110
    }
111

112 113 114
    /* allocate the resulting structure */    
    tmp_svr_ctx = calloc(1,sizeof(*tmp_svr_ctx));
    if(!tmp_svr_ctx)
Matthieu Dorier's avatar
Matthieu Dorier committed
115
        return BAKE_ERR_ALLOCATION;
Philip Carns's avatar
Philip Carns committed
116

117
    /* register RPCs */
118
    hg_id_t rpc_id;
119
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_create_rpc",
120
            bake_create_in_t, bake_create_out_t, 
121 122 123
            bake_create_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_write_rpc",
124
            bake_write_in_t, bake_write_out_t, 
125 126 127
            bake_write_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_eager_write_rpc",
128
            bake_eager_write_in_t, bake_eager_write_out_t, 
129 130 131
            bake_eager_write_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_eager_read_rpc",
132
            bake_eager_read_in_t, bake_eager_read_out_t, 
133 134 135
            bake_eager_read_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_persist_rpc",
136
            bake_persist_in_t, bake_persist_out_t, 
137 138 139
            bake_persist_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_create_write_persist_rpc",
140
            bake_create_write_persist_in_t, bake_create_write_persist_out_t,
141 142
            bake_create_write_persist_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
143
    tmp_svr_ctx->bake_create_write_persist_id = rpc_id;
144
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_get_size_rpc",
145
            bake_get_size_in_t, bake_get_size_out_t, 
146 147
            bake_get_size_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
148 149 150 151
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_get_data_rpc",
            bake_get_data_in_t, bake_get_data_out_t, 
            bake_get_data_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
152
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_read_rpc",
153
            bake_read_in_t, bake_read_out_t, 
154 155 156
            bake_read_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_probe_rpc",
157
            bake_probe_in_t, bake_probe_out_t, bake_probe_ult, 
158 159 160 161 162
            provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_noop_rpc",
            void, void, bake_noop_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
Shane Snyder's avatar
Shane Snyder committed
163 164 165
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_remove_rpc",
            bake_remove_in_t, bake_remove_out_t, bake_remove_ult,
            provider_id, abt_pool);
166
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
167 168 169 170
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_migrate_rpc",
            bake_migrate_in_t, bake_migrate_out_t, bake_migrate_ult,
            provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
171

172 173 174
    /* install the bake server finalize callback */
    margo_push_finalize_callback(mid, &bake_server_finalize_cb, tmp_svr_ctx);

175 176
    if(provider != BAKE_PROVIDER_IGNORE)
        *provider = tmp_svr_ctx;
177

Matthieu Dorier's avatar
Matthieu Dorier committed
178
    return BAKE_SUCCESS;
179 180
}

181 182 183 184 185 186 187 188 189 190 191
int bake_provider_add_storage_target(
        bake_provider_t provider,
        const char *target_name,
        bake_target_id_t* target_id)
{
    bake_pmem_entry_t* new_entry = calloc(1, sizeof(*new_entry));

    new_entry->pmem_pool = pmemobj_open(target_name, NULL);
    if(!(new_entry->pmem_pool)) {
        fprintf(stderr, "pmemobj_open: %s\n", pmemobj_errormsg());
        free(new_entry);
Matthieu Dorier's avatar
Matthieu Dorier committed
192
        return BAKE_ERR_PMEM;
193 194 195 196 197 198 199 200 201 202 203 204 205
    }

    /* check to make sure the root is properly set */
    PMEMoid root_oid = pmemobj_root(new_entry->pmem_pool, sizeof(bake_root_t));
    new_entry->pmem_root = pmemobj_direct(root_oid);
    bake_target_id_t key = new_entry->pmem_root->pool_id;
    new_entry->target_id = key;

    if(uuid_is_null(key.id))
    {
        fprintf(stderr, "Error: BAKE pool %s is not properly initialized\n", target_name);
        pmemobj_close(new_entry->pmem_pool);
        free(new_entry);
Matthieu Dorier's avatar
Matthieu Dorier committed
206
        return BAKE_ERR_UNKNOWN_TARGET;
207 208 209 210 211 212 213 214 215 216 217
    }

    /* insert in the provider's hash */
    HASH_ADD(hh, provider->targets, target_id, sizeof(bake_target_id_t), new_entry);
    /* check that it was inserted */
    bake_pmem_entry_t* check_entry = NULL;
    HASH_FIND(hh, provider->targets, &key, sizeof(bake_target_id_t), check_entry);
    if(check_entry != new_entry) {
        fprintf(stderr, "Error: BAKE could not insert new pmem pool into the hash\n");
        pmemobj_close(new_entry->pmem_pool);
        free(new_entry);
Matthieu Dorier's avatar
Matthieu Dorier committed
218
        return BAKE_ERR_ALLOCATION;
219 220 221 222
    }

    provider->num_targets += 1;
    *target_id = key;
Matthieu Dorier's avatar
Matthieu Dorier committed
223
    return BAKE_SUCCESS;
224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
}

static bake_pmem_entry_t* find_pmem_entry(
            bake_provider_t provider,
            bake_target_id_t target_id)
{
    bake_pmem_entry_t* entry = NULL;
    HASH_FIND(hh, provider->targets, &target_id, sizeof(bake_target_id_t), entry);
    return entry;
}

int bake_provider_remove_storage_target(
        bake_provider_t provider,
        bake_target_id_t target_id)
{
    bake_pmem_entry_t* entry = NULL;
    HASH_FIND(hh, provider->targets, &target_id, sizeof(bake_target_id_t), entry);
Matthieu Dorier's avatar
Matthieu Dorier committed
241
    if(!entry) return BAKE_ERR_UNKNOWN_TARGET;
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256
    pmemobj_close(entry->pmem_pool);
    HASH_DEL(provider->targets, entry);
    free(entry);
    return 0;
}

int bake_provider_remove_all_storage_targets(
        bake_provider_t provider)
{
    bake_pmem_entry_t *p, *tmp;
    HASH_ITER(hh, provider->targets, p, tmp) {
        HASH_DEL(provider->targets, p);
        pmemobj_close(p->pmem_pool);
        free(p);
    }
257
    provider->num_targets = 0;
Matthieu Dorier's avatar
Matthieu Dorier committed
258
    return BAKE_SUCCESS;
259 260 261 262 263 264 265
}

int bake_provider_count_storage_targets(
        bake_provider_t provider,
        uint64_t* num_targets)
{
    *num_targets = provider->num_targets;
Matthieu Dorier's avatar
Matthieu Dorier committed
266
    return BAKE_SUCCESS;
267 268 269 270 271 272 273 274 275 276 277 278
}

int bake_provider_list_storage_targets(
        bake_provider_t provider,
        bake_target_id_t* targets)
{
    bake_pmem_entry_t *p, *tmp;
    uint64_t i = 0;
    HASH_ITER(hh, provider->targets, p, tmp) {
        targets[i] = p->target_id;
        i += 1;
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
279
    return BAKE_SUCCESS;
280 281
}

282
/* service a remote RPC that creates a BAKE region */
283
static void bake_create_ult(hg_handle_t handle)
284
{
285 286
    bake_create_out_t out;
    bake_create_in_t in;
287 288 289
    hg_return_t hret;
    pmemobj_region_id_t* prid;

290 291 292 293
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
    assert(mid);
    const struct hg_info* info = margo_get_info(handle);
    bake_provider_t svr_ctx = 
294
        margo_registered_data(mid, info->id);
295
    if(!svr_ctx) {
Matthieu Dorier's avatar
Matthieu Dorier committed
296
        out.ret = BAKE_ERR_UNKNOWN_PROVIDER;
297
        goto respond_with_error;
298 299
    }

300
    hret = margo_get_input(handle, &in);
Matthieu Dorier's avatar
Matthieu Dorier committed
301 302
    if(hret != HG_SUCCESS) {
        out.ret = BAKE_ERR_MERCURY;
303
        goto respond_with_error;
Matthieu Dorier's avatar
Matthieu Dorier committed
304
    }
305

306 307 308
    /* find the pmem pool */
    bake_pmem_entry_t* entry = find_pmem_entry(svr_ctx, in.bti);
    if(entry == NULL) {
Matthieu Dorier's avatar
Matthieu Dorier committed
309
        out.ret = BAKE_ERR_UNKNOWN_TARGET;
310 311 312
        goto respond_with_error;
    }

313
    /* TODO: this check needs to be somewhere else */
314
    assert(sizeof(pmemobj_region_id_t) <= BAKE_REGION_ID_DATA_SIZE);
315

316 317 318
    memset(&out, 0, sizeof(out));

    prid = (pmemobj_region_id_t*)out.rid.data;
Matthieu Dorier's avatar
Matthieu Dorier committed
319

320
    size_t content_size = in.region_size + sizeof(uint64_t);
Matthieu Dorier's avatar
Matthieu Dorier committed
321
    int ret = pmemobj_alloc(entry->pmem_pool, &prid->oid,
322 323
            content_size, 0, NULL, NULL);
    if(ret != 0) {
Matthieu Dorier's avatar
Matthieu Dorier committed
324
        out.ret = BAKE_ERR_PMEM;
325 326 327 328 329 330 331 332 333 334 335
        goto respond_with_error;
    }

    region_content_t* region = (region_content_t*)pmemobj_direct(prid->oid);
    if(!region) {
        out.ret = BAKE_ERR_PMEM;
        goto respond_with_error;
    }
    region->size = in.region_size;
    PMEMobjpool* pmem_pool = pmemobj_pool_by_oid(prid->oid);
    pmemobj_persist(pmem_pool, &(region->size), sizeof(region->size));
336

Matthieu Dorier's avatar
Matthieu Dorier committed
337
    out.ret = BAKE_SUCCESS;
338
    margo_respond(handle, &out);
339 340

finish:
Matthieu Dorier's avatar
Matthieu Dorier committed
341
    margo_free_input(handle, &in);
342
    margo_destroy(handle);
343
    return;
344 345 346 347

respond_with_error:
    margo_respond(handle, &out);
    goto finish;
348
}
349
DEFINE_MARGO_RPC_HANDLER(bake_create_ult)
350

351
    /* service a remote RPC that writes to a BAKE region */
352
static void bake_write_ult(hg_handle_t handle)
353
{
354 355
    bake_write_out_t out;
    bake_write_in_t in;
356
    hg_return_t hret;
Matthieu Dorier's avatar
Matthieu Dorier committed
357
    hg_addr_t src_addr = HG_ADDR_NULL;
358
    char* buffer;
Matthieu Dorier's avatar
Matthieu Dorier committed
359
    hg_bulk_t bulk_handle = HG_BULK_NULL;
360
    const struct hg_info *hgi;
361 362 363 364 365
    margo_instance_id mid;
    pmemobj_region_id_t* prid;

    memset(&out, 0, sizeof(out));

366 367
    mid = margo_hg_handle_get_instance(handle);
    assert(mid);
368
    hgi = margo_get_info(handle);
369
    bake_provider_t svr_ctx = margo_registered_data(mid, hgi->id);
370
    if(!svr_ctx) {
Matthieu Dorier's avatar
Matthieu Dorier committed
371
        out.ret = BAKE_ERR_UNKNOWN_PROVIDER;
372 373 374 375
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }
376

377
    hret = margo_get_input(handle, &in);
378 379
    if(hret != HG_SUCCESS)
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
380
        out.ret = BAKE_ERR_MERCURY;
381 382
        margo_respond(handle, &out);
        margo_destroy(handle);
383 384 385 386 387 388
        return;
    }

    prid = (pmemobj_region_id_t*)in.rid.data;

    /* find memory address for target object */
389 390 391
    region_content_t* region = pmemobj_direct(prid->oid);

    if(!region)
392
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
393
        out.ret = BAKE_ERR_UNKNOWN_REGION;
394 395 396
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
397 398
        return;
    }
399 400 401 402 403 404 405 406 407 408

    if(in.region_offset + in.bulk_size > region->size) {
        out.ret = BAKE_ERR_OUT_OF_BOUNDS;
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

    buffer = region->data + in.region_offset;
409 410

    /* create bulk handle for local side of transfer */
411
    hret = margo_bulk_create(mid, 1, (void**)(&buffer), &in.bulk_size,
412
            HG_BULK_WRITE_ONLY, &bulk_handle);
413 414
    if(hret != HG_SUCCESS)
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
415
        out.ret = BAKE_ERR_MERCURY;
416 417 418
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
419 420 421
        return;
    }

422 423
    if(in.remote_addr_str)
    {
424
        /* a proxy address was provided to pull write data from */
425 426 427
        hret = margo_addr_lookup(mid, in.remote_addr_str, &src_addr);
        if(hret != HG_SUCCESS)
        {
Matthieu Dorier's avatar
Matthieu Dorier committed
428
            out.ret = BAKE_ERR_MERCURY;
429 430 431 432 433 434 435 436 437 438 439 440 441 442
            margo_bulk_free(bulk_handle);
            margo_free_input(handle, &in);
            margo_respond(handle, &out);
            margo_destroy(handle);
            return;
        }
    }
    else
    {
        /* no proxy write, use the source of this request */
        src_addr = hgi->addr;
    }

    hret = margo_bulk_transfer(mid, HG_BULK_PULL, src_addr, in.bulk_handle,
443
            in.bulk_offset, bulk_handle, 0, in.bulk_size);
444
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
445
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
446
        out.ret = BAKE_ERR_MERCURY;
447 448
        if(in.remote_addr_str)
            margo_addr_free(mid, src_addr);
449 450 451 452
        margo_bulk_free(bulk_handle);
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
453
        return;
Philip Carns's avatar
Philip Carns committed
454 455
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
456
    out.ret = BAKE_SUCCESS;
457

458 459
    if(in.remote_addr_str)
        margo_addr_free(mid, src_addr);
460 461 462 463
    margo_bulk_free(bulk_handle);
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
464 465
    return;
}
466
DEFINE_MARGO_RPC_HANDLER(bake_write_ult)
467

468
    /* service a remote RPC that writes to a BAKE region in eager mode */
469
static void bake_eager_write_ult(hg_handle_t handle)
470
{
471 472
    bake_eager_write_out_t out;
    bake_eager_write_in_t in;
473 474 475 476 477 478 479
    hg_return_t hret;
    char* buffer;
    hg_bulk_t bulk_handle;
    pmemobj_region_id_t* prid;

    memset(&out, 0, sizeof(out));

480 481 482
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
    assert(mid);
    const struct hg_info* info = margo_get_info(handle);
483
    bake_provider_t svr_ctx = margo_registered_data(mid, info->id);
484
    if(!svr_ctx) {
Matthieu Dorier's avatar
Matthieu Dorier committed
485
        out.ret = BAKE_ERR_UNKNOWN_PROVIDER;
486 487 488 489 490
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

491
    hret = margo_get_input(handle, &in);
492 493
    if(hret != HG_SUCCESS)
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
494
        out.ret = BAKE_ERR_MERCURY;
495 496
        margo_respond(handle, &out);
        margo_destroy(handle);
497 498 499 500 501 502
        return;
    }

    prid = (pmemobj_region_id_t*)in.rid.data;

    /* find memory address for target object */
503 504
    region_content_t* region = pmemobj_direct(prid->oid);
    if(!region)
505
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
506
        out.ret = BAKE_ERR_PMEM;
507 508 509
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
510
        return;
511
    }
512 513 514 515 516 517 518 519 520 521

    if(in.size + in.region_offset > region->size) {
        out.ret = BAKE_ERR_OUT_OF_BOUNDS;
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

    buffer = region->data + in.region_offset;
522 523 524

    memcpy(buffer, in.buffer, in.size);

Matthieu Dorier's avatar
Matthieu Dorier committed
525
    out.ret = BAKE_SUCCESS;
526

527 528 529
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
530 531
    return;
}
532
DEFINE_MARGO_RPC_HANDLER(bake_eager_write_ult)
533

534
    /* service a remote RPC that persists to a BAKE region */
535
static void bake_persist_ult(hg_handle_t handle)
536
{
537 538
    bake_persist_out_t out;
    bake_persist_in_t in;
539 540 541
    hg_return_t hret;
    char* buffer;
    pmemobj_region_id_t* prid;
542

543 544
    memset(&out, 0, sizeof(out));

545 546 547
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
    assert(mid);
    const struct hg_info* info = margo_get_info(handle);
548
    bake_provider_t svr_ctx = margo_registered_data(mid, info->id);
549
    if(!svr_ctx) {
Matthieu Dorier's avatar
Matthieu Dorier committed
550
        out.ret = BAKE_ERR_UNKNOWN_PROVIDER;
551 552 553 554 555
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

556
    hret = margo_get_input(handle, &in);
557
    if(hret != HG_SUCCESS)
558
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
559
        out.ret = BAKE_ERR_MERCURY;
560 561
        margo_respond(handle, &out);
        margo_destroy(handle);
562
        return;
563
    }
564

565 566 567
    prid = (pmemobj_region_id_t*)in.rid.data;

    /* find memory address for target object */
568 569
    region_content_t* region = pmemobj_direct(prid->oid);
    if(!region)
Philip Carns's avatar
Philip Carns committed
570
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
571
        out.ret = BAKE_ERR_PMEM;
572 573 574
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
575
        return;
Philip Carns's avatar
Philip Carns committed
576
    }
577
    buffer = region->data;
578 579

    /* TODO: should this have an abt shim in case it blocks? */
580
    PMEMobjpool* pmem_pool = pmemobj_pool_by_oid(prid->oid);
581
    pmemobj_persist(pmem_pool, buffer, region->size);
582

Matthieu Dorier's avatar
Matthieu Dorier committed
583
    out.ret = BAKE_SUCCESS;
584

585 586 587
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
588 589
    return;
}
590
DEFINE_MARGO_RPC_HANDLER(bake_persist_ult)
591

592 593 594 595 596 597 598 599 600 601 602 603 604 605 606
static void bake_create_write_persist_ult(hg_handle_t handle)
{
    bake_create_write_persist_out_t out;
    bake_create_write_persist_in_t in;
    hg_addr_t src_addr;
    char* buffer;
    hg_bulk_t bulk_handle;
    const struct hg_info *hgi;
    margo_instance_id mid;
    hg_return_t hret;
    int ret;
    pmemobj_region_id_t* prid;

    memset(&out, 0, sizeof(out));

607 608
    mid = margo_hg_handle_get_instance(handle);
    assert(mid);
609
    hgi = margo_get_info(handle);
610
    bake_provider_t svr_ctx = margo_registered_data(mid, hgi->id);
611
    if(!svr_ctx) {
Matthieu Dorier's avatar
Matthieu Dorier committed
612
        out.ret = BAKE_ERR_UNKNOWN_PROVIDER;
613 614 615 616 617 618 619
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

    /* TODO: this check needs to be somewhere else */
    assert(sizeof(pmemobj_region_id_t) <= BAKE_REGION_ID_DATA_SIZE);
620 621 622 623

    hret = margo_get_input(handle, &in);
    if(hret != HG_SUCCESS)
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
624
        out.ret = BAKE_ERR_MERCURY;
625 626 627 628 629
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

630 631 632
    /* find the pmem pool */
    bake_pmem_entry_t* entry = find_pmem_entry(svr_ctx, in.bti);
    if(entry == NULL) {
Matthieu Dorier's avatar
Matthieu Dorier committed
633
        out.ret = BAKE_ERR_UNKNOWN_TARGET;
634 635 636 637 638 639
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

640
    size_t content_size = in.bulk_size + sizeof(uint64_t);
641
    prid = (pmemobj_region_id_t*)out.rid.data;
642
#if 0
643
    prid->size = in.bulk_size;
644
#endif
645
    ret = pmemobj_alloc(entry->pmem_pool, &prid->oid,
646
            content_size, 0, NULL, NULL);
647 648
    if(ret != 0)
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
649
        out.ret = BAKE_ERR_PMEM;
650 651 652 653 654 655 656
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

    /* find memory address for target object */
657 658
    region_content_t* region = pmemobj_direct(prid->oid);
    if(!region)
659
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
660
        out.ret = BAKE_ERR_PMEM;
661 662 663 664 665
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }
666 667
    region->size = in.bulk_size;
    buffer = region->data;
668 669 670

    /* create bulk handle for local side of transfer */
    hret = margo_bulk_create(mid, 1, (void**)(&buffer), &in.bulk_size,
671
            HG_BULK_WRITE_ONLY, &bulk_handle);
672 673
    if(hret != HG_SUCCESS)
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
674
        out.ret = BAKE_ERR_MERCURY;
675 676 677 678 679 680 681 682 683 684 685 686
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

    if(in.remote_addr_str)
    {
        /* a proxy address was provided to pull write data from */
        hret = margo_addr_lookup(mid, in.remote_addr_str, &src_addr);
        if(hret != HG_SUCCESS)
        {
Matthieu Dorier's avatar
Matthieu Dorier committed
687
            out.ret = BAKE_ERR_MERCURY;
688 689 690 691 692 693 694 695 696 697 698 699 700 701
            margo_bulk_free(bulk_handle);
            margo_free_input(handle, &in);
            margo_respond(handle, &out);
            margo_destroy(handle);
            return;
        }
    }
    else
    {
        /* no proxy write, use the source of this request */
        src_addr = hgi->addr;
    }

    hret = margo_bulk_transfer(mid, HG_BULK_PULL, src_addr, in.bulk_handle,
702
            in.bulk_offset, bulk_handle, 0, in.bulk_size);
703 704
    if(hret != HG_SUCCESS)
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
705
        out.ret = BAKE_ERR_MERCURY;
706 707 708 709 710 711 712 713 714 715
        if(in.remote_addr_str)
            margo_addr_free(mid, src_addr);
        margo_bulk_free(bulk_handle);
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

    /* TODO: should this have an abt shim in case it blocks? */
716
    pmemobj_persist(entry->pmem_pool, region, content_size);
717

Matthieu Dorier's avatar
Matthieu Dorier committed
718
    out.ret = BAKE_SUCCESS;
719 720 721 722 723 724 725 726 727 728 729

    if(in.remote_addr_str)
        margo_addr_free(mid, src_addr);
    margo_bulk_free(bulk_handle);
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_create_write_persist_ult)

730
    /* service a remote RPC that retrieves the size of a BAKE region */
731
static void bake_get_size_ult(hg_handle_t handle)
732
{
733 734
    bake_get_size_out_t out;
    bake_get_size_in_t in;
735 736 737 738 739
    hg_return_t hret;
    pmemobj_region_id_t* prid;

    memset(&out, 0, sizeof(out));

740 741 742
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
    assert(mid);
    const struct hg_info* hgi = margo_get_info(handle);
743
    bake_provider_t svr_ctx = margo_registered_data(mid, hgi->id);
744
    if(!svr_ctx) {
Matthieu Dorier's avatar
Matthieu Dorier committed
745
        out.ret = BAKE_ERR_UNKNOWN_PROVIDER;
746 747 748 749
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }
750

751
    hret = margo_get_input(handle, &in);
752
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
753
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
754
        out.ret = BAKE_ERR_MERCURY;
755 756
        margo_respond(handle, &out);
        margo_destroy(handle);
757
        return;
Philip Carns's avatar
Philip Carns committed
758 759
    }

760 761
    prid = (pmemobj_region_id_t*)in.rid.data;

762 763 764 765 766 767 768 769 770 771
    region_content_t* region = pmemobj_direct(prid->oid);
    if(!region)
    {
        out.ret = BAKE_ERR_PMEM;
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }
    out.size = region->size;
Matthieu Dorier's avatar
Matthieu Dorier committed
772
    out.ret = BAKE_SUCCESS;
773

774 775 776
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
777 778
    return;
}
779
DEFINE_MARGO_RPC_HANDLER(bake_get_size_ult)
780

781 782 783 784 785 786 787 788 789 790 791 792 793 794 795
    /* Get the raw pointer of a region */
static void bake_get_data_ult(hg_handle_t handle)
{
    bake_get_data_out_t out;
    bake_get_data_in_t in;
    hg_return_t hret;
    pmemobj_region_id_t* prid;

    memset(&out, 0, sizeof(out));

    margo_instance_id mid = margo_hg_handle_get_instance(handle);
    assert(mid);
    const struct hg_info* hgi = margo_get_info(handle);
    bake_provider_t svr_ctx = margo_registered_data(mid, hgi->id);
    if(!svr_ctx) {
Matthieu Dorier's avatar
Matthieu Dorier committed
796
        out.ret = BAKE_ERR_UNKNOWN_PROVIDER;
797 798 799 800 801 802 803 804
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

    hret = margo_get_input(handle, &in);
    if(hret != HG_SUCCESS)
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
805
        out.ret = BAKE_ERR_MERCURY;
806 807 808 809 810 811 812 813
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

    prid = (pmemobj_region_id_t*)in.rid.data;

    /* find memory address for target object */
814 815
    region_content_t* region = pmemobj_direct(prid->oid);
    if(!region)
816
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
817
        out.ret = BAKE_ERR_UNKNOWN_REGION;
818 819 820 821 822 823
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

824
    out.ptr = (uint64_t)(region->data);
Matthieu Dorier's avatar
Matthieu Dorier committed
825
    out.ret = BAKE_SUCCESS;
826 827 828 829 830 831 832 833

    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_get_data_ult)

834
    /* service a remote RPC for a BAKE no-op */
835
static void bake_noop_ult(hg_handle_t handle)
836
{
837 838 839
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
    assert(mid);
    const struct hg_info* hgi = margo_get_info(handle);
840
    bake_provider_t svr_ctx = margo_registered_data(mid, hgi->id);
841

842 843
    margo_respond(handle, NULL);
    margo_destroy(handle);
844 845
    return;
}
846
DEFINE_MARGO_RPC_HANDLER(bake_noop_ult)
847

Matthieu Dorier's avatar
Matthieu Dorier committed
848 849
/* TODO consolidate with write handler; read and write are nearly identical */
/* service a remote RPC that reads from a BAKE region */
850
static void bake_read_ult(hg_handle_t handle)
851
{
852 853
    bake_read_out_t out;
    bake_read_in_t in;
854
    hg_return_t hret;
855
    hg_addr_t src_addr;
856 857
    char* buffer;
    hg_bulk_t bulk_handle;
858
    const struct hg_info *hgi;
859 860
    margo_instance_id mid;
    pmemobj_region_id_t* prid;
861
    hg_size_t size_to_read;
862 863 864

    memset(&out, 0, sizeof(out));

865 866
    mid = margo_hg_handle_get_instance(handle);
    assert(mid);
867
    hgi = margo_get_info(handle);
868
    bake_provider_t svr_ctx = 
869
        margo_registered_data(mid, hgi->id);
870
    if(!svr_ctx) {
Matthieu Dorier's avatar
Matthieu Dorier committed
871
        out.ret = BAKE_ERR_UNKNOWN_PROVIDER;
872 873 874 875
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }
876

877
    hret = margo_get_input(handle, &in);
878
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
879
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
880
        out.ret = BAKE_ERR_MERCURY;
881 882
        margo_respond(handle, &out);
        margo_destroy(handle);
883
        return;
Philip Carns's avatar
Philip Carns committed
884 885
    }

886 887 888
    prid = (pmemobj_region_id_t*)in.rid.data;

    /* find memory address for target object */
889 890
    region_content_t* region = pmemobj_direct(prid->oid);
    if(!region)
Philip Carns's avatar
Philip Carns committed
891
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
892
        out.ret = BAKE_ERR_UNKNOWN_REGION;
893 894 895
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
896
        return;
Philip Carns's avatar
Philip Carns committed
897
    }
898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914

    if(in.region_offset > region->size)
    {
        out.ret = BAKE_ERR_OUT_OF_BOUNDS;
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

    if(in.region_offset + in.bulk_size > region->size) {
        size_to_read = region->size - in.region_offset;
    } else {
        size_to_read = in.bulk_size;
    }

    buffer = region->data + in.region_offset;
Philip Carns's avatar
Philip Carns committed
915

916
    /* create bulk handle for local side of transfer */
917
    hret = margo_bulk_create(mid, 1, (void**)(&buffer), &size_to_read,
918
            HG_BULK_READ_ONLY, &bulk_handle);
919
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
920
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
921
        out.ret = BAKE_ERR_MERCURY;
922 923 924
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
925
        return;
Philip Carns's avatar
Philip Carns committed
926
    }
927

928 929 930 931 932 933
    if(in.remote_addr_str)
    {
        /* a proxy address was provided to push read data to */
        hret = margo_addr_lookup(mid, in.remote_addr_str, &src_addr);
        if(hret != HG_SUCCESS)
        {
Matthieu Dorier's avatar
Matthieu Dorier committed
934
            out.ret = BAKE_ERR_MERCURY;
935 936 937 938 939 940 941 942 943 944 945 946 947 948
            margo_bulk_free(bulk_handle);
            margo_free_input(handle, &in);
            margo_respond(handle, &out);
            margo_destroy(handle);
            return;
        }
    }
    else
    {
        /* no proxy write, use the source of this request */
        src_addr = hgi->addr;
    }

    hret = margo_bulk_transfer(mid, HG_BULK_PUSH, src_addr, in.bulk_handle,
949
            in.bulk_offset, bulk_handle, 0, size_to_read);
950
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
951
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
952
        out.ret = BAKE_ERR_MERCURY;
953 954
        if(in.remote_addr_str)
            margo_addr_free(mid, src_addr);
955 956 957 958
        margo_bulk_free(bulk_handle);
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
959
        return;
Philip Carns's avatar
Philip Carns committed
960 961
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
962
    out.ret = BAKE_SUCCESS;
963
    out.size = size_to_read;
Philip Carns's avatar
Philip Carns committed
964

965 966
    if(in.remote_addr_str)
        margo_addr_free(mid, src_addr);
967 968 969 970
    margo_bulk_free(bulk_handle);
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
971 972
    return;
}
973
DEFINE_MARGO_RPC_HANDLER(bake_read_ult)
Philip Carns's avatar
Philip Carns committed
974

975 976
    /* service a remote RPC that reads from a BAKE region and eagerly sends
     * response */
977
static void bake_eager_read_ult(hg_handle_t handle)
978
{
979 980
    bake_eager_read_out_t out;
    bake_eager_read_in_t in;
981 982
    hg_return_t hret;
    char* buffer;
983
    hg_size_t size_to_read;
984 985 986 987
    pmemobj_region_id_t* prid;

    memset(&out, 0, sizeof(out));

988 989 990 991
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
    assert(mid);
    const struct hg_info* hgi = margo_get_info(handle);
    bake_provider_t svr_ctx = 
992
        margo_registered_data(mid, hgi->id);
993
    if(!svr_ctx) {
Matthieu Dorier's avatar
Matthieu Dorier committed
994
        out.ret = BAKE_ERR_UNKNOWN_PROVIDER;
995 996 997 998 999
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

1000
    hret = margo_get_input(handle, &in);
1001 1002
    if(hret != HG_SUCCESS)
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
1003
        out.ret = BAKE_ERR_MERCURY;
1004 1005
        margo_respond(handle, &out);
        margo_destroy(handle);
1006 1007 1008 1009
        return;
    }

    prid = (pmemobj_region_id_t*)in.rid.data;
Philip Carns's avatar
Philip Carns committed
1010

1011
    /* find memory address for target object */
1012 1013
    region_content_t* region = pmemobj_direct(prid->oid);
    if(!region)
1014
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
1015
        out.ret = BAKE_ERR_UNKNOWN_REGION;
1016 1017 1018
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
1019 1020
        return;
    }
1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037

    if(in.region_offset > region->size)
    {
        out.ret = BAKE_ERR_OUT_OF_BOUNDS;
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

    if(in.region_offset + in.size > region->size) {
        size_to_read = region->size - in.region_offset;
    } else {
        size_to_read = in.size;
    }

    buffer = region->data + in.region_offset;
1038

Matthieu Dorier's avatar
Matthieu Dorier committed
1039
    out.ret = BAKE_SUCCESS;
1040
    out.buffer = buffer;
1041
    out.size = size_to_read;
1042

1043 1044 1045
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
1046
    return;
Philip Carns's avatar
Philip Carns committed
1047
}
1048
DEFINE_MARGO_RPC_HANDLER(bake_eager_read_ult)
1049

1050
    /* service a remote RPC that probes for a BAKE target id */
1051
static void bake_probe_ult(hg_handle_t handle)
1052
{
1053
    bake_probe_out_t out;
1054

1055 1056
    memset(&out, 0, sizeof(out));

1057 1058 1059 1060
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
    assert(mid);
    const struct hg_info* hgi = margo_get_info(handle);
    bake_provider_t svr_ctx = 
1061
        margo_registered_data(mid, hgi->id);
1062
    if(!svr_ctx) {
Matthieu Dorier's avatar
Matthieu Dorier committed
1063
        out.ret = BAKE_ERR_UNKNOWN_PROVIDER;
1064 1065 1066 1067 1068
        margo_respond(handle, &out);
        margo_destroy(handle);
        return;
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
1069 1070
    out.ret = BAKE_SUCCESS;

1071 1072 1073 1074 1075
    uint64_t targets_count;