bake-server.c 56.9 KB
Newer Older
Philip Carns's avatar
Philip Carns committed
1 2 3 4 5 6
/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

7 8
#include "bake-config.h"

Philip Carns's avatar
Philip Carns committed
9
#include <assert.h>
10
#include <libpmemobj.h>
11 12
#include <unistd.h>
#include <fcntl.h>
13 14
#include <margo.h>
#include <margo-bulk-pool.h>
15 16 17
#include <remi/remi-client.h>
#include <remi/remi-server.h>
#include "bake-server.h"
18
#include "uthash.h"
19
#include "bake-rpc.h"
20
#include "bake-timing.h"
Philip Carns's avatar
Philip Carns committed
21

22 23 24 25 26 27
DECLARE_MARGO_RPC_HANDLER(bake_shutdown_ult)
DECLARE_MARGO_RPC_HANDLER(bake_create_ult)
DECLARE_MARGO_RPC_HANDLER(bake_write_ult)
DECLARE_MARGO_RPC_HANDLER(bake_eager_write_ult)
DECLARE_MARGO_RPC_HANDLER(bake_persist_ult)
DECLARE_MARGO_RPC_HANDLER(bake_create_write_persist_ult)
28
DECLARE_MARGO_RPC_HANDLER(bake_eager_create_write_persist_ult)
29 30 31 32 33 34 35
DECLARE_MARGO_RPC_HANDLER(bake_get_size_ult)
DECLARE_MARGO_RPC_HANDLER(bake_get_data_ult)
DECLARE_MARGO_RPC_HANDLER(bake_read_ult)
DECLARE_MARGO_RPC_HANDLER(bake_eager_read_ult)
DECLARE_MARGO_RPC_HANDLER(bake_probe_ult)
DECLARE_MARGO_RPC_HANDLER(bake_noop_ult)
DECLARE_MARGO_RPC_HANDLER(bake_remove_ult)
36
DECLARE_MARGO_RPC_HANDLER(bake_migrate_region_ult)
37
DECLARE_MARGO_RPC_HANDLER(bake_migrate_target_ult)
38

39
/* definition of BAKE root data structure (just a uuid for now) */
40
typedef struct
41
{   
42
    bake_target_id_t pool_id;
43
} bake_root_t;
44

45
/* definition of internal BAKE region_id_t identifier for libpmemobj back end */
46 47
typedef struct
{
48 49
    PMEMoid oid;
} pmemobj_region_id_t;
50

51
typedef struct {
52
#ifdef USE_SIZECHECK_HEADERS
53
    uint64_t size;
54
#endif
55 56 57
    char data[1];
} region_content_t;

58 59 60 61 62
typedef struct
{
    PMEMobjpool* pmem_pool;
    bake_root_t* pmem_root;
    bake_target_id_t target_id;
63 64
    char* root;
    char* filename;
65
    size_t xfer_buffer_size;
66 67 68
    size_t xfer_buffer_count;
    uint32_t xfer_concurrency;
    margo_bulk_pool_t xfer_bulk_pool;
69 70 71
    UT_hash_handle hh;
} bake_pmem_entry_t;

72
typedef struct bake_server_context_t
73
{
74
    margo_instance_id mid;
75 76 77
    ABT_rwlock lock; // write-locked during migration, read-locked by all other
    // operations. There should be something better to avoid locking everything
    // but we are going with that for simplicity for now.
78 79
    uint64_t num_targets;
    bake_pmem_entry_t* targets;
80
    hg_id_t bake_create_write_persist_id;
81 82
    remi_client_t remi_client;
    remi_provider_t remi_provider;
83
} bake_server_context_t;
84

85 86 87 88 89 90 91 92 93 94 95 96 97
typedef struct xfer_args {
    margo_instance_id   mid;            // margo instance
    size_t              size;           // size of data to transfer
    char*               target;         // start address where data should land in local process
    size_t              buf_size;       // size of buffers in the pool of buffers
    margo_bulk_pool_t   buf_pool;       // pool of buffers
    hg_addr_t           remote_addr;    // remote address
    hg_bulk_t           remote_bulk;    // remote bulk handle for transfers
    size_t              remote_offset;  // remote offset at which to take the data
    int32_t             op_type;        // type of operation (PUSH or PULL)
    int32_t             ret;            // return value of the xfer_ult function
} xfer_args;

98
static void bake_server_finalize_cb(void *data);
99

100
static int bake_target_post_migration_callback(remi_fileset_t fileset, void* provider);
101

102 103
static void xfer_ult(xfer_args* args);

104
int bake_makepool(
105 106 107
        const char *pool_name,
        size_t pool_size,
        mode_t pool_mode)
108
{
109
    PMEMobjpool *pool;
110
    PMEMoid root_oid;
111
    bake_root_t *root;
112

113
    pool = pmemobj_create(pool_name, NULL, pool_size, pool_mode);
114
    if(!pool)
115
    {
116
        fprintf(stderr, "pmemobj_create: %s\n", pmemobj_errormsg());
Matthieu Dorier's avatar
Matthieu Dorier committed
117
        return BAKE_ERR_PMEM;
118 119 120
    }

    /* find root */
121
    root_oid = pmemobj_root(pool, sizeof(bake_root_t));
122 123 124
    root = pmemobj_direct(root_oid);

    /* store the target id for this bake pool at the root */
125
    uuid_generate(root->pool_id.id);
126
    pmemobj_persist(pool, root, sizeof(bake_root_t));
127 128 129

    pmemobj_close(pool);

Matthieu Dorier's avatar
Matthieu Dorier committed
130
    return BAKE_SUCCESS;
131 132
}

133
int bake_provider_register(
134
        margo_instance_id mid,
135
        uint16_t provider_id,
136 137
        ABT_pool abt_pool,
        bake_provider_t* provider)
138
{
139
    bake_server_context_t *tmp_svr_ctx;
140 141
    int ret;
    /* check if a provider with the same provider id already exists */
142
    {
143 144
        hg_id_t id;
        hg_bool_t flag;
145
        margo_provider_registered_name(mid, "bake_probe_rpc", provider_id, &id, &flag);
146
        if(flag == HG_TRUE) {
147
            fprintf(stderr, "bake_provider_register(): a BAKE provider with the same id (%d) already exists\n", provider_id);
Matthieu Dorier's avatar
Matthieu Dorier committed
148
            return BAKE_ERR_MERCURY;
149
        }
150
    }
151 152 153
    /* check if a REMI provider exists with the same provider id */
    {
        int flag;
154 155
        // TODO pass an actual ABT-IO instance
        remi_provider_registered(mid, provider_id, &flag, NULL, NULL, NULL);
156 157 158 159 160
        if(flag) {
            fprintf(stderr, "bake_provider_register(): a REMI provider with the same (%d) already exists\n", provider_id);
            return BAKE_ERR_REMI;
        }
    }
161

162 163 164
    /* allocate the resulting structure */    
    tmp_svr_ctx = calloc(1,sizeof(*tmp_svr_ctx));
    if(!tmp_svr_ctx)
Matthieu Dorier's avatar
Matthieu Dorier committed
165
        return BAKE_ERR_ALLOCATION;
Philip Carns's avatar
Philip Carns committed
166

167 168
    tmp_svr_ctx->mid = mid;

169 170 171 172 173 174 175
    /* Create rwlock */
    ret = ABT_rwlock_create(&(tmp_svr_ctx->lock));
    if(ret != ABT_SUCCESS) {
        free(tmp_svr_ctx);
        return BAKE_ERR_ARGOBOTS;
    }

176
    /* register RPCs */
177
    hg_id_t rpc_id;
178
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_create_rpc",
179
            bake_create_in_t, bake_create_out_t, 
180 181 182
            bake_create_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_write_rpc",
183
            bake_write_in_t, bake_write_out_t, 
184 185 186
            bake_write_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_eager_write_rpc",
187
            bake_eager_write_in_t, bake_eager_write_out_t, 
188 189 190
            bake_eager_write_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_eager_read_rpc",
191
            bake_eager_read_in_t, bake_eager_read_out_t, 
192 193 194
            bake_eager_read_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_persist_rpc",
195
            bake_persist_in_t, bake_persist_out_t, 
196 197 198
            bake_persist_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_create_write_persist_rpc",
199
            bake_create_write_persist_in_t, bake_create_write_persist_out_t,
200 201
            bake_create_write_persist_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
202 203 204 205
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_eager_create_write_persist_rpc",
            bake_eager_create_write_persist_in_t, bake_eager_create_write_persist_out_t,
            bake_eager_create_write_persist_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
206
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_get_size_rpc",
207
            bake_get_size_in_t, bake_get_size_out_t, 
208 209
            bake_get_size_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
210 211 212 213
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_get_data_rpc",
            bake_get_data_in_t, bake_get_data_out_t, 
            bake_get_data_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
214
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_read_rpc",
215
            bake_read_in_t, bake_read_out_t, 
216 217 218
            bake_read_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_probe_rpc",
219
            bake_probe_in_t, bake_probe_out_t, bake_probe_ult, 
220 221 222 223 224
            provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_noop_rpc",
            void, void, bake_noop_ult, provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
225 226 227
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_remove_rpc",
            bake_remove_in_t, bake_remove_out_t, bake_remove_ult,
            provider_id, abt_pool);
228
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
229 230
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_migrate_region_rpc",
            bake_migrate_region_in_t, bake_migrate_region_out_t, bake_migrate_region_ult,
231 232
            provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
233 234 235 236
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_migrate_target_rpc",
            bake_migrate_target_in_t, bake_migrate_target_out_t, bake_migrate_target_ult,
            provider_id, abt_pool);
    margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
237

Matthieu Dorier's avatar
Matthieu Dorier committed
238 239 240 241 242 243 244 245 246 247 248
    /* get a client-side version of the bake_create_write_persist RPC */
    hg_bool_t flag;
    margo_registered_name(mid, "bake_create_write_persist_rpc", &rpc_id, &flag);
    if(flag) {
        tmp_svr_ctx->bake_create_write_persist_id = rpc_id;
    } else {
        tmp_svr_ctx->bake_create_write_persist_id =
        MARGO_REGISTER(mid, "bake_create_write_persist_rpc",
                bake_create_write_persist_in_t, bake_create_write_persist_out_t, NULL);
    }

249
    /* register a REMI client */
250 251
    // TODO actually use an ABT-IO instance
    ret = remi_client_init(mid, ABT_IO_INSTANCE_NULL, &(tmp_svr_ctx->remi_client));
252
    if(ret != REMI_SUCCESS) {
253
        // XXX unregister RPCs, cleanup tmp_svr_ctx before returning
254 255
        return BAKE_ERR_REMI;
    }
256 257

    /* register a REMI provider */
258 259
    // TODO actually use an ABT-IO instance
    ret = remi_provider_register(mid, ABT_IO_INSTANCE_NULL, provider_id, abt_pool, &(tmp_svr_ctx->remi_provider));
260
    if(ret != REMI_SUCCESS) {
261
        // XXX unregister RPCs, cleanup tmp_svr_ctx before returning
262 263 264
        return BAKE_ERR_REMI;
    }
    ret = remi_provider_register_migration_class(tmp_svr_ctx->remi_provider,
265 266
            "bake", NULL,
            bake_target_post_migration_callback, NULL, tmp_svr_ctx);
267
    if(ret != REMI_SUCCESS) {
268
        // XXX unregister RPCs, cleanup tmp_svr_ctx before returning
269 270
        return BAKE_ERR_REMI;
    }
271

272 273 274
    /* install the bake server finalize callback */
    margo_push_finalize_callback(mid, &bake_server_finalize_cb, tmp_svr_ctx);

275 276
    if(provider != BAKE_PROVIDER_IGNORE)
        *provider = tmp_svr_ctx;
277

Matthieu Dorier's avatar
Matthieu Dorier committed
278
    return BAKE_SUCCESS;
279 280
}

281 282 283 284 285
int bake_provider_add_storage_target(
        bake_provider_t provider,
        const char *target_name,
        bake_target_id_t* target_id)
{
286
    int ret = BAKE_SUCCESS;
287
    bake_pmem_entry_t* new_entry = calloc(1, sizeof(*new_entry));
288 289
    new_entry->root = NULL;
    new_entry->filename = NULL;
290
    new_entry->xfer_buffer_size = 0;
291 292 293 294 295

    char* tmp = strrchr(target_name, '/');
    new_entry->filename = strdup(tmp);
    ptrdiff_t d = tmp - target_name;
    new_entry->root = strndup(target_name, d);
296 297 298 299

    new_entry->pmem_pool = pmemobj_open(target_name, NULL);
    if(!(new_entry->pmem_pool)) {
        fprintf(stderr, "pmemobj_open: %s\n", pmemobj_errormsg());
300 301
        free(new_entry->filename);
        free(new_entry->root);
302
        free(new_entry);
Matthieu Dorier's avatar
Matthieu Dorier committed
303
        return BAKE_ERR_PMEM;
304 305 306 307 308 309 310 311 312 313 314 315
    }

    /* check to make sure the root is properly set */
    PMEMoid root_oid = pmemobj_root(new_entry->pmem_pool, sizeof(bake_root_t));
    new_entry->pmem_root = pmemobj_direct(root_oid);
    bake_target_id_t key = new_entry->pmem_root->pool_id;
    new_entry->target_id = key;

    if(uuid_is_null(key.id))
    {
        fprintf(stderr, "Error: BAKE pool %s is not properly initialized\n", target_name);
        pmemobj_close(new_entry->pmem_pool);
316 317
        free(new_entry->filename);
        free(new_entry->root);
318
        free(new_entry);
Matthieu Dorier's avatar
Matthieu Dorier committed
319
        return BAKE_ERR_UNKNOWN_TARGET;
320 321
    }

322 323
    /* write-lock the provider */
    ABT_rwlock_wrlock(provider->lock);
324 325 326 327 328 329 330 331
    /* insert in the provider's hash */
    HASH_ADD(hh, provider->targets, target_id, sizeof(bake_target_id_t), new_entry);
    /* check that it was inserted */
    bake_pmem_entry_t* check_entry = NULL;
    HASH_FIND(hh, provider->targets, &key, sizeof(bake_target_id_t), check_entry);
    if(check_entry != new_entry) {
        fprintf(stderr, "Error: BAKE could not insert new pmem pool into the hash\n");
        pmemobj_close(new_entry->pmem_pool);
332 333
        free(new_entry->filename);
        free(new_entry->root);
334
        free(new_entry);
335 336 337 338 339
        ret = BAKE_ERR_ALLOCATION;
    } else {
        provider->num_targets += 1;
        *target_id = key;
        ret = BAKE_SUCCESS;
340
    }
341 342 343
    /* unlock provider */
    ABT_rwlock_unlock(provider->lock);
    return ret;
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358
}

static bake_pmem_entry_t* find_pmem_entry(
            bake_provider_t provider,
            bake_target_id_t target_id)
{
    bake_pmem_entry_t* entry = NULL;
    HASH_FIND(hh, provider->targets, &target_id, sizeof(bake_target_id_t), entry);
    return entry;
}

int bake_provider_remove_storage_target(
        bake_provider_t provider,
        bake_target_id_t target_id)
{
359 360
    int ret;
    ABT_rwlock_wrlock(provider->lock);
361 362
    bake_pmem_entry_t* entry = NULL;
    HASH_FIND(hh, provider->targets, &target_id, sizeof(bake_target_id_t), entry);
363 364 365 366 367 368 369 370 371 372 373 374
    if(!entry) {
        ret = BAKE_ERR_UNKNOWN_TARGET;
    } else {
        pmemobj_close(entry->pmem_pool);
        HASH_DEL(provider->targets, entry);
        free(entry->filename);
        free(entry->root);
        free(entry);
        ret = BAKE_SUCCESS;
    }
    ABT_rwlock_unlock(provider->lock);
    return ret;
375 376 377 378 379
}

int bake_provider_remove_all_storage_targets(
        bake_provider_t provider)
{
380
    ABT_rwlock_wrlock(provider->lock);
381 382 383 384
    bake_pmem_entry_t *p, *tmp;
    HASH_ITER(hh, provider->targets, p, tmp) {
        HASH_DEL(provider->targets, p);
        pmemobj_close(p->pmem_pool);
385
        margo_bulk_pool_destroy(p->xfer_bulk_pool);
386 387
        free(p->filename);
        free(p->root);
388 389
        free(p);
    }
390
    provider->num_targets = 0;
391
    ABT_rwlock_unlock(provider->lock);
Matthieu Dorier's avatar
Matthieu Dorier committed
392
    return BAKE_SUCCESS;
393 394 395 396 397 398
}

int bake_provider_count_storage_targets(
        bake_provider_t provider,
        uint64_t* num_targets)
{
399
    ABT_rwlock_rdlock(provider->lock);
400
    *num_targets = provider->num_targets;
401
    ABT_rwlock_unlock(provider->lock);
Matthieu Dorier's avatar
Matthieu Dorier committed
402
    return BAKE_SUCCESS;
403 404 405 406 407 408
}

int bake_provider_list_storage_targets(
        bake_provider_t provider,
        bake_target_id_t* targets)
{
409
    ABT_rwlock_rdlock(provider->lock);
410 411 412 413 414 415
    bake_pmem_entry_t *p, *tmp;
    uint64_t i = 0;
    HASH_ITER(hh, provider->targets, p, tmp) {
        targets[i] = p->target_id;
        i += 1;
    }
416
    ABT_rwlock_unlock(provider->lock);
Matthieu Dorier's avatar
Matthieu Dorier committed
417
    return BAKE_SUCCESS;
418 419
}

420
int bake_provider_set_target_xfer_buffer(
421 422
        bake_provider_t provider,
        bake_target_id_t target_id,
423
        size_t count,
424 425 426 427 428 429 430 431 432 433
        size_t size)
{
    int ret = BAKE_SUCCESS;
    ABT_rwlock_rdlock(provider->lock);
    bake_pmem_entry_t* entry = find_pmem_entry(provider, target_id);
    if(entry == NULL) {
        ret = -1;
        goto finish;
    }
    entry->xfer_buffer_size = size;
434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467
    entry->xfer_buffer_count = count;
    if(entry->xfer_bulk_pool) {
        margo_bulk_pool_destroy(entry->xfer_bulk_pool);
    }
    if(size && count) {
       hg_return_t hret = margo_bulk_pool_create(
                provider->mid, 
                count,
                size,
                HG_BULK_READWRITE,
                &(entry->xfer_bulk_pool));
       if(hret != HG_SUCCESS) {
           ret = -1;
           goto finish;
        }
    }
finish:
    ABT_rwlock_unlock(provider->lock);
    return ret;
}

int bake_provider_set_target_xfer_concurrency(
        bake_provider_t provider,
        bake_target_id_t target_id,
        uint32_t num_threads)
{
    int ret = BAKE_SUCCESS;
    ABT_rwlock_rdlock(provider->lock);
    bake_pmem_entry_t* entry = find_pmem_entry(provider, target_id);
    if(entry == NULL) {
        ret = -1;
        goto finish;
    }
    entry->xfer_concurrency = num_threads;
468 469 470 471 472
finish:
    ABT_rwlock_unlock(provider->lock);
    return ret;
}

473
/* service a remote RPC that creates a BAKE region */
474
static void bake_create_ult(hg_handle_t handle)
475
{
476
    TIMERS_INITIALIZE("start","alloc","persist","respond");
477 478
    bake_create_out_t out;
    bake_create_in_t in;
479 480
    hg_return_t hret;
    pmemobj_region_id_t* prid;
481
    ABT_rwlock lock = ABT_RWLOCK_NULL;
482

483 484 485 486
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
    assert(mid);
    const struct hg_info* info = margo_get_info(handle);
    bake_provider_t svr_ctx = 
487
        margo_registered_data(mid, info->id);
488
    if(!svr_ctx) {
Matthieu Dorier's avatar
Matthieu Dorier committed
489
        out.ret = BAKE_ERR_UNKNOWN_PROVIDER;
490
        goto finish;
491 492
    }

493
    hret = margo_get_input(handle, &in);
Matthieu Dorier's avatar
Matthieu Dorier committed
494 495
    if(hret != HG_SUCCESS) {
        out.ret = BAKE_ERR_MERCURY;
496
        goto finish;
Matthieu Dorier's avatar
Matthieu Dorier committed
497
    }
498 499 500
    /* lock provider */
    lock = svr_ctx->lock;
    ABT_rwlock_rdlock(lock);
501

502 503 504
    /* find the pmem pool */
    bake_pmem_entry_t* entry = find_pmem_entry(svr_ctx, in.bti);
    if(entry == NULL) {
Matthieu Dorier's avatar
Matthieu Dorier committed
505
        out.ret = BAKE_ERR_UNKNOWN_TARGET;
506
        goto finish;
507 508
    }

509
    /* TODO: this check needs to be somewhere else */
510
    assert(sizeof(pmemobj_region_id_t) <= BAKE_REGION_ID_DATA_SIZE);
511

512 513 514
    memset(&out, 0, sizeof(out));

    prid = (pmemobj_region_id_t*)out.rid.data;
Matthieu Dorier's avatar
Matthieu Dorier committed
515

516
#ifdef USE_SIZECHECK_HEADERS
517
    size_t content_size = in.region_size + sizeof(uint64_t);
518 519 520 521
#else
    size_t content_size = in.region_size;
#endif

522 523
    TIMERS_END_STEP(0);

Matthieu Dorier's avatar
Matthieu Dorier committed
524
    int ret = pmemobj_alloc(entry->pmem_pool, &prid->oid,
525 526
            content_size, 0, NULL, NULL);
    if(ret != 0) {
Matthieu Dorier's avatar
Matthieu Dorier committed
527
        out.ret = BAKE_ERR_PMEM;
528
        goto finish;
529 530
    }

531 532
    TIMERS_END_STEP(1);

533 534 535
    region_content_t* region = (region_content_t*)pmemobj_direct(prid->oid);
    if(!region) {
        out.ret = BAKE_ERR_PMEM;
536
        goto finish;
537
    }
538
#ifdef USE_SIZECHECK_HEADERS
539
    region->size = in.region_size;
540
#endif
541
    PMEMobjpool* pmem_pool = pmemobj_pool_by_oid(prid->oid);
542 543 544
#ifdef USE_SIZECHECK_HEADERS
    pmemobj_persist(pmem_pool, region, sizeof(region->size));
#endif
545

546 547
    TIMERS_END_STEP(2);

Matthieu Dorier's avatar
Matthieu Dorier committed
548
    out.ret = BAKE_SUCCESS;
549 550

finish:
551 552 553
    if(lock != ABT_RWLOCK_NULL)
        ABT_rwlock_unlock(lock);
    margo_respond(handle, &out);
554 555
    TIMERS_END_STEP(3);
    TIMERS_FINALIZE();
Matthieu Dorier's avatar
Matthieu Dorier committed
556
    margo_free_input(handle, &in);
557
    margo_destroy(handle);
558 559
    return;
}
560
DEFINE_MARGO_RPC_HANDLER(bake_create_ult)
561

562
    /* service a remote RPC that writes to a BAKE region */
563
static void bake_write_ult(hg_handle_t handle)
564
{
565
    TIMERS_INITIALIZE("start","bulk_create","bulk_xfer","respond");
566 567
    bake_write_out_t out;
    bake_write_in_t in;
568
    in.bulk_handle = HG_BULK_NULL;
569
    hg_return_t hret;
Matthieu Dorier's avatar
Matthieu Dorier committed
570
    hg_addr_t src_addr = HG_ADDR_NULL;
571 572 573
    char* memory;
    char* buffer = NULL;
    size_t xfer_buf_size = 0;
574 575
    size_t xfer_buf_count = 0;
    uint32_t max_num_threads = 0;
Matthieu Dorier's avatar
Matthieu Dorier committed
576
    hg_bulk_t bulk_handle = HG_BULK_NULL;
577
    const struct hg_info *hgi;
578 579
    margo_instance_id mid;
    pmemobj_region_id_t* prid;
580
    ABT_rwlock lock = ABT_RWLOCK_NULL;
581 582 583

    memset(&out, 0, sizeof(out));

584 585
    mid = margo_hg_handle_get_instance(handle);
    assert(mid);
586
    ABT_pool handler_pool = margo_hg_handle_get_handler_pool(handle);
587
    hgi = margo_get_info(handle);
588
    bake_provider_t svr_ctx = margo_registered_data(mid, hgi->id);
589
    if(!svr_ctx) {
Matthieu Dorier's avatar
Matthieu Dorier committed
590
        out.ret = BAKE_ERR_UNKNOWN_PROVIDER;
591
        goto finish;
592
    }
593 594 595
    /* read-lock the provider */
    lock = svr_ctx->lock;
    ABT_rwlock_rdlock(lock);
596

597
    hret = margo_get_input(handle, &in);
598 599
    if(hret != HG_SUCCESS)
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
600
        out.ret = BAKE_ERR_MERCURY;
601
        goto finish;
602 603 604 605 606
    }

    prid = (pmemobj_region_id_t*)in.rid.data;

    /* find memory address for target object */
607 608 609
    region_content_t* region = pmemobj_direct(prid->oid);

    if(!region)
610
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
611
        out.ret = BAKE_ERR_UNKNOWN_REGION;
612
        goto finish;
613
    }
614

615
#ifdef USE_SIZECHECK_HEADERS
616 617
    if(in.region_offset + in.bulk_size > region->size) {
        out.ret = BAKE_ERR_OUT_OF_BOUNDS;
618
        goto finish;
619
    }
620
#endif
621

622 623 624 625
    /* find enclosing pool and target id */
    PMEMobjpool *pool = pmemobj_pool_by_oid(prid->oid);
    PMEMoid root_oid = pmemobj_root(pool, 0);
    bake_root_t* root = pmemobj_direct(root_oid);
626

627 628 629 630
    /* find the pmem entry */
    bake_pmem_entry_t* entry = find_pmem_entry(svr_ctx, root->pool_id);
    if(entry == NULL) {
        out.ret = BAKE_ERR_UNKNOWN_TARGET;
631
        goto finish;
632
    }
633 634 635
    xfer_buf_size   = entry->xfer_buffer_size;
    xfer_buf_count  = entry->xfer_buffer_count;
    max_num_threads = entry->xfer_concurrency;
636

637
    memory = region->data + in.region_offset;
638

639 640
    if(in.remote_addr_str)
    {
641
        /* a proxy address was provided to pull write data from */
642 643 644
        hret = margo_addr_lookup(mid, in.remote_addr_str, &src_addr);
        if(hret != HG_SUCCESS)
        {
Matthieu Dorier's avatar
Matthieu Dorier committed
645
            out.ret = BAKE_ERR_MERCURY;
646
            goto finish;
647 648 649 650 651 652 653 654
        }
    }
    else
    {
        /* no proxy write, use the source of this request */
        src_addr = hgi->addr;
    }

655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678
    TIMERS_END_STEP(0);

    if(xfer_buf_size == 0 || xfer_buf_size > in.bulk_size) { // direct transfer to  device in one go

        /* create bulk handle for local side of transfer */
        hret = margo_bulk_create(mid, 1, (void**)(&memory), &in.bulk_size,
                HG_BULK_WRITE_ONLY, &bulk_handle);
        if(hret != HG_SUCCESS)
        {
            out.ret = BAKE_ERR_MERCURY;
            goto finish;
        }

        TIMERS_END_STEP(1);

        hret = margo_bulk_transfer(mid, HG_BULK_PULL, src_addr, in.bulk_handle,
                in.bulk_offset, bulk_handle, 0, in.bulk_size);
        if(hret != HG_SUCCESS)
        {
            out.ret = BAKE_ERR_MERCURY;
            goto finish;
        }

    } else { // multiple transfers using intermediate buffer
679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724

        // (1) compute the maximum number of ULTs that can handle this transfer
        // as well as the number of individual transfers needed given the buffer sizes

        // number of xfers of up to xfer_buf_size needed in total
        size_t num_xfers_needed = in.bulk_size / xfer_buf_size;
        if(num_xfers_needed * xfer_buf_size < in.bulk_size) num_xfers_needed += 1;
        // number of threads that will be spawned
        uint32_t num_threads = num_xfers_needed;
        num_threads = num_threads < max_num_threads ? num_threads : max_num_threads;
        // maximum number of xfers per thread
        size_t xfer_per_thread = num_xfers_needed / num_threads;
        if(xfer_per_thread * num_threads < num_xfers_needed) xfer_per_thread += 1;

        // (2) create the array of arguments and ULTs
        xfer_args* args  = alloca(sizeof(*args)*num_threads);
        ABT_thread* ults = alloca(sizeof(*ults)*num_threads);
        unsigned int i;
        size_t current_offset = 0;
        size_t remaining_size = in.bulk_size;
        size_t current_size = xfer_per_thread * xfer_buf_size;

        for(i=0; i < num_threads; i++) {

            current_size = current_size > remaining_size ? remaining_size : current_size;

            args[i].mid           = mid;
            args[i].size          = current_size;
            args[i].target        = memory + current_offset;
            args[i].buf_size      = xfer_buf_size;
            args[i].buf_pool      = entry->xfer_bulk_pool;
            args[i].remote_addr   = src_addr;
            args[i].remote_bulk   = in.bulk_handle;
            args[i].remote_offset = current_offset;
            args[i].op_type       = HG_BULK_PULL;
            args[i].ret           = 0;

            ABT_thread_create(handler_pool, (void (*)(void*))xfer_ult, args+i, ABT_THREAD_ATTR_NULL, ults+i);

            current_offset += current_size;
            remaining_size -= current_size;
        }

        // (3) join and free the ULTs
        ABT_thread_join_many(num_threads, ults);
        ABT_thread_free_many(num_threads, ults);
725

Philip Carns's avatar
Philip Carns committed
726 727
    }

728 729
    TIMERS_END_STEP(2);

Matthieu Dorier's avatar
Matthieu Dorier committed
730
    out.ret = BAKE_SUCCESS;
731

732 733 734 735
finish:
    if(lock != ABT_RWLOCK_NULL)
        ABT_rwlock_unlock(lock);
    margo_respond(handle, &out);
736 737
    TIMERS_END_STEP(3);
    TIMERS_FINALIZE();
738
    free(buffer);
739 740
    if(in.remote_addr_str)
        margo_addr_free(mid, src_addr);
741 742 743
    margo_bulk_free(bulk_handle);
    margo_free_input(handle, &in);
    margo_destroy(handle);
744 745
    return;
}
746
DEFINE_MARGO_RPC_HANDLER(bake_write_ult)
747

748
    /* service a remote RPC that writes to a BAKE region in eager mode */
749
static void bake_eager_write_ult(hg_handle_t handle)
750
{
751
    TIMERS_INITIALIZE("start","memcpy","respond");
752 753
    bake_eager_write_out_t out;
    bake_eager_write_in_t in;
754 755
    in.buffer = NULL;
    in.size = 0;
756
    hg_return_t hret;
757 758 759 760
    char* buffer = NULL;
    hg_bulk_t bulk_handle = HG_BULK_NULL;
    pmemobj_region_id_t* prid = NULL;
    ABT_rwlock lock = ABT_RWLOCK_NULL;
761 762 763

    memset(&out, 0, sizeof(out));

764 765 766
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
    assert(mid);
    const struct hg_info* info = margo_get_info(handle);
767
    bake_provider_t svr_ctx = margo_registered_data(mid, info->id);
768
    if(!svr_ctx) {
Matthieu Dorier's avatar
Matthieu Dorier committed
769
        out.ret = BAKE_ERR_UNKNOWN_PROVIDER;
770
        goto finish;
771 772
    }

773
    hret = margo_get_input(handle, &in);
774 775
    if(hret != HG_SUCCESS)
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
776
        out.ret = BAKE_ERR_MERCURY;
777
        goto finish;
778 779 780 781
    }

    prid = (pmemobj_region_id_t*)in.rid.data;

782 783 784 785
    /* lock provider */
    lock = svr_ctx->lock;
    ABT_rwlock_rdlock(lock);

786
    /* find memory address for target object */
787 788
    region_content_t* region = pmemobj_direct(prid->oid);
    if(!region)
789
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
790
        out.ret = BAKE_ERR_PMEM;
791
        goto finish;
792
    }
793

794
#ifdef USE_SIZECHECK_HEADERS
795 796
    if(in.size + in.region_offset > region->size) {
        out.ret = BAKE_ERR_OUT_OF_BOUNDS;
797
        goto finish;
798
    }
799
#endif
800

801 802
    TIMERS_END_STEP(0);

803
    buffer = region->data + in.region_offset;
804 805 806

    memcpy(buffer, in.buffer, in.size);

807 808
    TIMERS_END_STEP(1);

Matthieu Dorier's avatar
Matthieu Dorier committed
809
    out.ret = BAKE_SUCCESS;
810

811 812 813
finish:
    if(lock != ABT_RWLOCK_NULL)
        ABT_rwlock_unlock(lock);
814
    margo_respond(handle, &out);
815 816
    TIMERS_END_STEP(2);
    TIMERS_FINALIZE();
817
    margo_free_input(handle, &in);
818
    margo_destroy(handle);
819 820
    return;
}
821
DEFINE_MARGO_RPC_HANDLER(bake_eager_write_ult)
822

823
    /* service a remote RPC that persists to a BAKE region */
824
static void bake_persist_ult(hg_handle_t handle)
825
{
826
    TIMERS_INITIALIZE("start","persist","respond");
827 828
    bake_persist_out_t out;
    bake_persist_in_t in;
829
    hg_return_t hret;
830
    char* buffer = NULL;
831
    pmemobj_region_id_t* prid;
832
    ABT_rwlock lock = ABT_RWLOCK_NULL;
833

834 835
    memset(&out, 0, sizeof(out));

836 837 838
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
    assert(mid);
    const struct hg_info* info = margo_get_info(handle);
839
    bake_provider_t svr_ctx = margo_registered_data(mid, info->id);
840
    if(!svr_ctx) {
Matthieu Dorier's avatar
Matthieu Dorier committed
841
        out.ret = BAKE_ERR_UNKNOWN_PROVIDER;
842
        goto finish;
843 844
    }

845
    hret = margo_get_input(handle, &in);
846
    if(hret != HG_SUCCESS)
847
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
848
        out.ret = BAKE_ERR_MERCURY;
849
        goto finish;
850
    }
851

852 853
    prid = (pmemobj_region_id_t*)in.rid.data;

854 855 856
    /* lock provider */
    lock = svr_ctx->lock;
    ABT_rwlock_rdlock(lock);
857
    /* find memory address for target object */
858 859
    region_content_t* region = pmemobj_direct(prid->oid);
    if(!region)
Philip Carns's avatar
Philip Carns committed
860
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
861
        out.ret = BAKE_ERR_PMEM;
862
        goto finish;
Philip Carns's avatar
Philip Carns committed
863
    }
864
    buffer = region->data;
865

866 867
    TIMERS_END_STEP(0);

868
    /* TODO: should this have an abt shim in case it blocks? */
869
    PMEMobjpool* pmem_pool = pmemobj_pool_by_oid(prid->oid);
870
    pmemobj_persist(pmem_pool, buffer + in.offset, in.size);
871

872 873
    TIMERS_END_STEP(1);

Matthieu Dorier's avatar
Matthieu Dorier committed
874
    out.ret = BAKE_SUCCESS;
875

876 877 878
finish:
    if(lock != ABT_RWLOCK_NULL)
        ABT_rwlock_unlock(lock);
879
    margo_respond(handle, &out);
880
    TIMERS_END_STEP(2);
881
    margo_free_input(handle, &in);
882
    margo_destroy(handle);
883 884
    return;
}
885
DEFINE_MARGO_RPC_HANDLER(bake_persist_ult)
886

887 888
static void bake_create_write_persist_ult(hg_handle_t handle)
{
889
    TIMERS_INITIALIZE("start","alloc","bulk_create","bulk_xfer","persist","respond");
890 891
    bake_create_write_persist_out_t out;
    bake_create_write_persist_in_t in;
892 893 894
    in.bulk_handle = HG_BULK_NULL;
    in.remote_addr_str = NULL;
    hg_addr_t src_addr = HG_ADDR_NULL;
895
    ABT_pool handler_pool;
896
    char* buffer = NULL;
897 898
    char* memory = NULL;
    size_t xfer_buf_size = 0;
899 900
    size_t xfer_buf_count = 0;
    uint32_t max_num_threads = 0;
901 902
    hg_bulk_t bulk_handle = HG_BULK_NULL;
    const struct hg_info *hgi = NULL;
903 904 905 906
    margo_instance_id mid;
    hg_return_t hret;
    int ret;
    pmemobj_region_id_t* prid;
907
    ABT_rwlock lock = ABT_RWLOCK_NULL;
908 909 910

    memset(&out, 0, sizeof(out));

911
    mid = margo_hg_handle_get_instance(handle);
912
    handler_pool = margo_hg_handle_get_handler_pool(handle);
913
    assert(mid);
914
    hgi = margo_get_info(handle);
915
    bake_provider_t svr_ctx = margo_registered_data(mid, hgi->id);
916
    if(!svr_ctx) {
Matthieu Dorier's avatar
Matthieu Dorier committed
917
        out.ret = BAKE_ERR_UNKNOWN_PROVIDER;
918
        goto finish;
919 920 921 922
    }

    /* TODO: this check needs to be somewhere else */
    assert(sizeof(pmemobj_region_id_t) <= BAKE_REGION_ID_DATA_SIZE);
923 924 925 926

    hret = margo_get_input(handle, &in);
    if(hret != HG_SUCCESS)
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
927
        out.ret = BAKE_ERR_MERCURY;
928
        goto finish;
929 930
    }

931 932 933
    /* lock provider */
    lock = svr_ctx->lock;
    ABT_rwlock_rdlock(lock);
934 935 936
    /* find the pmem pool */
    bake_pmem_entry_t* entry = find_pmem_entry(svr_ctx, in.bti);
    if(entry == NULL) {
Matthieu Dorier's avatar
Matthieu Dorier committed
937
        out.ret = BAKE_ERR_UNKNOWN_TARGET;
938
        goto finish;
939
    }
940

941 942 943
    xfer_buf_size   = entry->xfer_buffer_size;
    xfer_buf_count  = entry->xfer_buffer_count;
    max_num_threads = entry->xfer_concurrency;
944

945
#ifdef USE_SIZECHECK_HEADERS
946
    size_t content_size = in.bulk_size + sizeof(uint64_t);
947 948 949 950
#else
    size_t content_size = in.bulk_size;
#endif

951 952
    TIMERS_END_STEP(0);

953
    prid = (pmemobj_region_id_t*)out.rid.data;
954

955
    ret = pmemobj_alloc(entry->pmem_pool, &prid->oid,
956
            content_size, 0, NULL, NULL);
957 958
    if(ret != 0)
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
959
        out.ret = BAKE_ERR_PMEM;
960
        goto finish;
961 962
    }

963 964
    TIMERS_END_STEP(1);

965
    /* find memory address for target object */
966 967
    region_content_t* region = pmemobj_direct(prid->oid);
    if(!region)
968
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
969
        out.ret = BAKE_ERR_PMEM;
970
        goto finish;
971
    }
972
#ifdef USE_SIZECHECK_HEADERS
973
    region->size = in.bulk_size;
974
#endif
975
    memory = region->data;
976

977 978 979 980 981 982
    if(in.remote_addr_str)
    {
        /* a proxy address was provided to pull write data from */
        hret = margo_addr_lookup(mid, in.remote_addr_str, &src_addr);
        if(hret != HG_SUCCESS)
        {
Matthieu Dorier's avatar
Matthieu Dorier committed
983
            out.ret = BAKE_ERR_MERCURY;
984
            goto finish;
985 986 987 988 989 990 991
        }
    }
    else
    {
        /* no proxy write, use the source of this request */
        src_addr = hgi->addr;
    }
992

993 994 995
    if(xfer_buf_size == 0 
    || xfer_buf_count == 0
    || xfer_buf_size > in.bulk_size) { // don't use an intermediate buffer
996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017

        /* create bulk handle for local side of transfer */
        hret = margo_bulk_create(mid, 1, (void**)(&memory), &in.bulk_size,
            HG_BULK_WRITE_ONLY, &bulk_handle);
        if(hret != HG_SUCCESS)
        {
            out.ret = BAKE_ERR_MERCURY;
            goto finish;
        }

        TIMERS_END_STEP(2);

        hret = margo_bulk_transfer(mid, HG_BULK_PULL, src_addr, in.bulk_handle,
                in.bulk_offset, bulk_handle, 0, in.bulk_size);
        if(hret != HG_SUCCESS)
        {
            out.ret = BAKE_ERR_MERCURY;
            goto finish;
        }

    } else {

1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034
        // (1) compute the maximum number of ULTs that can handle this transfer
        // as well as the number of individual transfers needed given the buffer sizes

        // number of xfers of up to xfer_buf_size needed in total
        size_t num_xfers_needed = in.bulk_size / xfer_buf_size;
        if(num_xfers_needed * xfer_buf_size < in.bulk_size) num_xfers_needed += 1;
        // number of threads that will be spawned
        uint32_t num_threads = num_xfers_needed;
        num_threads = num_threads < max_num_threads ? num_threads : max_num_threads;
        // maximum number of xfers per thread
        size_t xfer_per_thread = num_xfers_needed / num_threads;
        if(xfer_per_thread * num_threads < num_xfers_needed) xfer_per_thread += 1;

        // (2) create the array of arguments and ULTs
        xfer_args* args  = alloca(sizeof(*args)*num_threads);
        ABT_thread* ults = alloca(sizeof(*ults)*num_threads);
        unsigned int i;
1035
        size_t current_offset = 0;
1036 1037
        size_t remaining_size = in.bulk_size;
        size_t current_size = xfer_per_thread * xfer_buf_size;
1038

1039
        for(i=0; i < num_threads; i++) {
1040

1041
            current_size = current_size > remaining_size ? remaining_size : current_size;
1042

1043 1044 1045 1046 1047 1048 1049 1050 1051 1052
            args[i].mid           = mid;
            args[i].size          = current_size;
            args[i].target        = memory + current_offset;
            args[i].buf_size      = xfer_buf_size;
            args[i].buf_pool      = entry->xfer_bulk_pool;
            args[i].remote_addr   = src_addr;
            args[i].remote_bulk   = in.bulk_handle;
            args[i].remote_offset = current_offset;
            args[i].op_type       = HG_BULK_PULL;
            args[i].ret           = 0;
1053

1054
            ABT_thread_create(handler_pool, (void (*)(void*))xfer_ult, args+i, ABT_THREAD_ATTR_NULL, ults+i);
1055 1056 1057 1058 1059

            current_offset += current_size;
            remaining_size -= current_size;
        }

1060 1061 1062 1063
        // (3) join and free the ULTs
        ABT_thread_join_many(num_threads, ults);
        ABT_thread_free_many(num_threads, ults);

1064 1065
    }

1066 1067
    TIMERS_END_STEP(3);

1068
    /* TODO: should this have an abt shim in case it blocks? */
1069
    pmemobj_persist(entry->pmem_pool, region, content_size);
1070

Matthieu Dorier's avatar
Matthieu Dorier committed
1071
    out.ret = BAKE_SUCCESS;
1072

1073 1074
    TIMERS_END_STEP(4);

1075 1076 1077 1078
finish:
    if(lock != ABT_RWLOCK_NULL)
        ABT_rwlock_unlock(lock);
    margo_respond(handle, &out);
1079 1080
    TIMERS_END_STEP(5);
    TIMERS_FINALIZE();
1081
    if(in.remote_addr_str) {
1082
        margo_addr_free(mid, src_addr);
1083
    }
1084
    free(buffer);
1085 1086 1087 1088 1089 1090 1091
    margo_bulk_free(bulk_handle);
    margo_free_input(handle, &in);
    margo_destroy(handle);
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_create_write_persist_ult)

1092 1093
static void bake_eager_create_write_persist_ult(hg_handle_t handle)
{
1094
    TIMERS_INITIALIZE("start","alloc","memcpy","persist","respond");
1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145
    bake_eager_create_write_persist_out_t out;
    bake_eager_create_write_persist_in_t in;
    in.buffer = NULL;
    in.size = 0;
    char* buffer = NULL;
    const struct hg_info *hgi = NULL;
    margo_instance_id mid;
    hg_return_t hret;
    int ret;

    pmemobj_region_id_t* prid;
    ABT_rwlock lock = ABT_RWLOCK_NULL;

    memset(&out, 0, sizeof(out));

    mid = margo_hg_handle_get_instance(handle);
    assert(mid);
    hgi = margo_get_info(handle);
    bake_provider_t svr_ctx = margo_registered_data(mid, hgi->id);
    if(!svr_ctx) {
        out.ret = BAKE_ERR_UNKNOWN_PROVIDER;
        goto finish;
    }

    /* TODO: this check needs to be somewhere else */
    assert(sizeof(pmemobj_region_id_t) <= BAKE_REGION_ID_DATA_SIZE);

    hret = margo_get_input(handle, &in);
    if(hret != HG_SUCCESS)
    {
        out.ret = BAKE_ERR_MERCURY;
        goto finish;
    }

    /* lock provider */
    lock = svr_ctx->lock;
    ABT_rwlock_rdlock(lock);
    /* find the pmem pool */
    bake_pmem_entry_t* entry = find_pmem_entry(svr_ctx, in.bti);
    if(entry == NULL) {
        out.ret = BAKE_ERR_UNKNOWN_TARGET;
        goto finish;
    }

#ifdef USE_SIZECHECK_HEADERS
    size_t content_size = in.size + sizeof(uint64_t);
#else
    size_t content_size = in.size;
#endif
    prid = (pmemobj_region_id_t*)out.rid.data;

1146 1147
    TIMERS_END_STEP(0);

1148 1149 1150 1151 1152 1153 1154 1155
    ret = pmemobj_alloc(entry->pmem_pool, &prid->oid,
            content_size, 0, NULL, NULL);
    if(ret != 0)
    {
        out.ret = BAKE_ERR_PMEM;
        goto finish;
    }

1156 1157
    TIMERS_END_STEP(1);

1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171
    /* find memory address for target object */
    region_content_t* region = pmemobj_direct(prid->oid);
    if(!region)
    {
        out.ret = BAKE_ERR_PMEM;
        goto finish;
    }
#ifdef USE_SIZECHECK_HEADERS
    region->size = in.size;
#endif
    buffer = region->data;

    memcpy(buffer, in.buffer, in.size);

1172 1173
    TIMERS_END_STEP(2);

1174 1175 1176
    /* TODO: should this have an abt shim in case it blocks? */
    pmemobj_persist(entry->pmem_pool, region, content_size);

1177 1178
    TIMERS_END_STEP(3);

1179 1180 1181 1182 1183 1184
    out.ret = BAKE_SUCCESS;

finish:
    if(lock != ABT_RWLOCK_NULL)
        ABT_rwlock_unlock(lock);
    margo_respond(handle, &out);
1185 1186
    TIMERS_END_STEP(4);
    TIMERS_FINALIZE();
1187 1188 1189 1190 1191 1192 1193
    margo_free_input(handle, &in);
    margo_destroy(handle);
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_eager_create_write_persist_ult)

/* service a remote RPC that retrieves the size of a BAKE region */
1194
static void bake_get_size_ult(hg_handle_t handle)
1195
{
1196
    TIMERS_INITIALIZE("start","respond");
1197 1198
    bake_get_size_out_t out;
    bake_get_size_in_t in;
1199 1200
    hg_return_t hret;
    pmemobj_region_id_t* prid;
1201
    ABT_rwlock lock = ABT_RWLOCK_NULL;
1202 1203 1204

    memset(&out, 0, sizeof(out));

1205 1206 1207
    margo_instance_id mid = margo_hg_handle_get_instance(handle);
    assert(mid);
    const struct hg_info* hgi = margo_get_info(handle);
1208
    bake_provider_t svr_ctx = margo_registered_data(mid, hgi->id);
1209
    if(!svr_ctx) {
Matthieu Dorier's avatar
Matthieu Dorier committed
1210
        out.ret = BAKE_ERR_UNKNOWN_PROVIDER;
1211
        goto finish;
1212
    }
1213

1214
    hret = margo_get_input(handle, &in);
1215
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
1216
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
1217
        out.ret = BAKE_ERR_MERCURY;
1218
        goto finish;
Philip Carns's avatar
Philip Carns committed
1219 1220
    }

1221
#ifdef USE_SIZECHECK_HEADERS
1222
    prid = (pmemobj_region_id_t*)in.rid.data;
1223 1224 1225
    /* lock provider */
    lock = svr_ctx->lock;
    ABT_rwlock_rdlock(lock);
1226 1227 1228 1229
    region_content_t* region = pmemobj_direct(prid->oid);
    if(!region)
    {
        out.ret = BAKE_ERR_PMEM;
1230
        goto finish;
1231 1232
    }
    out.size = region->size;
Matthieu Dorier's avatar
Matthieu Dorier committed
1233
    out.ret = BAKE_SUCCESS;
1234 1235 1236
#else
    out.ret = BAKE_ERR_OP_UNSUPPORTED;
#endif
1237

1238
    TIMERS_END_STEP(0);