margo-timer.c 5.87 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14

/*
 * (C) 2016 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>

#include <abt.h>
15
#include "margo.h"
16 17 18 19
#include "margo-timer.h"
#include "utlist.h"


20 21 22 23 24 25 26 27 28 29 30
/* structure for mapping margo instance ids to corresponding timer instances */
struct margo_timer_instance
{
    margo_instance_id mid;
    ABT_mutex mutex;
    margo_timer_t *queue_head;
};

#define MAX_TIMER_INSTANCES 8
static int timer_inst_table_size = 0;
static struct margo_timer_instance *timer_inst_table[MAX_TIMER_INSTANCES] = {NULL};
31

32 33 34 35 36
static struct margo_timer_instance *margo_get_timer_instance(
    margo_instance_id mid);
static void margo_timer_queue(
    struct margo_timer_instance *timer_inst,
    margo_timer_t *timer);
37 38


39 40
int margo_timer_instance_init(
    margo_instance_id mid)
41
{
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
    struct margo_timer_instance *timer_inst;

    if(timer_inst_table_size >= MAX_TIMER_INSTANCES)
        return(-1);

    timer_inst = malloc(sizeof(timer_inst));
    if(!timer_inst)
        return(-1);

    timer_inst->mid = mid;
    ABT_mutex_create(&(timer_inst->mutex));
    timer_inst->queue_head = NULL;

    /* add this instance to the table of active timer instances */
    timer_inst_table[timer_inst_table_size++] = timer_inst;

    return(0);
59 60
}

61 62
void margo_timer_instance_finalize(
    margo_instance_id mid)
63
{
64
    struct margo_timer_instance *timer_inst;
Shane Snyder's avatar
Shane Snyder committed
65
    margo_timer_t *cur;
66
    int i = 0;
67

68 69
    timer_inst = margo_get_timer_instance(mid);
    if(!timer_inst)
70 71
        return;

72
    ABT_mutex_lock(timer_inst->mutex);
Shane Snyder's avatar
Shane Snyder committed
73
    /* delete any remaining timers from the queue */
74 75 76 77 78 79 80 81 82 83 84 85
    while(timer_inst->queue_head)
    {
        cur = timer_inst->queue_head;
        DL_DELETE(timer_inst->queue_head, cur);
    }
    ABT_mutex_unlock(timer_inst->mutex);
    ABT_mutex_free(&(timer_inst->mutex));

    /* remove this timer instance from the active table */
    while(timer_inst_table[i] != timer_inst)
        i++;
    while(i < timer_inst_table_size - 1)
86
    {
87 88
        timer_inst_table[i] = timer_inst_table[i+1];
        i++;
89
    }
90 91 92
    timer_inst_table[i] = NULL;
    timer_inst_table_size--;
    free(timer_inst);
Shane Snyder's avatar
Shane Snyder committed
93

94 95 96
    return;
}

Shane Snyder's avatar
Shane Snyder committed
97
void margo_timer_init(
98
    margo_instance_id mid,
Shane Snyder's avatar
Shane Snyder committed
99
    margo_timer_t *timer,
100 101
    margo_timer_cb_fn cb_fn,
    void *cb_dat,
Shane Snyder's avatar
Shane Snyder committed
102
    double timeout_ms)
103
{
104 105 106 107
    struct margo_timer_instance *timer_inst;

    timer_inst = margo_get_timer_instance(mid);
    assert(timer_inst);
Shane Snyder's avatar
Shane Snyder committed
108
    assert(timer);
109

Shane Snyder's avatar
Shane Snyder committed
110 111 112 113 114
    memset(timer, 0, sizeof(*timer));
    timer->cb_fn = cb_fn;
    timer->cb_dat = cb_dat;
    timer->expiration = ABT_get_wtime() + (timeout_ms/1000);
    timer->prev = timer->next = NULL;
115

116
    margo_timer_queue(timer_inst, timer);
117 118 119 120

    return;
}

Shane Snyder's avatar
Shane Snyder committed
121
void margo_timer_destroy(
122
    margo_instance_id mid,
Shane Snyder's avatar
Shane Snyder committed
123
    margo_timer_t *timer)
124
{
125 126 127 128
    struct margo_timer_instance *timer_inst;

    timer_inst = margo_get_timer_instance(mid);
    assert(timer_inst);
Shane Snyder's avatar
Shane Snyder committed
129
    assert(timer);
130

131
    ABT_mutex_lock(timer_inst->mutex);
Shane Snyder's avatar
Shane Snyder committed
132
    if(timer->prev || timer->next)
133 134
        DL_DELETE(timer_inst->queue_head, timer);
    ABT_mutex_unlock(timer_inst->mutex);
135

136 137 138
    return;
}

139 140
void margo_check_timers(
    margo_instance_id mid)
141
{
142
    int ret;
Shane Snyder's avatar
Shane Snyder committed
143
    margo_timer_t *cur;
144
    struct margo_timer_instance *timer_inst;
145
    ABT_pool *handler_pool;
146 147
    double now = ABT_get_wtime();

148 149
    timer_inst = margo_get_timer_instance(mid);
    assert(timer_inst);
150

151
    ABT_mutex_lock(timer_inst->mutex);
152 153 154 155

    /* iterate through timer list, performing timeout action
     * for all elements which have passed expiration time
     */
156
    while(timer_inst->queue_head && (timer_inst->queue_head->expiration < now))
157
    {
158 159
        cur = timer_inst->queue_head;
        DL_DELETE(timer_inst->queue_head, cur);
Shane Snyder's avatar
Shane Snyder committed
160
        cur->prev = cur->next = NULL;
161

162 163
        /* schedule callback on the handler pool */
        handler_pool = margo_get_handler_pool(mid);
164 165 166 167 168 169 170 171 172 173
        if(*handler_pool != ABT_POOL_NULL)
        {
            ret = ABT_thread_create(*handler_pool, cur->cb_fn, cur->cb_dat,
                ABT_THREAD_ATTR_NULL, NULL);
            assert(ret == ABT_SUCCESS);
        }
        else
        {
            cur->cb_fn(cur->cb_dat);
        }
174
    }
175
    ABT_mutex_unlock(timer_inst->mutex);
176 177 178 179

    return;
}

180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
static struct margo_timer_instance *margo_get_timer_instance(
    margo_instance_id mid)
{
    struct margo_timer_instance *timer_inst = NULL;
    int i = 0;

    /* find the timer instance using the given margo id */
    while(timer_inst_table[i] != NULL)
    {
        if(timer_inst_table[i]->mid == mid)
        {
            timer_inst = timer_inst_table[i];
            break;
        }
        i++;
    }

    if(timer_inst)
        assert(timer_inst->mutex != ABT_MUTEX_NULL);

    return(timer_inst);
}

static void margo_timer_queue(
    struct margo_timer_instance *timer_inst,
    margo_timer_t *timer)
206
{
Shane Snyder's avatar
Shane Snyder committed
207
    margo_timer_t *cur;
208

209
    ABT_mutex_lock(timer_inst->mutex);
210 211

    /* if list of timers is empty, put ourselves on it */
212
    if(!(timer_inst->queue_head))
213
    {
214
        DL_APPEND(timer_inst->queue_head, timer);
215 216 217 218 219 220
    }
    else
    {
        /* something else already in queue, keep it sorted in ascending order
         * of expiration time
         */
221
        cur = timer_inst->queue_head;
222 223 224 225 226 227 228
        do
        {
            /* walk backwards through queue */
            cur = cur->prev;
            /* as soon as we find an element that expires before this one, 
             * then we add ours after it
             */
Shane Snyder's avatar
Shane Snyder committed
229
            if(cur->expiration < timer->expiration)
230
            {
231
                DL_APPEND_ELEM(timer_inst->queue_head, cur, timer);
232 233
                break;
            }
234
        }while(cur != timer_inst->queue_head);
235 236 237 238

        /* if we never found one with an expiration before this one, then
         * this one is the new head
         */
Shane Snyder's avatar
Shane Snyder committed
239
        if(timer->prev == NULL && timer->next == NULL)
240
            DL_PREPEND(timer_inst->queue_head, timer);
241
    }
242
    ABT_mutex_unlock(timer_inst->mutex);
243 244 245

    return;
}