bake-bulk-server.c 14 KB
Newer Older
Philip Carns's avatar
Philip Carns committed
1 2 3 4 5 6 7
/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <assert.h>
8
#include <bake-bulk-server.h>
9
#include <libpmemobj.h>
Philip Carns's avatar
Philip Carns committed
10 11
#include "bake-bulk-rpc.h"

12 13 14 15 16
/* definition of internal region_id_t identifier for libpmemobj back end */
typedef struct {
    PMEMoid oid;
    uint64_t size;
} pmemobj_region_id_t;
17

Philip Carns's avatar
Philip Carns committed
18 19 20
/* TODO: this should not be global in the long run; server may provide access
 * to multiple targets
 */
21 22 23
static PMEMobjpool *g_pmem_pool = NULL;
static struct bake_bulk_root *g_pmem_root = NULL;

24
struct bake_pool_info * bake_server_makepool(
Rob Latham's avatar
Rob Latham committed
25
	const char *poolname)
26 27 28
{
    PMEMoid root_oid;
    char target_string[64];
29 30 31
    struct bake_pool_info *pool_info;

    pool_info = malloc(sizeof(*pool_info));
32 33

    /* open pmem pool */
34 35
    pool_info->bb_pmem_pool = pmemobj_open(poolname, NULL);
    if(!pool_info->bb_pmem_pool)
36 37
    {
        fprintf(stderr, "pmemobj_open: %s\n", pmemobj_errormsg());
38
        return(NULL);
39 40 41
    }

    /* find root */
42 43 44 45
    root_oid = pmemobj_root(pool_info->bb_pmem_pool,
	    sizeof(*(pool_info->bb_pmem_root)) );
    pool_info->bb_pmem_root = pmemobj_direct(root_oid);
    if(uuid_is_null(pool_info->bb_pmem_root->target_id.id))
46
    {
47 48 49
        uuid_generate(pool_info->bb_pmem_root->target_id.id);
        pmemobj_persist(pool_info->bb_pmem_pool,
		pool_info->bb_pmem_root, sizeof(*(pool_info->bb_pmem_root)) );
50
    }
51
    uuid_unparse(pool_info->bb_pmem_root->target_id.id, target_string);
52 53
    fprintf(stderr, "BAKE target ID: %s\n", target_string);

54
    return pool_info;
55 56
}

Philip Carns's avatar
Philip Carns committed
57

58 59
void bake_server_register(margo_instance_id mid,
	struct bake_pool_info *pool_info)
Philip Carns's avatar
Philip Carns committed
60
{
61
    /* register RPCs */
62 63 64
    MARGO_REGISTER(mid, "bake_bulk_shutdown_rpc", void, void,
        bake_bulk_shutdown_ult);
    MARGO_REGISTER(mid, "bake_bulk_create_rpc", bake_bulk_create_in_t,
65
        bake_bulk_create_out_t,
66 67
        bake_bulk_create_ult);
    MARGO_REGISTER(mid, "bake_bulk_write_rpc", bake_bulk_write_in_t,
68
        bake_bulk_write_out_t,
69 70
        bake_bulk_write_ult);
    MARGO_REGISTER(mid, "bake_bulk_eager_write_rpc", bake_bulk_eager_write_in_t,
71
        bake_bulk_eager_write_out_t,
72 73
        bake_bulk_eager_write_ult);
    MARGO_REGISTER(mid, "bake_bulk_eager_read_rpc", bake_bulk_eager_read_in_t,
74
        bake_bulk_eager_read_out_t,
75 76
        bake_bulk_eager_read_ult);
    MARGO_REGISTER(mid, "bake_bulk_persist_rpc", bake_bulk_persist_in_t,
77
        bake_bulk_persist_out_t,
78 79
        bake_bulk_persist_ult);
    MARGO_REGISTER(mid, "bake_bulk_get_size_rpc", bake_bulk_get_size_in_t,
80
        bake_bulk_get_size_out_t,
81 82
        bake_bulk_get_size_ult);
    MARGO_REGISTER(mid, "bake_bulk_read_rpc", bake_bulk_read_in_t,
83
        bake_bulk_read_out_t,
84 85
        bake_bulk_read_ult);
    MARGO_REGISTER(mid, "bake_bulk_probe_rpc", void,
86
        bake_bulk_probe_out_t,
87 88
        bake_bulk_probe_ult);
    MARGO_REGISTER(mid, "bake_bulk_noop_rpc", void,
89
        void,
90
        bake_bulk_noop_ult);
91 92

    /* set global pmem variables needed by the bake server */
93 94
    g_pmem_pool = pool_info->bb_pmem_pool;
    g_pmem_root = pool_info->bb_pmem_root;
95 96 97 98 99 100 101 102

    return;
}

/* service a remote RPC that instructs the server daemon to shut down */
static void bake_bulk_shutdown_ult(hg_handle_t handle)
{
    hg_return_t hret;
Philip Carns's avatar
Philip Carns committed
103 104
    margo_instance_id mid;

105 106
    // printf("Got RPC request to shutdown.\n");

107
    mid = margo_hg_handle_get_instance(handle);
108

109
    hret = margo_respond(handle, NULL);
110 111
    assert(hret == HG_SUCCESS);

112
    margo_destroy(handle);
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137

    /* NOTE: we assume that the server daemon is using
     * margo_wait_for_finalize() to suspend until this RPC executes, so there
     * is no need to send any extra signal to notify it.
     */
    margo_finalize(mid);

    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_shutdown_ult)

/* service a remote RPC that creates a bulk region */
static void bake_bulk_create_ult(hg_handle_t handle)
{
    bake_bulk_create_out_t out;
    bake_bulk_create_in_t in;
    hg_return_t hret;
    pmemobj_region_id_t* prid;

    /* TODO: this check needs to be somewhere else */
    assert(sizeof(pmemobj_region_id_t) <= BAKE_BULK_REGION_ID_DATA_SIZE);
    // printf("Got RPC request to create bulk region.\n");
    
    memset(&out, 0, sizeof(out));

138
    hret = margo_get_input(handle, &in);
139 140 141
    if(hret != HG_SUCCESS)
    {
        out.ret = -1;
142 143
        margo_respond(handle, &out);
        margo_destroy(handle);
144 145 146 147 148 149 150
        return;
    }

    prid = (pmemobj_region_id_t*)out.rid.data;
    prid->size = in.region_size;
    out.ret = pmemobj_alloc(g_pmem_pool, &prid->oid, in.region_size, 0, NULL, NULL);

151 152 153
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
154 155 156 157 158 159 160 161 162 163
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_create_ult)

/* service a remote RPC that writes to a bulk region */
static void bake_bulk_write_ult(hg_handle_t handle)
{
    bake_bulk_write_out_t out;
    bake_bulk_write_in_t in;
    hg_return_t hret;
164
    hg_addr_t src_addr;
165 166 167
    char* buffer;
    hg_size_t size;
    hg_bulk_t bulk_handle;
168
    const struct hg_info *hgi;
169 170 171 172 173 174 175
    margo_instance_id mid;
    pmemobj_region_id_t* prid;

    // printf("Got RPC request to write bulk region.\n");
    
    memset(&out, 0, sizeof(out));

176
    hgi = margo_get_info(handle);
177
    assert(hgi);
178
    mid = margo_hg_info_get_instance(hgi);
179

180
    hret = margo_get_input(handle, &in);
181 182 183
    if(hret != HG_SUCCESS)
    {
        out.ret = -1;
184 185
        margo_respond(handle, &out);
        margo_destroy(handle);
186 187 188 189 190 191 192 193 194 195
        return;
    }

    prid = (pmemobj_region_id_t*)in.rid.data;

    /* find memory address for target object */
    buffer = pmemobj_direct(prid->oid);
    if(!buffer)
    {
        out.ret = -1;
196 197 198
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
199 200 201
        return;
    }

202
    size = margo_bulk_get_size(in.bulk_handle);
203 204

    /* create bulk handle for local side of transfer */
205
    hret = margo_bulk_create(mid, 1, (void**)(&buffer), &size, 
206 207 208 209
        HG_BULK_WRITE_ONLY, &bulk_handle);
    if(hret != HG_SUCCESS)
    {
        out.ret = -1;
210 211 212
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
213 214 215
        return;
    }

216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
    if(in.remote_addr_str)
    {
        /* a proxy address was provided to send write data to */
        hret = margo_addr_lookup(mid, in.remote_addr_str, &src_addr);
        if(hret != HG_SUCCESS)
        {
            out.ret = -1;
            margo_bulk_free(bulk_handle);
            margo_free_input(handle, &in);
            margo_respond(handle, &out);
            margo_destroy(handle);
            return;
        }
    }
    else
    {
        /* no proxy write, use the source of this request */
        src_addr = hgi->addr;
    }

    hret = margo_bulk_transfer(mid, HG_BULK_PULL, src_addr, in.bulk_handle,
237 238
        0, bulk_handle, 0, size);
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
239
    {
240
        out.ret = -1;
241 242
        if(in.remote_addr_str)
            margo_addr_free(mid, src_addr);
243 244 245 246
        margo_bulk_free(bulk_handle);
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
247
        return;
Philip Carns's avatar
Philip Carns committed
248 249
    }

250 251
    out.ret = 0;

252 253
    if(in.remote_addr_str)
        margo_addr_free(mid, src_addr);
254 255 256 257
    margo_bulk_free(bulk_handle);
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_write_ult)


/* service a remote RPC that writes to a bulk region in eager mode */
static void bake_bulk_eager_write_ult(hg_handle_t handle)
{
    bake_bulk_eager_write_out_t out;
    bake_bulk_eager_write_in_t in;
    hg_return_t hret;
    char* buffer;
    hg_bulk_t bulk_handle;
    pmemobj_region_id_t* prid;

    // printf("Got RPC request to write bulk region.\n");
    
    memset(&out, 0, sizeof(out));

277
    hret = margo_get_input(handle, &in);
278 279 280
    if(hret != HG_SUCCESS)
    {
        out.ret = -1;
281 282
        margo_respond(handle, &out);
        margo_destroy(handle);
283 284 285 286 287 288 289 290
        return;
    }

    prid = (pmemobj_region_id_t*)in.rid.data;

    /* find memory address for target object */
    buffer = pmemobj_direct(prid->oid);
    if(!buffer)
291
    {
292
        out.ret = -1;
293 294 295
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
296
        return;
297
    }
298 299 300 301 302

    memcpy(buffer, in.buffer, in.size);

    out.ret = 0;

303 304 305
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
306 307 308 309 310 311 312 313 314 315 316 317 318 319
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_eager_write_ult)

/* service a remote RPC that persists to a bulk region */
static void bake_bulk_persist_ult(hg_handle_t handle)
{
    bake_bulk_persist_out_t out;
    bake_bulk_persist_in_t in;
    hg_return_t hret;
    char* buffer;
    pmemobj_region_id_t* prid;

    // printf("Got RPC request to persist bulk region.\n");
320
    
321 322
    memset(&out, 0, sizeof(out));

323
    hret = margo_get_input(handle, &in);
324
    if(hret != HG_SUCCESS)
325
    {
326
        out.ret = -1;
327 328
        margo_respond(handle, &out);
        margo_destroy(handle);
329
        return;
330
    }
331

332 333 334 335 336
    prid = (pmemobj_region_id_t*)in.rid.data;

    /* find memory address for target object */
    buffer = pmemobj_direct(prid->oid);
    if(!buffer)
Philip Carns's avatar
Philip Carns committed
337
    {
338
        out.ret = -1;
339 340 341
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
342
        return;
Philip Carns's avatar
Philip Carns committed
343
    }
344 345 346 347 348 349

    /* TODO: should this have an abt shim in case it blocks? */
    pmemobj_persist(g_pmem_pool, buffer, prid->size);

    out.ret = 0;

350 351 352
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_persist_ult)

/* service a remote RPC that retrieves the size of a bulk region */
static void bake_bulk_get_size_ult(hg_handle_t handle)
{
    bake_bulk_get_size_out_t out;
    bake_bulk_get_size_in_t in;
    hg_return_t hret;
    pmemobj_region_id_t* prid;

    // printf("Got RPC request to get_size bulk region.\n");
    
    memset(&out, 0, sizeof(out));

369
    hret = margo_get_input(handle, &in);
370
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
371
    {
372
        out.ret = -1;
373 374
        margo_respond(handle, &out);
        margo_destroy(handle);
375
        return;
Philip Carns's avatar
Philip Carns committed
376 377
    }

378 379 380 381 382 383
    prid = (pmemobj_region_id_t*)in.rid.data;

    /* kind of cheating here; the size is encoded in the RID */
    out.size = prid->size;
    out.ret = 0;

384 385 386
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
387 388 389 390 391 392 393 394 395
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_get_size_ult)

/* service a remote RPC for a no-op */
static void bake_bulk_noop_ult(hg_handle_t handle)
{
    // printf("Got RPC request to noop bulk region.\n");

396 397
    margo_respond(handle, NULL);
    margo_destroy(handle);
398 399 400 401 402 403 404 405 406 407 408 409 410 411
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_noop_ult)

/* TODO consolidate with write handler; read and write are nearly identical */
/* service a remote RPC that reads to a bulk region */
static void bake_bulk_read_ult(hg_handle_t handle)
{
    bake_bulk_read_out_t out;
    bake_bulk_read_in_t in;
    hg_return_t hret;
    char* buffer;
    hg_size_t size;
    hg_bulk_t bulk_handle;
412
    const struct hg_info *hgi;
413 414 415 416 417 418 419
    margo_instance_id mid;
    pmemobj_region_id_t* prid;

    // printf("Got RPC request to read bulk region.\n");
    
    memset(&out, 0, sizeof(out));

420
    hgi = margo_get_info(handle);
421
    assert(hgi);
422
    mid = margo_hg_info_get_instance(hgi);
423

424
    hret = margo_get_input(handle, &in);
425
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
426
    {
427
        out.ret = -1;
428 429
        margo_respond(handle, &out);
        margo_destroy(handle);
430
        return;
Philip Carns's avatar
Philip Carns committed
431 432
    }

433 434 435 436 437
    prid = (pmemobj_region_id_t*)in.rid.data;

    /* find memory address for target object */
    buffer = pmemobj_direct(prid->oid);
    if(!buffer)
Philip Carns's avatar
Philip Carns committed
438
    {
439
        out.ret = -1;
440 441 442
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
443
        return;
Philip Carns's avatar
Philip Carns committed
444 445
    }

446
    size = margo_bulk_get_size(in.bulk_handle);
447 448

    /* create bulk handle for local side of transfer */
449
    hret = margo_bulk_create(mid, 1, (void**)(&buffer), &size, 
450 451
        HG_BULK_READ_ONLY, &bulk_handle);
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
452
    {
453
        out.ret = -1;
454 455 456
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
457
        return;
Philip Carns's avatar
Philip Carns committed
458
    }
459 460 461 462

    hret = margo_bulk_transfer(mid, HG_BULK_PUSH, hgi->addr, in.bulk_handle,
        0, bulk_handle, 0, size);
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
463
    {
464
        out.ret = -1;
465 466 467 468
        margo_bulk_free(bulk_handle);
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
469
        return;
Philip Carns's avatar
Philip Carns committed
470 471
    }

472
    out.ret = 0;
Philip Carns's avatar
Philip Carns committed
473

474 475 476 477
    margo_bulk_free(bulk_handle);
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
478 479 480
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_read_ult)
Philip Carns's avatar
Philip Carns committed
481 482


483 484 485 486 487 488 489 490 491 492 493 494 495 496 497
/* service a remote RPC that reads to a bulk region and eagerly sends
 * response */
static void bake_bulk_eager_read_ult(hg_handle_t handle)
{
    bake_bulk_eager_read_out_t out;
    bake_bulk_eager_read_in_t in;
    hg_return_t hret;
    char* buffer;
    hg_size_t size;
    pmemobj_region_id_t* prid;

    // printf("Got RPC request to read bulk region.\n");
    
    memset(&out, 0, sizeof(out));

498
    hret = margo_get_input(handle, &in);
499 500 501
    if(hret != HG_SUCCESS)
    {
        out.ret = -1;
502 503
        margo_respond(handle, &out);
        margo_destroy(handle);
504 505 506 507
        return;
    }

    prid = (pmemobj_region_id_t*)in.rid.data;
Philip Carns's avatar
Philip Carns committed
508

509 510 511 512 513
    /* find memory address for target object */
    buffer = pmemobj_direct(prid->oid);
    if(!buffer)
    {
        out.ret = -1;
514 515 516
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
517 518
        return;
    }
519

520 521 522 523
    out.ret = 0;
    out.buffer = buffer;
    out.size = in.size;

524 525 526
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
527
    return;
Philip Carns's avatar
Philip Carns committed
528
}
529 530 531 532 533 534 535 536 537 538 539 540 541 542
DEFINE_MARGO_RPC_HANDLER(bake_bulk_eager_read_ult)

/* service a remote RPC that probes for a target id */
static void bake_bulk_probe_ult(hg_handle_t handle)
{
    bake_bulk_probe_out_t out;

    // printf("Got RPC request to probe bulk region.\n");
    
    memset(&out, 0, sizeof(out));

    out.ret = 0;
    out.bti = g_pmem_root->target_id;

543 544
    margo_respond(handle, &out);
    margo_destroy(handle);
545 546 547 548
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_probe_ult)

Philip Carns's avatar
Philip Carns committed
549