bake-bulk-server.c 12.4 KB
Newer Older
Philip Carns's avatar
Philip Carns committed
1 2 3 4 5 6 7
/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <assert.h>
8
#include <bake-bulk-server.h>
9
#include <libpmemobj.h>
Philip Carns's avatar
Philip Carns committed
10 11
#include "bake-bulk-rpc.h"

12 13 14 15 16
/* definition of internal region_id_t identifier for libpmemobj back end */
typedef struct {
    PMEMoid oid;
    uint64_t size;
} pmemobj_region_id_t;
17

Philip Carns's avatar
Philip Carns committed
18 19 20
/* TODO: this should not be global in the long run; server may provide access
 * to multiple targets
 */
21 22 23
static PMEMobjpool *g_pmem_pool = NULL;
static struct bake_bulk_root *g_pmem_root = NULL;

Philip Carns's avatar
Philip Carns committed
24

25 26
void bake_server_register(margo_instance_id mid, PMEMobjpool *bb_pmem_pool,
    struct bake_bulk_root *bb_pmem_root)
Philip Carns's avatar
Philip Carns committed
27
{
28
    /* register RPCs */
29 30 31
    MARGO_REGISTER(mid, "bake_bulk_shutdown_rpc", void, void,
        bake_bulk_shutdown_ult);
    MARGO_REGISTER(mid, "bake_bulk_create_rpc", bake_bulk_create_in_t,
32
        bake_bulk_create_out_t,
33 34
        bake_bulk_create_ult);
    MARGO_REGISTER(mid, "bake_bulk_write_rpc", bake_bulk_write_in_t,
35
        bake_bulk_write_out_t,
36 37
        bake_bulk_write_ult);
    MARGO_REGISTER(mid, "bake_bulk_eager_write_rpc", bake_bulk_eager_write_in_t,
38
        bake_bulk_eager_write_out_t,
39 40
        bake_bulk_eager_write_ult);
    MARGO_REGISTER(mid, "bake_bulk_eager_read_rpc", bake_bulk_eager_read_in_t,
41
        bake_bulk_eager_read_out_t,
42 43
        bake_bulk_eager_read_ult);
    MARGO_REGISTER(mid, "bake_bulk_persist_rpc", bake_bulk_persist_in_t,
44
        bake_bulk_persist_out_t,
45 46
        bake_bulk_persist_ult);
    MARGO_REGISTER(mid, "bake_bulk_get_size_rpc", bake_bulk_get_size_in_t,
47
        bake_bulk_get_size_out_t,
48 49
        bake_bulk_get_size_ult);
    MARGO_REGISTER(mid, "bake_bulk_read_rpc", bake_bulk_read_in_t,
50
        bake_bulk_read_out_t,
51 52
        bake_bulk_read_ult);
    MARGO_REGISTER(mid, "bake_bulk_probe_rpc", void,
53
        bake_bulk_probe_out_t,
54 55
        bake_bulk_probe_ult);
    MARGO_REGISTER(mid, "bake_bulk_noop_rpc", void,
56
        void,
57
        bake_bulk_noop_ult);
58 59 60 61 62 63 64 65 66 67 68 69

    /* set global pmem variables needed by the bake server */
    g_pmem_pool = bb_pmem_pool;
    g_pmem_root = bb_pmem_root;

    return;
}

/* service a remote RPC that instructs the server daemon to shut down */
static void bake_bulk_shutdown_ult(hg_handle_t handle)
{
    hg_return_t hret;
Philip Carns's avatar
Philip Carns committed
70 71
    margo_instance_id mid;

72 73
    // printf("Got RPC request to shutdown.\n");

74
    mid = margo_hg_handle_get_instance(handle);
75

76
    hret = margo_respond(handle, NULL);
77 78
    assert(hret == HG_SUCCESS);

79
    margo_destroy(handle);
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104

    /* NOTE: we assume that the server daemon is using
     * margo_wait_for_finalize() to suspend until this RPC executes, so there
     * is no need to send any extra signal to notify it.
     */
    margo_finalize(mid);

    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_shutdown_ult)

/* service a remote RPC that creates a bulk region */
static void bake_bulk_create_ult(hg_handle_t handle)
{
    bake_bulk_create_out_t out;
    bake_bulk_create_in_t in;
    hg_return_t hret;
    pmemobj_region_id_t* prid;

    /* TODO: this check needs to be somewhere else */
    assert(sizeof(pmemobj_region_id_t) <= BAKE_BULK_REGION_ID_DATA_SIZE);
    // printf("Got RPC request to create bulk region.\n");
    
    memset(&out, 0, sizeof(out));

105
    hret = margo_get_input(handle, &in);
106 107 108
    if(hret != HG_SUCCESS)
    {
        out.ret = -1;
109 110
        margo_respond(handle, &out);
        margo_destroy(handle);
111 112 113 114 115 116 117
        return;
    }

    prid = (pmemobj_region_id_t*)out.rid.data;
    prid->size = in.region_size;
    out.ret = pmemobj_alloc(g_pmem_pool, &prid->oid, in.region_size, 0, NULL, NULL);

118 119 120
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
121 122 123 124 125 126 127 128 129 130 131 132 133
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_create_ult)

/* service a remote RPC that writes to a bulk region */
static void bake_bulk_write_ult(hg_handle_t handle)
{
    bake_bulk_write_out_t out;
    bake_bulk_write_in_t in;
    hg_return_t hret;
    char* buffer;
    hg_size_t size;
    hg_bulk_t bulk_handle;
134
    const struct hg_info *hgi;
135 136 137 138 139 140 141
    margo_instance_id mid;
    pmemobj_region_id_t* prid;

    // printf("Got RPC request to write bulk region.\n");
    
    memset(&out, 0, sizeof(out));

142
    hgi = margo_get_info(handle);
143
    assert(hgi);
144
    mid = margo_hg_info_get_instance(hgi);
145

146
    hret = margo_get_input(handle, &in);
147 148 149
    if(hret != HG_SUCCESS)
    {
        out.ret = -1;
150 151
        margo_respond(handle, &out);
        margo_destroy(handle);
152 153 154 155 156 157 158 159 160 161
        return;
    }

    prid = (pmemobj_region_id_t*)in.rid.data;

    /* find memory address for target object */
    buffer = pmemobj_direct(prid->oid);
    if(!buffer)
    {
        out.ret = -1;
162 163 164
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
165 166 167
        return;
    }

168
    size = margo_bulk_get_size(in.bulk_handle);
169 170

    /* create bulk handle for local side of transfer */
171
    hret = margo_bulk_create(mid, 1, (void**)(&buffer), &size, 
172 173 174 175
        HG_BULK_WRITE_ONLY, &bulk_handle);
    if(hret != HG_SUCCESS)
    {
        out.ret = -1;
176 177 178
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
179 180 181 182 183 184
        return;
    }

    hret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, in.bulk_handle,
        0, bulk_handle, 0, size);
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
185
    {
186
        out.ret = -1;
187 188 189 190
        margo_bulk_free(bulk_handle);
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
191
        return;
Philip Carns's avatar
Philip Carns committed
192 193
    }

194 195
    out.ret = 0;

196 197 198 199
    margo_bulk_free(bulk_handle);
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_write_ult)


/* service a remote RPC that writes to a bulk region in eager mode */
static void bake_bulk_eager_write_ult(hg_handle_t handle)
{
    bake_bulk_eager_write_out_t out;
    bake_bulk_eager_write_in_t in;
    hg_return_t hret;
    char* buffer;
    hg_bulk_t bulk_handle;
    pmemobj_region_id_t* prid;

    // printf("Got RPC request to write bulk region.\n");
    
    memset(&out, 0, sizeof(out));

219
    hret = margo_get_input(handle, &in);
220 221 222
    if(hret != HG_SUCCESS)
    {
        out.ret = -1;
223 224
        margo_respond(handle, &out);
        margo_destroy(handle);
225 226 227 228 229 230 231 232
        return;
    }

    prid = (pmemobj_region_id_t*)in.rid.data;

    /* find memory address for target object */
    buffer = pmemobj_direct(prid->oid);
    if(!buffer)
233
    {
234
        out.ret = -1;
235 236 237
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
238
        return;
239
    }
240 241 242 243 244

    memcpy(buffer, in.buffer, in.size);

    out.ret = 0;

245 246 247
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
248 249 250 251 252 253 254 255 256 257 258 259 260 261
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_eager_write_ult)

/* service a remote RPC that persists to a bulk region */
static void bake_bulk_persist_ult(hg_handle_t handle)
{
    bake_bulk_persist_out_t out;
    bake_bulk_persist_in_t in;
    hg_return_t hret;
    char* buffer;
    pmemobj_region_id_t* prid;

    // printf("Got RPC request to persist bulk region.\n");
262
    
263 264
    memset(&out, 0, sizeof(out));

265
    hret = margo_get_input(handle, &in);
266
    if(hret != HG_SUCCESS)
267
    {
268
        out.ret = -1;
269 270
        margo_respond(handle, &out);
        margo_destroy(handle);
271
        return;
272
    }
273

274 275 276 277 278
    prid = (pmemobj_region_id_t*)in.rid.data;

    /* find memory address for target object */
    buffer = pmemobj_direct(prid->oid);
    if(!buffer)
Philip Carns's avatar
Philip Carns committed
279
    {
280
        out.ret = -1;
281 282 283
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
284
        return;
Philip Carns's avatar
Philip Carns committed
285
    }
286 287 288 289 290 291

    /* TODO: should this have an abt shim in case it blocks? */
    pmemobj_persist(g_pmem_pool, buffer, prid->size);

    out.ret = 0;

292 293 294
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_persist_ult)

/* service a remote RPC that retrieves the size of a bulk region */
static void bake_bulk_get_size_ult(hg_handle_t handle)
{
    bake_bulk_get_size_out_t out;
    bake_bulk_get_size_in_t in;
    hg_return_t hret;
    pmemobj_region_id_t* prid;

    // printf("Got RPC request to get_size bulk region.\n");
    
    memset(&out, 0, sizeof(out));

311
    hret = margo_get_input(handle, &in);
312
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
313
    {
314
        out.ret = -1;
315 316
        margo_respond(handle, &out);
        margo_destroy(handle);
317
        return;
Philip Carns's avatar
Philip Carns committed
318 319
    }

320 321 322 323 324 325
    prid = (pmemobj_region_id_t*)in.rid.data;

    /* kind of cheating here; the size is encoded in the RID */
    out.size = prid->size;
    out.ret = 0;

326 327 328
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
329 330 331 332 333 334 335 336 337
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_get_size_ult)

/* service a remote RPC for a no-op */
static void bake_bulk_noop_ult(hg_handle_t handle)
{
    // printf("Got RPC request to noop bulk region.\n");

338 339
    margo_respond(handle, NULL);
    margo_destroy(handle);
340 341 342 343 344 345 346 347 348 349 350 351 352 353
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_noop_ult)

/* TODO consolidate with write handler; read and write are nearly identical */
/* service a remote RPC that reads to a bulk region */
static void bake_bulk_read_ult(hg_handle_t handle)
{
    bake_bulk_read_out_t out;
    bake_bulk_read_in_t in;
    hg_return_t hret;
    char* buffer;
    hg_size_t size;
    hg_bulk_t bulk_handle;
354
    const struct hg_info *hgi;
355 356 357 358 359 360 361
    margo_instance_id mid;
    pmemobj_region_id_t* prid;

    // printf("Got RPC request to read bulk region.\n");
    
    memset(&out, 0, sizeof(out));

362
    hgi = margo_get_info(handle);
363
    assert(hgi);
364
    mid = margo_hg_info_get_instance(hgi);
365

366
    hret = margo_get_input(handle, &in);
367
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
368
    {
369
        out.ret = -1;
370 371
        margo_respond(handle, &out);
        margo_destroy(handle);
372
        return;
Philip Carns's avatar
Philip Carns committed
373 374
    }

375 376 377 378 379
    prid = (pmemobj_region_id_t*)in.rid.data;

    /* find memory address for target object */
    buffer = pmemobj_direct(prid->oid);
    if(!buffer)
Philip Carns's avatar
Philip Carns committed
380
    {
381
        out.ret = -1;
382 383 384
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
385
        return;
Philip Carns's avatar
Philip Carns committed
386 387
    }

388
    size = margo_bulk_get_size(in.bulk_handle);
389 390

    /* create bulk handle for local side of transfer */
391
    hret = margo_bulk_create(mid, 1, (void**)(&buffer), &size, 
392 393
        HG_BULK_READ_ONLY, &bulk_handle);
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
394
    {
395
        out.ret = -1;
396 397 398
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
399
        return;
Philip Carns's avatar
Philip Carns committed
400
    }
401 402 403 404

    hret = margo_bulk_transfer(mid, HG_BULK_PUSH, hgi->addr, in.bulk_handle,
        0, bulk_handle, 0, size);
    if(hret != HG_SUCCESS)
Philip Carns's avatar
Philip Carns committed
405
    {
406
        out.ret = -1;
407 408 409 410
        margo_bulk_free(bulk_handle);
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
411
        return;
Philip Carns's avatar
Philip Carns committed
412 413
    }

414
    out.ret = 0;
Philip Carns's avatar
Philip Carns committed
415

416 417 418 419
    margo_bulk_free(bulk_handle);
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
420 421 422
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_read_ult)
Philip Carns's avatar
Philip Carns committed
423 424


425 426 427 428 429 430 431 432 433 434 435 436 437 438 439
/* service a remote RPC that reads to a bulk region and eagerly sends
 * response */
static void bake_bulk_eager_read_ult(hg_handle_t handle)
{
    bake_bulk_eager_read_out_t out;
    bake_bulk_eager_read_in_t in;
    hg_return_t hret;
    char* buffer;
    hg_size_t size;
    pmemobj_region_id_t* prid;

    // printf("Got RPC request to read bulk region.\n");
    
    memset(&out, 0, sizeof(out));

440
    hret = margo_get_input(handle, &in);
441 442 443
    if(hret != HG_SUCCESS)
    {
        out.ret = -1;
444 445
        margo_respond(handle, &out);
        margo_destroy(handle);
446 447 448 449
        return;
    }

    prid = (pmemobj_region_id_t*)in.rid.data;
Philip Carns's avatar
Philip Carns committed
450

451 452 453 454 455
    /* find memory address for target object */
    buffer = pmemobj_direct(prid->oid);
    if(!buffer)
    {
        out.ret = -1;
456 457 458
        margo_free_input(handle, &in);
        margo_respond(handle, &out);
        margo_destroy(handle);
459 460
        return;
    }
461

462 463 464 465
    out.ret = 0;
    out.buffer = buffer;
    out.size = in.size;

466 467 468
    margo_free_input(handle, &in);
    margo_respond(handle, &out);
    margo_destroy(handle);
469
    return;
Philip Carns's avatar
Philip Carns committed
470
}
471 472 473 474 475 476 477 478 479 480 481 482 483 484
DEFINE_MARGO_RPC_HANDLER(bake_bulk_eager_read_ult)

/* service a remote RPC that probes for a target id */
static void bake_bulk_probe_ult(hg_handle_t handle)
{
    bake_bulk_probe_out_t out;

    // printf("Got RPC request to probe bulk region.\n");
    
    memset(&out, 0, sizeof(out));

    out.ret = 0;
    out.bti = g_pmem_root->target_id;

485 486
    margo_respond(handle, &out);
    margo_destroy(handle);
487 488 489 490
    return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_probe_ult)

Philip Carns's avatar
Philip Carns committed
491