margo-timer.c 5.03 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14

/*
 * (C) 2016 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>

#include <abt.h>
15
#include "margo.h"
16 17 18 19
#include "margo-timer.h"
#include "utlist.h"


20
/* structure for mapping margo instance ids to corresponding timer instances */
21
struct margo_timer_list
22 23 24 25 26 27
{
    ABT_mutex mutex;
    margo_timer_t *queue_head;
};

static void margo_timer_queue(
28
    struct margo_timer_list *timer_lst,
29
    margo_timer_t *timer);
30 31


32
struct margo_timer_list* margo_timer_list_create()
33
{
34
    struct margo_timer_list *timer_lst;
35

36 37 38
    timer_lst = malloc(sizeof(*timer_lst));
    if(!timer_lst)
        return NULL;
39

40 41
    ABT_mutex_create(&(timer_lst->mutex));
    timer_lst->queue_head = NULL;
42

43
    return timer_lst;
44 45
}

46
void margo_timer_list_free(struct margo_timer_list* timer_lst)
47
{
Shane Snyder's avatar
Shane Snyder committed
48
    margo_timer_t *cur;
49

50
    ABT_mutex_lock(timer_lst->mutex);
Shane Snyder's avatar
Shane Snyder committed
51
    /* delete any remaining timers from the queue */
52
    while(timer_lst->queue_head)
53
    {
54 55
        cur = timer_lst->queue_head;
        DL_DELETE(timer_lst->queue_head, cur);
56
    }
57 58
    ABT_mutex_unlock(timer_lst->mutex);
    ABT_mutex_free(&(timer_lst->mutex));
59

60
    free(timer_lst);
Shane Snyder's avatar
Shane Snyder committed
61

62 63 64
    return;
}

Shane Snyder's avatar
Shane Snyder committed
65
void margo_timer_init(
66
    margo_instance_id mid,
Shane Snyder's avatar
Shane Snyder committed
67
    margo_timer_t *timer,
68 69
    margo_timer_cb_fn cb_fn,
    void *cb_dat,
Shane Snyder's avatar
Shane Snyder committed
70
    double timeout_ms)
71
{
72
    struct margo_timer_list *timer_lst;
73

74 75
    timer_lst = margo_get_timer_list(mid);
    assert(timer_lst);
Shane Snyder's avatar
Shane Snyder committed
76
    assert(timer);
77

Shane Snyder's avatar
Shane Snyder committed
78 79 80 81 82
    memset(timer, 0, sizeof(*timer));
    timer->cb_fn = cb_fn;
    timer->cb_dat = cb_dat;
    timer->expiration = ABT_get_wtime() + (timeout_ms/1000);
    timer->prev = timer->next = NULL;
83

84
    margo_timer_queue(timer_lst, timer);
85 86 87 88

    return;
}

Shane Snyder's avatar
Shane Snyder committed
89
void margo_timer_destroy(
90
    margo_instance_id mid,
Shane Snyder's avatar
Shane Snyder committed
91
    margo_timer_t *timer)
92
{
93
    struct margo_timer_list *timer_lst;
94

95 96
    timer_lst = margo_get_timer_list(mid);
    assert(timer_lst);
Shane Snyder's avatar
Shane Snyder committed
97
    assert(timer);
98

99
    ABT_mutex_lock(timer_lst->mutex);
Shane Snyder's avatar
Shane Snyder committed
100
    if(timer->prev || timer->next)
101 102
        DL_DELETE(timer_lst->queue_head, timer);
    ABT_mutex_unlock(timer_lst->mutex);
103

104 105 106
    return;
}

107 108
void margo_check_timers(
    margo_instance_id mid)
109
{
110
    int ret;
Shane Snyder's avatar
Shane Snyder committed
111
    margo_timer_t *cur;
112
    struct margo_timer_list *timer_lst;
113
    ABT_pool handler_pool;
Philip Carns's avatar
Philip Carns committed
114
    double now;
115

116 117
    timer_lst = margo_get_timer_list(mid);
    assert(timer_lst);
118

119
    ABT_mutex_lock(timer_lst->mutex);
120

Philip Carns's avatar
Philip Carns committed
121 122 123
    if(timer_lst->queue_head)
        now = ABT_get_wtime();

124 125 126
    /* iterate through timer list, performing timeout action
     * for all elements which have passed expiration time
     */
127
    while(timer_lst->queue_head && (timer_lst->queue_head->expiration < now))
128
    {
129 130
        cur = timer_lst->queue_head;
        DL_DELETE(timer_lst->queue_head, cur);
Shane Snyder's avatar
Shane Snyder committed
131
        cur->prev = cur->next = NULL;
132

133
        /* schedule callback on the handler pool */
134 135
        margo_get_handler_pool(mid, &handler_pool);
        if(handler_pool != ABT_POOL_NULL)
136
        {
137
            ret = ABT_thread_create(handler_pool, cur->cb_fn, cur->cb_dat,
138 139 140 141 142 143 144
                ABT_THREAD_ATTR_NULL, NULL);
            assert(ret == ABT_SUCCESS);
        }
        else
        {
            cur->cb_fn(cur->cb_dat);
        }
145
    }
146
    ABT_mutex_unlock(timer_lst->mutex);
147 148 149 150

    return;
}

151 152 153 154 155 156 157
/* returns 0 and sets 'next_timer_exp' if the timer instance
 * has timers queued up, -1 otherwise
 */
int margo_timer_get_next_expiration(
    margo_instance_id mid,
    double *next_timer_exp)
{
158
    struct margo_timer_list *timer_lst;
Philip Carns's avatar
Philip Carns committed
159
    double now;
160 161
    int ret;

162 163
    timer_lst = margo_get_timer_list(mid);
    assert(timer_lst);
164

165 166
    ABT_mutex_lock(timer_lst->mutex);
    if(timer_lst->queue_head)
167
    {
Philip Carns's avatar
Philip Carns committed
168
        now = ABT_get_wtime();
169
        *next_timer_exp = timer_lst->queue_head->expiration - now;
170 171 172 173 174 175
        ret = 0;
    }
    else
    {
        ret = -1;
    }
176
    ABT_mutex_unlock(timer_lst->mutex);
177 178 179 180

    return(ret);
}

181
static void margo_timer_queue(
182
    struct margo_timer_list *timer_lst,
183
    margo_timer_t *timer)
184
{
Shane Snyder's avatar
Shane Snyder committed
185
    margo_timer_t *cur;
186

187
    ABT_mutex_lock(timer_lst->mutex);
188 189

    /* if list of timers is empty, put ourselves on it */
190
    if(!(timer_lst->queue_head))
191
    {
192
        DL_APPEND(timer_lst->queue_head, timer);
193 194 195 196 197 198
    }
    else
    {
        /* something else already in queue, keep it sorted in ascending order
         * of expiration time
         */
199
        cur = timer_lst->queue_head;
200 201 202 203 204 205 206
        do
        {
            /* walk backwards through queue */
            cur = cur->prev;
            /* as soon as we find an element that expires before this one, 
             * then we add ours after it
             */
Shane Snyder's avatar
Shane Snyder committed
207
            if(cur->expiration < timer->expiration)
208
            {
209
                DL_APPEND_ELEM(timer_lst->queue_head, cur, timer);
210 211
                break;
            }
212
        }while(cur != timer_lst->queue_head);
213 214 215 216

        /* if we never found one with an expiration before this one, then
         * this one is the new head
         */
Shane Snyder's avatar
Shane Snyder committed
217
        if(timer->prev == NULL && timer->next == NULL)
218
            DL_PREPEND(timer_lst->queue_head, timer);
219
    }
220
    ABT_mutex_unlock(timer_lst->mutex);
221 222 223

    return;
}