margo.c 33.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11

/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <assert.h>
#include <unistd.h>
#include <errno.h>
#include <abt.h>
Matthieu Dorier's avatar
Matthieu Dorier committed
12 13 14

#include <margo-config.h>
#ifdef HAVE_ABT_SNOOZER
15
#include <abt-snoozer.h>
Matthieu Dorier's avatar
Matthieu Dorier committed
16
#endif
17
#include <time.h>
Philip Carns's avatar
bug fix  
Philip Carns committed
18
#include <math.h>
19 20

#include "margo.h"
21
#include "margo-timer.h"
Philip Carns's avatar
Philip Carns committed
22
#include "utlist.h"
Philip Carns's avatar
Philip Carns committed
23
#include "uthash.h"
24

25
#define DEFAULT_MERCURY_PROGRESS_TIMEOUT_UB 100 /* 100 milliseconds */
Shane Snyder's avatar
Shane Snyder committed
26
#define DEFAULT_MERCURY_HANDLE_CACHE_SIZE 32
27

Philip Carns's avatar
Philip Carns committed
28 29 30 31 32 33 34 35 36 37 38 39 40
struct mplex_key
{
    hg_id_t id;
    uint32_t mplex_id;
};

struct mplex_element
{
    struct mplex_key key;
    ABT_pool pool;
    UT_hash_handle hh;
};

Philip Carns's avatar
Philip Carns committed
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
struct diag_data
{
    double min;
    double max;
    double cumulative;
    int count;
};

#define __DIAG_UPDATE(__data, __time)\
do {\
    __data.count++; \
    __data.cumulative += (__time); \
    if((__time) > __data.max) __data.max = (__time); \
    if((__time) < __data.min) __data.min = (__time); \
} while(0)

Shane Snyder's avatar
Shane Snyder committed
57 58 59 60 61 62 63
struct margo_handle_cache_el
{
    hg_handle_t handle;
    UT_hash_handle hh; /* in-use hash link */
    struct margo_handle_cache_el *next; /* free list link */
};

64 65
struct margo_instance
{
Shane Snyder's avatar
Shane Snyder committed
66
    /* mercury/argobots state */
67 68
    hg_context_t *hg_context;
    hg_class_t *hg_class;
69 70 71
    ABT_pool handler_pool;
    ABT_pool progress_pool;

72
    /* internal to margo for this particular instance */
Shane Snyder's avatar
Shane Snyder committed
73
    int margo_init;
74
    int abt_init;
75 76
    ABT_thread hg_progress_tid;
    int hg_progress_shutdown_flag;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
77
    ABT_xstream progress_xstream;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
78 79 80
    int owns_progress_pool;
    ABT_xstream *rpc_xstreams;
    int num_handler_pool_threads;
81
    unsigned int hg_progress_timeout_ub;
82 83 84

    /* control logic for callers waiting on margo to be finalized */
    int finalize_flag;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
85
    int refcount;
86 87 88
    ABT_mutex finalize_mutex;
    ABT_cond finalize_cond;

Philip Carns's avatar
Philip Carns committed
89 90
    /* hash table to track multiplexed rpcs registered with margo */
    struct mplex_element *mplex_table;
Philip Carns's avatar
Philip Carns committed
91

Shane Snyder's avatar
Shane Snyder committed
92 93 94
    /* linked list of free hg handles and a hash of in-use handles */
    struct margo_handle_cache_el *free_handle_list;
    struct margo_handle_cache_el *used_handle_hash;
95
    ABT_mutex handle_cache_mtx; /* mutex protecting access to above caches */
Shane Snyder's avatar
Shane Snyder committed
96

Philip Carns's avatar
Philip Carns committed
97 98 99 100 101 102 103 104 105 106 107
    /* optional diagnostics data tracking */
    /* NOTE: technically the following fields are subject to races if they
     * are updated from more than one thread at a time.  We will be careful
     * to only update the counters from the progress_fn,
     * which will serialize access.
     */
    int diag_enabled;
    struct diag_data diag_trigger_elapsed;
    struct diag_data diag_progress_elapsed_zero_timeout;
    struct diag_data diag_progress_elapsed_nonzero_timeout;
    struct diag_data diag_progress_timeout_value;
108 109
};

110 111 112 113 114 115
struct margo_rpc_data
{
	margo_instance_id mid;
	void* user_data;
	void (*user_free_callback)(void *);
};
116

117
static void hg_progress_fn(void* foo);
118
static void margo_rpc_data_free(void* ptr);
119

Shane Snyder's avatar
Shane Snyder committed
120 121 122 123 124 125 126
static hg_return_t margo_handle_cache_init(margo_instance_id mid);
static void margo_handle_cache_destroy(margo_instance_id mid);
static hg_return_t margo_handle_cache_get(margo_instance_id mid,
    hg_addr_t addr, hg_id_t id, hg_handle_t *handle);
static hg_return_t margo_handle_cache_put(margo_instance_id mid,
    hg_handle_t handle);

Shane Snyder's avatar
Shane Snyder committed
127
margo_instance_id margo_init(const char *addr_str, int mode,
Shane Snyder's avatar
Shane Snyder committed
128
    int use_progress_thread, int rpc_thread_count)
129
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
130 131 132 133 134
    ABT_xstream progress_xstream = ABT_XSTREAM_NULL;
    ABT_pool progress_pool = ABT_POOL_NULL;
    ABT_xstream *rpc_xstreams = NULL;
    ABT_xstream rpc_xstream = ABT_XSTREAM_NULL;
    ABT_pool rpc_pool = ABT_POOL_NULL;
Shane Snyder's avatar
Shane Snyder committed
135 136
    hg_class_t *hg_class = NULL;
    hg_context_t *hg_context = NULL;
Shane Snyder's avatar
Shane Snyder committed
137
    int listen_flag = (mode == MARGO_CLIENT_MODE) ? HG_FALSE : HG_TRUE;
138
    int abt_init = 0;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
139
    int i;
Shane Snyder's avatar
Shane Snyder committed
140 141 142
    int ret;
    struct margo_instance *mid = MARGO_INSTANCE_NULL;

Shane Snyder's avatar
Shane Snyder committed
143
    if(mode != MARGO_CLIENT_MODE && mode != MARGO_SERVER_MODE) goto err;
Shane Snyder's avatar
Shane Snyder committed
144

145 146 147 148 149 150
    if (ABT_initialized() == ABT_ERR_UNINITIALIZED)
    {
        ret = ABT_init(0, NULL); /* XXX: argc/argv not currently used by ABT ... */
        if(ret != 0) goto err;
        abt_init = 1;
    }
Shane Snyder's avatar
Shane Snyder committed
151

152
    /* set caller (self) ES to idle without polling */
Matthieu Dorier's avatar
Matthieu Dorier committed
153
#ifdef HAVE_ABT_SNOOZER
Shane Snyder's avatar
Shane Snyder committed
154 155
    ret = ABT_snoozer_xstream_self_set();
    if(ret != 0) goto err;
Matthieu Dorier's avatar
Matthieu Dorier committed
156
#endif
Jonathan Jenkins's avatar
Jonathan Jenkins committed
157 158 159

    if (use_progress_thread)
    {
Matthieu Dorier's avatar
Matthieu Dorier committed
160
#ifdef HAVE_ABT_SNOOZER
Jonathan Jenkins's avatar
Jonathan Jenkins committed
161
        ret = ABT_snoozer_xstream_create(1, &progress_pool, &progress_xstream);
Matthieu Dorier's avatar
Matthieu Dorier committed
162 163 164 165 166 167 168
		if (ret != ABT_SUCCESS) goto err;
#else
		ret = ABT_xstream_create(ABT_SCHED_NULL, &progress_xstream);
		if (ret != ABT_SUCCESS) goto err;
		ret = ABT_xstream_get_main_pools(progress_xstream, 1, &progress_pool);
		if (ret != ABT_SUCCESS) goto err;
#endif
Jonathan Jenkins's avatar
Jonathan Jenkins committed
169 170 171 172 173 174 175 176 177
    }
    else
    {
        ret = ABT_xstream_self(&progress_xstream);
        if (ret != ABT_SUCCESS) goto err;
        ret = ABT_xstream_get_main_pools(progress_xstream, 1, &progress_pool);
        if (ret != ABT_SUCCESS) goto err;
    }

178
    if (rpc_thread_count > 0)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
179
    {
180 181
        rpc_xstreams = calloc(rpc_thread_count, sizeof(*rpc_xstreams));
        if (rpc_xstreams == NULL) goto err;
Matthieu Dorier's avatar
Matthieu Dorier committed
182
#ifdef HAVE_ABT_SNOOZER
183 184 185
        ret = ABT_snoozer_xstream_create(rpc_thread_count, &rpc_pool,
                rpc_xstreams);
        if (ret != ABT_SUCCESS) goto err;
Matthieu Dorier's avatar
Matthieu Dorier committed
186
#else
187 188 189 190 191
        int j;
        ret = ABT_pool_create_basic(ABT_POOL_FIFO, ABT_POOL_ACCESS_MPMC, ABT_TRUE, &rpc_pool);
        if (ret != ABT_SUCCESS) goto err;
        for(j=0; j<rpc_thread_count; j++) {
            ret = ABT_xstream_create(ABT_SCHED_NULL, rpc_xstreams+j);
Shane Snyder's avatar
Shane Snyder committed
192 193
            if (ret != ABT_SUCCESS) goto err;
        }
194 195 196 197 198 199 200 201 202 203 204 205
#endif
    }
    else if (rpc_thread_count == 0)
    {
        ret = ABT_xstream_self(&rpc_xstream);
        if (ret != ABT_SUCCESS) goto err;
        ret = ABT_xstream_get_main_pools(rpc_xstream, 1, &rpc_pool);
        if (ret != ABT_SUCCESS) goto err;
    }
    else
    {
        rpc_pool = progress_pool;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
206 207
    }

Shane Snyder's avatar
Shane Snyder committed
208 209 210 211 212 213
    hg_class = HG_Init(addr_str, listen_flag);
    if(!hg_class) goto err;

    hg_context = HG_Context_create(hg_class);
    if(!hg_context) goto err;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
214 215 216
    mid = margo_init_pool(progress_pool, rpc_pool, hg_context);
    if (mid == MARGO_INSTANCE_NULL) goto err;

Shane Snyder's avatar
Shane Snyder committed
217
    mid->margo_init = 1;
218
    mid->abt_init = abt_init;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
219 220 221
    mid->owns_progress_pool = use_progress_thread;
    mid->progress_xstream = progress_xstream;
    mid->num_handler_pool_threads = rpc_thread_count < 0 ? 0 : rpc_thread_count;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
222
    mid->rpc_xstreams = rpc_xstreams;
223

Jonathan Jenkins's avatar
Jonathan Jenkins committed
224 225 226
    return mid;

err:
Shane Snyder's avatar
Shane Snyder committed
227 228 229 230 231 232 233
    if(mid)
    {
        margo_timer_instance_finalize(mid);
        ABT_mutex_free(&mid->finalize_mutex);
        ABT_cond_free(&mid->finalize_cond);
        free(mid);
    }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
234 235 236 237 238 239 240 241 242 243 244 245 246 247
    if (use_progress_thread && progress_xstream != ABT_XSTREAM_NULL)
    {
        ABT_xstream_join(progress_xstream);
        ABT_xstream_free(&progress_xstream);
    }
    if (rpc_thread_count > 0 && rpc_xstreams != NULL)
    {
        for (i = 0; i < rpc_thread_count; i++)
        {
            ABT_xstream_join(rpc_xstreams[i]);
            ABT_xstream_free(&rpc_xstreams[i]);
        }
        free(rpc_xstreams);
    }
Shane Snyder's avatar
Shane Snyder committed
248 249 250 251
    if(hg_context)
        HG_Context_destroy(hg_context);
    if(hg_class)
        HG_Finalize(hg_class);
252 253
    if(abt_init)
        ABT_finalize();
Jonathan Jenkins's avatar
Jonathan Jenkins committed
254 255 256 257
    return MARGO_INSTANCE_NULL;
}

margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
258
    hg_context_t *hg_context)
259 260
{
    int ret;
Shane Snyder's avatar
Shane Snyder committed
261
    hg_return_t hret;
262 263 264
    struct margo_instance *mid;

    mid = malloc(sizeof(*mid));
Shane Snyder's avatar
Shane Snyder committed
265
    if(!mid) goto err;
266
    memset(mid, 0, sizeof(*mid));
267

268 269 270
    ABT_mutex_create(&mid->finalize_mutex);
    ABT_cond_create(&mid->finalize_cond);

271 272
    mid->progress_pool = progress_pool;
    mid->handler_pool = handler_pool;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
273
    mid->hg_class = HG_Context_get_class(hg_context);
274
    mid->hg_context = hg_context;
275
    mid->hg_progress_timeout_ub = DEFAULT_MERCURY_PROGRESS_TIMEOUT_UB;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
276
    mid->refcount = 1;
277

278
    ret = margo_timer_instance_init(mid);
Shane Snyder's avatar
Shane Snyder committed
279
    if(ret != 0) goto err;
280

Shane Snyder's avatar
Shane Snyder committed
281 282 283 284
    /* initialize the handle cache */
    hret = margo_handle_cache_init(mid);
    if(hret != HG_SUCCESS) goto err;

285
    ret = ABT_thread_create(mid->progress_pool, hg_progress_fn, mid, 
286
        ABT_THREAD_ATTR_NULL, &mid->hg_progress_tid);
Shane Snyder's avatar
Shane Snyder committed
287 288
    if(ret != 0) goto err;

Shane Snyder's avatar
Shane Snyder committed
289 290
    return mid;

Shane Snyder's avatar
Shane Snyder committed
291 292
err:
    if(mid)
293
    {
Shane Snyder's avatar
Shane Snyder committed
294
        margo_handle_cache_destroy(mid);
Shane Snyder's avatar
Shane Snyder committed
295 296 297
        margo_timer_instance_finalize(mid);
        ABT_mutex_free(&mid->finalize_mutex);
        ABT_cond_free(&mid->finalize_cond);
298
        free(mid);
299
    }
Shane Snyder's avatar
Shane Snyder committed
300
    return MARGO_INSTANCE_NULL;
301 302
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
static void margo_cleanup(margo_instance_id mid)
{
    int i;

    margo_timer_instance_finalize(mid);

    ABT_mutex_free(&mid->finalize_mutex);
    ABT_cond_free(&mid->finalize_cond);

    if (mid->owns_progress_pool)
    {
        ABT_xstream_join(mid->progress_xstream);
        ABT_xstream_free(&mid->progress_xstream);
    }

    if (mid->num_handler_pool_threads > 0)
    {
        for (i = 0; i < mid->num_handler_pool_threads; i++)
        {
            ABT_xstream_join(mid->rpc_xstreams[i]);
            ABT_xstream_free(&mid->rpc_xstreams[i]);
        }
        free(mid->rpc_xstreams);
    }

Shane Snyder's avatar
Shane Snyder committed
328 329
    margo_handle_cache_destroy(mid);

Shane Snyder's avatar
Shane Snyder committed
330 331 332 333 334 335
    if (mid->margo_init)
    {
        if (mid->hg_context)
            HG_Context_destroy(mid->hg_context);
        if (mid->hg_class)
            HG_Finalize(mid->hg_class);
336 337
        if (mid->abt_init)
            ABT_finalize();
Shane Snyder's avatar
Shane Snyder committed
338 339
    }

Jonathan Jenkins's avatar
Jonathan Jenkins committed
340 341 342
    free(mid);
}

343
void margo_finalize(margo_instance_id mid)
344
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
345
    int do_cleanup;
346

347
    /* tell progress thread to wrap things up */
348
    mid->hg_progress_shutdown_flag = 1;
349 350

    /* wait for it to shutdown cleanly */
351 352
    ABT_thread_join(mid->hg_progress_tid);
    ABT_thread_free(&mid->hg_progress_tid);
353

354 355 356 357
    ABT_mutex_lock(mid->finalize_mutex);
    mid->finalize_flag = 1;
    ABT_cond_broadcast(mid->finalize_cond);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
358 359
    mid->refcount--;
    do_cleanup = mid->refcount == 0;
360

Jonathan Jenkins's avatar
Jonathan Jenkins committed
361 362 363 364 365 366 367
    ABT_mutex_unlock(mid->finalize_mutex);

    /* if there was noone waiting on the finalize at the time of the finalize
     * broadcast, then we're safe to clean up. Otherwise, let the finalizer do
     * it */
    if (do_cleanup)
        margo_cleanup(mid);
368 369 370 371 372 373

    return;
}

void margo_wait_for_finalize(margo_instance_id mid)
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
374
    int do_cleanup;
375 376 377

    ABT_mutex_lock(mid->finalize_mutex);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
378
        mid->refcount++;
379 380 381 382
            
        while(!mid->finalize_flag)
            ABT_cond_wait(mid->finalize_cond, mid->finalize_mutex);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
383 384 385
        mid->refcount--;
        do_cleanup = mid->refcount == 0;

386
    ABT_mutex_unlock(mid->finalize_mutex);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
387 388 389 390

    if (do_cleanup)
        margo_cleanup(mid);

391 392 393
    return;
}

394 395
hg_id_t margo_register_name(margo_instance_id mid, const char *func_name,
    hg_proc_cb_t in_proc_cb, hg_proc_cb_t out_proc_cb, hg_rpc_cb_t rpc_cb)
396
{
397 398 399
	struct margo_rpc_data* margo_data;
    hg_return_t hret;
    hg_id_t id;
400

401 402 403
    id = HG_Register_name(mid->hg_class, func_name, in_proc_cb, out_proc_cb, rpc_cb);
    if(id <= 0)
        return(0);
404

405 406 407 408 409 410 411 412 413 414 415 416
	/* register the margo data with the RPC */
    margo_data = (struct margo_rpc_data*)HG_Registered_data(mid->hg_class, id);
    if(!margo_data)
    {
        margo_data = (struct margo_rpc_data*)malloc(sizeof(struct margo_rpc_data));
        if(!margo_data)
            return(0);
        margo_data->mid = mid;
        margo_data->user_data = NULL;
        margo_data->user_free_callback = NULL;
        hret = HG_Register_data(mid->hg_class, id, margo_data, margo_rpc_data_free);
        if(hret != HG_SUCCESS)
417
        {
418 419
            free(margo_data);
            return(0);
420
        }
421 422
    }

423
	return(id);
424 425
}

426 427 428
hg_id_t margo_register_name_mplex(margo_instance_id mid, const char *func_name,
    hg_proc_cb_t in_proc_cb, hg_proc_cb_t out_proc_cb, hg_rpc_cb_t rpc_cb,
    uint32_t mplex_id, ABT_pool pool)
429
{
430 431 432
    struct mplex_key key;
    struct mplex_element *element;
    hg_id_t id;
433

434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458
    id = margo_register_name(mid, func_name, in_proc_cb, out_proc_cb, rpc_cb);
    if(id <= 0)
        return(0);

    /* nothing to do, we'll let the handler pool take this directly */
    if(mplex_id == MARGO_DEFAULT_MPLEX_ID)
        return(id);

    memset(&key, 0, sizeof(key));
    key.id = id;
    key.mplex_id = mplex_id;

    HASH_FIND(hh, mid->mplex_table, &key, sizeof(key), element);
    if(element)
        return(id);

    element = malloc(sizeof(*element));
    if(!element)
        return(0);
    element->key = key;
    element->pool = pool;

    HASH_ADD(hh, mid->mplex_table, key, sizeof(key), element);

    return(id);
459 460
}

461 462
hg_return_t margo_registered_name(margo_instance_id mid, const char *func_name,
    hg_id_t *id, hg_bool_t *flag)
463
{
464
    return(HG_Registered_name(mid->hg_class, func_name, id, flag));
465 466
}

467 468 469 470 471 472 473
hg_return_t margo_register_data(
    margo_instance_id mid,
    hg_id_t id,
    void *data,
    void (*free_callback)(void *)) 
{
	struct margo_rpc_data* margo_data 
474
		= (struct margo_rpc_data*) HG_Registered_data(mid->hg_class, id);
475 476 477 478 479 480 481 482 483 484 485 486 487 488
	if(!margo_data) return HG_OTHER_ERROR;
	margo_data->user_data = data;
	margo_data->user_free_callback = free_callback;
	return HG_SUCCESS;
}

void* margo_registered_data(margo_instance_id mid, hg_id_t id)
{
	struct margo_rpc_data* data
		= (struct margo_rpc_data*) HG_Registered_data(margo_get_class(mid), id);
	if(!data) return NULL;
	else return data->user_data;
}

489 490 491 492
hg_return_t margo_registered_disable_response(
    margo_instance_id mid,
    hg_id_t id,
    int disable_flag)
493
{
494
    return(HG_Registered_disable_response(mid->hg_class, id, disable_flag));
495
}
496

497
struct lookup_cb_evt
498
{
499
    hg_return_t hret;
500 501 502 503 504 505
    hg_addr_t addr;
};

static hg_return_t margo_addr_lookup_cb(const struct hg_cb_info *info)
{
    struct lookup_cb_evt evt;
506
    evt.hret = info->ret;
507
    evt.addr = info->info.lookup.addr;
Matthieu Dorier's avatar
Matthieu Dorier committed
508
    ABT_eventual eventual = (ABT_eventual)(info->arg);
509 510

    /* propagate return code out through eventual */
Matthieu Dorier's avatar
Matthieu Dorier committed
511
    ABT_eventual_set(eventual, &evt, sizeof(evt));
512

513 514 515
    return(HG_SUCCESS);
}

516 517 518 519
hg_return_t margo_addr_lookup(
    margo_instance_id mid,
    const char   *name,
    hg_addr_t    *addr)
520
{
521
    hg_return_t hret;
522 523 524
    struct lookup_cb_evt *evt;
    ABT_eventual eventual;
    int ret;
525

526 527 528 529 530 531
    ret = ABT_eventual_create(sizeof(*evt), &eventual);
    if(ret != 0)
    {
        return(HG_NOMEM_ERROR);        
    }

532
    hret = HG_Addr_lookup(mid->hg_context, margo_addr_lookup_cb,
Matthieu Dorier's avatar
Matthieu Dorier committed
533
        (void*)eventual, name, HG_OP_ID_IGNORE);
534
    if(hret == HG_SUCCESS)
535 536 537
    {
        ABT_eventual_wait(eventual, (void**)&evt);
        *addr = evt->addr;
538
        hret = evt->hret;
539 540 541 542
    }

    ABT_eventual_free(&eventual);

543
    return(hret);
544 545 546 547 548
}

hg_return_t margo_addr_free(
    margo_instance_id mid,
    hg_addr_t addr)
549
{
550 551
    return(HG_Addr_free(mid->hg_class, addr));
}
552

553 554 555 556 557
hg_return_t margo_addr_self(
    margo_instance_id mid,
    hg_addr_t *addr)
{
    return(HG_Addr_self(mid->hg_class, addr));
558 559
}

560 561 562 563 564 565 566 567 568
hg_return_t margo_addr_dup(
    margo_instance_id mid,
    hg_addr_t addr,
    hg_addr_t *new_addr)
{
    return(HG_Addr_dup(mid->hg_class, addr, new_addr));
}

hg_return_t margo_addr_to_string(
569
    margo_instance_id mid,
570 571 572 573 574 575 576 577 578 579
    char *buf,
    hg_size_t *buf_size,
    hg_addr_t addr)
{
    return(HG_Addr_to_string(mid->hg_class, buf, buf_size, addr));
}

hg_return_t margo_create(margo_instance_id mid, hg_addr_t addr,
    hg_id_t id, hg_handle_t *handle)
{
580
    hg_return_t hret = HG_OTHER_ERROR;
Shane Snyder's avatar
Shane Snyder committed
581 582 583 584 585 586 587 588

    /* look for a handle to reuse */
    hret = margo_handle_cache_get(mid, addr, id, handle);
    if(hret != HG_SUCCESS)
    {
        /* else try creating a new handle */
        hret = HG_Create(mid->hg_context, addr, id, handle);
    }
589

Shane Snyder's avatar
Shane Snyder committed
590
    return hret;
591 592
}

593
hg_return_t margo_destroy(hg_handle_t handle)
594
{
595
    margo_instance_id mid;
596
    hg_return_t hret = HG_OTHER_ERROR;
Shane Snyder's avatar
Shane Snyder committed
597

598 599 600
    /* use the handle to get the associated mid */
    mid = margo_hg_handle_get_instance(handle);

Shane Snyder's avatar
Shane Snyder committed
601 602 603 604 605 606 607
    /* recycle this handle if it came from the handle cache */
    hret = margo_handle_cache_put(mid, handle);
    if(hret != HG_SUCCESS)
    {
        /* else destroy the handle manually */
        hret = HG_Destroy(handle);
    }
608

Shane Snyder's avatar
Shane Snyder committed
609
    return hret;
610 611 612 613 614
}

static hg_return_t margo_cb(const struct hg_cb_info *info)
{
    hg_return_t hret = info->ret;
Matthieu Dorier's avatar
Matthieu Dorier committed
615
    ABT_eventual eventual = (ABT_eventual)(info->arg);
616 617

    /* propagate return code out through eventual */
Matthieu Dorier's avatar
Matthieu Dorier committed
618
    ABT_eventual_set(eventual, &hret, sizeof(hret));
619 620 621 622 623 624 625
    
    return(HG_SUCCESS);
}

hg_return_t margo_forward(
    hg_handle_t handle,
    void *in_struct)
Matthieu Dorier's avatar
Matthieu Dorier committed
626 627 628 629 630 631 632 633 634 635 636 637 638
{
	hg_return_t hret;
	margo_request req;
	hret = margo_iforward(handle, in_struct, &req);
	if(hret != HG_SUCCESS) 
		return hret;
	return margo_wait(req);
}

hg_return_t margo_iforward(
    hg_handle_t handle,
    void *in_struct,
    margo_request* req)
639 640
{
    hg_return_t hret = HG_TIMEOUT;
641
    ABT_eventual eventual;
642
    int ret;
643 644 645 646 647 648 649

    ret = ABT_eventual_create(sizeof(hret), &eventual);
    if(ret != 0)
    {
        return(HG_NOMEM_ERROR);        
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
650
    *req = eventual;
651

Matthieu Dorier's avatar
Matthieu Dorier committed
652
    return HG_Forward(handle, margo_cb, (void*)eventual, in_struct);
Matthieu Dorier's avatar
Matthieu Dorier committed
653
}
654

Matthieu Dorier's avatar
Matthieu Dorier committed
655 656 657 658
hg_return_t margo_wait(margo_request req)
{
	hg_return_t* waited_hret;
	hg_return_t  hret;
659

Matthieu Dorier's avatar
Matthieu Dorier committed
660 661 662 663
    ABT_eventual_wait(req, (void**)&waited_hret);
	hret = *waited_hret;
    ABT_eventual_free(&req);
	
664
    return(hret);
665 666
}

Matthieu Dorier's avatar
Matthieu Dorier committed
667 668 669 670 671
int margo_test(margo_request req, int* flag)
{
    return ABT_eventual_test(req, NULL, flag);
}

672 673 674 675
typedef struct
{
    hg_handle_t handle;
} margo_forward_timeout_cb_dat;
676

677 678 679 680 681 682 683 684 685 686 687
static void margo_forward_timeout_cb(void *arg)
{
    margo_forward_timeout_cb_dat *timeout_cb_dat =
        (margo_forward_timeout_cb_dat *)arg;

    /* cancel the Mercury op if the forward timed out */
    HG_Cancel(timeout_cb_dat->handle);
    return;
}

hg_return_t margo_forward_timed(
688
    hg_handle_t handle,
689 690
    void *in_struct,
    double timeout_ms)
691 692
{
    int ret;
693
    hg_return_t hret;
694
    margo_instance_id mid;
695
    ABT_eventual eventual;
696
    hg_return_t* waited_hret;
697 698
    margo_timer_t forward_timer;
    margo_forward_timeout_cb_dat timeout_cb_dat;
699 700 701 702 703 704 705

    ret = ABT_eventual_create(sizeof(hret), &eventual);
    if(ret != 0)
    {
        return(HG_NOMEM_ERROR);        
    }

706 707 708
    /* use the handle to get the associated mid */
    mid = margo_hg_handle_get_instance(handle);

709 710 711 712 713
    /* set a timer object to expire when this forward times out */
    timeout_cb_dat.handle = handle;
    margo_timer_init(mid, &forward_timer, margo_forward_timeout_cb,
        &timeout_cb_dat, timeout_ms);

Matthieu Dorier's avatar
Matthieu Dorier committed
714
    hret = HG_Forward(handle, margo_cb, (void*)eventual, in_struct);
715
    if(hret == HG_SUCCESS)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
716 717 718 719 720
    {
        ABT_eventual_wait(eventual, (void**)&waited_hret);
        hret = *waited_hret;
    }

721 722 723 724 725 726 727 728
    /* convert HG_CANCELED to HG_TIMEOUT to indicate op timed out */
    if(hret == HG_CANCELED)
        hret = HG_TIMEOUT;

    /* remove timer if it is still in place (i.e., not timed out) */
    if(hret != HG_TIMEOUT)
        margo_timer_destroy(mid, &forward_timer);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
729 730 731 732 733 734 735 736
    ABT_eventual_free(&eventual);

    return(hret);
}

hg_return_t margo_respond(
    hg_handle_t handle,
    void *out_struct)
Matthieu Dorier's avatar
Matthieu Dorier committed
737 738 739 740 741 742 743 744 745 746 747 748 749
{
    hg_return_t hret;
    margo_request req;
    hret = margo_irespond(handle,out_struct,&req);
    if(hret != HG_SUCCESS)
        return hret;
    return margo_wait(req);
}

hg_return_t margo_irespond(
    hg_handle_t handle,
    void *out_struct,
    margo_request* req)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
750 751 752 753
{
    ABT_eventual eventual;
    int ret;

Matthieu Dorier's avatar
Matthieu Dorier committed
754
    ret = ABT_eventual_create(sizeof(hg_return_t), &eventual);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
755 756 757 758 759
    if(ret != 0)
    {
        return(HG_NOMEM_ERROR);
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
760
    *req = eventual;
761

Matthieu Dorier's avatar
Matthieu Dorier committed
762
    return HG_Respond(handle, margo_cb, (void*)eventual, out_struct);
763 764
}

765 766 767 768 769 770 771
hg_return_t margo_bulk_create(
    margo_instance_id mid,
    hg_uint32_t count,
    void **buf_ptrs,
    const hg_size_t *buf_sizes,
    hg_uint8_t flags,
    hg_bulk_t *handle)
772
{
773 774 775
    return(HG_Bulk_create(mid->hg_class, count,
        buf_ptrs, buf_sizes, flags, handle));
}
776

777 778 779 780
hg_return_t margo_bulk_free(
    hg_bulk_t handle)
{
    return(HG_Bulk_free(handle));
781 782
}

783 784 785 786 787 788 789 790
hg_return_t margo_bulk_deserialize(
    margo_instance_id mid,
    hg_bulk_t *handle,
    const void *buf,
    hg_size_t buf_size)
{
    return(HG_Bulk_deserialize(mid->hg_class, handle, buf, buf_size));
}
791

792
hg_return_t margo_bulk_transfer(
793
    margo_instance_id mid,
794
    hg_bulk_op_t op,
795
    hg_addr_t origin_addr,
796 797 798 799
    hg_bulk_t origin_handle,
    size_t origin_offset,
    hg_bulk_t local_handle,
    size_t local_offset,
800
    size_t size)
Matthieu Dorier's avatar
Matthieu Dorier committed
801 802 803 804
{  
    margo_request req;
    hg_return_t hret = margo_bulk_itransfer(mid,op,origin_addr,
                          origin_handle, origin_offset, local_handle,
Matthieu Dorier's avatar
Matthieu Dorier committed
805
                          local_offset, size, &req);
Matthieu Dorier's avatar
Matthieu Dorier committed
806 807 808 809 810 811 812 813 814 815 816 817 818 819 820
    if(hret != HG_SUCCESS)
        return hret;
    return margo_wait(req);
}

hg_return_t margo_bulk_itransfer(
    margo_instance_id mid,
    hg_bulk_op_t op,
    hg_addr_t origin_addr,
    hg_bulk_t origin_handle,
    size_t origin_offset,
    hg_bulk_t local_handle,
    size_t local_offset,
    size_t size,
    margo_request* req)
821 822 823 824 825 826 827 828 829 830 831 832
{
    hg_return_t hret = HG_TIMEOUT;
    hg_return_t *waited_hret;
    ABT_eventual eventual;
    int ret;

    ret = ABT_eventual_create(sizeof(hret), &eventual);
    if(ret != 0)
    {
        return(HG_NOMEM_ERROR);        
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
833
    *req = eventual;
834

Matthieu Dorier's avatar
Matthieu Dorier committed
835 836
    hret = HG_Bulk_transfer(mid->hg_context, margo_cb,
        (void*)eventual, op, origin_addr, origin_handle, origin_offset, local_handle,
837
        local_offset, size, HG_OP_ID_IGNORE);
838 839 840 841

    return(hret);
}

842 843 844 845
typedef struct
{
    ABT_mutex mutex;
    ABT_cond cond;
Shane Snyder's avatar
Shane Snyder committed
846
    char is_asleep;
847 848 849 850 851 852 853 854 855
} margo_thread_sleep_cb_dat;

static void margo_thread_sleep_cb(void *arg)
{
    margo_thread_sleep_cb_dat *sleep_cb_dat =
        (margo_thread_sleep_cb_dat *)arg;

    /* wake up the sleeping thread */
    ABT_mutex_lock(sleep_cb_dat->mutex);
856
    sleep_cb_dat->is_asleep = 0;
857 858 859 860 861 862 863
    ABT_cond_signal(sleep_cb_dat->cond);
    ABT_mutex_unlock(sleep_cb_dat->mutex);

    return;
}

void margo_thread_sleep(
864
    margo_instance_id mid,
865 866 867 868 869 870 871 872
    double timeout_ms)
{
    margo_timer_t sleep_timer;
    margo_thread_sleep_cb_dat sleep_cb_dat;

    /* set data needed for sleep callback */
    ABT_mutex_create(&(sleep_cb_dat.mutex));
    ABT_cond_create(&(sleep_cb_dat.cond));
873
    sleep_cb_dat.is_asleep = 1;
874 875

    /* initialize the sleep timer */
876
    margo_timer_init(mid, &sleep_timer, margo_thread_sleep_cb,
877 878 879 880
        &sleep_cb_dat, timeout_ms);

    /* yield thread for specified timeout */
    ABT_mutex_lock(sleep_cb_dat.mutex);
881 882
    while(sleep_cb_dat.is_asleep)
        ABT_cond_wait(sleep_cb_dat.cond, sleep_cb_dat.mutex);
883 884
    ABT_mutex_unlock(sleep_cb_dat.mutex);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
885 886 887 888
    /* clean up */
    ABT_mutex_free(&sleep_cb_dat.mutex);
    ABT_cond_free(&sleep_cb_dat.cond);

889 890 891
    return;
}

892
ABT_pool* margo_get_handler_pool(margo_instance_id mid)
893
{
894 895
    return(&mid->handler_pool);
}
896

897 898 899 900
hg_context_t* margo_get_context(margo_instance_id mid)
{
    return(mid->hg_context);
}
901

902 903 904
hg_class_t* margo_get_class(margo_instance_id mid)
{
    return(mid->hg_class);
905
}
Philip Carns's avatar
Philip Carns committed
906

907
margo_instance_id margo_hg_handle_get_instance(hg_handle_t h)
908
{
909 910
	const struct hg_info* info = HG_Get_info(h);
	if(!info) return MARGO_INSTANCE_NULL;
911 912 913 914 915
    return margo_hg_info_get_instance(info);
}

margo_instance_id margo_hg_info_get_instance(const struct hg_info *info)
{
916 917 918 919
	struct margo_rpc_data* data = 
		(struct margo_rpc_data*) HG_Registered_data(info->hg_class, info->id);
	if(!data) return MARGO_INSTANCE_NULL;
	return data->mid;
920 921
}

Philip Carns's avatar
Philip Carns committed
922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940
int margo_lookup_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, ABT_pool *pool)
{
    struct mplex_key key;
    struct mplex_element *element;

    if(!mplex_id)
    {
        *pool = mid->handler_pool;
        return(0);
    }

    memset(&key, 0, sizeof(key));
    key.id = id;
    key.mplex_id = mplex_id;

    HASH_FIND(hh, mid->mplex_table, &key, sizeof(key), element);
    if(!element)
        return(-1);

Philip Carns's avatar
Philip Carns committed
941 942
    assert(element->key.id == id && element->key.mplex_id == mplex_id);

Philip Carns's avatar
Philip Carns committed
943 944 945 946 947
    *pool = element->pool;

    return(0);
}

948
static void margo_rpc_data_free(void* ptr)
Philip Carns's avatar
Philip Carns committed
949
{
950 951 952 953 954 955
	struct margo_rpc_data* data = (struct margo_rpc_data*) ptr;
	if(data->user_data && data->user_free_callback) {
		data->user_free_callback(data->user_data);
	}
	free(ptr);
}
Philip Carns's avatar
Philip Carns committed
956

957 958 959 960 961 962 963
/* dedicated thread function to drive Mercury progress */
static void hg_progress_fn(void* foo)
{
    int ret;
    unsigned int actual_count;
    struct margo_instance *mid = (struct margo_instance *)foo;
    size_t size;
964
    unsigned int hg_progress_timeout = mid->hg_progress_timeout_ub;
965 966
    double next_timer_exp;
    int trigger_happened;
967 968
    double tm1, tm2;
    int diag_enabled = 0;
969

970 971 972 973
    while(!mid->hg_progress_shutdown_flag)
    {
        trigger_happened = 0;
        do {
974 975 976 977
            /* save value of instance diag variable, in case it is modified
             * while we are in loop 
             */
            diag_enabled = mid->diag_enabled;
978

979
            if(diag_enabled) tm1 = ABT_get_wtime();
980
            ret = HG_Trigger(mid->hg_context, 0, 1, &actual_count);
981 982 983 984 985
            if(diag_enabled)
            {
                tm2 = ABT_get_wtime();
                __DIAG_UPDATE(mid->diag_trigger_elapsed, (tm2-tm1));
            }
Philip Carns's avatar
Philip Carns committed
986

987 988 989
            if(ret == HG_SUCCESS && actual_count > 0)
                trigger_happened = 1;
        } while((ret == HG_SUCCESS) && actual_count && !mid->hg_progress_shutdown_flag);
Philip Carns's avatar
Philip Carns committed
990

991 992
        if(trigger_happened)
            ABT_thread_yield();
Philip Carns's avatar
Philip Carns committed
993

994
        ABT_pool_get_size(mid->progress_pool, &size);
995
        /* Are there any other threads executing in this pool that are *not*
996 997 998 999
         * blocked ?  If so then, we can't sleep here or else those threads 
         * will not get a chance to execute.
         * TODO: check is ABT_pool_get_size returns the number of ULT/tasks
         * that can be executed including this one, or not including this one.
1000
         */
1001
        if(size > 0)
1002 1003 1004 1005 1006 1007 1008 1009
        {
            /* TODO: this is being executed more than is necessary (i.e.
             * in cases where there are other legitimate ULTs eligible
             * for execution that are not blocking on any events, Margo
             * or otherwise). Maybe we need an abt scheduling tweak here
             * to make sure that this ULT is the lowest priority in that
             * scenario.
             */
1010
            if(diag_enabled) tm1 = ABT_get_wtime();
1011
            ret = HG_Progress(mid->hg_context, 0);
1012 1013 1014 1015 1016 1017
            if(diag_enabled)
            {
                tm2 = ABT_get_wtime();
                __DIAG_UPDATE(mid->diag_progress_elapsed_zero_timeout, (tm2-tm1));
                __DIAG_UPDATE(mid->diag_progress_timeout_value, 0);
            }
1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036
            if(ret == HG_SUCCESS)
            {
                /* Mercury completed something; loop around to trigger
                 * callbacks 
                 */
            }
            else if(ret == HG_TIMEOUT)
            {
                /* No completion; yield here to allow other ULTs to run */
                ABT_thread_yield();
            }
            else
            {
                /* TODO: error handling */
                fprintf(stderr, "WARNING: unexpected return code (%d) from HG_Progress()\n", ret);
            }
        }
        else
        {
1037
            hg_progress_timeout = mid->hg_progress_timeout_ub;
1038 1039 1040 1041 1042 1043 1044 1045 1046
            ret = margo_timer_get_next_expiration(mid, &next_timer_exp);
            if(ret == 0)
            {
                /* there is a queued timer, don't block long enough
                 * to keep this timer waiting
                 */
                if(next_timer_exp >= 0.0)
                {
                    next_timer_exp *= 1000; /* convert to milliseconds */
1047
                    if(next_timer_exp < mid->hg_progress_timeout_ub)
1048 1049 1050 1051 1052 1053 1054
                        hg_progress_timeout = (unsigned int)next_timer_exp;
                }
                else
                {
                    hg_progress_timeout = 0;
                }
            }
1055
            if(diag_enabled) tm1 = ABT_get_wtime();
1056
            ret = HG_Progress(mid->hg_context, hg_progress_timeout);
1057 1058 1059 1060 1061 1062 1063 1064 1065 1066
            if(diag_enabled)
            {
                tm2 = ABT_get_wtime();
                if(hg_progress_timeout == 0)
                    __DIAG_UPDATE(mid->diag_progress_elapsed_zero_timeout, (tm2-tm1));
                else
                    __DIAG_UPDATE(mid->diag_progress_elapsed_nonzero_timeout, (tm2-tm1));
                    
                __DIAG_UPDATE(mid->diag_progress_timeout_value, hg_progress_timeout);
            }
1067 1068 1069 1070 1071 1072
            if(ret != HG_SUCCESS && ret != HG_TIMEOUT)
            {
                /* TODO: error handling */
                fprintf(stderr, "WARNING: unexpected return code (%d) from HG_Progress()\n", ret);
            }
        }