Commit d9921257 authored by Shane Snyder's avatar Shane Snyder

margo_forward_timed() uses new timer interface

parent 132a30de
...@@ -13,6 +13,8 @@ extern "C" { ...@@ -13,6 +13,8 @@ extern "C" {
typedef void (*margo_timer_cb_fn)(void *); typedef void (*margo_timer_cb_fn)(void *);
typedef void* margo_timer_handle;
typedef struct margo_timed_element_s typedef struct margo_timed_element_s
{ {
margo_timer_cb_fn cb_fn; margo_timer_cb_fn cb_fn;
...@@ -22,20 +24,49 @@ typedef struct margo_timed_element_s ...@@ -22,20 +24,49 @@ typedef struct margo_timed_element_s
struct margo_timed_element_s *prev; struct margo_timed_element_s *prev;
} margo_timed_element; } margo_timed_element;
/**
* Initializes margo's timer interface
*/
void margo_timer_init( void margo_timer_init(
void); void);
/**
* Cleans up margo timer data structures
*/
void margo_timer_cleanup( void margo_timer_cleanup(
void); void);
/**
* Suspends the calling ULT for a specified time duration
* @param [in] timeout_ms timeout duration in milliseconds
*/
void margo_thread_sleep( void margo_thread_sleep(
double timeout_ms); double timeout_ms);
void margo_create_timer( /**
* Creates a margo timer object to perform some action after a
* specified time duration
* @param [in] cb_fn callback function for timeout action
* @param [in] cb_dat callback data passed to the callback function
* @param [in] timeout_ms timeout duration in milliseconds
* @param [out] handle handle used to reference the created timer object
*/
void margo_timer_create(
margo_timer_cb_fn cb_fn, margo_timer_cb_fn cb_fn,
void *cb_dat, void *cb_dat,
double timeout_ms); double timeout_ms,
margo_timer_handle *handle);
/**
* Frees resources used by the referenced timer object
* @param [in] handle handle of timer object to free
*/
void margo_timer_free(
margo_timer_handle handle);
/**
* Checks for expired timers and performs specified timeout action
*/
void margo_check_timers( void margo_check_timers(
void); void);
......
...@@ -68,7 +68,8 @@ static void margo_thread_sleep_cb(void *arg) ...@@ -68,7 +68,8 @@ static void margo_thread_sleep_cb(void *arg)
return; return;
} }
void margo_thread_sleep(double timeout_ms) void margo_thread_sleep(
double timeout_ms)
{ {
margo_thread_sleep_cb_dat *cb_dat; margo_thread_sleep_cb_dat *cb_dat;
...@@ -80,7 +81,7 @@ void margo_thread_sleep(double timeout_ms) ...@@ -80,7 +81,7 @@ void margo_thread_sleep(double timeout_ms)
ABT_cond_create(&(cb_dat->sleep_cond)); ABT_cond_create(&(cb_dat->sleep_cond));
/* create timer */ /* create timer */
margo_create_timer(margo_thread_sleep_cb, cb_dat, timeout_ms); margo_timer_create(margo_thread_sleep_cb, cb_dat, timeout_ms, NULL);
/* yield thread for specified timeout */ /* yield thread for specified timeout */
ABT_mutex_lock(cb_dat->sleep_mutex); ABT_mutex_lock(cb_dat->sleep_mutex);
...@@ -90,12 +91,17 @@ void margo_thread_sleep(double timeout_ms) ...@@ -90,12 +91,17 @@ void margo_thread_sleep(double timeout_ms)
return; return;
} }
void margo_create_timer(margo_timer_cb_fn cb_fn, void *cb_dat, double timeout_ms) void margo_timer_create(
margo_timer_cb_fn cb_fn,
void *cb_dat,
double timeout_ms,
margo_timer_handle *handle)
{ {
margo_timed_element *el; margo_timed_element *el;
el = malloc(sizeof(margo_timed_element)); el = malloc(sizeof(margo_timed_element));
assert(el); assert(el);
memset(el, 0, sizeof(*el));
el->cb_fn = cb_fn; el->cb_fn = cb_fn;
el->cb_dat = cb_dat; el->cb_dat = cb_dat;
el->expiration = ABT_get_wtime() + (timeout_ms/1000); el->expiration = ABT_get_wtime() + (timeout_ms/1000);
...@@ -103,6 +109,31 @@ void margo_create_timer(margo_timer_cb_fn cb_fn, void *cb_dat, double timeout_ms ...@@ -103,6 +109,31 @@ void margo_create_timer(margo_timer_cb_fn cb_fn, void *cb_dat, double timeout_ms
margo_queue_timer(el); margo_queue_timer(el);
if(handle)
*handle = (margo_timer_handle)el;
return;
}
void margo_timer_free(
margo_timer_handle handle)
{
assert(handle);
assert(timer_mutex != ABT_MUTEX_NULL);
margo_timed_element *el;
el = (margo_timed_element *)handle;
ABT_mutex_lock(timer_mutex);
if(el->prev || el->next)
{
DL_DELETE(timer_head, el);
if(el->cb_dat)
free(el->cb_dat);
free(el);
}
ABT_mutex_unlock(timer_mutex);
return; return;
} }
...@@ -122,7 +153,6 @@ void margo_check_timers() ...@@ -122,7 +153,6 @@ void margo_check_timers()
{ {
cur = timer_head; cur = timer_head;
DL_DELETE(timer_head, cur); DL_DELETE(timer_head, cur);
cur->next = cur->prev = NULL;
/* execute callback */ /* execute callback */
cur->cb_fn(cur->cb_dat); cur->cb_fn(cur->cb_dat);
......
...@@ -125,8 +125,6 @@ void margo_finalize(margo_instance_id mid) ...@@ -125,8 +125,6 @@ void margo_finalize(margo_instance_id mid)
ABT_cond_broadcast(mid->finalize_cond); ABT_cond_broadcast(mid->finalize_cond);
ABT_mutex_unlock(mid->finalize_mutex); ABT_mutex_unlock(mid->finalize_mutex);
margo_timer_cleanup();
/* TODO: yuck, there is a race here if someone was really waiting for /* TODO: yuck, there is a race here if someone was really waiting for
* finalize; we can't destroy the data structures out from under them. * finalize; we can't destroy the data structures out from under them.
* We could fix this by reference counting so that the last caller * We could fix this by reference counting so that the last caller
...@@ -135,6 +133,8 @@ void margo_finalize(margo_instance_id mid) ...@@ -135,6 +133,8 @@ void margo_finalize(margo_instance_id mid)
* a small amount of memory. * a small amount of memory.
*/ */
#if 0 #if 0
margo_timer_cleanup();
ABT_mutex_free(&mid->finalize_mutex); ABT_mutex_free(&mid->finalize_mutex);
ABT_cond_free(&mid->finalize_cond); ABT_cond_free(&mid->finalize_cond);
free(mid); free(mid);
...@@ -231,6 +231,21 @@ static hg_return_t margo_cb(const struct hg_cb_info *info) ...@@ -231,6 +231,21 @@ static hg_return_t margo_cb(const struct hg_cb_info *info)
return(HG_SUCCESS); return(HG_SUCCESS);
} }
typedef struct
{
hg_handle_t handle;
} margo_hg_cancel_cb_dat;
static void margo_forward_timeout_cb(void *arg)
{
margo_hg_cancel_cb_dat *cb_dat =
(margo_hg_cancel_cb_dat *)arg;
/* cancel the Mercury op if the forward timed out */
HG_Core_cancel(cb_dat->handle);
return;
}
hg_return_t margo_forward_timed( hg_return_t margo_forward_timed(
margo_instance_id mid, margo_instance_id mid,
hg_handle_t handle, hg_handle_t handle,
...@@ -241,6 +256,8 @@ hg_return_t margo_forward_timed( ...@@ -241,6 +256,8 @@ hg_return_t margo_forward_timed(
ABT_eventual eventual; ABT_eventual eventual;
int ret; int ret;
hg_return_t* waited_hret; hg_return_t* waited_hret;
margo_hg_cancel_cb_dat *cb_dat;
margo_timer_handle timer_handle;
ret = ABT_eventual_create(sizeof(hret), &eventual); ret = ABT_eventual_create(sizeof(hret), &eventual);
if(ret != 0) if(ret != 0)
...@@ -248,7 +265,13 @@ hg_return_t margo_forward_timed( ...@@ -248,7 +265,13 @@ hg_return_t margo_forward_timed(
return(HG_NOMEM_ERROR); return(HG_NOMEM_ERROR);
} }
/* TODO: timer interface: add element */ /* create a timer that expires when this request has timed out */
cb_dat = malloc(sizeof(margo_hg_cancel_cb_dat));
assert(cb_dat);
memset(cb_dat, 0, sizeof(*cb_dat));
cb_dat->handle = handle;
margo_timer_create(margo_forward_timeout_cb, cb_dat,
timeout_ms, &timer_handle);
hret = HG_Forward(handle, margo_cb, &eventual, in_struct); hret = HG_Forward(handle, margo_cb, &eventual, in_struct);
if(hret == 0) if(hret == 0)
...@@ -257,7 +280,9 @@ hg_return_t margo_forward_timed( ...@@ -257,7 +280,9 @@ hg_return_t margo_forward_timed(
hret = *waited_hret; hret = *waited_hret;
} }
/* TODO: remove timer if it is still in place */ /* remove timer if it is still in place (i.e., not timed out) */
if(hret != HG_TIMEOUT)
margo_timer_free(timer_handle);
ABT_eventual_free(&eventual); ABT_eventual_free(&eventual);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment