bake-bulk.c 11.9 KB
Newer Older
Philip Carns's avatar
Philip Carns committed
1 2 3 4 5 6
/*
 * (C) 2016 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

7
#include <assert.h>
Philip Carns's avatar
Philip Carns committed
8
#include <bake-bulk.h>
Philip Carns's avatar
Philip Carns committed
9
#include <margo.h>
Philip Carns's avatar
Philip Carns committed
10
#include "uthash.h"
11
#include "bake-bulk-rpc.h"
Philip Carns's avatar
Philip Carns committed
12

13 14 15 16 17 18 19 20 21 22 23
/* Refers to a single Mercury/Margo initialization, for now this is shared by
 * all remote targets.  In the future we probably need to support multiple in
 * case we run atop more than one transport at a time.
 */
struct hg_instance
{
    margo_instance_id mid;  
    hg_class_t *hg_class;
    hg_context_t *hg_context;
    int refct;
    
24
    hg_id_t bake_bulk_probe_id;
25 26 27 28 29 30 31 32 33
    hg_id_t bake_bulk_shutdown_id; 
    hg_id_t bake_bulk_create_id;
    hg_id_t bake_bulk_write_id;
    hg_id_t bake_bulk_persist_id;
    hg_id_t bake_bulk_get_size_id;
    hg_id_t bake_bulk_read_id;
};

/* Refers to an instance connected to a specific target */
Philip Carns's avatar
Philip Carns committed
34 35
struct bake_instance
{
36 37 38
    bake_target_id_t bti;   /* persistent identifier for this target */
    hg_addr_t dest;         /* resolved Mercury address */
    UT_hash_handle hh;
Philip Carns's avatar
Philip Carns committed
39 40
};

41
struct bake_instance *instance_hash;
42

43 44 45 46 47 48
struct hg_instance g_hginst = {
    .mid = MARGO_INSTANCE_NULL,
    .hg_class = NULL,
    .hg_context = NULL,
    .refct = 0,
};
Philip Carns's avatar
Philip Carns committed
49

50
static int hg_instance_init(const char *mercury_dest)
Philip Carns's avatar
Philip Carns committed
51
{
52 53 54
    /* have we already started a Mercury instance? */
    if(g_hginst.refct > 0)
        return(0);
Philip Carns's avatar
Philip Carns committed
55 56 57 58 59 60 61

    /* boilerplate HG initialization steps */
    /***************************************/
    /* NOTE: the listening address is not actually used in this case (the
     * na_listen flag is false); but we pass in the *target* server address
     * here to make sure that Mercury starts up the correct transport
     */
62 63
    g_hginst.hg_class = HG_Init(mercury_dest, HG_FALSE);
    if(!g_hginst.hg_class)
Philip Carns's avatar
Philip Carns committed
64 65 66
    {
        return(-1);
    }
67 68
    g_hginst.hg_context = HG_Context_create(g_hginst.hg_class);
    if(!g_hginst.hg_context)
Philip Carns's avatar
Philip Carns committed
69
    {
70
        HG_Finalize(g_hginst.hg_class);
Philip Carns's avatar
Philip Carns committed
71 72 73 74
        return(-1);
    }

    /* register RPCs */
75 76 77 78
    g_hginst.bake_bulk_probe_id = 
        MERCURY_REGISTER(g_hginst.hg_class, 
        "bake_bulk_probe_rpc", void, bake_bulk_probe_out_t, 
        NULL);
79 80
    g_hginst.bake_bulk_shutdown_id = 
        MERCURY_REGISTER(g_hginst.hg_class, 
Philip Carns's avatar
Philip Carns committed
81 82
        "bake_bulk_shutdown_rpc", void, void, 
        NULL);
83 84
    g_hginst.bake_bulk_create_id = 
        MERCURY_REGISTER(g_hginst.hg_class, 
85 86 87 88
        "bake_bulk_create_rpc", 
        bake_bulk_create_in_t,
        bake_bulk_create_out_t,
        NULL);
89 90
    g_hginst.bake_bulk_write_id = 
        MERCURY_REGISTER(g_hginst.hg_class, 
91 92 93 94
        "bake_bulk_write_rpc", 
        bake_bulk_write_in_t,
        bake_bulk_write_out_t,
        NULL);
95 96
    g_hginst.bake_bulk_persist_id = 
        MERCURY_REGISTER(g_hginst.hg_class, 
Philip Carns's avatar
Philip Carns committed
97 98 99 100
        "bake_bulk_persist_rpc", 
        bake_bulk_persist_in_t,
        bake_bulk_persist_out_t,
        NULL);
101 102
    g_hginst.bake_bulk_get_size_id = 
        MERCURY_REGISTER(g_hginst.hg_class, 
103 104 105 106
        "bake_bulk_get_size_rpc", 
        bake_bulk_get_size_in_t,
        bake_bulk_get_size_out_t,
        NULL);
107 108
    g_hginst.bake_bulk_read_id = 
        MERCURY_REGISTER(g_hginst.hg_class, 
109 110 111 112
        "bake_bulk_read_rpc", 
        bake_bulk_read_in_t,
        bake_bulk_read_out_t,
        NULL);
Philip Carns's avatar
Philip Carns committed
113

114 115
    g_hginst.mid = margo_init(0, 0, g_hginst.hg_context);
    if(!g_hginst.mid)
Philip Carns's avatar
Philip Carns committed
116
    {
117 118
        HG_Context_destroy(g_hginst.hg_context);
        HG_Finalize(g_hginst.hg_class);
Philip Carns's avatar
Philip Carns committed
119 120
        return(-1);
    }
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
    g_hginst.refct = 1;

    return(0);
}

void hg_instance_finalize(void)
{
    g_hginst.refct--;

    assert(g_hginst.refct > -1);

    if(g_hginst.refct == 0)
    {
        margo_finalize(g_hginst.mid);
        HG_Context_destroy(g_hginst.hg_context);
        HG_Finalize(g_hginst.hg_class);
    }
}

int bake_probe_instance(
    const char *mercury_dest,
    bake_target_id_t *bti)
{
    hg_return_t hret;
    int ret;
146 147
    bake_bulk_probe_out_t out;
    hg_handle_t handle;
148
    struct bake_instance *new_instance;
149 150 151 152

    ret = hg_instance_init(mercury_dest);
    if(ret < 0)
        return(ret);
Philip Carns's avatar
Philip Carns committed
153

154 155 156 157 158 159 160 161
    new_instance = calloc(1, sizeof(*new_instance));
    if(!new_instance)
    {
        hg_instance_finalize();
        return(-1);
    }

    hret = margo_addr_lookup(g_hginst.mid, mercury_dest, &new_instance->dest);
Philip Carns's avatar
Philip Carns committed
162 163
    if(hret != HG_SUCCESS)
    {
164
        free(new_instance);
165
        hg_instance_finalize();
Philip Carns's avatar
Philip Carns committed
166 167 168
        return(-1);
    }

169
    /* create handle */
170
    hret = HG_Create(g_hginst.hg_context, new_instance->dest, 
171 172 173
        g_hginst.bake_bulk_probe_id, &handle);
    if(hret != HG_SUCCESS)
    {
174 175
        free(new_instance);
        hg_instance_finalize();
176 177 178 179 180 181
        return(-1);
    }

    hret = margo_forward(g_hginst.mid, handle, NULL);
    if(hret != HG_SUCCESS)
    {
182
        free(new_instance);
183
        HG_Destroy(handle);
184
        hg_instance_finalize();
185 186 187 188 189 190
        return(-1);
    }

    hret = HG_Get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
191
        free(new_instance);
192
        HG_Destroy(handle);
193
        hg_instance_finalize();
194 195 196 197 198
        return(-1);
    }

    ret = out.ret;
    *bti = out.bti;
199
    new_instance->bti = out.bti;
200 201 202 203

    HG_Free_output(handle, &out);
    HG_Destroy(handle);

204 205 206 207 208 209 210 211 212 213 214
    if(ret != 0)
    {
        free(new_instance);
        hg_instance_finalize();
    }
    else
    {
        /* TODO: safety check that it isn't already there.  Here or earlier? */
        HASH_ADD(hh, instance_hash, bti, sizeof(new_instance->bti), new_instance);
    }

215
    return(ret);
Philip Carns's avatar
Philip Carns committed
216 217 218 219 220
}
  
void bake_release_instance(
    bake_target_id_t bti)
{
221 222 223 224 225 226 227 228 229
    struct bake_instance *instance = NULL;

    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return;
    
    HASH_DELETE(hh, instance_hash, instance);
    HG_Addr_free(g_hginst.hg_class, instance->dest);
    free(instance);
230
    hg_instance_finalize();
231

Philip Carns's avatar
Philip Carns committed
232 233 234 235 236 237 238
    return;
}

int bake_shutdown_service(bake_target_id_t bti)
{
    hg_return_t hret;
    hg_handle_t handle;
239 240 241 242 243
    struct bake_instance *instance = NULL;

    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return(-1);
Philip Carns's avatar
Philip Carns committed
244 245

    /* create handle */
246
    hret = HG_Create(g_hginst.hg_context, instance->dest, 
247
        g_hginst.bake_bulk_shutdown_id, &handle);
Philip Carns's avatar
Philip Carns committed
248 249 250 251 252
    if(hret != HG_SUCCESS)
    {
        return(-1);
    }

253
    hret = margo_forward(g_hginst.mid, handle, NULL);
Philip Carns's avatar
Philip Carns committed
254 255 256 257 258 259 260 261 262
    if(hret != HG_SUCCESS)
    {
        HG_Destroy(handle);
        return(-1);
    }

    HG_Destroy(handle);
    return(0);
}
Philip Carns's avatar
Philip Carns committed
263

Philip Carns's avatar
Philip Carns committed
264 265 266 267 268 269 270
int bake_bulk_write(
    bake_target_id_t bti,
    bake_bulk_region_id_t rid,
    uint64_t region_offset,
    void const *buf,
    uint64_t buf_size)
{
271 272 273 274 275
    hg_return_t hret;
    hg_handle_t handle;
    bake_bulk_write_in_t in;
    bake_bulk_write_out_t out;
    int ret;
276 277 278 279 280
    struct bake_instance *instance = NULL;

    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return(-1);
281 282 283 284

    in.bti = bti;
    in.rid = rid;
    in.region_offset = region_offset;
Philip Carns's avatar
Philip Carns committed
285

286
    hret = HG_Bulk_create(g_hginst.hg_class, 1, (void**)(&buf), &buf_size, 
287 288 289 290 291 292 293
        HG_BULK_READ_ONLY, &in.bulk_handle);
    if(hret != HG_SUCCESS)
    {
        return(-1);
    }
   
    /* create handle */
294
    hret = HG_Create(g_hginst.hg_context, instance->dest, 
295
        g_hginst.bake_bulk_write_id, &handle);
296 297 298 299 300 301
    if(hret != HG_SUCCESS)
    {
        HG_Bulk_free(in.bulk_handle);
        return(-1);
    }

302
    hret = margo_forward(g_hginst.mid, handle, &in);
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323
    if(hret != HG_SUCCESS)
    {
        HG_Destroy(handle);
        HG_Bulk_free(in.bulk_handle);
        return(-1);
    }

    hret = HG_Get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        HG_Destroy(handle);
        HG_Bulk_free(in.bulk_handle);
        return(-1);
    }
    
    ret = out.ret;

    HG_Free_output(handle, &out);
    HG_Destroy(handle);
    HG_Bulk_free(in.bulk_handle);
    return(ret);
Philip Carns's avatar
Philip Carns committed
324 325
}

Philip Carns's avatar
Philip Carns committed
326 327 328 329 330
int bake_bulk_create(
    bake_target_id_t bti,
    uint64_t region_size,
    bake_bulk_region_id_t *rid)
{
331 332 333 334 335
    hg_return_t hret;
    hg_handle_t handle;
    bake_bulk_create_in_t in;
    bake_bulk_create_out_t out;
    int ret;
336 337 338 339 340
    struct bake_instance *instance = NULL;

    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return(-1);
341

342 343 344
    in.bti = bti;
    in.region_size = region_size;

345
    /* create handle */
346
    hret = HG_Create(g_hginst.hg_context, instance->dest, 
347
        g_hginst.bake_bulk_create_id, &handle);
348 349 350 351 352
    if(hret != HG_SUCCESS)
    {
        return(-1);
    }

353
    hret = margo_forward(g_hginst.mid, handle, &in);
354 355 356 357 358
    if(hret != HG_SUCCESS)
    {
        HG_Destroy(handle);
        return(-1);
    }
Philip Carns's avatar
Philip Carns committed
359

360 361 362 363 364 365 366 367 368 369 370 371 372
    hret = HG_Get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        HG_Destroy(handle);
        return(-1);
    }

    ret = out.ret;
    *rid = out.rid;

    HG_Free_output(handle, &out);
    HG_Destroy(handle);
    return(ret);
Philip Carns's avatar
Philip Carns committed
373 374 375
}


Philip Carns's avatar
Philip Carns committed
376 377 378 379
int bake_bulk_persist(
    bake_target_id_t bti,
    bake_bulk_region_id_t rid)
{
Philip Carns's avatar
Philip Carns committed
380 381 382 383 384
    hg_return_t hret;
    hg_handle_t handle;
    bake_bulk_persist_in_t in;
    bake_bulk_persist_out_t out;
    int ret;
385 386 387 388 389
    struct bake_instance *instance = NULL;

    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return(-1);
Philip Carns's avatar
Philip Carns committed
390 391 392 393 394

    in.bti = bti;
    in.rid = rid;

    /* create handle */
395
    hret = HG_Create(g_hginst.hg_context, instance->dest, 
396
        g_hginst.bake_bulk_persist_id, &handle);
Philip Carns's avatar
Philip Carns committed
397 398 399 400 401
    if(hret != HG_SUCCESS)
    {
        return(-1);
    }

402
    hret = margo_forward(g_hginst.mid, handle, &in);
Philip Carns's avatar
Philip Carns committed
403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420
    if(hret != HG_SUCCESS)
    {
        HG_Destroy(handle);
        return(-1);
    }

    hret = HG_Get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        HG_Destroy(handle);
        return(-1);
    }

    ret = out.ret;

    HG_Free_output(handle, &out);
    HG_Destroy(handle);
    return(ret);
Philip Carns's avatar
Philip Carns committed
421
}
Philip Carns's avatar
Philip Carns committed
422

423 424 425 426 427 428 429 430 431 432
int bake_bulk_get_size(
    bake_target_id_t bti,
    bake_bulk_region_id_t rid,
    uint64_t *region_size)
{
    hg_return_t hret;
    hg_handle_t handle;
    bake_bulk_get_size_in_t in;
    bake_bulk_get_size_out_t out;
    int ret;
433 434 435 436 437
    struct bake_instance *instance = NULL;

    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return(-1);
438 439 440 441 442

    in.bti = bti;
    in.rid = rid;

    /* create handle */
443
    hret = HG_Create(g_hginst.hg_context, instance->dest, 
444
        g_hginst.bake_bulk_get_size_id, &handle);
445 446 447 448 449
    if(hret != HG_SUCCESS)
    {
        return(-1);
    }

450
    hret = margo_forward(g_hginst.mid, handle, &in);
451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471
    if(hret != HG_SUCCESS)
    {
        HG_Destroy(handle);
        return(-1);
    }

    hret = HG_Get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        HG_Destroy(handle);
        return(-1);
    }

    ret = out.ret;
    *region_size = out.size;

    HG_Free_output(handle, &out);
    HG_Destroy(handle);
    return(ret);
}

Philip Carns's avatar
Philip Carns committed
472 473 474 475 476 477 478
int bake_bulk_read(
    bake_target_id_t bti,
    bake_bulk_region_id_t rid,
    uint64_t region_offset,
    void *buf,
    uint64_t buf_size)
{
479 480 481 482 483
    hg_return_t hret;
    hg_handle_t handle;
    bake_bulk_read_in_t in;
    bake_bulk_read_out_t out;
    int ret;
484 485 486 487 488
    struct bake_instance *instance = NULL;

    HASH_FIND(hh, instance_hash, &bti, sizeof(bti), instance);
    if(!instance)
        return(-1);
489 490 491 492 493

    in.bti = bti;
    in.rid = rid;
    in.region_offset = region_offset;

494
    hret = HG_Bulk_create(g_hginst.hg_class, 1, (void**)(&buf), &buf_size, 
495 496 497 498 499 500 501
        HG_BULK_WRITE_ONLY, &in.bulk_handle);
    if(hret != HG_SUCCESS)
    {
        return(-1);
    }
   
    /* create handle */
502
    hret = HG_Create(g_hginst.hg_context, instance->dest, 
503
        g_hginst.bake_bulk_read_id, &handle);
504 505 506 507 508 509
    if(hret != HG_SUCCESS)
    {
        HG_Bulk_free(in.bulk_handle);
        return(-1);
    }

510
    hret = margo_forward(g_hginst.mid, handle, &in);
511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531
    if(hret != HG_SUCCESS)
    {
        HG_Destroy(handle);
        HG_Bulk_free(in.bulk_handle);
        return(-1);
    }

    hret = HG_Get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        HG_Destroy(handle);
        HG_Bulk_free(in.bulk_handle);
        return(-1);
    }
    
    ret = out.ret;

    HG_Free_output(handle, &out);
    HG_Destroy(handle);
    HG_Bulk_free(in.bulk_handle);
    return(ret);
Philip Carns's avatar
Philip Carns committed
532 533 534
}