margo-timer.c 4.99 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14

/*
 * (C) 2016 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>

#include <abt.h>
15
#include "margo.h"
16 17 18 19
#include "margo-timer.h"
#include "utlist.h"


20
/* structure for mapping margo instance ids to corresponding timer instances */
21
struct margo_timer_list
22 23 24 25 26 27
{
    ABT_mutex mutex;
    margo_timer_t *queue_head;
};

static void margo_timer_queue(
28
    struct margo_timer_list *timer_lst,
29
    margo_timer_t *timer);
30 31


32
struct margo_timer_list* margo_timer_list_create()
33
{
34
    struct margo_timer_list *timer_lst;
35

36 37 38
    timer_lst = malloc(sizeof(*timer_lst));
    if(!timer_lst)
        return NULL;
39

40 41
    ABT_mutex_create(&(timer_lst->mutex));
    timer_lst->queue_head = NULL;
42

43
    return timer_lst;
44 45
}

46
void margo_timer_list_free(struct margo_timer_list* timer_lst)
47
{
Shane Snyder's avatar
Shane Snyder committed
48
    margo_timer_t *cur;
49
    int i = 0;
50

51
    ABT_mutex_lock(timer_lst->mutex);
Shane Snyder's avatar
Shane Snyder committed
52
    /* delete any remaining timers from the queue */
53
    while(timer_lst->queue_head)
54
    {
55 56
        cur = timer_lst->queue_head;
        DL_DELETE(timer_lst->queue_head, cur);
57
    }
58 59
    ABT_mutex_unlock(timer_lst->mutex);
    ABT_mutex_free(&(timer_lst->mutex));
60

61
    free(timer_lst);
Shane Snyder's avatar
Shane Snyder committed
62

63 64 65
    return;
}

Shane Snyder's avatar
Shane Snyder committed
66
void margo_timer_init(
67
    margo_instance_id mid,
Shane Snyder's avatar
Shane Snyder committed
68
    margo_timer_t *timer,
69 70
    margo_timer_cb_fn cb_fn,
    void *cb_dat,
Shane Snyder's avatar
Shane Snyder committed
71
    double timeout_ms)
72
{
73
    struct margo_timer_list *timer_lst;
74

75 76
    timer_lst = margo_get_timer_list(mid);
    assert(timer_lst);
Shane Snyder's avatar
Shane Snyder committed
77
    assert(timer);
78

Shane Snyder's avatar
Shane Snyder committed
79 80 81 82 83
    memset(timer, 0, sizeof(*timer));
    timer->cb_fn = cb_fn;
    timer->cb_dat = cb_dat;
    timer->expiration = ABT_get_wtime() + (timeout_ms/1000);
    timer->prev = timer->next = NULL;
84

85
    margo_timer_queue(timer_lst, timer);
86 87 88 89

    return;
}

Shane Snyder's avatar
Shane Snyder committed
90
void margo_timer_destroy(
91
    margo_instance_id mid,
Shane Snyder's avatar
Shane Snyder committed
92
    margo_timer_t *timer)
93
{
94
    struct margo_timer_list *timer_lst;
95

96 97
    timer_lst = margo_get_timer_list(mid);
    assert(timer_lst);
Shane Snyder's avatar
Shane Snyder committed
98
    assert(timer);
99

100
    ABT_mutex_lock(timer_lst->mutex);
Shane Snyder's avatar
Shane Snyder committed
101
    if(timer->prev || timer->next)
102 103
        DL_DELETE(timer_lst->queue_head, timer);
    ABT_mutex_unlock(timer_lst->mutex);
104

105 106 107
    return;
}

108 109
void margo_check_timers(
    margo_instance_id mid)
110
{
111
    int ret;
Shane Snyder's avatar
Shane Snyder committed
112
    margo_timer_t *cur;
113
    struct margo_timer_list *timer_lst;
114
    ABT_pool handler_pool;
115 116
    double now = ABT_get_wtime();

117 118
    timer_lst = margo_get_timer_list(mid);
    assert(timer_lst);
119

120
    ABT_mutex_lock(timer_lst->mutex);
121 122 123 124

    /* iterate through timer list, performing timeout action
     * for all elements which have passed expiration time
     */
125
    while(timer_lst->queue_head && (timer_lst->queue_head->expiration < now))
126
    {
127 128
        cur = timer_lst->queue_head;
        DL_DELETE(timer_lst->queue_head, cur);
Shane Snyder's avatar
Shane Snyder committed
129
        cur->prev = cur->next = NULL;
130

131
        /* schedule callback on the handler pool */
132 133
        margo_get_handler_pool(mid, &handler_pool);
        if(handler_pool != ABT_POOL_NULL)
134
        {
135
            ret = ABT_thread_create(handler_pool, cur->cb_fn, cur->cb_dat,
136 137 138 139 140 141 142
                ABT_THREAD_ATTR_NULL, NULL);
            assert(ret == ABT_SUCCESS);
        }
        else
        {
            cur->cb_fn(cur->cb_dat);
        }
143
    }
144
    ABT_mutex_unlock(timer_lst->mutex);
145 146 147 148

    return;
}

149 150 151 152 153 154 155
/* returns 0 and sets 'next_timer_exp' if the timer instance
 * has timers queued up, -1 otherwise
 */
int margo_timer_get_next_expiration(
    margo_instance_id mid,
    double *next_timer_exp)
{
156
    struct margo_timer_list *timer_lst;
157 158 159
    double now = ABT_get_wtime();
    int ret;

160 161
    timer_lst = margo_get_timer_list(mid);
    assert(timer_lst);
162

163 164
    ABT_mutex_lock(timer_lst->mutex);
    if(timer_lst->queue_head)
165
    {
166
        *next_timer_exp = timer_lst->queue_head->expiration - now;
167 168 169 170 171 172
        ret = 0;
    }
    else
    {
        ret = -1;
    }
173
    ABT_mutex_unlock(timer_lst->mutex);
174 175 176 177

    return(ret);
}

178
static void margo_timer_queue(
179
    struct margo_timer_list *timer_lst,
180
    margo_timer_t *timer)
181
{
Shane Snyder's avatar
Shane Snyder committed
182
    margo_timer_t *cur;
183

184
    ABT_mutex_lock(timer_lst->mutex);
185 186

    /* if list of timers is empty, put ourselves on it */
187
    if(!(timer_lst->queue_head))
188
    {
189
        DL_APPEND(timer_lst->queue_head, timer);
190 191 192 193 194 195
    }
    else
    {
        /* something else already in queue, keep it sorted in ascending order
         * of expiration time
         */
196
        cur = timer_lst->queue_head;
197 198 199 200 201 202 203
        do
        {
            /* walk backwards through queue */
            cur = cur->prev;
            /* as soon as we find an element that expires before this one, 
             * then we add ours after it
             */
Shane Snyder's avatar
Shane Snyder committed
204
            if(cur->expiration < timer->expiration)
205
            {
206
                DL_APPEND_ELEM(timer_lst->queue_head, cur, timer);
207 208
                break;
            }
209
        }while(cur != timer_lst->queue_head);
210 211 212 213

        /* if we never found one with an expiration before this one, then
         * this one is the new head
         */
Shane Snyder's avatar
Shane Snyder committed
214
        if(timer->prev == NULL && timer->next == NULL)
215
            DL_PREPEND(timer_lst->queue_head, timer);
216
    }
217
    ABT_mutex_unlock(timer_lst->mutex);
218 219 220

    return;
}