margo.c 39 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11

/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <assert.h>
#include <unistd.h>
#include <errno.h>
#include <abt.h>
Matthieu Dorier's avatar
Matthieu Dorier committed
12 13 14

#include <margo-config.h>
#ifdef HAVE_ABT_SNOOZER
15
#include <abt-snoozer.h>
Matthieu Dorier's avatar
Matthieu Dorier committed
16
#endif
17
#include <time.h>
Philip Carns's avatar
bug fix  
Philip Carns committed
18
#include <math.h>
19 20

#include "margo.h"
21
#include "margo-timer.h"
Philip Carns's avatar
Philip Carns committed
22
#include "utlist.h"
Philip Carns's avatar
Philip Carns committed
23
#include "uthash.h"
24

25
#define DEFAULT_MERCURY_PROGRESS_TIMEOUT_UB 100 /* 100 milliseconds */
Shane Snyder's avatar
Shane Snyder committed
26
#define DEFAULT_MERCURY_HANDLE_CACHE_SIZE 32
27

Philip Carns's avatar
Philip Carns committed
28 29 30
struct mplex_key
{
    hg_id_t id;
31
    uint8_t mplex_id;
Philip Carns's avatar
Philip Carns committed
32 33 34 35 36 37
};

struct mplex_element
{
    struct mplex_key key;
    ABT_pool pool;
38 39
    void* user_data;
    void(*user_free_callback)(void*);
Philip Carns's avatar
Philip Carns committed
40 41 42
    UT_hash_handle hh;
};

Philip Carns's avatar
Philip Carns committed
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
struct diag_data
{
    double min;
    double max;
    double cumulative;
    int count;
};

#define __DIAG_UPDATE(__data, __time)\
do {\
    __data.count++; \
    __data.cumulative += (__time); \
    if((__time) > __data.max) __data.max = (__time); \
    if((__time) < __data.min) __data.min = (__time); \
} while(0)

Shane Snyder's avatar
Shane Snyder committed
59 60 61 62 63 64 65
struct margo_handle_cache_el
{
    hg_handle_t handle;
    UT_hash_handle hh; /* in-use hash link */
    struct margo_handle_cache_el *next; /* free list link */
};

66 67 68 69 70 71 72
struct margo_finalize_cb
{
    void(*callback)(void*);
    void* uargs;
    struct margo_finalize_cb* next;
};

73 74
struct margo_timer_list; /* defined in margo-timer.c */

75 76
struct margo_instance
{
Shane Snyder's avatar
Shane Snyder committed
77
    /* mercury/argobots state */
78 79
    hg_context_t *hg_context;
    hg_class_t *hg_class;
80 81 82
    ABT_pool handler_pool;
    ABT_pool progress_pool;

83
    /* internal to margo for this particular instance */
Shane Snyder's avatar
Shane Snyder committed
84
    int margo_init;
85
    int abt_init;
86 87
    ABT_thread hg_progress_tid;
    int hg_progress_shutdown_flag;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
88
    ABT_xstream progress_xstream;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
89 90 91
    int owns_progress_pool;
    ABT_xstream *rpc_xstreams;
    int num_handler_pool_threads;
92
    unsigned int hg_progress_timeout_ub;
93 94 95

    /* control logic for callers waiting on margo to be finalized */
    int finalize_flag;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
96
    int refcount;
97 98
    ABT_mutex finalize_mutex;
    ABT_cond finalize_cond;
99
    struct margo_finalize_cb* finalize_cb;
100

Matthieu Dorier's avatar
Matthieu Dorier committed
101 102 103 104
    /* control logic for shutting down */
    hg_id_t shutdown_rpc_id;
    int enable_remote_shutdown;

105 106 107
    /* timer data */
    struct margo_timer_list* timer_list;

Philip Carns's avatar
Philip Carns committed
108 109
    /* hash table to track multiplexed rpcs registered with margo */
    struct mplex_element *mplex_table;
Philip Carns's avatar
Philip Carns committed
110

Shane Snyder's avatar
Shane Snyder committed
111 112 113
    /* linked list of free hg handles and a hash of in-use handles */
    struct margo_handle_cache_el *free_handle_list;
    struct margo_handle_cache_el *used_handle_hash;
114
    ABT_mutex handle_cache_mtx; /* mutex protecting access to above caches */
Shane Snyder's avatar
Shane Snyder committed
115

Philip Carns's avatar
Philip Carns committed
116 117 118 119 120 121 122 123 124 125 126
    /* optional diagnostics data tracking */
    /* NOTE: technically the following fields are subject to races if they
     * are updated from more than one thread at a time.  We will be careful
     * to only update the counters from the progress_fn,
     * which will serialize access.
     */
    int diag_enabled;
    struct diag_data diag_trigger_elapsed;
    struct diag_data diag_progress_elapsed_zero_timeout;
    struct diag_data diag_progress_elapsed_nonzero_timeout;
    struct diag_data diag_progress_timeout_value;
127 128
};

129 130 131 132 133 134
struct margo_rpc_data
{
	margo_instance_id mid;
	void* user_data;
	void (*user_free_callback)(void *);
};
135

Matthieu Dorier's avatar
Matthieu Dorier committed
136 137
MERCURY_GEN_PROC(margo_shutdown_out_t, ((int32_t)(ret)))

138
static void hg_progress_fn(void* foo);
139
static void margo_rpc_data_free(void* ptr);
Matthieu Dorier's avatar
Matthieu Dorier committed
140 141
static void remote_shutdown_ult(hg_handle_t handle);
DECLARE_MARGO_RPC_HANDLER(remote_shutdown_ult);
142

Shane Snyder's avatar
Shane Snyder committed
143 144 145 146 147 148
static hg_return_t margo_handle_cache_init(margo_instance_id mid);
static void margo_handle_cache_destroy(margo_instance_id mid);
static hg_return_t margo_handle_cache_get(margo_instance_id mid,
    hg_addr_t addr, hg_id_t id, hg_handle_t *handle);
static hg_return_t margo_handle_cache_put(margo_instance_id mid,
    hg_handle_t handle);
149 150
static void delete_multiplexing_hash(margo_instance_id mid);

Shane Snyder's avatar
Shane Snyder committed
151

Shane Snyder's avatar
Shane Snyder committed
152
margo_instance_id margo_init(const char *addr_str, int mode,
Shane Snyder's avatar
Shane Snyder committed
153
    int use_progress_thread, int rpc_thread_count)
154
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
155 156 157 158 159
    ABT_xstream progress_xstream = ABT_XSTREAM_NULL;
    ABT_pool progress_pool = ABT_POOL_NULL;
    ABT_xstream *rpc_xstreams = NULL;
    ABT_xstream rpc_xstream = ABT_XSTREAM_NULL;
    ABT_pool rpc_pool = ABT_POOL_NULL;
Shane Snyder's avatar
Shane Snyder committed
160 161
    hg_class_t *hg_class = NULL;
    hg_context_t *hg_context = NULL;
Shane Snyder's avatar
Shane Snyder committed
162
    int listen_flag = (mode == MARGO_CLIENT_MODE) ? HG_FALSE : HG_TRUE;
163
    int abt_init = 0;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
164
    int i;
Shane Snyder's avatar
Shane Snyder committed
165 166 167
    int ret;
    struct margo_instance *mid = MARGO_INSTANCE_NULL;

Shane Snyder's avatar
Shane Snyder committed
168
    if(mode != MARGO_CLIENT_MODE && mode != MARGO_SERVER_MODE) goto err;
Shane Snyder's avatar
Shane Snyder committed
169

170 171 172 173 174 175
    if (ABT_initialized() == ABT_ERR_UNINITIALIZED)
    {
        ret = ABT_init(0, NULL); /* XXX: argc/argv not currently used by ABT ... */
        if(ret != 0) goto err;
        abt_init = 1;
    }
Shane Snyder's avatar
Shane Snyder committed
176

177
    /* set caller (self) ES to idle without polling */
Matthieu Dorier's avatar
Matthieu Dorier committed
178
#ifdef HAVE_ABT_SNOOZER
Shane Snyder's avatar
Shane Snyder committed
179 180
    ret = ABT_snoozer_xstream_self_set();
    if(ret != 0) goto err;
Matthieu Dorier's avatar
Matthieu Dorier committed
181
#endif
Jonathan Jenkins's avatar
Jonathan Jenkins committed
182 183 184

    if (use_progress_thread)
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
185
#ifdef HAVE_ABT_SNOOZER
Jonathan Jenkins's avatar
Jonathan Jenkins committed
186
        ret = ABT_snoozer_xstream_create(1, &progress_pool, &progress_xstream);
Matthieu Dorier's avatar
Matthieu Dorier committed
187 188 189 190 191 192 193
		if (ret != ABT_SUCCESS) goto err;
#else
		ret = ABT_xstream_create(ABT_SCHED_NULL, &progress_xstream);
		if (ret != ABT_SUCCESS) goto err;
		ret = ABT_xstream_get_main_pools(progress_xstream, 1, &progress_pool);
		if (ret != ABT_SUCCESS) goto err;
#endif
Jonathan Jenkins's avatar
Jonathan Jenkins committed
194 195 196 197 198 199 200 201 202
    }
    else
    {
        ret = ABT_xstream_self(&progress_xstream);
        if (ret != ABT_SUCCESS) goto err;
        ret = ABT_xstream_get_main_pools(progress_xstream, 1, &progress_pool);
        if (ret != ABT_SUCCESS) goto err;
    }

203
    if (rpc_thread_count > 0)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
204
    {
205 206
        rpc_xstreams = calloc(rpc_thread_count, sizeof(*rpc_xstreams));
        if (rpc_xstreams == NULL) goto err;
Matthieu Dorier's avatar
Matthieu Dorier committed
207
#ifdef HAVE_ABT_SNOOZER
208 209 210
        ret = ABT_snoozer_xstream_create(rpc_thread_count, &rpc_pool,
                rpc_xstreams);
        if (ret != ABT_SUCCESS) goto err;
Matthieu Dorier's avatar
Matthieu Dorier committed
211
#else
212 213 214 215 216
        int j;
        ret = ABT_pool_create_basic(ABT_POOL_FIFO, ABT_POOL_ACCESS_MPMC, ABT_TRUE, &rpc_pool);
        if (ret != ABT_SUCCESS) goto err;
        for(j=0; j<rpc_thread_count; j++) {
            ret = ABT_xstream_create(ABT_SCHED_NULL, rpc_xstreams+j);
Shane Snyder's avatar
Shane Snyder committed
217 218
            if (ret != ABT_SUCCESS) goto err;
        }
219 220 221 222 223 224 225 226 227 228 229 230
#endif
    }
    else if (rpc_thread_count == 0)
    {
        ret = ABT_xstream_self(&rpc_xstream);
        if (ret != ABT_SUCCESS) goto err;
        ret = ABT_xstream_get_main_pools(rpc_xstream, 1, &rpc_pool);
        if (ret != ABT_SUCCESS) goto err;
    }
    else
    {
        rpc_pool = progress_pool;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
231 232
    }

Shane Snyder's avatar
Shane Snyder committed
233 234 235 236 237 238
    hg_class = HG_Init(addr_str, listen_flag);
    if(!hg_class) goto err;

    hg_context = HG_Context_create(hg_class);
    if(!hg_context) goto err;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
239 240 241
    mid = margo_init_pool(progress_pool, rpc_pool, hg_context);
    if (mid == MARGO_INSTANCE_NULL) goto err;

Shane Snyder's avatar
Shane Snyder committed
242
    mid->margo_init = 1;
243
    mid->abt_init = abt_init;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
244 245 246
    mid->owns_progress_pool = use_progress_thread;
    mid->progress_xstream = progress_xstream;
    mid->num_handler_pool_threads = rpc_thread_count < 0 ? 0 : rpc_thread_count;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
247
    mid->rpc_xstreams = rpc_xstreams;
248

Jonathan Jenkins's avatar
Jonathan Jenkins committed
249 250 251
    return mid;

err:
Shane Snyder's avatar
Shane Snyder committed
252 253
    if(mid)
    {
254
        margo_timer_list_free(mid->timer_list);
Shane Snyder's avatar
Shane Snyder committed
255 256 257 258
        ABT_mutex_free(&mid->finalize_mutex);
        ABT_cond_free(&mid->finalize_cond);
        free(mid);
    }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
259 260 261 262 263 264 265 266 267 268 269 270 271 272
    if (use_progress_thread && progress_xstream != ABT_XSTREAM_NULL)
    {
        ABT_xstream_join(progress_xstream);
        ABT_xstream_free(&progress_xstream);
    }
    if (rpc_thread_count > 0 && rpc_xstreams != NULL)
    {
        for (i = 0; i < rpc_thread_count; i++)
        {
            ABT_xstream_join(rpc_xstreams[i]);
            ABT_xstream_free(&rpc_xstreams[i]);
        }
        free(rpc_xstreams);
    }
Shane Snyder's avatar
Shane Snyder committed
273 274 275 276
    if(hg_context)
        HG_Context_destroy(hg_context);
    if(hg_class)
        HG_Finalize(hg_class);
277 278
    if(abt_init)
        ABT_finalize();
Jonathan Jenkins's avatar
Jonathan Jenkins committed
279 280 281 282
    return MARGO_INSTANCE_NULL;
}

margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
283
    hg_context_t *hg_context)
284 285
{
    int ret;
Shane Snyder's avatar
Shane Snyder committed
286
    hg_return_t hret;
287 288
    struct margo_instance *mid;

Matthieu Dorier's avatar
Matthieu Dorier committed
289
    mid = calloc(1,sizeof(*mid));
Shane Snyder's avatar
Shane Snyder committed
290
    if(!mid) goto err;
291
    memset(mid, 0, sizeof(*mid));
292

293 294 295
    ABT_mutex_create(&mid->finalize_mutex);
    ABT_cond_create(&mid->finalize_cond);

296 297
    mid->progress_pool = progress_pool;
    mid->handler_pool = handler_pool;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
298
    mid->hg_class = HG_Context_get_class(hg_context);
299
    mid->hg_context = hg_context;
300
    mid->hg_progress_timeout_ub = DEFAULT_MERCURY_PROGRESS_TIMEOUT_UB;
301
    mid->mplex_table = NULL;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
302
    mid->refcount = 1;
303
    mid->finalize_cb = NULL;
Matthieu Dorier's avatar
Matthieu Dorier committed
304
    mid->enable_remote_shutdown = 0;
305

306 307
    mid->timer_list = margo_timer_list_create();
    if(mid->timer_list == NULL) goto err;
308

Shane Snyder's avatar
Shane Snyder committed
309 310 311 312
    /* initialize the handle cache */
    hret = margo_handle_cache_init(mid);
    if(hret != HG_SUCCESS) goto err;

313
    ret = ABT_thread_create(mid->progress_pool, hg_progress_fn, mid, 
314
        ABT_THREAD_ATTR_NULL, &mid->hg_progress_tid);
Shane Snyder's avatar
Shane Snyder committed
315 316
    if(ret != 0) goto err;

Matthieu Dorier's avatar
Matthieu Dorier committed
317 318 319
    mid->shutdown_rpc_id = MARGO_REGISTER(mid, "__shutdown__", 
            void, margo_shutdown_out_t, remote_shutdown_ult);

Shane Snyder's avatar
Shane Snyder committed
320 321
    return mid;

Shane Snyder's avatar
Shane Snyder committed
322 323
err:
    if(mid)
324
    {
Shane Snyder's avatar
Shane Snyder committed
325
        margo_handle_cache_destroy(mid);
326
        margo_timer_list_free(mid->timer_list);
Shane Snyder's avatar
Shane Snyder committed
327 328
        ABT_mutex_free(&mid->finalize_mutex);
        ABT_cond_free(&mid->finalize_cond);
329
        free(mid);
330
    }
Shane Snyder's avatar
Shane Snyder committed
331
    return MARGO_INSTANCE_NULL;
332 333
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
334 335 336 337
static void margo_cleanup(margo_instance_id mid)
{
    int i;

338 339 340 341 342 343 344 345 346
    /* call finalize callbacks */
    struct margo_finalize_cb* fcb = mid->finalize_cb;
    while(fcb) {
        (fcb->callback)(fcb->uargs);
        struct margo_finalize_cb* tmp = fcb;
        fcb = fcb->next;
        free(tmp);
    }

347
    margo_timer_list_free(mid->timer_list);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
348

349 350 351
    /* delete the hash used for multiplexing */
    delete_multiplexing_hash(mid);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370
    ABT_mutex_free(&mid->finalize_mutex);
    ABT_cond_free(&mid->finalize_cond);

    if (mid->owns_progress_pool)
    {
        ABT_xstream_join(mid->progress_xstream);
        ABT_xstream_free(&mid->progress_xstream);
    }

    if (mid->num_handler_pool_threads > 0)
    {
        for (i = 0; i < mid->num_handler_pool_threads; i++)
        {
            ABT_xstream_join(mid->rpc_xstreams[i]);
            ABT_xstream_free(&mid->rpc_xstreams[i]);
        }
        free(mid->rpc_xstreams);
    }

Shane Snyder's avatar
Shane Snyder committed
371 372
    margo_handle_cache_destroy(mid);

Shane Snyder's avatar
Shane Snyder committed
373 374 375 376 377 378
    if (mid->margo_init)
    {
        if (mid->hg_context)
            HG_Context_destroy(mid->hg_context);
        if (mid->hg_class)
            HG_Finalize(mid->hg_class);
379 380
        if (mid->abt_init)
            ABT_finalize();
Shane Snyder's avatar
Shane Snyder committed
381 382
    }

Jonathan Jenkins's avatar
Jonathan Jenkins committed
383 384 385
    free(mid);
}

386
void margo_finalize(margo_instance_id mid)
387
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
388
    int do_cleanup;
389

390
    /* tell progress thread to wrap things up */
391
    mid->hg_progress_shutdown_flag = 1;
392 393

    /* wait for it to shutdown cleanly */
394 395
    ABT_thread_join(mid->hg_progress_tid);
    ABT_thread_free(&mid->hg_progress_tid);
396

397 398 399 400
    ABT_mutex_lock(mid->finalize_mutex);
    mid->finalize_flag = 1;
    ABT_cond_broadcast(mid->finalize_cond);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
401 402
    mid->refcount--;
    do_cleanup = mid->refcount == 0;
403

Jonathan Jenkins's avatar
Jonathan Jenkins committed
404 405 406 407 408 409 410
    ABT_mutex_unlock(mid->finalize_mutex);

    /* if there was noone waiting on the finalize at the time of the finalize
     * broadcast, then we're safe to clean up. Otherwise, let the finalizer do
     * it */
    if (do_cleanup)
        margo_cleanup(mid);
411 412 413 414 415 416

    return;
}

void margo_wait_for_finalize(margo_instance_id mid)
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
417
    int do_cleanup;
418 419 420

    ABT_mutex_lock(mid->finalize_mutex);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
421
        mid->refcount++;
422 423 424 425
            
        while(!mid->finalize_flag)
            ABT_cond_wait(mid->finalize_cond, mid->finalize_mutex);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
426 427 428
        mid->refcount--;
        do_cleanup = mid->refcount == 0;

429
    ABT_mutex_unlock(mid->finalize_mutex);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
430 431 432 433

    if (do_cleanup)
        margo_cleanup(mid);

434 435 436
    return;
}

437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453
void margo_push_finalize_callback(
            margo_instance_id mid,
            void(*cb)(void*),                  
            void* uargs)
{
    if(cb == NULL) return;

    struct margo_finalize_cb* fcb = 
        (struct margo_finalize_cb*)malloc(sizeof(*fcb));
    fcb->callback = cb;
    fcb->uargs = uargs;

    struct margo_finalize_cb* next = mid->finalize_cb;
    fcb->next = next;
    mid->finalize_cb = fcb;
}

454 455
hg_id_t margo_register_name(margo_instance_id mid, const char *func_name,
    hg_proc_cb_t in_proc_cb, hg_proc_cb_t out_proc_cb, hg_rpc_cb_t rpc_cb)
456
{
457 458 459
	struct margo_rpc_data* margo_data;
    hg_return_t hret;
    hg_id_t id;
460

461 462 463
    id = HG_Register_name(mid->hg_class, func_name, in_proc_cb, out_proc_cb, rpc_cb);
    if(id <= 0)
        return(0);
464

465 466 467 468 469 470 471 472 473 474 475 476
	/* register the margo data with the RPC */
    margo_data = (struct margo_rpc_data*)HG_Registered_data(mid->hg_class, id);
    if(!margo_data)
    {
        margo_data = (struct margo_rpc_data*)malloc(sizeof(struct margo_rpc_data));
        if(!margo_data)
            return(0);
        margo_data->mid = mid;
        margo_data->user_data = NULL;
        margo_data->user_free_callback = NULL;
        hret = HG_Register_data(mid->hg_class, id, margo_data, margo_rpc_data_free);
        if(hret != HG_SUCCESS)
477
        {
478 479
            free(margo_data);
            return(0);
480
        }
481 482
    }

483
	return(id);
484 485
}

Matthieu Dorier's avatar
Matthieu Dorier committed
486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516
void margo_enable_remote_shutdown(margo_instance_id mid)
{
    mid->enable_remote_shutdown = 1;
}

int margo_shutdown_remote_instance(
        margo_instance_id mid,
        hg_addr_t remote_addr)
{
    hg_return_t hret;
    hg_handle_t handle;

    hret = margo_create(mid, remote_addr,
                        mid->shutdown_rpc_id, &handle);
    if(hret != HG_SUCCESS) return -1;

    hret = margo_forward(handle, NULL);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return -1;
    }

    margo_shutdown_out_t out;
    hret = margo_get_output(handle, &out);
    if(hret != HG_SUCCESS)
    {
        margo_destroy(handle);
        return -1;
    }

517
    margo_free_output(handle, &out);
Matthieu Dorier's avatar
Matthieu Dorier committed
518 519 520 521 522
    margo_destroy(handle);

    return out.ret;
}

523 524
hg_id_t margo_register_name_mplex(margo_instance_id mid, const char *func_name,
    hg_proc_cb_t in_proc_cb, hg_proc_cb_t out_proc_cb, hg_rpc_cb_t rpc_cb,
525
    uint8_t mplex_id, ABT_pool pool)
526
{
527 528 529
    struct mplex_key key;
    struct mplex_element *element;
    hg_id_t id;
530

531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555
    id = margo_register_name(mid, func_name, in_proc_cb, out_proc_cb, rpc_cb);
    if(id <= 0)
        return(0);

    /* nothing to do, we'll let the handler pool take this directly */
    if(mplex_id == MARGO_DEFAULT_MPLEX_ID)
        return(id);

    memset(&key, 0, sizeof(key));
    key.id = id;
    key.mplex_id = mplex_id;

    HASH_FIND(hh, mid->mplex_table, &key, sizeof(key), element);
    if(element)
        return(id);

    element = malloc(sizeof(*element));
    if(!element)
        return(0);
    element->key = key;
    element->pool = pool;

    HASH_ADD(hh, mid->mplex_table, key, sizeof(key), element);

    return(id);
556 557
}

558 559
hg_return_t margo_registered_name(margo_instance_id mid, const char *func_name,
    hg_id_t *id, hg_bool_t *flag)
560
{
561
    return(HG_Registered_name(mid->hg_class, func_name, id, flag));
562 563
}

564
hg_return_t margo_registered_name_mplex(margo_instance_id mid, const char *func_name,
565
    uint8_t mplex_id, hg_id_t *id, hg_bool_t *flag)
566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594
{
    hg_bool_t b;
    hg_return_t ret = margo_registered_name(mid, func_name, id, &b);
    if(ret != HG_SUCCESS) 
        return ret;
    if((!b) || (!mplex_id)) {
        *flag = b;
        return ret;
    }

    struct mplex_key key;
    struct mplex_element *element;

    memset(&key, 0, sizeof(key));
    key.id = *id;
    key.mplex_id = mplex_id;

    HASH_FIND(hh, mid->mplex_table, &key, sizeof(key), element);
    if(!element) {
        *flag = 0;
        return HG_SUCCESS;
    }

    assert(element->key.id == *id && element->key.mplex_id == mplex_id);

    *flag = 1;
    return HG_SUCCESS;
}

595 596 597 598 599 600 601
hg_return_t margo_register_data(
    margo_instance_id mid,
    hg_id_t id,
    void *data,
    void (*free_callback)(void *)) 
{
	struct margo_rpc_data* margo_data 
602
		= (struct margo_rpc_data*) HG_Registered_data(mid->hg_class, id);
603
	if(!margo_data) return HG_OTHER_ERROR;
604 605 606
    if(margo_data->user_data && margo_data->user_free_callback) {
        (margo_data->user_free_callback)(margo_data->user_data);
    }
607 608 609 610 611 612 613 614 615 616 617 618 619
	margo_data->user_data = data;
	margo_data->user_free_callback = free_callback;
	return HG_SUCCESS;
}

void* margo_registered_data(margo_instance_id mid, hg_id_t id)
{
	struct margo_rpc_data* data
		= (struct margo_rpc_data*) HG_Registered_data(margo_get_class(mid), id);
	if(!data) return NULL;
	else return data->user_data;
}

620 621 622 623
hg_return_t margo_registered_disable_response(
    margo_instance_id mid,
    hg_id_t id,
    int disable_flag)
624
{
625
    return(HG_Registered_disable_response(mid->hg_class, id, disable_flag));
626
}
627

628
struct lookup_cb_evt
629
{
630
    hg_return_t hret;
631 632 633 634 635 636
    hg_addr_t addr;
};

static hg_return_t margo_addr_lookup_cb(const struct hg_cb_info *info)
{
    struct lookup_cb_evt evt;
637
    evt.hret = info->ret;
638
    evt.addr = info->info.lookup.addr;
Matthieu Dorier's avatar
Matthieu Dorier committed
639
    ABT_eventual eventual = (ABT_eventual)(info->arg);
640 641

    /* propagate return code out through eventual */
Matthieu Dorier's avatar
Matthieu Dorier committed
642
    ABT_eventual_set(eventual, &evt, sizeof(evt));
643

644 645 646
    return(HG_SUCCESS);
}

647 648 649 650
hg_return_t margo_addr_lookup(
    margo_instance_id mid,
    const char   *name,
    hg_addr_t    *addr)
651
{
652
    hg_return_t hret;
653 654 655
    struct lookup_cb_evt *evt;
    ABT_eventual eventual;
    int ret;
656

657 658 659 660 661 662
    ret = ABT_eventual_create(sizeof(*evt), &eventual);
    if(ret != 0)
    {
        return(HG_NOMEM_ERROR);        
    }

663
    hret = HG_Addr_lookup(mid->hg_context, margo_addr_lookup_cb,
Matthieu Dorier's avatar
Matthieu Dorier committed
664
        (void*)eventual, name, HG_OP_ID_IGNORE);
665
    if(hret == HG_SUCCESS)
666 667 668
    {
        ABT_eventual_wait(eventual, (void**)&evt);
        *addr = evt->addr;
669
        hret = evt->hret;
670 671 672 673
    }

    ABT_eventual_free(&eventual);

674
    return(hret);
675 676 677 678 679
}

hg_return_t margo_addr_free(
    margo_instance_id mid,
    hg_addr_t addr)
680
{
681 682
    return(HG_Addr_free(mid->hg_class, addr));
}
683

684 685 686 687 688
hg_return_t margo_addr_self(
    margo_instance_id mid,
    hg_addr_t *addr)
{
    return(HG_Addr_self(mid->hg_class, addr));
689 690
}

691 692 693 694 695 696 697 698 699
hg_return_t margo_addr_dup(
    margo_instance_id mid,
    hg_addr_t addr,
    hg_addr_t *new_addr)
{
    return(HG_Addr_dup(mid->hg_class, addr, new_addr));
}

hg_return_t margo_addr_to_string(
700
    margo_instance_id mid,
701 702 703 704 705 706 707 708 709 710
    char *buf,
    hg_size_t *buf_size,
    hg_addr_t addr)
{
    return(HG_Addr_to_string(mid->hg_class, buf, buf_size, addr));
}

hg_return_t margo_create(margo_instance_id mid, hg_addr_t addr,
    hg_id_t id, hg_handle_t *handle)
{
711
    hg_return_t hret = HG_OTHER_ERROR;
Shane Snyder's avatar
Shane Snyder committed
712 713 714 715 716 717 718 719

    /* look for a handle to reuse */
    hret = margo_handle_cache_get(mid, addr, id, handle);
    if(hret != HG_SUCCESS)
    {
        /* else try creating a new handle */
        hret = HG_Create(mid->hg_context, addr, id, handle);
    }
720

Shane Snyder's avatar
Shane Snyder committed
721
    return hret;
722 723
}

724
hg_return_t margo_destroy(hg_handle_t handle)
725
{
726
    margo_instance_id mid;
727
    hg_return_t hret = HG_OTHER_ERROR;
Shane Snyder's avatar
Shane Snyder committed
728

729 730 731
    /* use the handle to get the associated mid */
    mid = margo_hg_handle_get_instance(handle);

Shane Snyder's avatar
Shane Snyder committed
732 733 734 735 736 737 738
    /* recycle this handle if it came from the handle cache */
    hret = margo_handle_cache_put(mid, handle);
    if(hret != HG_SUCCESS)
    {
        /* else destroy the handle manually */
        hret = HG_Destroy(handle);
    }
739

Shane Snyder's avatar
Shane Snyder committed
740
    return hret;
741 742 743 744 745
}

static hg_return_t margo_cb(const struct hg_cb_info *info)
{
    hg_return_t hret = info->ret;
Matthieu Dorier's avatar
Matthieu Dorier committed
746
    ABT_eventual eventual = (ABT_eventual)(info->arg);
747 748

    /* propagate return code out through eventual */
Matthieu Dorier's avatar
Matthieu Dorier committed
749
    ABT_eventual_set(eventual, &hret, sizeof(hret));
750 751 752 753 754 755 756
    
    return(HG_SUCCESS);
}

hg_return_t margo_forward(
    hg_handle_t handle,
    void *in_struct)
Matthieu Dorier's avatar
Matthieu Dorier committed
757 758 759 760 761 762 763 764 765 766 767 768 769
{
	hg_return_t hret;
	margo_request req;
	hret = margo_iforward(handle, in_struct, &req);
	if(hret != HG_SUCCESS) 
		return hret;
	return margo_wait(req);
}

hg_return_t margo_iforward(
    hg_handle_t handle,
    void *in_struct,
    margo_request* req)
770 771
{
    hg_return_t hret = HG_TIMEOUT;
772
    ABT_eventual eventual;
773
    int ret;
774 775 776 777 778 779 780

    ret = ABT_eventual_create(sizeof(hret), &eventual);
    if(ret != 0)
    {
        return(HG_NOMEM_ERROR);        
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
781
    *req = eventual;
782

Matthieu Dorier's avatar
Matthieu Dorier committed
783
    return HG_Forward(handle, margo_cb, (void*)eventual, in_struct);
Matthieu Dorier's avatar
Matthieu Dorier committed
784
}
785

Matthieu Dorier's avatar
Matthieu Dorier committed
786 787 788 789
hg_return_t margo_wait(margo_request req)
{
	hg_return_t* waited_hret;
	hg_return_t  hret;
790

Matthieu Dorier's avatar
Matthieu Dorier committed
791 792 793 794
    ABT_eventual_wait(req, (void**)&waited_hret);
	hret = *waited_hret;
    ABT_eventual_free(&req);
	
795
    return(hret);
796 797
}

Matthieu Dorier's avatar
Matthieu Dorier committed
798 799 800 801 802
int margo_test(margo_request req, int* flag)
{
    return ABT_eventual_test(req, NULL, flag);
}

803 804 805 806
typedef struct
{
    hg_handle_t handle;
} margo_forward_timeout_cb_dat;
807

808 809 810 811 812 813 814 815 816 817 818
static void margo_forward_timeout_cb(void *arg)
{
    margo_forward_timeout_cb_dat *timeout_cb_dat =
        (margo_forward_timeout_cb_dat *)arg;

    /* cancel the Mercury op if the forward timed out */
    HG_Cancel(timeout_cb_dat->handle);
    return;
}

hg_return_t margo_forward_timed(
819
    hg_handle_t handle,
820 821
    void *in_struct,
    double timeout_ms)
822 823
{
    int ret;
824
    hg_return_t hret;
825
    margo_instance_id mid;
826
    ABT_eventual eventual;
827
    hg_return_t* waited_hret;
828 829
    margo_timer_t forward_timer;
    margo_forward_timeout_cb_dat timeout_cb_dat;
830 831 832 833 834 835 836

    ret = ABT_eventual_create(sizeof(hret), &eventual);
    if(ret != 0)
    {
        return(HG_NOMEM_ERROR);        
    }

837 838 839
    /* use the handle to get the associated mid */
    mid = margo_hg_handle_get_instance(handle);

840 841 842 843 844
    /* set a timer object to expire when this forward times out */
    timeout_cb_dat.handle = handle;
    margo_timer_init(mid, &forward_timer, margo_forward_timeout_cb,
        &timeout_cb_dat, timeout_ms);

Matthieu Dorier's avatar
Matthieu Dorier committed
845
    hret = HG_Forward(handle, margo_cb, (void*)eventual, in_struct);
846
    if(hret == HG_SUCCESS)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
847 848 849 850 851
    {
        ABT_eventual_wait(eventual, (void**)&waited_hret);
        hret = *waited_hret;
    }

852 853 854 855 856 857 858 859
    /* convert HG_CANCELED to HG_TIMEOUT to indicate op timed out */
    if(hret == HG_CANCELED)
        hret = HG_TIMEOUT;

    /* remove timer if it is still in place (i.e., not timed out) */
    if(hret != HG_TIMEOUT)
        margo_timer_destroy(mid, &forward_timer);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
860 861 862 863 864 865 866 867
    ABT_eventual_free(&eventual);

    return(hret);
}

hg_return_t margo_respond(
    hg_handle_t handle,
    void *out_struct)
Matthieu Dorier's avatar
Matthieu Dorier committed
868 869 870 871 872 873 874 875 876 877 878 879 880
{
    hg_return_t hret;
    margo_request req;
    hret = margo_irespond(handle,out_struct,&req);
    if(hret != HG_SUCCESS)
        return hret;
    return margo_wait(req);
}

hg_return_t margo_irespond(
    hg_handle_t handle,
    void *out_struct,
    margo_request* req)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
881 882 883 884
{
    ABT_eventual eventual;
    int ret;

Matthieu Dorier's avatar
Matthieu Dorier committed
885
    ret = ABT_eventual_create(sizeof(hg_return_t), &eventual);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
886 887 888 889 890
    if(ret != 0)
    {
        return(HG_NOMEM_ERROR);
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
891
    *req = eventual;
892

Matthieu Dorier's avatar
Matthieu Dorier committed
893
    return HG_Respond(handle, margo_cb, (void*)eventual, out_struct);
894 895
}

896 897 898 899 900 901 902
hg_return_t margo_bulk_create(
    margo_instance_id mid,
    hg_uint32_t count,
    void **buf_ptrs,
    const hg_size_t *buf_sizes,
    hg_uint8_t flags,
    hg_bulk_t *handle)
903
{
904 905 906
    return(HG_Bulk_create(mid->hg_class, count,
        buf_ptrs, buf_sizes, flags, handle));
}
907

908 909 910 911
hg_return_t margo_bulk_free(
    hg_bulk_t handle)
{
    return(HG_Bulk_free(handle));
912 913
}

914 915 916 917 918 919 920 921
hg_return_t margo_bulk_deserialize(
    margo_instance_id mid,
    hg_bulk_t *handle,
    const void *buf,
    hg_size_t buf_size)
{
    return(HG_Bulk_deserialize(mid->hg_class, handle, buf, buf_size));
}
922

923
hg_return_t margo_bulk_transfer(
924
    margo_instance_id mid,
925
    hg_bulk_op_t op,
926
    hg_addr_t origin_addr,
927 928 929 930
    hg_bulk_t origin_handle,
    size_t origin_offset,
    hg_bulk_t local_handle,
    size_t local_offset,
931
    size_t size)
Matthieu Dorier's avatar
Matthieu Dorier committed
932 933 934 935
{  
    margo_request req;
    hg_return_t hret = margo_bulk_itransfer(mid,op,origin_addr,
                          origin_handle, origin_offset, local_handle,
Matthieu Dorier's avatar
Matthieu Dorier committed
936
                          local_offset, size, &req);
Matthieu Dorier's avatar
Matthieu Dorier committed
937 938 939 940 941 942 943 944 945 946 947 948 949 950 951
    if(hret != HG_SUCCESS)
        return hret;
    return margo_wait(req);
}

hg_return_t margo_bulk_itransfer(
    margo_instance_id mid,
    hg_bulk_op_t op,
    hg_addr_t origin_addr,
    hg_bulk_t origin_handle,
    size_t origin_offset,
    hg_bulk_t local_handle,
    size_t local_offset,
    size_t size,
    margo_request* req)
952 953 954 955 956 957 958 959 960 961 962
{
    hg_return_t hret = HG_TIMEOUT;
    ABT_eventual eventual;
    int ret;

    ret = ABT_eventual_create(sizeof(hret), &eventual);
    if(ret != 0)
    {
        return(HG_NOMEM_ERROR);        
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
963
    *req = eventual;
964

Matthieu Dorier's avatar
Matthieu Dorier committed
965 966
    hret = HG_Bulk_transfer(mid->hg_context, margo_cb,
        (void*)eventual, op, origin_addr, origin_handle, origin_offset, local_handle,
967
        local_offset, size, HG_OP_ID_IGNORE);
968 969 970 971

    return(hret);
}

972 973 974 975
typedef struct
{
    ABT_mutex mutex;
    ABT_cond cond;
Shane Snyder's avatar
Shane Snyder committed
976
    char is_asleep;
977 978 979 980 981 982 983 984 985
} margo_thread_sleep_cb_dat;

static void margo_thread_sleep_cb(void *arg)
{
    margo_thread_sleep_cb_dat *sleep_cb_dat =
        (margo_thread_sleep_cb_dat *)arg;

    /* wake up the sleeping thread */
    ABT_mutex_lock(sleep_cb_dat->mutex);
986
    sleep_cb_dat->is_asleep = 0;
987 988 989 990 991 992 993
    ABT_cond_signal(sleep_cb_dat->cond);
    ABT_mutex_unlock(sleep_cb_dat->mutex);

    return;
}

void margo_thread_sleep(
994
    margo_instance_id mid,
995 996 997 998 999 1000 1001 1002
    double timeout_ms)
{
    margo_timer_t sleep_timer;
    margo_thread_sleep_cb_dat sleep_cb_dat;

    /* set data needed for sleep callback */
    ABT_mutex_create(&(sleep_cb_dat.mutex));
    ABT_cond_create(&(sleep_cb_dat.cond));
1003
    sleep_cb_dat.is_asleep = 1;
1004 1005

    /* initialize the sleep timer */
1006
    margo_timer_init(mid, &sleep_timer, margo_thread_sleep_cb,
1007 1008 1009 1010
        &sleep_cb_dat, timeout_ms);

    /* yield thread for specified timeout */
    ABT_mutex_lock(sleep_cb_dat.mutex);
1011 1012
    while(sleep_cb_dat.is_asleep)
        ABT_cond_wait(sleep_cb_dat.cond, sleep_cb_dat.mutex);
1013 1014
    ABT_mutex_unlock(sleep_cb_dat.mutex);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
1015 1016 1017 1018
    /* clean up */
    ABT_mutex_free(&sleep_cb_dat.mutex);
    ABT_cond_free(&sleep_cb_dat.cond);

1019 1020 1021
    return;
}

1022
int margo_get_handler_pool(margo_instance_id mid, ABT_pool* pool)
1023
{
1024 1025 1026 1027 1028 1029
    if(mid) {
        *pool = mid->handler_pool;
        return 0;
    } else {
        return -1;
    }
1030
}
1031

1032 1033 1034 1035
hg_context_t* margo_get_context(margo_instance_id mid)
{
    return(mid->hg_context);
}
1036

1037 1038 1039
hg_class_t* margo_get_class(margo_instance_id mid)
{
    return(mid->hg_class);
1040
}
Philip Carns's avatar
Philip Carns committed
1041

1042
margo_instance_id margo_hg_handle_get_instance(hg_handle_t h)
1043
{
1044 1045
	const struct hg_info* info = HG_Get_info(h);
	if(!info) return MARGO_INSTANCE_NULL;
1046 1047 1048 1049 1050
    return margo_hg_info_get_instance(info);
}

margo_instance_id margo_hg_info_get_instance(const struct hg_info *info)
{
1051 1052 1053 1054
	struct margo_rpc_data* data = 
		(struct margo_rpc_data*) HG_Registered_data(info->hg_class, info->id);
	if(!data) return MARGO_INSTANCE_NULL;
	return data->mid;
1055 1056
}

1057
int margo_lookup_mplex(margo_instance_id mid, hg_id_t id, uint8_t mplex_id, ABT_pool *pool)
Philip Carns's avatar
Philip Carns committed
1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072
{
    struct mplex_key key;
    struct mplex_element *element;

    if(!mplex_id)
    {
        *pool = mid->handler_pool;
        return(0);
    }

    memset(&key, 0, sizeof(key));
    key.id = id;
    key.mplex_id = mplex_id;

    HASH_FIND(hh, mid->mplex_table, &key, sizeof(key), element);
1073 1074 1075 1076 1077 1078
    if(!element) {
        if(mplex_id == 0) // element does not exist and mplex is 0, return defau