Commit c5628a48 authored by Shane Snyder's avatar Shane Snyder
Browse files

make timer data structures/funcs per margo id

parent 7bc62935
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
static int use_abt_sleep = 0; static int use_abt_sleep = 0;
static int sleep_seconds = 2; static int sleep_seconds = 2;
static margo_instance_id mid;
static void sleep_fn(void *arg); static void sleep_fn(void *arg);
...@@ -26,7 +27,6 @@ int main(int argc, char **argv) ...@@ -26,7 +27,6 @@ int main(int argc, char **argv)
int ret; int ret;
ABT_xstream xstream; ABT_xstream xstream;
ABT_pool pool; ABT_pool pool;
margo_instance_id mid;
hg_context_t *hg_context; hg_context_t *hg_context;
hg_class_t *hg_class; hg_class_t *hg_class;
...@@ -154,7 +154,7 @@ static void sleep_fn(void *arg) ...@@ -154,7 +154,7 @@ static void sleep_fn(void *arg)
int my_tid = *(int *)arg; int my_tid = *(int *)arg;
if(use_abt_sleep) if(use_abt_sleep)
margo_thread_sleep(sleep_seconds*1000.0); margo_thread_sleep(mid, sleep_seconds*1000.0);
else else
sleep(sleep_seconds); sleep(sleep_seconds);
......
...@@ -156,9 +156,11 @@ hg_return_t margo_addr_lookup( ...@@ -156,9 +156,11 @@ hg_return_t margo_addr_lookup(
/** /**
* Suspends the calling ULT for a specified time duration * Suspends the calling ULT for a specified time duration
* @param [in] mid Margo instance
* @param [in] timeout_ms timeout duration in milliseconds * @param [in] timeout_ms timeout duration in milliseconds
*/ */
void margo_thread_sleep( void margo_thread_sleep(
margo_instance_id mid,
double timeout_ms); double timeout_ms);
/** /**
......
...@@ -12,50 +12,99 @@ ...@@ -12,50 +12,99 @@
#include <assert.h> #include <assert.h>
#include <abt.h> #include <abt.h>
#include "margo.h"
#include "margo-timer.h" #include "margo-timer.h"
#include "utlist.h" #include "utlist.h"
static void margo_timer_queue(margo_timer_t *timer); /* structure for mapping margo instance ids to corresponding timer instances */
struct margo_timer_instance
{
margo_instance_id mid;
ABT_mutex mutex;
margo_timer_t *queue_head;
};
#define MAX_TIMER_INSTANCES 8
static int timer_inst_table_size = 0;
static struct margo_timer_instance *timer_inst_table[MAX_TIMER_INSTANCES] = {NULL};
static struct margo_timer_instance *margo_get_timer_instance(
margo_instance_id mid);
static void margo_timer_queue(
struct margo_timer_instance *timer_inst,
margo_timer_t *timer);
static ABT_mutex timer_mutex = ABT_MUTEX_NULL;
static margo_timer_t *timer_head = NULL;
void margo_timer_sys_init() int margo_timer_instance_init(
margo_instance_id mid)
{ {
ABT_mutex_create(&timer_mutex); struct margo_timer_instance *timer_inst;
return;
if(timer_inst_table_size >= MAX_TIMER_INSTANCES)
return(-1);
timer_inst = malloc(sizeof(timer_inst));
if(!timer_inst)
return(-1);
timer_inst->mid = mid;
ABT_mutex_create(&(timer_inst->mutex));
timer_inst->queue_head = NULL;
/* add this instance to the table of active timer instances */
timer_inst_table[timer_inst_table_size++] = timer_inst;
return(0);
} }
void margo_timer_sys_shutdown() void margo_timer_instance_finalize(
margo_instance_id mid)
{ {
struct margo_timer_instance *timer_inst;
margo_timer_t *cur; margo_timer_t *cur;
int i = 0;
if(timer_mutex == ABT_MUTEX_NULL) timer_inst = margo_get_timer_instance(mid);
if(!timer_inst)
return; return;
ABT_mutex_lock(timer_mutex); ABT_mutex_lock(timer_inst->mutex);
/* delete any remaining timers from the queue */ /* delete any remaining timers from the queue */
while(timer_head) while(timer_inst->queue_head)
{
cur = timer_inst->queue_head;
DL_DELETE(timer_inst->queue_head, cur);
}
ABT_mutex_unlock(timer_inst->mutex);
ABT_mutex_free(&(timer_inst->mutex));
/* remove this timer instance from the active table */
while(timer_inst_table[i] != timer_inst)
i++;
while(i < timer_inst_table_size - 1)
{ {
cur = timer_head; timer_inst_table[i] = timer_inst_table[i+1];
DL_DELETE(timer_head, cur); i++;
} }
ABT_mutex_unlock(timer_mutex); timer_inst_table[i] = NULL;
ABT_mutex_free(&timer_mutex); timer_inst_table_size--;
timer_mutex = ABT_MUTEX_NULL; free(timer_inst);
return; return;
} }
void margo_timer_init( void margo_timer_init(
margo_instance_id mid,
margo_timer_t *timer, margo_timer_t *timer,
margo_timer_cb_fn cb_fn, margo_timer_cb_fn cb_fn,
void *cb_dat, void *cb_dat,
double timeout_ms) double timeout_ms)
{ {
assert(timer_mutex != ABT_MUTEX_NULL); struct margo_timer_instance *timer_inst;
timer_inst = margo_get_timer_instance(mid);
assert(timer_inst);
assert(timer); assert(timer);
memset(timer, 0, sizeof(*timer)); memset(timer, 0, sizeof(*timer));
...@@ -64,68 +113,100 @@ void margo_timer_init( ...@@ -64,68 +113,100 @@ void margo_timer_init(
timer->expiration = ABT_get_wtime() + (timeout_ms/1000); timer->expiration = ABT_get_wtime() + (timeout_ms/1000);
timer->prev = timer->next = NULL; timer->prev = timer->next = NULL;
margo_timer_queue(timer); margo_timer_queue(timer_inst, timer);
return; return;
} }
void margo_timer_destroy( void margo_timer_destroy(
margo_instance_id mid,
margo_timer_t *timer) margo_timer_t *timer)
{ {
assert(timer_mutex != ABT_MUTEX_NULL); struct margo_timer_instance *timer_inst;
timer_inst = margo_get_timer_instance(mid);
assert(timer_inst);
assert(timer); assert(timer);
ABT_mutex_lock(timer_mutex); ABT_mutex_lock(timer_inst->mutex);
if(timer->prev || timer->next) if(timer->prev || timer->next)
DL_DELETE(timer_head, timer); DL_DELETE(timer_inst->queue_head, timer);
ABT_mutex_unlock(timer_mutex); ABT_mutex_unlock(timer_inst->mutex);
return; return;
} }
void margo_check_timers() void margo_check_timers(
margo_instance_id mid)
{ {
margo_timer_t *cur; margo_timer_t *cur;
struct margo_timer_instance *timer_inst;
double now = ABT_get_wtime(); double now = ABT_get_wtime();
assert(timer_mutex != ABT_MUTEX_NULL); timer_inst = margo_get_timer_instance(mid);
assert(timer_inst);
ABT_mutex_lock(timer_mutex); ABT_mutex_lock(timer_inst->mutex);
/* iterate through timer list, performing timeout action /* iterate through timer list, performing timeout action
* for all elements which have passed expiration time * for all elements which have passed expiration time
*/ */
while(timer_head && (timer_head->expiration < now)) while(timer_inst->queue_head && (timer_inst->queue_head->expiration < now))
{ {
cur = timer_head; cur = timer_inst->queue_head;
DL_DELETE(timer_head, cur); DL_DELETE(timer_inst->queue_head, cur);
cur->prev = cur->next = NULL; cur->prev = cur->next = NULL;
/* execute callback */ /* execute callback */
cur->cb_fn(cur->cb_dat); cur->cb_fn(cur->cb_dat);
} }
ABT_mutex_unlock(timer_mutex); ABT_mutex_unlock(timer_inst->mutex);
return; return;
} }
static void margo_timer_queue(margo_timer_t *timer) static struct margo_timer_instance *margo_get_timer_instance(
margo_instance_id mid)
{
struct margo_timer_instance *timer_inst = NULL;
int i = 0;
/* find the timer instance using the given margo id */
while(timer_inst_table[i] != NULL)
{
if(timer_inst_table[i]->mid == mid)
{
timer_inst = timer_inst_table[i];
break;
}
i++;
}
if(timer_inst)
assert(timer_inst->mutex != ABT_MUTEX_NULL);
return(timer_inst);
}
static void margo_timer_queue(
struct margo_timer_instance *timer_inst,
margo_timer_t *timer)
{ {
margo_timer_t *cur; margo_timer_t *cur;
ABT_mutex_lock(timer_mutex); ABT_mutex_lock(timer_inst->mutex);
/* if list of timers is empty, put ourselves on it */ /* if list of timers is empty, put ourselves on it */
if(!timer_head) if(!(timer_inst->queue_head))
{ {
DL_APPEND(timer_head, timer); DL_APPEND(timer_inst->queue_head, timer);
} }
else else
{ {
/* something else already in queue, keep it sorted in ascending order /* something else already in queue, keep it sorted in ascending order
* of expiration time * of expiration time
*/ */
cur = timer_head; cur = timer_inst->queue_head;
do do
{ {
/* walk backwards through queue */ /* walk backwards through queue */
...@@ -135,19 +216,18 @@ static void margo_timer_queue(margo_timer_t *timer) ...@@ -135,19 +216,18 @@ static void margo_timer_queue(margo_timer_t *timer)
*/ */
if(cur->expiration < timer->expiration) if(cur->expiration < timer->expiration)
{ {
DL_APPEND_ELEM(timer_head, cur, timer); DL_APPEND_ELEM(timer_inst->queue_head, cur, timer);
break; break;
} }
}while(cur != timer_head); }while(cur != timer_inst->queue_head);
/* if we never found one with an expiration before this one, then /* if we never found one with an expiration before this one, then
* this one is the new head * this one is the new head
*/ */
if(timer->prev == NULL && timer->next == NULL) if(timer->prev == NULL && timer->next == NULL)
DL_PREPEND(timer_head, timer); DL_PREPEND(timer_inst->queue_head, timer);
} }
ABT_mutex_unlock(timer_mutex); ABT_mutex_unlock(timer_inst->mutex);
return; return;
} }
...@@ -24,25 +24,30 @@ typedef struct margo_timed_element ...@@ -24,25 +24,30 @@ typedef struct margo_timed_element
/** /**
* Initializes the margo timer interface * Initializes the margo timer interface
* @param [in] mid Margo instance
* @returns 0 on success, -1 on failure
*/ */
void margo_timer_sys_init( int margo_timer_instance_init(
void); margo_instance_id mid);
/** /**
* Shuts down the margo timer interface * Shuts down the margo timer interface
* @param [in] mid Margo instance
*/ */
void margo_timer_sys_shutdown( void margo_timer_instance_finalize(
void); margo_instance_id mid);
/** /**
* Initializes a margo timer object which will perform some action * Initializes a margo timer object which will perform some action
* after a specified time duration * after a specified time duration
* @param [in] mid Margo instance
* @param [in] timer pointer to margo timer object to be initialized * @param [in] timer pointer to margo timer object to be initialized
* @param [in] cb_fn callback function for timeout action * @param [in] cb_fn callback function for timeout action
* @param [in] cb_dat callback data passed to the callback function * @param [in] cb_dat callback data passed to the callback function
* @param [in] timeout_ms timeout duration in milliseconds * @param [in] timeout_ms timeout duration in milliseconds
*/ */
void margo_timer_init( void margo_timer_init(
margo_instance_id mid,
margo_timer_t *timer, margo_timer_t *timer,
margo_timer_cb_fn cb_fn, margo_timer_cb_fn cb_fn,
void *cb_dat, void *cb_dat,
...@@ -50,17 +55,19 @@ void margo_timer_init( ...@@ -50,17 +55,19 @@ void margo_timer_init(
/** /**
* Destroys a margo timer object which was previously initialized * Destroys a margo timer object which was previously initialized
* @param [in] mid Margo instance
* @param [in] timer pointer to margo timer object to be destroyed * @param [in] timer pointer to margo timer object to be destroyed
*/ */
void margo_timer_destroy( void margo_timer_destroy(
margo_instance_id mid,
margo_timer_t *timer); margo_timer_t *timer);
/** /**
* Checks for expired timers and performs specified timeout action * Checks for expired timers and performs specified timeout action
* @param [in] mid Margo instance
*/ */
void margo_check_timers( void margo_check_timers(
void); margo_instance_id mid);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -80,13 +80,19 @@ margo_instance_id margo_init(ABT_pool progress_pool, ABT_pool handler_pool, ...@@ -80,13 +80,19 @@ margo_instance_id margo_init(ABT_pool progress_pool, ABT_pool handler_pool,
ABT_mutex_create(&mid->finalize_mutex); ABT_mutex_create(&mid->finalize_mutex);
ABT_cond_create(&mid->finalize_cond); ABT_cond_create(&mid->finalize_cond);
margo_timer_sys_init();
mid->progress_pool = progress_pool; mid->progress_pool = progress_pool;
mid->handler_pool = handler_pool; mid->handler_pool = handler_pool;
mid->hg_class = hg_class; mid->hg_class = hg_class;
mid->hg_context = hg_context; mid->hg_context = hg_context;
ret = margo_timer_instance_init(mid);
if(ret != 0)
{
fprintf(stderr, "Error: margo_timer_instance_init()\n");
free(mid);
return(MARGO_INSTANCE_NULL);
}
ret = ABT_thread_create(mid->progress_pool, hg_progress_fn, mid, ret = ABT_thread_create(mid->progress_pool, hg_progress_fn, mid,
ABT_THREAD_ATTR_NULL, &mid->hg_progress_tid); ABT_THREAD_ATTR_NULL, &mid->hg_progress_tid);
if(ret != 0) if(ret != 0)
...@@ -134,7 +140,7 @@ void margo_finalize(margo_instance_id mid) ...@@ -134,7 +140,7 @@ void margo_finalize(margo_instance_id mid)
* a small amount of memory. * a small amount of memory.
*/ */
#if 0 #if 0
margo_timer_sys_shutdown(); margo_timer_instance_finalize(mid);
ABT_mutex_free(&mid->finalize_mutex); ABT_mutex_free(&mid->finalize_mutex);
ABT_cond_free(&mid->finalize_cond); ABT_cond_free(&mid->finalize_cond);
...@@ -210,7 +216,7 @@ static void hg_progress_fn(void* foo) ...@@ -210,7 +216,7 @@ static void hg_progress_fn(void* foo)
} }
/* check for any expired timers */ /* check for any expired timers */
margo_check_timers(); margo_check_timers(mid);
} }
return; return;
...@@ -279,7 +285,7 @@ hg_return_t margo_forward_timed( ...@@ -279,7 +285,7 @@ hg_return_t margo_forward_timed(
/* set a timer object to expire when this forward times out */ /* set a timer object to expire when this forward times out */
timeout_cb_dat.handle = handle; timeout_cb_dat.handle = handle;
margo_timer_init(&forward_timer, margo_forward_timeout_cb, margo_timer_init(mid, &forward_timer, margo_forward_timeout_cb,
&timeout_cb_dat, timeout_ms); &timeout_cb_dat, timeout_ms);
hret = HG_Forward(handle, margo_cb, &eventual, in_struct); hret = HG_Forward(handle, margo_cb, &eventual, in_struct);
...@@ -291,7 +297,7 @@ hg_return_t margo_forward_timed( ...@@ -291,7 +297,7 @@ hg_return_t margo_forward_timed(
/* remove timer if it is still in place (i.e., not timed out) */ /* remove timer if it is still in place (i.e., not timed out) */
if(hret != HG_TIMEOUT) if(hret != HG_TIMEOUT)
margo_timer_destroy(&forward_timer); margo_timer_destroy(mid, &forward_timer);
ABT_eventual_free(&eventual); ABT_eventual_free(&eventual);
...@@ -476,6 +482,7 @@ static void margo_thread_sleep_cb(void *arg) ...@@ -476,6 +482,7 @@ static void margo_thread_sleep_cb(void *arg)
} }
void margo_thread_sleep( void margo_thread_sleep(
margo_instance_id mid,
double timeout_ms) double timeout_ms)
{ {
margo_timer_t sleep_timer; margo_timer_t sleep_timer;
...@@ -486,7 +493,7 @@ void margo_thread_sleep( ...@@ -486,7 +493,7 @@ void margo_thread_sleep(
ABT_cond_create(&(sleep_cb_dat.cond)); ABT_cond_create(&(sleep_cb_dat.cond));
/* initialize the sleep timer */ /* initialize the sleep timer */
margo_timer_init(&sleep_timer, margo_thread_sleep_cb, margo_timer_init(mid, &sleep_timer, margo_thread_sleep_cb,
&sleep_cb_dat, timeout_ms); &sleep_cb_dat, timeout_ms);
/* yield thread for specified timeout */ /* yield thread for specified timeout */
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment