bake-bulk-server.c 13.3 KB
Newer Older
Philip Carns's avatar
Philip Carns committed
1 2 3 4 5 6 7
/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <assert.h>
8
#include <bake-bulk-server.h>
9
#include <libpmemobj.h>
Philip Carns's avatar
Philip Carns committed
10 11
#include "bake-bulk-rpc.h"

12 13 14 15 16
/* definition of internal region_id_t identifier for libpmemobj back end */
typedef struct {
    PMEMoid oid;
    uint64_t size;
} pmemobj_region_id_t;
17

Philip Carns's avatar
Philip Carns committed
18 19 20
/* TODO: this should not be global in the long run; server may provide access
 * to multiple targets
 */
21 22 23
static PMEMobjpool *g_pmem_pool = NULL;
static struct bake_bulk_root *g_pmem_root = NULL;

24 25
struct bake_pool_info * bake_server_makepool(
	char *poolname)
26 27 28
{
    PMEMoid root_oid;
    char target_string[64];
29 30 31
    struct bake_pool_info *pool_info;

    pool_info = malloc(sizeof(*pool_info));
32 33

    /* open pmem pool */
34 35
    pool_info->bb_pmem_pool = pmemobj_open(poolname, NULL);
    if(!pool_info->bb_pmem_pool)
36 37
    {
        fprintf(stderr, "pmemobj_open: %s\n", pmemobj_errormsg());
38
        return(NULL);
39 40 41
    }

    /* find root */
42 43 44 45
    root_oid = pmemobj_root(pool_info->bb_pmem_pool,
	    sizeof(*(pool_info->bb_pmem_root)) );
    pool_info->bb_pmem_root = pmemobj_direct(root_oid);
    if(uuid_is_null(pool_info->bb_pmem_root->target_id.id))
46
    {
47 48 49
        uuid_generate(pool_info->bb_pmem_root->target_id.id);
        pmemobj_persist(pool_info->bb_pmem_pool,
		pool_info->bb_pmem_root, sizeof(*(pool_info->bb_pmem_root)) );
50
    }
51
    uuid_unparse(pool_info->bb_pmem_root->target_id.id, target_string);
52 53
    fprintf(stderr, "BAKE target ID: %s\n", target_string);

54
    return pool_info;
55 56
}

Philip Carns's avatar
Philip Carns committed
57

58 59
void bake_server_register(margo_instance_id mid,
	struct bake_pool_info *pool_info)
Philip Carns's avatar
Philip Carns committed
60
{
61
    /* register RPCs */
62 63 64
    MARGO_REGISTER(mid, "bake_bulk_shutdown_rpc", void, void,
        bake_bulk_shutdown_ult);
    MARGO_REGISTER(mid, "bake_bulk_create_rpc", bake_bulk_create_in_t,
65
        bake_bulk_create_out_t,
66 67
        bake_bulk_create_ult);
    MARGO_REGISTER(mid, "bake_bulk_write_rpc", bake_bulk_write_in_t,
68
        bake_bulk_write_out_t,
69 70
        bake_bulk_write_ult);
    MARGO_REGISTER(mid, "bake_bulk_eager_write_rpc", bake_bulk_eager_write_in_t,
71
        bake_bulk_eager_write_out_t,
72 73
        bake_bulk_eager_write_ult);
    MARGO_REGISTER(mid, "bake_bulk_eager_read_rpc", bake_bulk_eager_read_in_t,
74
        bake_bulk_eager_read_out_t,
75 76
        bake_bulk_eager_read_ult);
    MARGO_REGISTER(mid, "bake_bulk_persist_rpc", bake_bulk_persist_in_t,
77
        bake_bulk_persist_out_t,
78 79
        bake_bulk_persist_ult);
    MARGO_REGISTER(mid, "bake_bulk_get_size_rpc", bake_bulk_get_size_in_t,
80
        bake_bulk_get_size_out_t,
81 82
        bake_bulk_get_size_ult);
    MARGO_REGISTER(mid, "bake_bulk_read_rpc", bake_bulk_read_in_t,
83
        bake_bulk_read_out_t,
84 85
        bake_bulk_read_ult);
    MARGO_REGISTER(mid, "bake_bulk_probe_rpc", void,
86
        bake_bulk_probe_out_t,
87 88
        bake_bulk_probe_ult);
    MARGO_REGISTER(mid, "bake_bulk_noop_rpc", void,
89
        void,
90
        bake_bulk_noop_ult);
91 92

    /* set global pmem variables needed by the bake server */
93 94
    g_pmem_pool = pool_info->bb_pmem_pool;
    g_pmem_root = pool_info->bb_pmem_root;
95 96 97 98 99 100 101 102

    return;
}

/* service a remote RPC that instructs the server daemon to shut down */
static void bake_bulk_shutdown_ult(hg_handle_t handle)
{
    hg_return_t hret;
Philip Carns's avatar
Philip Carns committed
103 104
    margo_instance_id mid;

105 106
    // printf("Got RPC request to shutdown.\n");

107
    mid = margo_hg_handle_get_instance(handle);
108

109
    hret = margo_respond(handle, NULL);
110 111
    assert(hret == HG_SUCCESS);

112
    margo_destroy(handle);
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137

    /* NOTE: we assume that the server daemon is using
     * margo_wait_for_finalize() to suspend until this RPC executes, so there
     * is no need to send any extra signal to notify it.
     */
    margo_finalize(mid);

    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_shutdown_ult)

/* service a remote RPC that creates a bulk region */
static void bake_bulk_create_ult(hg_handle_t handle)
{
    bake_bulk_create_out_t out;
    bake_bulk_create_in_t in;
    hg_return_t hret;
    pmemobj_region_id_t* prid;

    /* TODO: this check needs to be somewhere else */
    assert(sizeof(pmemobj_region_id_t) <= BAKE_BULK_REGION_ID_DATA_SIZE);
    // printf("Got RPC request to create bulk region.\n");
    
    memset(&out, 0, sizeof(out));

138
    hret = margo_get_input(handle, &in);
139 140 141
    if(hret != HG_SUCCESS)
    {
        out.ret = -1;
142 143
        margo_respond(handle, &out);
        margo_destroy(handle);
144 145 146 147 148 149 150
        return;
    }

    prid = (pmemobj_region_id_t*)out.rid.data;
    prid->size = in.region_size;
    out.ret = pmemobj_alloc(g_pmem_pool, &prid->oid, in.region_size, 0, NULL, NULL);

151 152 153
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
154 155 156 157 158 159 160 161 162 163 164 165 166
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_create_ult)

/* service a remote RPC that writes to a bulk region */
static void bake_bulk_write_ult(hg_handle_t handle)
{
    bake_bulk_write_out_t out;
    bake_bulk_write_in_t in;
    hg_return_t hret;
    char* buffer;
    hg_size_t size;
    hg_bulk_t bulk_handle;
167
    const struct hg_info *hgi;
168 169 170 171 172 173 174
    margo_instance_id mid;
    pmemobj_region_id_t* prid;

    // printf("Got RPC request to write bulk region.\n");
    
    memset(&out, 0, sizeof(out));

175
    hgi = margo_get_info(handle);
176
    assert(hgi);
177
    mid = margo_hg_info_get_instance(hgi);
178

179
    hret = margo_get_input(handle, &in);
180 181 182
    if(hret != HG_SUCCESS)
    {
        out.ret = -1;
183 184
        margo_respond(handle, &out);
        margo_destroy(handle);
185 186 187 188 189 190 191 192 193 194
        return;
    }

    prid = (pmemobj_region_id_t*)in.rid.data;

    /* find memory address for target object */
    buffer = pmemobj_direct(prid->oid);
    if(!buffer)
    {
        out.ret = -1;
195 196 197
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
198 199 200
        return;
    }

201
    size = margo_bulk_get_size(in.bulk_handle);
202 203

    /* create bulk handle for local side of transfer */
204
    hret = margo_bulk_create(mid, 1, (void**)(&buffer), &size, 
205 206 207 208
        HG_BULK_WRITE_ONLY, &bulk_handle);
    if(hret != HG_SUCCESS)
    {
        out.ret = -1;
209 210 211
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
212 213 214 215 216 217
        return;
    }

    hret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, in.bulk_handle,
        0, bulk_handle, 0, size);
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
218
    {
219
        out.ret = -1;
220 221 222 223
        margo_bulk_free(bulk_handle);
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
224
        return;
Philip Carns's avatar
Philip Carns committed
225 226
    }

227 228
    out.ret = 0;

229 230 231 232
    margo_bulk_free(bulk_handle);
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_write_ult)


/* service a remote RPC that writes to a bulk region in eager mode */
static void bake_bulk_eager_write_ult(hg_handle_t handle)
{
    bake_bulk_eager_write_out_t out;
    bake_bulk_eager_write_in_t in;
    hg_return_t hret;
    char* buffer;
    hg_bulk_t bulk_handle;
    pmemobj_region_id_t* prid;

    // printf("Got RPC request to write bulk region.\n");
    
    memset(&out, 0, sizeof(out));

252
    hret = margo_get_input(handle, &in);
253 254 255
    if(hret != HG_SUCCESS)
    {
        out.ret = -1;
256 257
        margo_respond(handle, &out);
        margo_destroy(handle);
258 259 260 261 262 263 264 265
        return;
    }

    prid = (pmemobj_region_id_t*)in.rid.data;

    /* find memory address for target object */
    buffer = pmemobj_direct(prid->oid);
    if(!buffer)
266
    {
267
        out.ret = -1;
268 269 270
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
271
        return;
272
    }
273 274 275 276 277

    memcpy(buffer, in.buffer, in.size);

    out.ret = 0;

278 279 280
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
281 282 283 284 285 286 287 288 289 290 291 292 293 294
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_eager_write_ult)

/* service a remote RPC that persists to a bulk region */
static void bake_bulk_persist_ult(hg_handle_t handle)
{
    bake_bulk_persist_out_t out;
    bake_bulk_persist_in_t in;
    hg_return_t hret;
    char* buffer;
    pmemobj_region_id_t* prid;

    // printf("Got RPC request to persist bulk region.\n");
295
    
296 297
    memset(&out, 0, sizeof(out));

298
    hret = margo_get_input(handle, &in);
299
    if(hret != HG_SUCCESS)
300
    {
301
        out.ret = -1;
302 303
        margo_respond(handle, &out);
        margo_destroy(handle);
304
        return;
305
    }
306

307 308 309 310 311
    prid = (pmemobj_region_id_t*)in.rid.data;

    /* find memory address for target object */
    buffer = pmemobj_direct(prid->oid);
    if(!buffer)
Philip Carns's avatar
Philip Carns committed
312
    {
313
        out.ret = -1;
314 315 316
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
317
        return;
Philip Carns's avatar
Philip Carns committed
318
    }
319 320 321 322 323 324

    /* TODO: should this have an abt shim in case it blocks? */
    pmemobj_persist(g_pmem_pool, buffer, prid->size);

    out.ret = 0;

325 326 327
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_persist_ult)

/* service a remote RPC that retrieves the size of a bulk region */
static void bake_bulk_get_size_ult(hg_handle_t handle)
{
    bake_bulk_get_size_out_t out;
    bake_bulk_get_size_in_t in;
    hg_return_t hret;
    pmemobj_region_id_t* prid;

    // printf("Got RPC request to get_size bulk region.\n");
    
    memset(&out, 0, sizeof(out));

344
    hret = margo_get_input(handle, &in);
345
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
346
    {
347
        out.ret = -1;
348 349
        margo_respond(handle, &out);
        margo_destroy(handle);
350
        return;
Philip Carns's avatar
Philip Carns committed
351 352
    }

353 354 355 356 357 358
    prid = (pmemobj_region_id_t*)in.rid.data;

    /* kind of cheating here; the size is encoded in the RID */
    out.size = prid->size;
    out.ret = 0;

359 360 361
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
362 363 364 365 366 367 368 369 370
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_get_size_ult)

/* service a remote RPC for a no-op */
static void bake_bulk_noop_ult(hg_handle_t handle)
{
    // printf("Got RPC request to noop bulk region.\n");

371 372
    margo_respond(handle, NULL);
    margo_destroy(handle);
373 374 375 376 377 378 379 380 381 382 383 384 385 386
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_noop_ult)

/* TODO consolidate with write handler; read and write are nearly identical */
/* service a remote RPC that reads to a bulk region */
static void bake_bulk_read_ult(hg_handle_t handle)
{
    bake_bulk_read_out_t out;
    bake_bulk_read_in_t in;
    hg_return_t hret;
    char* buffer;
    hg_size_t size;
    hg_bulk_t bulk_handle;
387
    const struct hg_info *hgi;
388 389 390 391 392 393 394
    margo_instance_id mid;
    pmemobj_region_id_t* prid;

    // printf("Got RPC request to read bulk region.\n");
    
    memset(&out, 0, sizeof(out));

395
    hgi = margo_get_info(handle);
396
    assert(hgi);
397
    mid = margo_hg_info_get_instance(hgi);
398

399
    hret = margo_get_input(handle, &in);
400
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
401
    {
402
        out.ret = -1;
403 404
        margo_respond(handle, &out);
        margo_destroy(handle);
405
        return;
Philip Carns's avatar
Philip Carns committed
406 407
    }

408 409 410 411 412
    prid = (pmemobj_region_id_t*)in.rid.data;

    /* find memory address for target object */
    buffer = pmemobj_direct(prid->oid);
    if(!buffer)
Philip Carns's avatar
Philip Carns committed
413
    {
414
        out.ret = -1;
415 416 417
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
418
        return;
Philip Carns's avatar
Philip Carns committed
419 420
    }

421
    size = margo_bulk_get_size(in.bulk_handle);
422 423

    /* create bulk handle for local side of transfer */
424
    hret = margo_bulk_create(mid, 1, (void**)(&buffer), &size, 
425 426
        HG_BULK_READ_ONLY, &bulk_handle);
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
427
    {
428
        out.ret = -1;
429 430 431
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
432
        return;
Philip Carns's avatar
Philip Carns committed
433
    }
434 435 436 437

    hret = margo_bulk_transfer(mid, HG_BULK_PUSH, hgi->addr, in.bulk_handle,
        0, bulk_handle, 0, size);
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
438
    {
439
        out.ret = -1;
440 441 442 443
        margo_bulk_free(bulk_handle);
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
444
        return;
Philip Carns's avatar
Philip Carns committed
445 446
    }

447
    out.ret = 0;
Philip Carns's avatar
Philip Carns committed
448

449 450 451 452
    margo_bulk_free(bulk_handle);
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
453 454 455
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_read_ult)
Philip Carns's avatar
Philip Carns committed
456 457


458 459 460 461 462 463 464 465 466 467 468 469 470 471 472
/* service a remote RPC that reads to a bulk region and eagerly sends
 * response */
static void bake_bulk_eager_read_ult(hg_handle_t handle)
{
    bake_bulk_eager_read_out_t out;
    bake_bulk_eager_read_in_t in;
    hg_return_t hret;
    char* buffer;
    hg_size_t size;
    pmemobj_region_id_t* prid;

    // printf("Got RPC request to read bulk region.\n");
    
    memset(&out, 0, sizeof(out));

473
    hret = margo_get_input(handle, &in);
474 475 476
    if(hret != HG_SUCCESS)
    {
        out.ret = -1;
477 478
        margo_respond(handle, &out);
        margo_destroy(handle);
479 480 481 482
        return;
    }

    prid = (pmemobj_region_id_t*)in.rid.data;
Philip Carns's avatar
Philip Carns committed
483

484 485 486 487 488
    /* find memory address for target object */
    buffer = pmemobj_direct(prid->oid);
    if(!buffer)
    {
        out.ret = -1;
489 490 491
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
492 493
        return;
    }
494

495 496 497 498
    out.ret = 0;
    out.buffer = buffer;
    out.size = in.size;

499 500 501
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
502
    return;
Philip Carns's avatar
Philip Carns committed
503
}
504 505 506 507 508 509 510 511 512 513 514 515 516 517
DEFINE_MARGO_RPC_HANDLER(bake_bulk_eager_read_ult)

/* service a remote RPC that probes for a target id */
static void bake_bulk_probe_ult(hg_handle_t handle)
{
    bake_bulk_probe_out_t out;

    // printf("Got RPC request to probe bulk region.\n");
    
    memset(&out, 0, sizeof(out));

    out.ret = 0;
    out.bti = g_pmem_root->target_id;

518 519
    margo_respond(handle, &out);
    margo_destroy(handle);
520 521 522 523
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_probe_ult)

Philip Carns's avatar
Philip Carns committed
524