Commit acef4b65 authored by Shane Snyder's avatar Shane Snyder

cleanup timer interface

parent 1bedc928
......@@ -13,27 +13,25 @@ extern "C" {
typedef void (*margo_timer_cb_fn)(void *);
typedef void* margo_timer_handle;
typedef struct margo_timed_element_s
typedef struct margo_timed_element
{
margo_timer_cb_fn cb_fn;
void *cb_dat;
double expiration;
struct margo_timed_element_s *next;
struct margo_timed_element_s *prev;
} margo_timed_element;
struct margo_timed_element *next;
struct margo_timed_element *prev;
} margo_timer_t;
/**
* Initializes margo's timer interface
* Initializes the margo timer interface
*/
void margo_timer_init(
void margo_timer_sys_init(
void);
/**
* Cleans up margo timer data structures
* Shuts down the margo timer interface
*/
void margo_timer_cleanup(
void margo_timer_sys_shutdown(
void);
/**
......@@ -44,25 +42,25 @@ void margo_thread_sleep(
double timeout_ms);
/**
* Creates a margo timer object to perform some action after a
* specified time duration
* Initializes a margo timer object which will perform some action
* after a specified time duration
* @param [in] timer pointer to margo timer object to be initialized
* @param [in] cb_fn callback function for timeout action
* @param [in] cb_dat callback data passed to the callback function
* @param [in] timeout_ms timeout duration in milliseconds
* @param [out] handle handle used to reference the created timer object
*/
void margo_timer_create(
void margo_timer_init(
margo_timer_t *timer,
margo_timer_cb_fn cb_fn,
void *cb_dat,
double timeout_ms,
margo_timer_handle *handle);
double timeout_ms);
/**
* Frees resources used by the referenced timer object
* @param [in] handle handle of timer object to free
* Destroys a margo timer object which was previously initialized
* @param [in] timer pointer to margo timer object to be destroyed
*/
void margo_timer_free(
margo_timer_handle handle);
void margo_timer_destroy(
margo_timer_t *timer);
/**
* Checks for expired timers and performs specified timeout action
......
......@@ -16,54 +16,54 @@
#include "utlist.h"
static void margo_queue_timer(margo_timed_element *el);
static void margo_timer_queue(margo_timer_t *timer);
static ABT_mutex timer_mutex = ABT_MUTEX_NULL;
static margo_timed_element *timer_head = NULL;
static margo_timer_t *timer_head = NULL;
void margo_timer_init()
void margo_timer_sys_init()
{
ABT_mutex_create(&timer_mutex);
return;
}
void margo_timer_cleanup()
void margo_timer_sys_shutdown()
{
margo_timed_element *cur;
margo_timer_t *cur;
if(timer_mutex == ABT_MUTEX_NULL)
return;
ABT_mutex_lock(timer_mutex);
/* free any remaining timers */
/* delete any remaining timers from the queue */
while(timer_head)
{
cur = timer_head;
DL_DELETE(timer_head, cur);
free(cur);
}
ABT_mutex_unlock(timer_mutex);
ABT_mutex_free(&timer_mutex);
timer_mutex = ABT_MUTEX_NULL;
return;
}
typedef struct
{
ABT_mutex sleep_mutex;
ABT_cond sleep_cond;
ABT_mutex mutex;
ABT_cond cond;
} margo_thread_sleep_cb_dat;
static void margo_thread_sleep_cb(void *arg)
{
margo_thread_sleep_cb_dat *cb_dat =
margo_thread_sleep_cb_dat *sleep_cb_dat =
(margo_thread_sleep_cb_dat *)arg;
/* wake up the sleeping thread */
ABT_mutex_lock(cb_dat->sleep_mutex);
ABT_cond_signal(cb_dat->sleep_cond);
ABT_mutex_unlock(cb_dat->sleep_mutex);
ABT_mutex_lock(sleep_cb_dat->mutex);
ABT_cond_signal(sleep_cb_dat->cond);
ABT_mutex_unlock(sleep_cb_dat->mutex);
return;
}
......@@ -71,67 +71,54 @@ static void margo_thread_sleep_cb(void *arg)
void margo_thread_sleep(
double timeout_ms)
{
margo_thread_sleep_cb_dat *cb_dat;
margo_timer_t sleep_timer;
margo_thread_sleep_cb_dat sleep_cb_dat;
/* set data needed for callback */
cb_dat = malloc(sizeof(margo_thread_sleep_cb_dat));
assert(cb_dat);
memset(cb_dat, 0 , sizeof(*cb_dat));
ABT_mutex_create(&(cb_dat->sleep_mutex));
ABT_cond_create(&(cb_dat->sleep_cond));
/* set data needed for sleep callback */
ABT_mutex_create(&(sleep_cb_dat.mutex));
ABT_cond_create(&(sleep_cb_dat.cond));
/* create timer */
margo_timer_create(margo_thread_sleep_cb, cb_dat, timeout_ms, NULL);
/* initialize the sleep timer */
margo_timer_init(&sleep_timer, margo_thread_sleep_cb,
&sleep_cb_dat, timeout_ms);
/* yield thread for specified timeout */
ABT_mutex_lock(cb_dat->sleep_mutex);
ABT_cond_wait(cb_dat->sleep_cond, cb_dat->sleep_mutex);
ABT_mutex_unlock(cb_dat->sleep_mutex);
ABT_mutex_lock(sleep_cb_dat.mutex);
ABT_cond_wait(sleep_cb_dat.cond, sleep_cb_dat.mutex);
ABT_mutex_unlock(sleep_cb_dat.mutex);
return;
}
void margo_timer_create(
void margo_timer_init(
margo_timer_t *timer,
margo_timer_cb_fn cb_fn,
void *cb_dat,
double timeout_ms,
margo_timer_handle *handle)
double timeout_ms)
{
margo_timed_element *el;
el = malloc(sizeof(margo_timed_element));
assert(el);
memset(el, 0, sizeof(*el));
el->cb_fn = cb_fn;
el->cb_dat = cb_dat;
el->expiration = ABT_get_wtime() + (timeout_ms/1000);
el->prev = el->next = NULL;
assert(timer_mutex != ABT_MUTEX_NULL);
assert(timer);
margo_queue_timer(el);
memset(timer, 0, sizeof(*timer));
timer->cb_fn = cb_fn;
timer->cb_dat = cb_dat;
timer->expiration = ABT_get_wtime() + (timeout_ms/1000);
timer->prev = timer->next = NULL;
if(handle)
*handle = (margo_timer_handle)el;
margo_timer_queue(timer);
return;
}
void margo_timer_free(
margo_timer_handle handle)
void margo_timer_destroy(
margo_timer_t *timer)
{
assert(handle);
assert(timer_mutex != ABT_MUTEX_NULL);
margo_timed_element *el;
el = (margo_timed_element *)handle;
assert(timer);
ABT_mutex_lock(timer_mutex);
if(el->prev || el->next)
{
DL_DELETE(timer_head, el);
if(el->cb_dat)
free(el->cb_dat);
free(el);
}
if(timer->prev || timer->next)
DL_DELETE(timer_head, timer);
ABT_mutex_unlock(timer_mutex);
return;
......@@ -139,7 +126,7 @@ void margo_timer_free(
void margo_check_timers()
{
margo_timed_element *cur;
margo_timer_t *cur;
double now = ABT_get_wtime();
assert(timer_mutex != ABT_MUTEX_NULL);
......@@ -153,29 +140,26 @@ void margo_check_timers()
{
cur = timer_head;
DL_DELETE(timer_head, cur);
cur->prev = cur->next = NULL;
/* execute callback */
cur->cb_fn(cur->cb_dat);
free(cur);
}
ABT_mutex_unlock(timer_mutex);
return;
}
static void margo_queue_timer(margo_timed_element *el)
static void margo_timer_queue(margo_timer_t *timer)
{
margo_timed_element *cur;
assert(timer_mutex != ABT_MUTEX_NULL);
margo_timer_t *cur;
ABT_mutex_lock(timer_mutex);
/* if list of timers is empty, put ourselves on it */
if(!timer_head)
{
DL_APPEND(timer_head, el);
DL_APPEND(timer_head, timer);
}
else
{
......@@ -190,9 +174,9 @@ static void margo_queue_timer(margo_timed_element *el)
/* as soon as we find an element that expires before this one,
* then we add ours after it
*/
if(cur->expiration < el->expiration)
if(cur->expiration < timer->expiration)
{
DL_APPEND_ELEM(timer_head, cur, el);
DL_APPEND_ELEM(timer_head, cur, timer);
break;
}
}while(cur != timer_head);
......@@ -200,8 +184,8 @@ static void margo_queue_timer(margo_timed_element *el)
/* if we never found one with an expiration before this one, then
* this one is the new head
*/
if(el->prev == NULL && el->next == NULL)
DL_PREPEND(timer_head, el);
if(timer->prev == NULL && timer->next == NULL)
DL_PREPEND(timer_head, timer);
}
ABT_mutex_unlock(timer_mutex);
......
......@@ -79,7 +79,7 @@ margo_instance_id margo_init(ABT_pool progress_pool, ABT_pool handler_pool,
ABT_mutex_create(&mid->finalize_mutex);
ABT_cond_create(&mid->finalize_cond);
margo_timer_init();
margo_timer_sys_init();
mid->progress_pool = progress_pool;
mid->handler_pool = handler_pool;
......@@ -133,7 +133,7 @@ void margo_finalize(margo_instance_id mid)
* a small amount of memory.
*/
#if 0
margo_timer_cleanup();
margo_timer_sys_shutdown();
ABT_mutex_free(&mid->finalize_mutex);
ABT_cond_free(&mid->finalize_cond);
......@@ -245,15 +245,15 @@ static hg_return_t margo_cb(const struct hg_cb_info *info)
typedef struct
{
hg_handle_t handle;
} margo_hg_cancel_cb_dat;
} margo_forward_timeout_cb_dat;
static void margo_forward_timeout_cb(void *arg)
{
margo_hg_cancel_cb_dat *cb_dat =
(margo_hg_cancel_cb_dat *)arg;
margo_forward_timeout_cb_dat *timeout_cb_dat =
(margo_forward_timeout_cb_dat *)arg;
/* cancel the Mercury op if the forward timed out */
HG_Core_cancel(cb_dat->handle);
HG_Core_cancel(timeout_cb_dat->handle);
return;
}
......@@ -263,12 +263,12 @@ hg_return_t margo_forward_timed(
void *in_struct,
double timeout_ms)
{
int ret;
hg_return_t hret = HG_TIMEOUT;
ABT_eventual eventual;
int ret;
hg_return_t* waited_hret;
margo_hg_cancel_cb_dat *cb_dat;
margo_timer_handle timer_handle;
margo_timer_t forward_timer;
margo_forward_timeout_cb_dat timeout_cb_dat;
ret = ABT_eventual_create(sizeof(hret), &eventual);
if(ret != 0)
......@@ -276,13 +276,10 @@ hg_return_t margo_forward_timed(
return(HG_NOMEM_ERROR);
}
/* create a timer that expires when this request has timed out */
cb_dat = malloc(sizeof(margo_hg_cancel_cb_dat));
assert(cb_dat);
memset(cb_dat, 0, sizeof(*cb_dat));
cb_dat->handle = handle;
margo_timer_create(margo_forward_timeout_cb, cb_dat,
timeout_ms, &timer_handle);
/* set a timer object to expire when this forward times out */
timeout_cb_dat.handle = handle;
margo_timer_init(&forward_timer, margo_forward_timeout_cb,
&timeout_cb_dat, timeout_ms);
hret = HG_Forward(handle, margo_cb, &eventual, in_struct);
if(hret == 0)
......@@ -293,7 +290,7 @@ hg_return_t margo_forward_timed(
/* remove timer if it is still in place (i.e., not timed out) */
if(hret != HG_TIMEOUT)
margo_timer_free(timer_handle);
margo_timer_destroy(&forward_timer);
ABT_eventual_free(&eventual);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment