margo-timer.c 6.49 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14

/*
 * (C) 2016 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>

#include <abt.h>
15
#include "margo.h"
16
17
18
19
#include "margo-timer.h"
#include "utlist.h"


20
21
22
23
24
25
26
27
28
29
30
/* structure for mapping margo instance ids to corresponding timer instances */
struct margo_timer_instance
{
    margo_instance_id mid;
    ABT_mutex mutex;
    margo_timer_t *queue_head;
};

#define MAX_TIMER_INSTANCES 8
static int timer_inst_table_size = 0;
static struct margo_timer_instance *timer_inst_table[MAX_TIMER_INSTANCES] = {NULL};
31

32
33
34
35
36
static struct margo_timer_instance *margo_get_timer_instance(
    margo_instance_id mid);
static void margo_timer_queue(
    struct margo_timer_instance *timer_inst,
    margo_timer_t *timer);
37
38


39
40
int margo_timer_instance_init(
    margo_instance_id mid)
41
{
42
43
44
45
46
    struct margo_timer_instance *timer_inst;

    if(timer_inst_table_size >= MAX_TIMER_INSTANCES)
        return(-1);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
47
    timer_inst = malloc(sizeof(*timer_inst));
48
49
50
51
52
53
54
55
56
57
58
    if(!timer_inst)
        return(-1);

    timer_inst->mid = mid;
    ABT_mutex_create(&(timer_inst->mutex));
    timer_inst->queue_head = NULL;

    /* add this instance to the table of active timer instances */
    timer_inst_table[timer_inst_table_size++] = timer_inst;

    return(0);
59
60
}

61
62
void margo_timer_instance_finalize(
    margo_instance_id mid)
63
{
64
    struct margo_timer_instance *timer_inst;
Shane Snyder's avatar
Shane Snyder committed
65
    margo_timer_t *cur;
66
    int i = 0;
67

68
69
    timer_inst = margo_get_timer_instance(mid);
    if(!timer_inst)
70
71
        return;

72
    ABT_mutex_lock(timer_inst->mutex);
Shane Snyder's avatar
Shane Snyder committed
73
    /* delete any remaining timers from the queue */
74
75
76
77
78
79
80
81
82
83
84
85
    while(timer_inst->queue_head)
    {
        cur = timer_inst->queue_head;
        DL_DELETE(timer_inst->queue_head, cur);
    }
    ABT_mutex_unlock(timer_inst->mutex);
    ABT_mutex_free(&(timer_inst->mutex));

    /* remove this timer instance from the active table */
    while(timer_inst_table[i] != timer_inst)
        i++;
    while(i < timer_inst_table_size - 1)
86
    {
87
88
        timer_inst_table[i] = timer_inst_table[i+1];
        i++;
89
    }
90
91
92
    timer_inst_table[i] = NULL;
    timer_inst_table_size--;
    free(timer_inst);
Shane Snyder's avatar
Shane Snyder committed
93

94
95
96
    return;
}

Shane Snyder's avatar
Shane Snyder committed
97
void margo_timer_init(
98
    margo_instance_id mid,
Shane Snyder's avatar
Shane Snyder committed
99
    margo_timer_t *timer,
100
101
    margo_timer_cb_fn cb_fn,
    void *cb_dat,
Shane Snyder's avatar
Shane Snyder committed
102
    double timeout_ms)
103
{
104
105
106
107
    struct margo_timer_instance *timer_inst;

    timer_inst = margo_get_timer_instance(mid);
    assert(timer_inst);
Shane Snyder's avatar
Shane Snyder committed
108
    assert(timer);
109

Shane Snyder's avatar
Shane Snyder committed
110
111
112
113
114
    memset(timer, 0, sizeof(*timer));
    timer->cb_fn = cb_fn;
    timer->cb_dat = cb_dat;
    timer->expiration = ABT_get_wtime() + (timeout_ms/1000);
    timer->prev = timer->next = NULL;
115

116
    margo_timer_queue(timer_inst, timer);
117
118
119
120

    return;
}

Shane Snyder's avatar
Shane Snyder committed
121
void margo_timer_destroy(
122
    margo_instance_id mid,
Shane Snyder's avatar
Shane Snyder committed
123
    margo_timer_t *timer)
124
{
125
126
127
128
    struct margo_timer_instance *timer_inst;

    timer_inst = margo_get_timer_instance(mid);
    assert(timer_inst);
Shane Snyder's avatar
Shane Snyder committed
129
    assert(timer);
130

131
    ABT_mutex_lock(timer_inst->mutex);
Shane Snyder's avatar
Shane Snyder committed
132
    if(timer->prev || timer->next)
133
134
        DL_DELETE(timer_inst->queue_head, timer);
    ABT_mutex_unlock(timer_inst->mutex);
135

136
137
138
    return;
}

139
140
void margo_check_timers(
    margo_instance_id mid)
141
{
142
    int ret;
Shane Snyder's avatar
Shane Snyder committed
143
    margo_timer_t *cur;
144
    struct margo_timer_instance *timer_inst;
145
    ABT_pool *handler_pool;
146
147
    double now = ABT_get_wtime();

148
149
    timer_inst = margo_get_timer_instance(mid);
    assert(timer_inst);
150

151
    ABT_mutex_lock(timer_inst->mutex);
152
153
154
155

    /* iterate through timer list, performing timeout action
     * for all elements which have passed expiration time
     */
156
    while(timer_inst->queue_head && (timer_inst->queue_head->expiration < now))
157
    {
158
159
        cur = timer_inst->queue_head;
        DL_DELETE(timer_inst->queue_head, cur);
Shane Snyder's avatar
Shane Snyder committed
160
        cur->prev = cur->next = NULL;
161

162
163
        /* schedule callback on the handler pool */
        handler_pool = margo_get_handler_pool(mid);
164
165
166
167
168
169
170
171
172
173
        if(*handler_pool != ABT_POOL_NULL)
        {
            ret = ABT_thread_create(*handler_pool, cur->cb_fn, cur->cb_dat,
                ABT_THREAD_ATTR_NULL, NULL);
            assert(ret == ABT_SUCCESS);
        }
        else
        {
            cur->cb_fn(cur->cb_dat);
        }
174
    }
175
    ABT_mutex_unlock(timer_inst->mutex);
176
177
178
179

    return;
}

180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
/* returns 0 and sets 'next_timer_exp' if the timer instance
 * has timers queued up, -1 otherwise
 */
int margo_timer_get_next_expiration(
    margo_instance_id mid,
    double *next_timer_exp)
{
    struct margo_timer_instance *timer_inst;
    double now = ABT_get_wtime();
    int ret;

    timer_inst = margo_get_timer_instance(mid);
    assert(timer_inst);

    ABT_mutex_lock(timer_inst->mutex);
    if(timer_inst->queue_head)
    {
        *next_timer_exp = timer_inst->queue_head->expiration - now;
        ret = 0;
    }
    else
    {
        ret = -1;
    }
    ABT_mutex_unlock(timer_inst->mutex);

    return(ret);
}

209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
static struct margo_timer_instance *margo_get_timer_instance(
    margo_instance_id mid)
{
    struct margo_timer_instance *timer_inst = NULL;
    int i = 0;

    /* find the timer instance using the given margo id */
    while(timer_inst_table[i] != NULL)
    {
        if(timer_inst_table[i]->mid == mid)
        {
            timer_inst = timer_inst_table[i];
            break;
        }
        i++;
    }

    if(timer_inst)
        assert(timer_inst->mutex != ABT_MUTEX_NULL);

    return(timer_inst);
}

static void margo_timer_queue(
    struct margo_timer_instance *timer_inst,
    margo_timer_t *timer)
235
{
Shane Snyder's avatar
Shane Snyder committed
236
    margo_timer_t *cur;
237

238
    ABT_mutex_lock(timer_inst->mutex);
239
240

    /* if list of timers is empty, put ourselves on it */
241
    if(!(timer_inst->queue_head))
242
    {
243
        DL_APPEND(timer_inst->queue_head, timer);
244
245
246
247
248
249
    }
    else
    {
        /* something else already in queue, keep it sorted in ascending order
         * of expiration time
         */
250
        cur = timer_inst->queue_head;
251
252
253
254
255
256
257
        do
        {
            /* walk backwards through queue */
            cur = cur->prev;
            /* as soon as we find an element that expires before this one, 
             * then we add ours after it
             */
Shane Snyder's avatar
Shane Snyder committed
258
            if(cur->expiration < timer->expiration)
259
            {
260
                DL_APPEND_ELEM(timer_inst->queue_head, cur, timer);
261
262
                break;
            }
263
        }while(cur != timer_inst->queue_head);
264
265
266
267

        /* if we never found one with an expiration before this one, then
         * this one is the new head
         */
Shane Snyder's avatar
Shane Snyder committed
268
        if(timer->prev == NULL && timer->next == NULL)
269
            DL_PREPEND(timer_inst->queue_head, timer);
270
    }
271
    ABT_mutex_unlock(timer_inst->mutex);
272
273
274

    return;
}