From 969b53fc9abb745d1c38cd4f0839cdefd34c66a6 Mon Sep 17 00:00:00 2001 From: Shane Snyder Date: Tue, 19 Apr 2016 12:26:15 -0500 Subject: [PATCH] add dynamic progress timeout to margo --- src/margo-timer.c | 29 +++++++++++++++++++++++++++++ src/margo-timer.h | 11 +++++++++++ src/margo.c | 24 +++++++++++++++++++++++- 3 files changed, 63 insertions(+), 1 deletion(-) diff --git a/src/margo-timer.c b/src/margo-timer.c index b28b7c7..e6bb482 100644 --- a/src/margo-timer.c +++ b/src/margo-timer.c @@ -177,6 +177,35 @@ void margo_check_timers( return; } +/* returns 0 and sets 'next_timer_exp' if the timer instance + * has timers queued up, -1 otherwise + */ +int margo_timer_get_next_expiration( + margo_instance_id mid, + double *next_timer_exp) +{ + struct margo_timer_instance *timer_inst; + double now = ABT_get_wtime(); + int ret; + + timer_inst = margo_get_timer_instance(mid); + assert(timer_inst); + + ABT_mutex_lock(timer_inst->mutex); + if(timer_inst->queue_head) + { + *next_timer_exp = timer_inst->queue_head->expiration - now; + ret = 0; + } + else + { + ret = -1; + } + ABT_mutex_unlock(timer_inst->mutex); + + return(ret); +} + static struct margo_timer_instance *margo_get_timer_instance( margo_instance_id mid) { diff --git a/src/margo-timer.h b/src/margo-timer.h index 7c072ff..5fc3ae4 100644 --- a/src/margo-timer.h +++ b/src/margo-timer.h @@ -69,6 +69,17 @@ void margo_timer_destroy( void margo_check_timers( margo_instance_id mid); +/** + * Determines the amount of time (in seconds) until the next timer + * is set to expire + * @param [in] mid Margo instance + * @param [out] time until next timer expiration + * @returns 0 when there is a queued timer which will expire, -1 otherwise + */ +int margo_timer_get_next_expiration( + margo_instance_id mid, + double *next_timer_exp); + #ifdef __cplusplus } #endif diff --git a/src/margo.c b/src/margo.c index 1f0d6ad..f20cd95 100644 --- a/src/margo.c +++ b/src/margo.c @@ -22,6 +22,8 @@ */ #include +#define MERCURY_PROGRESS_TIMEOUT_UB 100 /* 100 milliseconds */ + struct margo_instance { /* provided by caller */ @@ -189,6 +191,8 @@ static void hg_progress_fn(void* foo) unsigned int actual_count; struct margo_instance *mid = (struct margo_instance *)foo; size_t size; + unsigned int hg_progress_timeout = MERCURY_PROGRESS_TIMEOUT_UB; + double next_timer_exp; while(!mid->hg_progress_shutdown_flag) { @@ -215,7 +219,25 @@ static void hg_progress_fn(void* foo) else { ABT_mutex_unlock(mid->finalize_mutex); - HG_Progress(mid->hg_context, 100); + + ret = margo_timer_get_next_expiration(mid, &next_timer_exp); + if(ret == 0) + { + /* there is a queued timer, don't block long enough + * to keep this timer waiting + */ + if(next_timer_exp >= 0.0) + { + next_timer_exp *= 1000; /* convert to milliseconds */ + if(next_timer_exp < MERCURY_PROGRESS_TIMEOUT_UB) + hg_progress_timeout = (unsigned int)next_timer_exp; + } + else + { + hg_progress_timeout = 0; + } + } + HG_Progress(mid->hg_context, hg_progress_timeout); } } -- 2.26.2