model-net-sched-impl.c 12.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/*
 * Copyright (C) 2014 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

#include <stdlib.h>
#include <assert.h>

#include "model-net-sched-impl.h"
#include "codes/model-net-sched.h"
#include "codes/model-net-method.h"
#include "codes/quicklist.h"

/// scheduler-specific data structures 
/// NOTE: for now, scheduler data structures are the same - this may change in
/// later versions

typedef struct mn_sched {
    // method containing packet event to call
    struct model_net_method *method;
    struct qlist_head reqs; // of type mn_sched_qitem
} mn_sched;

typedef struct mn_sched_qitem {
    model_net_request req;
    // remaining bytes to send
    uint64_t rem;
    // pointers to event structures 
    // sizes are given in the request struct
    void * remote_event;
    void * local_event;
    struct qlist_head ql;
} mn_sched_qitem;

/// scheduler-specific function decls and function table

/// FCFS
// void used to avoid ptr-to-ptr conv warnings
static void fcfs_init (struct model_net_method *method, void ** sched);
static void fcfs_destroy (void *sched);
static void fcfs_add (
        model_net_request *req, 
        int remote_event_size,
        void * remote_event,
        int local_event_size,
        void * local_event,
        void *sched, 
        model_net_sched_rc *rc, 
        tw_lp *lp);
static void fcfs_add_rc(void *sched, model_net_sched_rc *rc, tw_lp *lp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
52
53
54
55
static int  fcfs_next(tw_stime *poffset, void *sched, void *rc_event_save, 
        model_net_sched_rc *rc, tw_lp *lp);
static void fcfs_next_rc(void *sched, void *rc_event_save,
        model_net_sched_rc *rc, tw_lp *lp);
56
57
58
59
60
61
62
63
64
65
66
67
68

static void rr_init (struct model_net_method *method, void ** sched);
static void rr_destroy (void *sched);
static void rr_add (
        model_net_request *req,
        int remote_event_size,
        void * remote_event,
        int local_event_size,
        void * local_event,
        void *sched,
        model_net_sched_rc *rc,
        tw_lp *lp);
static void rr_add_rc(void *sched, model_net_sched_rc *rc, tw_lp *lp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
69
70
71
72
static int  rr_next(tw_stime *poffset, void *sched, void *rc_event_save,
        model_net_sched_rc *rc, tw_lp *lp);
static void rr_next_rc (void *sched, void *rc_event_save,
        model_net_sched_rc *rc, tw_lp *lp);
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119

/// function tables (names defined by X macro in model-net-sched.h)
static model_net_sched_interface fcfs_tab = 
{ &fcfs_init, &fcfs_destroy, &fcfs_add, &fcfs_add_rc, &fcfs_next, &fcfs_next_rc};
static model_net_sched_interface rr_tab = 
{ &rr_init, &rr_destroy, &rr_add, &rr_add_rc, &rr_next, &rr_next_rc};

#define X(a,b,c) c,
model_net_sched_interface * sched_interfaces[] = {
    SCHEDULER_TYPES
};
#undef X

/// FCFS implementation 

void fcfs_init(struct model_net_method *method, void ** sched){
    *sched = malloc(sizeof(mn_sched));
    mn_sched *ss = *sched;
    ss->method = method;
    INIT_QLIST_HEAD(&ss->reqs);
}

void fcfs_destroy(void *sched){
    free(sched);
}

void fcfs_add (
        model_net_request *req, 
        int remote_event_size,
        void * remote_event,
        int local_event_size,
        void * local_event,
        void *sched, 
        model_net_sched_rc *rc, 
        tw_lp *lp){
    // NOTE: in optimistic mode, we currently do not have a good way to
    // reliably free and re-initialize the q item and the local/remote events
    // when processing next/next_rc events. Hence, the memory leaks. Later on
    // we'll figure out a better way to handle this.
    mn_sched_qitem *q = malloc(sizeof(mn_sched_qitem));
    assert(q);
    q->req = *req;
    q->rem = req->is_pull ? PULL_MSG_SIZE : req->msg_size;
    if (remote_event_size > 0){
        q->remote_event = malloc(remote_event_size);
        memcpy(q->remote_event, remote_event, remote_event_size);
    }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
120
    else { q->remote_event = NULL; }
121
122
123
124
    if (local_event_size > 0){
        q->local_event = malloc(local_event_size);
        memcpy(q->local_event, local_event, local_event_size);
    }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
125
    else { q->local_event = NULL; }
126
127
128
129
130
131
132
133
134
    mn_sched *s = sched;
    qlist_add_tail(&q->ql, &s->reqs);
}

void fcfs_add_rc(void *sched, model_net_sched_rc *rc, tw_lp *lp){
    mn_sched *s = sched;
    struct qlist_head *ent = qlist_pop_back(&s->reqs);
    assert(ent != NULL);
    mn_sched_qitem *q = qlist_entry(ent, mn_sched_qitem, ql);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
135
136
137
    // free'ing NULLs is a no-op 
    free(q->remote_event);
    free(q->local_event);
138
139
140
    free(q);
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
141
142
int fcfs_next(tw_stime *poffset, void *sched, void *rc_event_save,
        model_net_sched_rc *rc, tw_lp *lp){
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
    mn_sched *s = sched;
    struct qlist_head *ent = s->reqs.next;
    if (ent == &s->reqs){
        rc->rtn = -1;
        return -1;
    }
    mn_sched_qitem *q = qlist_entry(ent, mn_sched_qitem, ql);
    
    // issue the next packet
    int is_last_packet;
    uint64_t psize;
    if (q->req.packet_size >= q->rem) {
        psize = q->rem;
        is_last_packet = 1;
    }
    else{
        psize = q->req.packet_size;
        is_last_packet = 0;
    }

    *poffset = s->method->model_net_method_packet_event(q->req.category,
            q->req.final_dest_lp, psize, q->req.is_pull, q->req.msg_size, 0.0,
            q->req.remote_event_size, q->remote_event, q->req.self_event_size,
            q->local_event, q->req.src_lp, lp, is_last_packet);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
168
    // if last packet - remove from list, free, save for rc
169
170
    if (is_last_packet){
        qlist_pop(&s->reqs);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
171
172
173
174
175
176
177
178
179
180
181
182
        rc->req = q->req;
        void *e_dat = rc_event_save;
        if (q->req.remote_event_size > 0){
            memcpy(e_dat, q->remote_event, q->req.remote_event_size);
            e_dat = (char*) e_dat + q->req.remote_event_size;
            free(q->remote_event);
        }
        if (q->req.self_event_size > 0){
            memcpy(e_dat, q->local_event, q->req.self_event_size);
            free(q->local_event);
        }
        free(q);
183
184
185
186
187
188
189
190
191
        rc->rtn = 1;
    }
    else{
        q->rem -= psize;
        rc->rtn = 0;
    }
    return rc->rtn;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
192
193
void fcfs_next_rc(void *sched, void *rc_event_save, model_net_sched_rc *rc,
        tw_lp *lp){
194
195
196
197
198
199
200
201
202
203
204
205
206
    mn_sched *s = sched;
    if (rc->rtn == -1){
        // no op
    }
    else{
        s->method->model_net_method_packet_event_rc(lp);
        if (rc->rtn == 0){
            // just get the front and increment rem
            mn_sched_qitem *q = qlist_entry(s->reqs.next, mn_sched_qitem, ql);
            // just increment rem
            q->rem += q->req.packet_size;
        }
        else if (rc->rtn == 1){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
            // re-create the q item
            mn_sched_qitem *q = malloc(sizeof(mn_sched_qitem));
            assert(q);
            q->req = rc->req;
            q->rem = q->req.msg_size % q->req.packet_size;
            if (q->rem == 0){ // processed exactly a packet's worth of data
                q->rem = q->req.packet_size;
            }
            void * e_dat = rc_event_save;
            if (q->req.remote_event_size > 0){
                q->remote_event = malloc(q->req.remote_event_size);
                memcpy(q->remote_event, e_dat, q->req.remote_event_size);
                e_dat = (char*) e_dat + q->req.remote_event_size;
            }
            if (q->req.self_event_size > 0) {
                q->local_event = malloc(q->req.self_event_size);
                memcpy(q->local_event, e_dat, q->req.self_event_size);
            }
            // add back to front of list
            qlist_add(&q->ql, &s->reqs);
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
        }
        else {
            assert(0);
        }
    }
}

void rr_init (struct model_net_method *method, void ** sched){
    *sched = malloc(sizeof(mn_sched));
    mn_sched *ss = *sched;
    ss->method = method;
    INIT_QLIST_HEAD(&ss->reqs);
}

void rr_destroy (void *sched){
    free(sched);
}

void rr_add (
        model_net_request *req,
        int remote_event_size,
        void * remote_event,
        int local_event_size,
        void * local_event,
        void *sched,
        model_net_sched_rc *rc,
        tw_lp *lp){
    // NOTE: in optimistic mode, we currently do not have a good way to
    // reliably free and re-initialize the q item and the local/remote events
    // when processing next/next_rc events. Hence, the memory leaks. Later on
    // we'll figure out a better way to handle this.
    mn_sched_qitem *q = malloc(sizeof(mn_sched_qitem));
    q->req = *req;
    q->rem = req->is_pull ? PULL_MSG_SIZE : req->msg_size;
    if (remote_event_size > 0){
        q->remote_event = malloc(remote_event_size);
        memcpy(q->remote_event, remote_event, remote_event_size);
    }
    if (local_event_size > 0){
        q->local_event = malloc(local_event_size);
        memcpy(q->local_event, local_event, local_event_size);
    }
    mn_sched *s = sched;
    qlist_add_tail(&q->ql, &s->reqs);
}
void rr_add_rc(void *sched, model_net_sched_rc *rc, tw_lp *lp){
    mn_sched *s = sched;
    struct qlist_head *ent = qlist_pop_back(&s->reqs);
    assert(ent != NULL);
    mn_sched_qitem *q = qlist_entry(ent, mn_sched_qitem, ql);
    if (q->remote_event) free(q->remote_event);
    if (q->local_event)  free(q->local_event);
    free(q);
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
282
283
int rr_next(tw_stime *poffset, void *sched, void *rc_event_save,
        model_net_sched_rc *rc, tw_lp *lp){
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
    mn_sched *s = sched;
    struct qlist_head *ent = qlist_pop(&s->reqs);
    if (ent == NULL){
        rc->rtn = -1;
        return -1;
    }
    mn_sched_qitem *q = qlist_entry(ent, mn_sched_qitem, ql);

    // issue the next packet
    int is_last_packet;
    uint64_t psize;
    if (q->req.packet_size >= q->rem) {
        psize = q->rem;
        is_last_packet = 1;
    }
    else{
        psize = q->req.packet_size;
        is_last_packet = 0;
    }

    *poffset = s->method->model_net_method_packet_event(q->req.category,
            q->req.final_dest_lp, psize, q->req.is_pull, q->req.msg_size, 0.0,
            q->req.remote_event_size, q->remote_event, q->req.self_event_size,
            q->local_event, q->req.src_lp, lp, is_last_packet);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
309
    // if last packet - remove from list, free, save for rc
310
    if (is_last_packet){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
311
312
313
314
315
316
317
318
319
320
321
322
        rc->req = q->req;
        void *e_dat = rc_event_save;
        if (q->req.remote_event_size > 0){
            memcpy(e_dat, q->remote_event, q->req.remote_event_size);
            e_dat = (char*) e_dat + q->req.remote_event_size;
            free(q->remote_event);
        }
        if (q->req.self_event_size > 0){
            memcpy(e_dat, q->local_event, q->req.self_event_size);
            free(q->local_event);
        }
        free(q);
323
324
325
326
327
328
329
330
331
332
        rc->rtn = 1;
    }
    else{
        q->rem -= psize;
        qlist_add_tail(&q->ql, &s->reqs);
        rc->rtn = 0;
    }
    return rc->rtn;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
333
void rr_next_rc (void *sched, void *rc_event_save, model_net_sched_rc *rc, tw_lp *lp){
334
335
336
337
338
339
340
341
342
343
344
345
346
347
    mn_sched *s = sched;
    if (rc->rtn == -1){
        // no op
    }
    else {
        s->method->model_net_method_packet_event_rc(lp);
        if (rc->rtn == 0){
            // increment rem and put item back to front of list
            struct qlist_head *ent = qlist_pop_back(&s->reqs);
            qlist_add(ent, &s->reqs);
            mn_sched_qitem *q = qlist_entry(ent, mn_sched_qitem, ql);
            q->rem += q->req.packet_size;
        }
        else if (rc->rtn == 1){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
            // re-create the q item
            mn_sched_qitem *q = malloc(sizeof(mn_sched_qitem));
            assert(q);
            q->req = rc->req;
            q->rem = q->req.msg_size % q->req.packet_size;
            if (q->rem == 0){ // processed exactly a packet's worth of data
                q->rem = q->req.packet_size;
            }
            void * e_dat = rc_event_save;
            if (q->req.remote_event_size > 0){
                q->remote_event = malloc(q->req.remote_event_size);
                memcpy(q->remote_event, e_dat, q->req.remote_event_size);
                e_dat = (char*) e_dat + q->req.remote_event_size;
            }
            if (q->req.self_event_size > 0) {
                q->local_event = malloc(q->req.self_event_size);
                memcpy(q->local_event, e_dat, q->req.self_event_size);
            }
            // add back to front of list
            qlist_add(&q->ql, &s->reqs);
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
        }
        else {
            assert(0);
        }
    }
}

/*
 * Local variables:
 *  c-indent-level: 4
 *  c-basic-offset: 4
 * End:
 *
 * vim: ft=c ts=8 sts=4 sw=4 expandtab
 */