bake-pmem-backend.c 27 KB
Newer Older
1
#include <assert.h>
2
#include <json-c/json.h>
3
4
5
6
7
8
#include "bake-config.h"
#include "bake.h"
#include "bake-rpc.h"
#include "bake-server.h"
#include "bake-provider.h"
#include "bake-backend.h"
9
#include "bake-macros.h"
10
11

/* definition of BAKE root data structure (just a uuid for now) */
Philip Carns's avatar
Philip Carns committed
12
typedef struct {
13
14
15
16
    bake_target_id_t pool_id;
} bake_root_t;

/* definition of internal BAKE region_id_t identifier for libpmemobj back end */
Philip Carns's avatar
Philip Carns committed
17
typedef struct {
18
19
20
21
22
23
24
25
26
27
28
29
    PMEMoid oid;
} pmemobj_region_id_t;

typedef struct {
#ifdef USE_SIZECHECK_HEADERS
    uint64_t size;
#endif
    char data[1];
} region_content_t;

typedef struct {
    bake_provider_t provider;
Philip Carns's avatar
Philip Carns committed
30
31
32
33
    PMEMobjpool*    pmem_pool;
    bake_root_t*    pmem_root;
    char*           root;
    char*           filename;
34
35
36
} bake_pmem_entry_t;

typedef struct xfer_args {
Philip Carns's avatar
Philip Carns committed
37
38
39
40
41
42
43
44
    margo_instance_id mid;           // margo instance
    hg_addr_t         remote_addr;   // remote address
    hg_bulk_t         remote_bulk;   // remote bulk handle for transfers
    size_t            remote_offset; // remote offset at which to take the data
    size_t            bulk_size;
    char*             local_ptr;
    size_t            bytes_issued;
    size_t            bytes_retired;
45
    margo_bulk_poolset_t poolset;
Philip Carns's avatar
Philip Carns committed
46
47
48
49
50
51
    size_t               poolset_max_size;
    int32_t              ret; // return value of the xfer_ult function
    int                  done;
    int                  ults_active;
    ABT_mutex            mutex;
    ABT_eventual         eventual;
52
53
} xfer_args;

Philip Carns's avatar
Philip Carns committed
54
static void xfer_ult(void* _args);
55

56
static int bake_pmem_makepool(const char* pool_name, size_t pool_size)
57
{
Philip Carns's avatar
Philip Carns committed
58
59
60
    PMEMobjpool* pool;
    PMEMoid      root_oid;
    bake_root_t* root;
61

62
    pool = pmemobj_create(pool_name, NULL, pool_size, 0644);
Philip Carns's avatar
Philip Carns committed
63
    if (!pool) {
64
65
66
67
68
69
        fprintf(stderr, "pmemobj_create: %s\n", pmemobj_errormsg());
        return BAKE_ERR_PMEM;
    }

    /* find root */
    root_oid = pmemobj_root(pool, sizeof(bake_root_t));
Philip Carns's avatar
Philip Carns committed
70
    root     = pmemobj_direct(root_oid);
71
72
73
74
75
76
77
78
79
80

    /* store the target id for this bake pool at the root */
    uuid_generate(root->pool_id.id);
    pmemobj_persist(pool, root, sizeof(bake_root_t));

    pmemobj_close(pool);

    return BAKE_SUCCESS;
}
////////////////////////////////////////////////////////////////////////////////////////////
Philip Carns's avatar
Philip Carns committed
81
82
83
84
static int bake_pmem_backend_initialize(bake_provider_t    provider,
                                        const char*        path,
                                        bake_target_id_t*  target,
                                        backend_context_t* context)
85
{
Philip Carns's avatar
Philip Carns committed
86
87
    bake_pmem_entry_t* new_context
        = (bake_pmem_entry_t*)calloc(1, sizeof(*new_context));
88
89
90
91
92
93
94
95
96
97
98
99
100
101
    char* tmp                             = strrchr(path, '/');
    new_context->provider                 = provider;
    new_context->filename                 = strdup(tmp);
    ptrdiff_t d                           = tmp - path;
    new_context->root                     = strndup(path, d);
    struct json_object* pmem_backend_json = NULL;
    struct json_object* target_array      = NULL;

    CONFIG_HAS_OR_CREATE_OBJECT(provider->json_cfg, "pmem_backend",
                                "pmem_backend", pmem_backend_json);
    CONFIG_HAS_OR_CREATE_ARRAY(pmem_backend_json, "targets",
                               "pmem_backend.targets", target_array);

    /* TODO: populate tuning parameters specific to this backend */
102
103

    new_context->pmem_pool = pmemobj_open(path, NULL);
Philip Carns's avatar
Philip Carns committed
104
    if (!(new_context->pmem_pool)) {
Philip Carns's avatar
Philip Carns committed
105
        BAKE_ERROR(provider->mid, "pmemobj_open: %s", pmemobj_errormsg());
106
107
108
        free(new_context->filename);
        free(new_context->root);
        free(new_context);
109
        return BAKE_ERR_NOENT;
110
111
112
    }

    /* check to make sure the root is properly set */
Philip Carns's avatar
Philip Carns committed
113
114
    PMEMoid root_oid
        = pmemobj_root(new_context->pmem_pool, sizeof(bake_root_t));
115
    new_context->pmem_root = pmemobj_direct(root_oid);
Philip Carns's avatar
Philip Carns committed
116
    bake_target_id_t tid   = new_context->pmem_root->pool_id;
117

Philip Carns's avatar
Philip Carns committed
118
    if (uuid_is_null(tid.id)) {
Philip Carns's avatar
Philip Carns committed
119
        BAKE_ERROR(provider->mid, "pool %s is not properly initialized", path);
120
121
122
123
124
125
126
        pmemobj_close(new_context->pmem_pool);
        free(new_context->filename);
        free(new_context->root);
        free(new_context);
        return BAKE_ERR_UNKNOWN_TARGET;
    }

127
128
129
130
131
    /* target successfully added; inject it into the json in array of
     * targets for this backend
     */
    json_object_array_add(target_array, json_object_new_string(path));

Philip Carns's avatar
Philip Carns committed
132
    *target  = tid;
133
134
135
136
137
138
    *context = new_context;
    return 0;
}
////////////////////////////////////////////////////////////////////////////////////////////
static int bake_pmem_backend_finalize(backend_context_t context)
{
Philip Carns's avatar
Philip Carns committed
139
    bake_pmem_entry_t* entry = (bake_pmem_entry_t*)context;
140
141
142
143
    pmemobj_close(entry->pmem_pool);
    free(entry->filename);
    free(entry->root);
    free(entry);
Matthieu Dorier's avatar
Matthieu Dorier committed
144
    return 0;
145
146
147
}

////////////////////////////////////////////////////////////////////////////////////////////
Philip Carns's avatar
Philip Carns committed
148
149
static int
bake_pmem_create(backend_context_t context, size_t size, bake_region_id_t* rid)
150
{
Philip Carns's avatar
Philip Carns committed
151
    bake_pmem_entry_t* entry = (bake_pmem_entry_t*)context;
152
153
154
155
156
157
158
159
160
161
    assert(sizeof(pmemobj_region_id_t) <= BAKE_REGION_ID_DATA_SIZE);

#ifdef USE_SIZECHECK_HEADERS
    size_t content_size = size + sizeof(uint64_t);
#else
    size_t content_size = size;
#endif

    pmemobj_region_id_t* prid = (pmemobj_region_id_t*)rid->data;

Philip Carns's avatar
Philip Carns committed
162
163
164
    int ret = pmemobj_alloc(entry->pmem_pool, &prid->oid, content_size, 0, NULL,
                            NULL);
    if (ret != 0) return BAKE_ERR_PMEM;
165
166
167

#ifdef USE_SIZECHECK_HEADERS
    region_content_t* region = (region_content_t*)pmemobj_direct(prid->oid);
Philip Carns's avatar
Philip Carns committed
168
    if (!region) return BAKE_ERR_PMEM;
169

Philip Carns's avatar
Philip Carns committed
170
    region->size           = size;
171
172
173
174
175
176
177
178
    PMEMobjpool* pmem_pool = pmemobj_pool_by_oid(prid->oid);
    pmemobj_persist(pmem_pool, region, sizeof(region->size));
#endif

    return BAKE_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////
Philip Carns's avatar
Philip Carns committed
179
180
181
182
183
184
185
186
187
static int write_transfer_data(margo_instance_id mid,
                               bake_provider_t   provider,
                               PMEMoid           pmoid,
                               uint64_t          region_offset,
                               hg_bulk_t         remote_bulk,
                               uint64_t          remote_bulk_offset,
                               uint64_t          bulk_size,
                               hg_addr_t         src_addr,
                               ABT_pool          target_pool)
188
189
{
    region_content_t* region;
Philip Carns's avatar
Philip Carns committed
190
191
192
193
194
195
    char*             memory;
    hg_return_t       hret;
    hg_bulk_t         bulk_handle = HG_BULK_NULL;
    int               ret         = 0;
    struct xfer_args  x_args      = {0};
    size_t            i;
196
197
198

    /* find memory address for target object */
    region = pmemobj_direct(pmoid);
Philip Carns's avatar
Philip Carns committed
199
    if (!region) return (BAKE_ERR_UNKNOWN_REGION);
200
201

#ifdef USE_SIZECHECK_HEADERS
Philip Carns's avatar
Philip Carns committed
202
203
    if (region_offset + bulk_size > region->size)
        return (BAKE_ERR_OUT_OF_BOUNDS);
204
#endif
Philip Carns's avatar
Philip Carns committed
205

206
207
208
209
210
    memory = region->data + region_offset;

    /* resolve addr, could be addr of rpc sender (normal case) or a third
     * party (proxy write)
     */
211
212
    if (!json_object_get_boolean(
            json_object_object_get(provider->json_cfg, "pipeline_enable"))) {
213
214
215
216
        /* normal path; no pipeline or intermediate buffers */

        /* create bulk handle for local side of transfer */
        hret = margo_bulk_create(mid, 1, (void**)(&memory), &bulk_size,
Philip Carns's avatar
Philip Carns committed
217
218
                                 HG_BULK_WRITE_ONLY, &bulk_handle);
        if (hret != HG_SUCCESS) {
219
220
221
222
            ret = BAKE_ERR_MERCURY;
            goto finish;
        }
        hret = margo_bulk_transfer(mid, HG_BULK_PULL, src_addr, remote_bulk,
Philip Carns's avatar
Philip Carns committed
223
224
225
                                   remote_bulk_offset, bulk_handle, 0,
                                   bulk_size);
        if (hret != HG_SUCCESS) {
226
227
228
229
230
231
232
233
            ret = BAKE_ERR_MERCURY;
            goto finish;
        }

    } else {

        /* pipelining mode, with intermediate buffers */

Philip Carns's avatar
Philip Carns committed
234
235
236
        x_args.mid           = mid;
        x_args.remote_addr   = src_addr;
        x_args.remote_bulk   = remote_bulk;
237
        x_args.remote_offset = remote_bulk_offset;
Philip Carns's avatar
Philip Carns committed
238
239
240
        x_args.bulk_size     = bulk_size;
        x_args.local_ptr     = memory;
        x_args.bytes_issued  = 0;
241
        x_args.bytes_retired = 0;
Philip Carns's avatar
Philip Carns committed
242
        x_args.poolset       = provider->poolset;
243
244
245
246
247
        margo_bulk_poolset_get_max(provider->poolset, &x_args.poolset_max_size);
        x_args.ret = 0;
        ABT_mutex_create(&x_args.mutex);
        ABT_eventual_create(0, &x_args.eventual);

Philip Carns's avatar
Philip Carns committed
248
        for (i = 0; i < bulk_size; i += x_args.poolset_max_size)
249
250
251
            x_args.ults_active++;

        /* issue one ult per pipeline chunk */
Philip Carns's avatar
Philip Carns committed
252
        for (i = 0; i < bulk_size; i += x_args.poolset_max_size) {
253
254
255
256
            /* note: setting output tid to NULL to ignore; we will let
             * threads clean up themselves, with the last one setting an
             * eventual to signal completion.
             */
Philip Carns's avatar
Philip Carns committed
257
258
            ABT_thread_create(target_pool, xfer_ult, &x_args,
                              ABT_THREAD_ATTR_NULL, NULL);
259
260
261
262
263
264
265
266
267
268
269
        }

        ABT_eventual_wait(x_args.eventual, NULL);
        ABT_eventual_free(&x_args.eventual);

        /* consolidated error code (0 if all successful, otherwise first
         * non-zero error code)
         */
        ret = x_args.ret;
    }

Philip Carns's avatar
Philip Carns committed
270
finish:
271
272
    margo_bulk_free(bulk_handle);

Philip Carns's avatar
Philip Carns committed
273
    return (ret);
274
275
276
277
}

////////////////////////////////////////////////////////////////////////////////////////////
static int bake_pmem_write_raw(backend_context_t context,
Philip Carns's avatar
Philip Carns committed
278
279
280
281
                               bake_region_id_t  rid,
                               size_t            offset,
                               size_t            size,
                               const void*       data)
282
{
Philip Carns's avatar
Philip Carns committed
283
    char*                ptr  = NULL;
284
285
286
    pmemobj_region_id_t* prid = (pmemobj_region_id_t*)rid.data;
    /* find memory address for target object */
    region_content_t* region = pmemobj_direct(prid->oid);
Philip Carns's avatar
Philip Carns committed
287
    if (!region) return BAKE_ERR_PMEM;
288
289

#ifdef USE_SIZECHECK_HEADERS
Philip Carns's avatar
Philip Carns committed
290
    if (size + offset > region->size) return BAKE_ERR_OUT_OF_BOUNDS;
291
#endif
Philip Carns's avatar
Philip Carns committed
292

293
294
    ptr = region->data + offset;
    memcpy(ptr, data, size);
Philip Carns's avatar
Philip Carns committed
295

296
297
298
299
300
    return BAKE_SUCCESS;
}

////////////////////////////////////////////////////////////////////////////////////////////
static int bake_pmem_write_bulk(backend_context_t context,
Philip Carns's avatar
Philip Carns committed
301
302
303
304
305
306
                                bake_region_id_t  rid,
                                size_t            region_offset,
                                size_t            size,
                                hg_bulk_t         bulk,
                                hg_addr_t         source,
                                size_t            bulk_offset)
307
{
Philip Carns's avatar
Philip Carns committed
308
    bake_pmem_entry_t*   entry = (bake_pmem_entry_t*)context;
309
310
311
312
313
314
315
316
317
318
319
320
321
    pmemobj_region_id_t* prid;

    prid = (pmemobj_region_id_t*)rid.data;

    ABT_pool handler_pool = entry->provider->handler_pool;

    int ret = write_transfer_data(entry->provider->mid, entry->provider,
                                  prid->oid, region_offset, bulk, bulk_offset,
                                  size, source, handler_pool);
    return ret;
}

static int bake_pmem_read_raw(backend_context_t context,
Philip Carns's avatar
Philip Carns committed
322
323
324
325
326
327
                              bake_region_id_t  rid,
                              size_t            offset,
                              size_t            size,
                              void**            data,
                              uint64_t*         data_size,
                              free_fn*          free_data)
328
329
{
    *free_data = NULL;
Philip Carns's avatar
Philip Carns committed
330
    *data      = NULL;
331
332
    *data_size = 0;

Philip Carns's avatar
Philip Carns committed
333
334
    char*                buffer = NULL;
    hg_size_t            size_to_read;
335
336
337
338
    pmemobj_region_id_t* prid = (pmemobj_region_id_t*)rid.data;

    /* find memory address for target object */
    region_content_t* region = pmemobj_direct(prid->oid);
Philip Carns's avatar
Philip Carns committed
339
    if (!region) return BAKE_ERR_UNKNOWN_REGION;
340
341
342
343

    size_to_read = size;

#ifdef USE_SIZECHECK_HEADERS
Philip Carns's avatar
Philip Carns committed
344
345
    if (offset > region->size) return BAKE_ERR_OUT_OF_BOUNDS;
    if (offset + size > region->size) { size_to_read = region->size - offset; }
346
347
348
349
#endif

    buffer = region->data + offset;

Philip Carns's avatar
Philip Carns committed
350
    *data      = buffer;
351
352
353
354
355
356
    *data_size = size_to_read;

    return BAKE_SUCCESS;
}

static int bake_pmem_read_bulk(backend_context_t context,
Philip Carns's avatar
Philip Carns committed
357
358
359
360
361
362
363
                               bake_region_id_t  rid,
                               size_t            region_offset,
                               size_t            size,
                               hg_bulk_t         bulk,
                               hg_addr_t         source,
                               size_t            bulk_offset,
                               size_t*           bytes_read)
364
{
Philip Carns's avatar
Philip Carns committed
365
366
367
368
    int                  ret         = BAKE_SUCCESS;
    bake_pmem_entry_t*   entry       = (bake_pmem_entry_t*)context;
    char*                buffer      = NULL;
    hg_bulk_t            bulk_handle = HG_BULK_NULL;
369
    pmemobj_region_id_t* prid;
Philip Carns's avatar
Philip Carns committed
370
    hg_size_t            size_to_read;
371
372
373
374
375
376
    *bytes_read = 0;

    prid = (pmemobj_region_id_t*)rid.data;

    /* find memory address for target object */
    region_content_t* region = pmemobj_direct(prid->oid);
Philip Carns's avatar
Philip Carns committed
377
    if (!region) {
378
379
380
381
382
383
384
        ret = BAKE_ERR_UNKNOWN_REGION;
        goto finish;
    }

    size_to_read = size;

#ifdef USE_SIZECHECK_HEADERS
Philip Carns's avatar
Philip Carns committed
385
    if (region_offset > region->size) {
386
387
388
        ret = BAKE_ERR_OUT_OF_BOUNDS;
        goto finish;
    }
Philip Carns's avatar
Philip Carns committed
389
    if (region_offset + size > region->size) {
390
391
392
393
394
395
396
397
398
        size_to_read = region->size - region_offset;
    } else {
        size_to_read = size;
    }
#endif

    buffer = region->data + region_offset;

    /* create bulk handle for local side of transfer */
Philip Carns's avatar
Philip Carns committed
399
400
401
402
    hg_return_t hret
        = margo_bulk_create(entry->provider->mid, 1, (void**)(&buffer),
                            &size_to_read, HG_BULK_READ_ONLY, &bulk_handle);
    if (hret != HG_SUCCESS) {
403
404
405
406
        ret = BAKE_ERR_MERCURY;
        goto finish;
    }

Philip Carns's avatar
Philip Carns committed
407
408
    hret = margo_bulk_transfer(entry->provider->mid, HG_BULK_PUSH, source, bulk,
                               bulk_offset, bulk_handle, 0, size_to_read);
409

Philip Carns's avatar
Philip Carns committed
410
    if (hret != HG_SUCCESS) {
411
412
413
414
415
416
417
418
419
420
421
422
        ret = BAKE_ERR_MERCURY;
        goto finish;
    }

    *bytes_read = size_to_read;

finish:
    margo_bulk_free(bulk_handle);
    return ret;
}

static int bake_pmem_persist(backend_context_t context,
Philip Carns's avatar
Philip Carns committed
423
424
425
                             bake_region_id_t  rid,
                             size_t            offset,
                             size_t            size)
426
{
Philip Carns's avatar
Philip Carns committed
427
    char*                ptr  = NULL;
428
429
430
    pmemobj_region_id_t* prid = (pmemobj_region_id_t*)rid.data;
    /* find memory address for target object */
    region_content_t* region = pmemobj_direct(prid->oid);
Philip Carns's avatar
Philip Carns committed
431
    if (!region) return BAKE_ERR_PMEM;
432
433
434
435
436
437
438
439
440
441
    ptr = region->data;

    /* TODO: should this have an abt shim in case it blocks? */
    PMEMobjpool* pmem_pool = pmemobj_pool_by_oid(prid->oid);
    pmemobj_persist(pmem_pool, ptr + offset, size);

    return BAKE_SUCCESS;
}

static int bake_pmem_create_write_persist_raw(backend_context_t context,
Philip Carns's avatar
Philip Carns committed
442
443
444
                                              const void*       data,
                                              size_t            size,
                                              bake_region_id_t* rid)
445
{
Philip Carns's avatar
Philip Carns committed
446
447
    bake_pmem_entry_t*   entry  = (bake_pmem_entry_t*)context;
    char*                buffer = NULL;
448
449
450
451
452
453
454
455
456
457
458
459
    pmemobj_region_id_t* prid;

    /* TODO: this check needs to be somewhere else */
    assert(sizeof(pmemobj_region_id_t) <= BAKE_REGION_ID_DATA_SIZE);

#ifdef USE_SIZECHECK_HEADERS
    size_t content_size = size + sizeof(uint64_t);
#else
    size_t content_size = size;
#endif
    prid = (pmemobj_region_id_t*)rid->data;

Philip Carns's avatar
Philip Carns committed
460
461
462
    int ret = pmemobj_alloc(entry->pmem_pool, &prid->oid, content_size, 0, NULL,
                            NULL);
    if (ret != 0) return BAKE_ERR_PMEM;
463
464
465

    /* find memory address for target object */
    region_content_t* region = pmemobj_direct(prid->oid);
Philip Carns's avatar
Philip Carns committed
466
    if (!region) return BAKE_ERR_PMEM;
467
468
469
470
471
472
473
474
475
476
477
478
479
480
#ifdef USE_SIZECHECK_HEADERS
    region->size = size;
#endif
    buffer = region->data;

    memcpy(buffer, data, size);

    /* TODO: should this have an abt shim in case it blocks? */
    pmemobj_persist(entry->pmem_pool, region, content_size);

    return BAKE_SUCCESS;
}

static int bake_pmem_create_write_persist_bulk(backend_context_t context,
Philip Carns's avatar
Philip Carns committed
481
482
483
484
485
                                               hg_bulk_t         bulk,
                                               hg_addr_t         source,
                                               size_t            bulk_offset,
                                               size_t            size,
                                               bake_region_id_t* rid)
486
{
Philip Carns's avatar
Philip Carns committed
487
    bake_pmem_entry_t*   entry = (bake_pmem_entry_t*)context;
488
    pmemobj_region_id_t* prid;
Philip Carns's avatar
Philip Carns committed
489
    ABT_pool             handler_pool = entry->provider->handler_pool;
490
491
492
493
494
495
496
497
498
499
500
501

    /* TODO: this check needs to be somewhere else */
    assert(sizeof(pmemobj_region_id_t) <= BAKE_REGION_ID_DATA_SIZE);

#ifdef USE_SIZECHECK_HEADERS
    size_t content_size = size + sizeof(uint64_t);
#else
    size_t content_size = size;
#endif

    prid = (pmemobj_region_id_t*)rid->data;

Philip Carns's avatar
Philip Carns committed
502
503
504
    int ret = pmemobj_alloc(entry->pmem_pool, &prid->oid, content_size, 0, NULL,
                            NULL);
    if (ret != 0) return BAKE_ERR_PMEM;
505

Philip Carns's avatar
Philip Carns committed
506
507
    ret = write_transfer_data(entry->provider->mid, entry->provider, prid->oid,
                              0, bulk, bulk_offset, size, source, handler_pool);
508

Philip Carns's avatar
Philip Carns committed
509
    if (ret == BAKE_SUCCESS) {
510
511
        /* find memory address for target object */
        region_content_t* region = pmemobj_direct(prid->oid);
Philip Carns's avatar
Philip Carns committed
512
        if (!region) return BAKE_ERR_PMEM;
513
514
515
516
517
518
519
520
521
522
#ifdef USE_SIZECHECK_HEADERS
        region->size = size;
#endif
        pmemobj_persist(entry->pmem_pool, region, content_size);
    }

    return BAKE_SUCCESS;
}

static int bake_pmem_get_region_size(backend_context_t context,
Philip Carns's avatar
Philip Carns committed
523
524
                                     bake_region_id_t  rid,
                                     size_t*           size)
525
526
527
528
529
{
#ifdef USE_SIZECHECK_HEADERS
    pmemobj_region_id_t* prid = (pmemobj_region_id_t*)rid.data;
    /* lock provider */
    region_content_t* region = pmemobj_direct(prid->oid);
Philip Carns's avatar
Philip Carns committed
530
    if (!region) return BAKE_ERR_PMEM;
531
532
533
534
535
536
537
538
    *size = region->size;
    return BAKE_SUCCESS;
#else
    return BAKE_ERR_OP_UNSUPPORTED;
#endif
}

static int bake_pmem_get_region_data(backend_context_t context,
Philip Carns's avatar
Philip Carns committed
539
540
                                     bake_region_id_t  rid,
                                     void**            data)
541
542
543
544
{
    pmemobj_region_id_t* prid = (pmemobj_region_id_t*)rid.data;
    /* find memory address for target object */
    region_content_t* region = pmemobj_direct(prid->oid);
Philip Carns's avatar
Philip Carns committed
545
546
    if (!region) return BAKE_ERR_UNKNOWN_REGION;

547
548
549
550
    *data = region->data;
    return BAKE_SUCCESS;
}

Philip Carns's avatar
Philip Carns committed
551
static int bake_pmem_remove(backend_context_t context, bake_region_id_t rid)
552
553
554
555
556
557
558
{
    pmemobj_region_id_t* prid = (pmemobj_region_id_t*)rid.data;
    pmemobj_free(&prid->oid);
    return BAKE_SUCCESS;
}

static int bake_pmem_migrate_region(backend_context_t context,
Philip Carns's avatar
Philip Carns committed
559
560
561
562
563
564
565
                                    bake_region_id_t  source_rid,
                                    size_t            region_size,
                                    int               remove_source,
                                    const char*       dest_addr_str,
                                    uint16_t          dest_provider_id,
                                    bake_target_id_t  dest_target_id,
                                    bake_region_id_t* dest_rid)
566
{
Philip Carns's avatar
Philip Carns committed
567
    bake_pmem_entry_t*   entry = (bake_pmem_entry_t*)context;
568
    pmemobj_region_id_t* prid;
Philip Carns's avatar
Philip Carns committed
569
570
    hg_addr_t            dest_addr = HG_ADDR_NULL;
    int                  ret       = BAKE_SUCCESS;
571
572
573
574
575

    prid = (pmemobj_region_id_t*)source_rid.data;

    /* find memory address for target object */
    region_content_t* region = pmemobj_direct(prid->oid);
Philip Carns's avatar
Philip Carns committed
576
    if (!region) {
577
578
579
580
581
        ret = BAKE_ERR_UNKNOWN_REGION;
        goto finish;
    }

    /* get the size of the region */
Philip Carns's avatar
Philip Carns committed
582
    char* region_data = region->data;
583
584
585

#ifdef USE_SIZECHECK_HEADERS
    /* check region size */
Philip Carns's avatar
Philip Carns committed
586
    if (region_size != region->size) {
587
588
589
590
591
592
        ret = BAKE_ERR_INVALID_ARG;
        goto finish;
    }
#endif

    /* lookup the address of the destination provider */
Philip Carns's avatar
Philip Carns committed
593
594
595
    hg_return_t hret
        = margo_addr_lookup(entry->provider->mid, dest_addr_str, &dest_addr);
    if (hret != HG_SUCCESS) {
596
597
598
599
600
        ret = BAKE_ERR_MERCURY;
        goto finish;
    }

    { /* in this block we issue a create_write_persist to the destination */
Philip Carns's avatar
Philip Carns committed
601
602
        hg_handle_t                     cwp_handle = HG_HANDLE_NULL;
        bake_create_write_persist_in_t  cwp_in;
603
604
        bake_create_write_persist_out_t cwp_out;

Philip Carns's avatar
Philip Carns committed
605
606
607
        cwp_in.bti             = dest_target_id;
        cwp_in.bulk_offset     = 0;
        cwp_in.bulk_size       = region_size;
608
609
        cwp_in.remote_addr_str = NULL;

Philip Carns's avatar
Philip Carns committed
610
611
612
613
        hret = margo_bulk_create(entry->provider->mid, 1,
                                 (void**)(&region_data), &region_size,
                                 HG_BULK_READ_ONLY, &cwp_in.bulk_handle);
        if (hret != HG_SUCCESS) {
614
615
616
617
618
619
620
            ret = BAKE_ERR_MERCURY;
            goto finish_scope;
        }

        hret = margo_create(entry->provider->mid, dest_addr,
                            entry->provider->bake_create_write_persist_id,
                            &cwp_handle);
Philip Carns's avatar
Philip Carns committed
621
        if (hret != HG_SUCCESS) {
622
623
624
625
626
            ret = BAKE_ERR_MERCURY;
            goto finish_scope;
        }

        hret = margo_provider_forward(dest_provider_id, cwp_handle, &cwp_in);
Philip Carns's avatar
Philip Carns committed
627
        if (hret != HG_SUCCESS) {
628
629
630
631
632
            ret = BAKE_ERR_MERCURY;
            goto finish_scope;
        }

        hret = margo_get_output(cwp_handle, &cwp_out);
Philip Carns's avatar
Philip Carns committed
633
        if (hret != HG_SUCCESS) {
634
635
636
637
            ret = BAKE_ERR_MERCURY;
            goto finish_scope;
        }

Philip Carns's avatar
Philip Carns committed
638
        if (cwp_out.ret != BAKE_SUCCESS) {
639
640
641
642
643
            ret = cwp_out.ret;
            goto finish_scope;
        }

        *dest_rid = cwp_out.rid;
Philip Carns's avatar
Philip Carns committed
644
        ret       = BAKE_SUCCESS;
645
646
647
648
649
650
651

finish_scope:
        margo_free_output(cwp_handle, &cwp_out);
        margo_bulk_free(cwp_in.bulk_handle);
        margo_destroy(cwp_handle);
    } /* end of create-write-persist block */

Philip Carns's avatar
Philip Carns committed
652
    if (ret != BAKE_SUCCESS) goto finish;
653

Philip Carns's avatar
Philip Carns committed
654
    if (remove_source) { pmemobj_free(&prid->oid); }
655
656
657
658
659
660
661
662

finish:
    margo_addr_free(entry->provider->mid, dest_addr);
    return ret;
}

#ifdef USE_REMI
static int bake_pmem_create_fileset(backend_context_t context,
Philip Carns's avatar
Philip Carns committed
663
                                    remi_fileset_t*   fileset)
664
{
Philip Carns's avatar
Philip Carns committed
665
666
    bake_pmem_entry_t* entry = (bake_pmem_entry_t*)context;
    int                ret;
667
668
    /* create a fileset */
    ret = remi_fileset_create("bake", entry->root, fileset);
Philip Carns's avatar
Philip Carns committed
669
    if (ret != REMI_SUCCESS) {
670
671
672
673
674
675
        ret = BAKE_ERR_REMI;
        goto error;
    }

    /* fill the fileset */
    ret = remi_fileset_register_file(*fileset, entry->filename);
Philip Carns's avatar
Philip Carns committed
676
    if (ret != REMI_SUCCESS) {
677
678
679
680
681
682
683
684
685
686
687
688
689
        ret = BAKE_ERR_REMI;
        goto error;
    }

finish:
    return ret;
error:
    remi_fileset_free(*fileset);
    *fileset = NULL;
    goto finish;
}
#endif

690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
bake_backend g_bake_pmem_backend = {
    .name                       = "pmem",
    ._initialize                = bake_pmem_backend_initialize,
    ._finalize                  = bake_pmem_backend_finalize,
    ._create                    = bake_pmem_create,
    ._write_raw                 = bake_pmem_write_raw,
    ._write_bulk                = bake_pmem_write_bulk,
    ._read_raw                  = bake_pmem_read_raw,
    ._read_bulk                 = bake_pmem_read_bulk,
    ._persist                   = bake_pmem_persist,
    ._create_write_persist_raw  = bake_pmem_create_write_persist_raw,
    ._create_write_persist_bulk = bake_pmem_create_write_persist_bulk,
    ._get_region_size           = bake_pmem_get_region_size,
    ._get_region_data           = bake_pmem_get_region_data,
    ._remove                    = bake_pmem_remove,
    ._migrate_region            = bake_pmem_migrate_region,
706
    ._create_raw_target         = bake_pmem_makepool,
707
#ifdef USE_REMI
708
    ._create_fileset = bake_pmem_create_fileset,
709
#endif
710
};
711

Philip Carns's avatar
Philip Carns committed
712
static void xfer_ult(void* _args)
713
{
Philip Carns's avatar
Philip Carns committed
714
715
716
717
718
719
720
721
722
723
    struct xfer_args* args       = _args;
    hg_bulk_t         local_bulk = HG_BULK_NULL;
    size_t            this_size;
    char*             this_local_ptr;
    void*             local_bulk_ptr;
    size_t            this_remote_offset;
    size_t            tmp_buf_size;
    hg_uint32_t       tmp_count;
    int               turn_out_the_lights = 0;
    int               ret;
724
725
726
727
728
729

    /* Set up a loop here.  It may or may not get used; just depends on
     * timing of whether this ULT gets through a cycle before other ULTs
     * start running.  We don't care which ULT does the next chunk.
     */
    ABT_mutex_lock(args->mutex);
Philip Carns's avatar
Philip Carns committed
730
    while (args->bytes_issued < args->bulk_size && !args->ret) {
731
        /* calculate what work we will do in this cycle */
Philip Carns's avatar
Philip Carns committed
732
        if ((args->bulk_size - args->bytes_issued) > args->poolset_max_size)
733
734
735
            this_size = args->poolset_max_size;
        else
            this_size = args->bulk_size - args->bytes_issued;
Philip Carns's avatar
Philip Carns committed
736
        this_local_ptr     = args->local_ptr + args->bytes_issued;
737
738
739
740
741
742
743
744
745
746
        this_remote_offset = args->remote_offset + args->bytes_issued;

        /* update state */
        args->bytes_issued += this_size;

        /* drop mutex while we work on our local piece */
        ABT_mutex_unlock(args->mutex);

        /* get buffer */
        ret = margo_bulk_poolset_get(args->poolset, this_size, &local_bulk);
Philip Carns's avatar
Philip Carns committed
747
        if (ret != 0 && args->ret == 0) {
748
749
750
751
752
            args->ret = ret;
            goto finished;
        }

        /* find pointer of memory in buffer */
Philip Carns's avatar
Philip Carns committed
753
754
        ret = margo_bulk_access(local_bulk, 0, this_size, HG_BULK_READWRITE, 1,
                                &local_bulk_ptr, &tmp_buf_size, &tmp_count);
755
756
757
758
        /* shouldn't ever fail in this use case */
        assert(ret == 0);

        /* do the rdma transfer */
Philip Carns's avatar
Philip Carns committed
759
760
761
762
        ret = margo_bulk_transfer(args->mid, HG_BULK_PULL, args->remote_addr,
                                  args->remote_bulk, this_remote_offset,
                                  local_bulk, 0, this_size);
        if (ret != 0 && args->ret == 0) {
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
            args->ret = ret;
            goto finished;
        }

        /* copy to real destination */
        memcpy(this_local_ptr, local_bulk_ptr, this_size);

        /* let go of bulk handle */
        margo_bulk_poolset_release(args->poolset, local_bulk);
        local_bulk = HG_BULK_NULL;

        ABT_mutex_lock(args->mutex);
        args->bytes_retired += this_size;
    }
    /* TODO: think about this.  It is tempting to signal caller before all
     * of the threads have cleaned up, but if we do that then we need to get
     * args struct off of the caller's stack and free it here, otherwise
     * will go out of scope.
     */
#if 0
    if(args->bytes_retired == args->bulk_size && !args->done)
    {
        /* this is the first ULT to notice completion; signal caller */
        args->done = 1;
        ABT_eventual_set(args->eventual, NULL, 0);
    }
#endif

    ABT_mutex_unlock(args->mutex);

Philip Carns's avatar
Philip Carns committed
793
794
finished:
    if (local_bulk != HG_BULK_NULL)
795
796
797
        margo_bulk_poolset_release(args->poolset, local_bulk);
    ABT_mutex_lock(args->mutex);
    args->ults_active--;
Philip Carns's avatar
Philip Carns committed
798
    if (!args->ults_active) turn_out_the_lights = 1;
799
800
801
802
803
    ABT_mutex_unlock(args->mutex);

    /* last ULT to exit needs to clean up some resources, one else around
     * to touch mutex at this point
     */
Philip Carns's avatar
Philip Carns committed
804
    if (turn_out_the_lights) {
805
806
807
808
809
810
        ABT_mutex_free(&args->mutex);
        ABT_eventual_set(args->eventual, NULL, 0);
    }

    return;
}