Commit 8f00fd30 authored by Matthieu Dorier's avatar Matthieu Dorier

made timer list a member of the margo_instance structure

parent b6420b1a
......@@ -18,78 +18,47 @@
/* structure for mapping margo instance ids to corresponding timer instances */
struct margo_timer_instance
struct margo_timer_list
{
margo_instance_id mid;
ABT_mutex mutex;
margo_timer_t *queue_head;
};
#define MAX_TIMER_INSTANCES 8
static int timer_inst_table_size = 0;
static struct margo_timer_instance *timer_inst_table[MAX_TIMER_INSTANCES] = {NULL};
static struct margo_timer_instance *margo_get_timer_instance(
margo_instance_id mid);
static void margo_timer_queue(
struct margo_timer_instance *timer_inst,
struct margo_timer_list *timer_lst,
margo_timer_t *timer);
int margo_timer_instance_init(
margo_instance_id mid)
struct margo_timer_list* margo_timer_list_create()
{
struct margo_timer_instance *timer_inst;
if(timer_inst_table_size >= MAX_TIMER_INSTANCES)
return(-1);
struct margo_timer_list *timer_lst;
timer_inst = malloc(sizeof(*timer_inst));
if(!timer_inst)
return(-1);
timer_lst = malloc(sizeof(*timer_lst));
if(!timer_lst)
return NULL;
timer_inst->mid = mid;
ABT_mutex_create(&(timer_inst->mutex));
timer_inst->queue_head = NULL;
ABT_mutex_create(&(timer_lst->mutex));
timer_lst->queue_head = NULL;
/* add this instance to the table of active timer instances */
timer_inst_table[timer_inst_table_size++] = timer_inst;
return(0);
return timer_lst;
}
void margo_timer_instance_finalize(
margo_instance_id mid)
void margo_timer_list_free(struct margo_timer_list* timer_lst)
{
struct margo_timer_instance *timer_inst;
margo_timer_t *cur;
int i = 0;
timer_inst = margo_get_timer_instance(mid);
if(!timer_inst)
return;
ABT_mutex_lock(timer_inst->mutex);
ABT_mutex_lock(timer_lst->mutex);
/* delete any remaining timers from the queue */
while(timer_inst->queue_head)
while(timer_lst->queue_head)
{
cur = timer_inst->queue_head;
DL_DELETE(timer_inst->queue_head, cur);
cur = timer_lst->queue_head;
DL_DELETE(timer_lst->queue_head, cur);
}
ABT_mutex_unlock(timer_inst->mutex);
ABT_mutex_free(&(timer_inst->mutex));
ABT_mutex_unlock(timer_lst->mutex);
ABT_mutex_free(&(timer_lst->mutex));
/* remove this timer instance from the active table */
while(timer_inst_table[i] != timer_inst)
i++;
while(i < timer_inst_table_size - 1)
{
timer_inst_table[i] = timer_inst_table[i+1];
i++;
}
timer_inst_table[i] = NULL;
timer_inst_table_size--;
free(timer_inst);
free(timer_lst);
return;
}
......@@ -101,10 +70,10 @@ void margo_timer_init(
void *cb_dat,
double timeout_ms)
{
struct margo_timer_instance *timer_inst;
struct margo_timer_list *timer_lst;
timer_inst = margo_get_timer_instance(mid);
assert(timer_inst);
timer_lst = margo_get_timer_list(mid);
assert(timer_lst);
assert(timer);
memset(timer, 0, sizeof(*timer));
......@@ -113,7 +82,7 @@ void margo_timer_init(
timer->expiration = ABT_get_wtime() + (timeout_ms/1000);
timer->prev = timer->next = NULL;
margo_timer_queue(timer_inst, timer);
margo_timer_queue(timer_lst, timer);
return;
}
......@@ -122,16 +91,16 @@ void margo_timer_destroy(
margo_instance_id mid,
margo_timer_t *timer)
{
struct margo_timer_instance *timer_inst;
struct margo_timer_list *timer_lst;
timer_inst = margo_get_timer_instance(mid);
assert(timer_inst);
timer_lst = margo_get_timer_list(mid);
assert(timer_lst);
assert(timer);
ABT_mutex_lock(timer_inst->mutex);
ABT_mutex_lock(timer_lst->mutex);
if(timer->prev || timer->next)
DL_DELETE(timer_inst->queue_head, timer);
ABT_mutex_unlock(timer_inst->mutex);
DL_DELETE(timer_lst->queue_head, timer);
ABT_mutex_unlock(timer_lst->mutex);
return;
}
......@@ -141,22 +110,22 @@ void margo_check_timers(
{
int ret;
margo_timer_t *cur;
struct margo_timer_instance *timer_inst;
struct margo_timer_list *timer_lst;
ABT_pool handler_pool;
double now = ABT_get_wtime();
timer_inst = margo_get_timer_instance(mid);
assert(timer_inst);
timer_lst = margo_get_timer_list(mid);
assert(timer_lst);
ABT_mutex_lock(timer_inst->mutex);
ABT_mutex_lock(timer_lst->mutex);
/* iterate through timer list, performing timeout action
* for all elements which have passed expiration time
*/
while(timer_inst->queue_head && (timer_inst->queue_head->expiration < now))
while(timer_lst->queue_head && (timer_lst->queue_head->expiration < now))
{
cur = timer_inst->queue_head;
DL_DELETE(timer_inst->queue_head, cur);
cur = timer_lst->queue_head;
DL_DELETE(timer_lst->queue_head, cur);
cur->prev = cur->next = NULL;
/* schedule callback on the handler pool */
......@@ -172,7 +141,7 @@ void margo_check_timers(
cur->cb_fn(cur->cb_dat);
}
}
ABT_mutex_unlock(timer_inst->mutex);
ABT_mutex_unlock(timer_lst->mutex);
return;
}
......@@ -184,70 +153,47 @@ int margo_timer_get_next_expiration(
margo_instance_id mid,
double *next_timer_exp)
{
struct margo_timer_instance *timer_inst;
struct margo_timer_list *timer_lst;
double now = ABT_get_wtime();
int ret;
timer_inst = margo_get_timer_instance(mid);
assert(timer_inst);
timer_lst = margo_get_timer_list(mid);
assert(timer_lst);
ABT_mutex_lock(timer_inst->mutex);
if(timer_inst->queue_head)
ABT_mutex_lock(timer_lst->mutex);
if(timer_lst->queue_head)
{
*next_timer_exp = timer_inst->queue_head->expiration - now;
*next_timer_exp = timer_lst->queue_head->expiration - now;
ret = 0;
}
else
{
ret = -1;
}
ABT_mutex_unlock(timer_inst->mutex);
ABT_mutex_unlock(timer_lst->mutex);
return(ret);
}
static struct margo_timer_instance *margo_get_timer_instance(
margo_instance_id mid)
{
struct margo_timer_instance *timer_inst = NULL;
int i = 0;
/* find the timer instance using the given margo id */
while(timer_inst_table[i] != NULL)
{
if(timer_inst_table[i]->mid == mid)
{
timer_inst = timer_inst_table[i];
break;
}
i++;
}
if(timer_inst)
assert(timer_inst->mutex != ABT_MUTEX_NULL);
return(timer_inst);
}
static void margo_timer_queue(
struct margo_timer_instance *timer_inst,
struct margo_timer_list *timer_lst,
margo_timer_t *timer)
{
margo_timer_t *cur;
ABT_mutex_lock(timer_inst->mutex);
ABT_mutex_lock(timer_lst->mutex);
/* if list of timers is empty, put ourselves on it */
if(!(timer_inst->queue_head))
if(!(timer_lst->queue_head))
{
DL_APPEND(timer_inst->queue_head, timer);
DL_APPEND(timer_lst->queue_head, timer);
}
else
{
/* something else already in queue, keep it sorted in ascending order
* of expiration time
*/
cur = timer_inst->queue_head;
cur = timer_lst->queue_head;
do
{
/* walk backwards through queue */
......@@ -257,18 +203,18 @@ static void margo_timer_queue(
*/
if(cur->expiration < timer->expiration)
{
DL_APPEND_ELEM(timer_inst->queue_head, cur, timer);
DL_APPEND_ELEM(timer_lst->queue_head, cur, timer);
break;
}
}while(cur != timer_inst->queue_head);
}while(cur != timer_lst->queue_head);
/* if we never found one with an expiration before this one, then
* this one is the new head
*/
if(timer->prev == NULL && timer->next == NULL)
DL_PREPEND(timer_inst->queue_head, timer);
DL_PREPEND(timer_lst->queue_head, timer);
}
ABT_mutex_unlock(timer_inst->mutex);
ABT_mutex_unlock(timer_lst->mutex);
return;
}
......@@ -23,19 +23,16 @@ typedef struct margo_timed_element
} margo_timer_t;
/**
* Initializes the margo timer interface
* @param [in] mid Margo instance
* @returns 0 on success, -1 on failure
* Creates a margo_timer_list.
* @returns a new margo_timer_list, or NULL if failed
*/
int margo_timer_instance_init(
margo_instance_id mid);
struct margo_timer_list* margo_timer_list_create();
/**
* Shuts down the margo timer interface
* @param [in] mid Margo instance
* Frees the timer list
* @param [in] timer_lst timer list to free
*/
void margo_timer_instance_finalize(
margo_instance_id mid);
void margo_timer_list_free(struct margo_timer_list* timer_lst);
/**
* Initializes a margo timer object which will perform some action
......@@ -80,6 +77,12 @@ int margo_timer_get_next_expiration(
margo_instance_id mid,
double *next_timer_exp);
/**
* Gets the margo_timer_list from the margo instance.
* This function is defined in margo.c.
*/
struct margo_timer_list *margo_get_timer_list(margo_instance_id mid);
#ifdef __cplusplus
}
#endif
......
......@@ -70,6 +70,8 @@ struct margo_finalize_cb
struct margo_finalize_cb* next;
};
struct margo_timer_list; /* defined in margo-timer.c */
struct margo_instance
{
/* mercury/argobots state */
......@@ -96,6 +98,9 @@ struct margo_instance
ABT_cond finalize_cond;
struct margo_finalize_cb* finalize_cb;
/* timer data */
struct margo_timer_list* timer_list;
/* hash table to track multiplexed rpcs registered with margo */
struct mplex_element *mplex_table;
......@@ -238,7 +243,7 @@ margo_instance_id margo_init(const char *addr_str, int mode,
err:
if(mid)
{
margo_timer_instance_finalize(mid);
margo_timer_list_free(mid->timer_list);
ABT_mutex_free(&mid->finalize_mutex);
ABT_cond_free(&mid->finalize_cond);
free(mid);
......@@ -289,8 +294,8 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
mid->refcount = 1;
mid->finalize_cb = NULL;
ret = margo_timer_instance_init(mid);
if(ret != 0) goto err;
mid->timer_list = margo_timer_list_create();
if(mid->timer_list == NULL) goto err;
/* initialize the handle cache */
hret = margo_handle_cache_init(mid);
......@@ -306,7 +311,7 @@ err:
if(mid)
{
margo_handle_cache_destroy(mid);
margo_timer_instance_finalize(mid);
margo_timer_list_free(mid->timer_list);
ABT_mutex_free(&mid->finalize_mutex);
ABT_cond_free(&mid->finalize_cond);
free(mid);
......@@ -327,7 +332,7 @@ static void margo_cleanup(margo_instance_id mid)
free(tmp);
}
margo_timer_instance_finalize(mid);
margo_timer_list_free(mid->timer_list);
/* delete the hash used for multiplexing */
delete_multiplexing_hash(mid);
......@@ -1442,3 +1447,8 @@ finish:
ABT_mutex_unlock(mid->handle_cache_mtx);
return hret;
}
struct margo_timer_list *margo_get_timer_list(margo_instance_id mid)
{
return mid->timer_list;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment