model-net-sched-impl.c 18.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
/*
 * Copyright (C) 2014 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

#include <stdlib.h>
#include <assert.h>

#include "model-net-sched-impl.h"
11
12
13
14
#include <codes/model-net-sched.h>
#include <codes/model-net-method.h>
#include <codes/quicklist.h>
#include <codes/codes.h>
15

16
17
#define MN_SCHED_DEBUG_VERBOSE 0

18
19
20
21
#define dprintf(_fmt, ...) \
    do { \
        if (MN_SCHED_DEBUG_VERBOSE) printf(_fmt, ##__VA_ARGS__); \
    } while(0)
22

23
24
25
26
/// scheduler-specific data structures 

typedef struct mn_sched_qitem {
    model_net_request req;
27
    mn_sched_params sched_params;
28
29
    // remaining bytes to send
    uint64_t rem;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
30
    tw_stime entry_time;
31
32
33
34
35
36
37
    // pointers to event structures 
    // sizes are given in the request struct
    void * remote_event;
    void * local_event;
    struct qlist_head ql;
} mn_sched_qitem;

38
39
40
41
// fcfs and round-robin each use a single queue
typedef struct mn_sched_queue {
    // method containing packet event to call
    const struct model_net_method *method;
42
    int is_recv_queue;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
43
    int queue_len;
44
45
46
47
48
49
50
51
52
53
54
    struct qlist_head reqs; // of type mn_sched_qitem
} mn_sched_queue;

// priority scheduler consists of a bunch of rr/fcfs queues
typedef struct mn_sched_prio {
    mn_prio_params params;
    const model_net_sched_interface *sub_sched_iface;
    mn_sched_queue ** sub_scheds; // one for each params.num_prios
} mn_sched_prio;

/// scheduler-specific function decls and tables
55
56
57

/// FCFS
// void used to avoid ptr-to-ptr conv warnings
58
59
60
static void fcfs_init (
        const struct model_net_method     * method, 
        const model_net_sched_cfg_params  * params,
61
        int                                 is_recv_queue,
62
        void                             ** sched);
63
64
static void fcfs_destroy (void *sched);
static void fcfs_add (
65
66
67
68
69
70
71
72
73
74
        const model_net_request * req,
        const mn_sched_params   * sched_params,
        int                       remote_event_size,
        void                    * remote_event,
        int                       local_event_size,
        void                    * local_event,
        void                    * sched,
        model_net_sched_rc      * rc,
        tw_lp                   * lp);
static void fcfs_add_rc(void *sched, const model_net_sched_rc *rc, tw_lp *lp);
75
static int  fcfs_next(
76
77
78
79
80
        tw_stime              * poffset,
        void                  * sched,
        void                  * rc_event_save,
        model_net_sched_rc    * rc,
        tw_lp                 * lp);
81
static void fcfs_next_rc(
82
83
84
85
        void                     * sched,
        const void               * rc_event_save,
        const model_net_sched_rc * rc,
        tw_lp                    * lp);
86

87
88
89
90
// ROUND-ROBIN
static void rr_init (
        const struct model_net_method     * method, 
        const model_net_sched_cfg_params  * params,
91
        int                                 is_recv_queue,
92
        void                             ** sched);
93
94
static void rr_destroy (void *sched);
static void rr_add (
95
96
97
98
99
100
101
102
103
104
        const model_net_request * req,
        const mn_sched_params   * sched_params,
        int                       remote_event_size,
        void                    * remote_event,
        int                       local_event_size,
        void                    * local_event,
        void                    * sched,
        model_net_sched_rc      * rc,
        tw_lp                   * lp);
static void rr_add_rc(void *sched, const model_net_sched_rc *rc, tw_lp *lp);
105
static int  rr_next(
106
107
108
109
110
        tw_stime              * poffset,
        void                  * sched,
        void                  * rc_event_save,
        model_net_sched_rc    * rc,
        tw_lp                 * lp);
111
static void rr_next_rc (
112
113
114
115
        void                     * sched,
        const void               * rc_event_save,
        const model_net_sched_rc * rc,
        tw_lp                    * lp);
116
117
118
static void prio_init (
        const struct model_net_method     * method, 
        const model_net_sched_cfg_params  * params,
119
        int                                 is_recv_queue,
120
121
122
        void                             ** sched);
static void prio_destroy (void *sched);
static void prio_add (
123
124
125
126
127
128
129
130
131
132
        const model_net_request * req,
        const mn_sched_params   * sched_params,
        int                       remote_event_size,
        void                    * remote_event,
        int                       local_event_size,
        void                    * local_event,
        void                    * sched,
        model_net_sched_rc      * rc,
        tw_lp                   * lp);
static void prio_add_rc(void *sched, const model_net_sched_rc *rc, tw_lp *lp);
133
static int  prio_next(
134
135
136
137
138
        tw_stime              * poffset,
        void                  * sched,
        void                  * rc_event_save,
        model_net_sched_rc    * rc,
        tw_lp                 * lp);
139
static void prio_next_rc (
140
141
142
143
        void                     * sched,
        const void               * rc_event_save,
        const model_net_sched_rc * rc,
        tw_lp                    * lp);
144
145

/// function tables (names defined by X macro in model-net-sched.h)
146
static const model_net_sched_interface fcfs_tab = 
147
{ &fcfs_init, &fcfs_destroy, &fcfs_add, &fcfs_add_rc, &fcfs_next, &fcfs_next_rc};
148
static const model_net_sched_interface rr_tab = 
149
{ &rr_init, &rr_destroy, &rr_add, &rr_add_rc, &rr_next, &rr_next_rc};
150
151
static const model_net_sched_interface prio_tab =
{ &prio_init, &prio_destroy, &prio_add, &prio_add_rc, &prio_next, &prio_next_rc};
152
153

#define X(a,b,c) c,
154
const model_net_sched_interface * sched_interfaces[] = {
155
156
157
158
159
160
    SCHEDULER_TYPES
};
#undef X

/// FCFS implementation 

161
162
163
void fcfs_init(
        const struct model_net_method     * method, 
        const model_net_sched_cfg_params  * params,
164
        int                                 is_recv_queue,
165
        void                             ** sched){
166
    (void)params; // unused for fcfs
167
168
    *sched = malloc(sizeof(mn_sched_queue));
    mn_sched_queue *ss = *sched;
169
    ss->method = method;
170
    ss->is_recv_queue = is_recv_queue;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
171
    ss->queue_len = 0;
172
173
174
175
176
177
178
179
    INIT_QLIST_HEAD(&ss->reqs);
}

void fcfs_destroy(void *sched){
    free(sched);
}

void fcfs_add (
180
181
182
183
184
185
186
187
188
        const model_net_request * req,
        const mn_sched_params   * sched_params,
        int                       remote_event_size,
        void                    * remote_event,
        int                       local_event_size,
        void                    * local_event,
        void                    * sched,
        model_net_sched_rc      * rc,
        tw_lp                   * lp){
189
    (void)rc; // unneeded for fcfs
190
    mn_sched_qitem *q = malloc(sizeof(mn_sched_qitem));
Jonathan Jenkins's avatar
Jonathan Jenkins committed
191
    q->entry_time = tw_now(lp);
192
    q->req = *req;
193
    q->sched_params = *sched_params;
194
    q->rem = req->msg_size;
195
196
197
198
    if (remote_event_size > 0){
        q->remote_event = malloc(remote_event_size);
        memcpy(q->remote_event, remote_event, remote_event_size);
    }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
199
    else { q->remote_event = NULL; }
200
201
202
203
    if (local_event_size > 0){
        q->local_event = malloc(local_event_size);
        memcpy(q->local_event, local_event, local_event_size);
    }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
204
    else { q->local_event = NULL; }
205
    mn_sched_queue *s = sched;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
206
    s->queue_len++;
207
    qlist_add_tail(&q->ql, &s->reqs);
208
209
210
    dprintf("%llu (mn):    adding %srequest from %llu to %llu, size %llu, at %lf\n",
            LLU(lp->gid), req->is_pull ? "pull " : "", LLU(req->src_lp),
            LLU(req->final_dest_lp), LLU(req->msg_size), tw_now(lp));
211
212
}

213
void fcfs_add_rc(void *sched, const model_net_sched_rc *rc, tw_lp *lp){
214
    (void)rc;
215
    mn_sched_queue *s = sched;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
216
    s->queue_len--;
217
218
219
    struct qlist_head *ent = qlist_pop_back(&s->reqs);
    assert(ent != NULL);
    mn_sched_qitem *q = qlist_entry(ent, mn_sched_qitem, ql);
220
221
    dprintf("%llu (mn): rc adding request from %llu to %llu\n", LLU(lp->gid),
            LLU(q->req.src_lp), LLU(q->req.final_dest_lp));
Jonathan Jenkins's avatar
Jonathan Jenkins committed
222
223
224
    // free'ing NULLs is a no-op 
    free(q->remote_event);
    free(q->local_event);
225
226
227
    free(q);
}

228
int fcfs_next(
229
230
231
232
233
        tw_stime              * poffset,
        void                  * sched,
        void                  * rc_event_save,
        model_net_sched_rc    * rc,
        tw_lp                 * lp){
234
    mn_sched_queue *s = sched;
235
236
237
238
239
240
    struct qlist_head *ent = s->reqs.next;
    if (ent == &s->reqs){
        rc->rtn = -1;
        return -1;
    }
    mn_sched_qitem *q = qlist_entry(ent, mn_sched_qitem, ql);
241

242
243
244
245
246
247
248
249
250
251
252
253
    // issue the next packet
    int is_last_packet;
    uint64_t psize;
    if (q->req.packet_size >= q->rem) {
        psize = q->rem;
        is_last_packet = 1;
    }
    else{
        psize = q->req.packet_size;
        is_last_packet = 0;
    }

254
    if (s->is_recv_queue){
255
256
257
258
        dprintf("%llu (mn):    receiving message of size %llu (of %llu) "
                "from %llu to %llu at %1.5e (last:%d)\n",
                LLU(lp->gid), LLU(psize), LLU(q->rem), LLU(q->req.src_lp),
                LLU(q->req.final_dest_lp), tw_now(lp), is_last_packet);
259
260
        // note: we overloaded on the dest_mn_lp field - it's the dest of the
        // soruce in the case of a pull
261
        *poffset = s->method->model_net_method_recv_msg_event(q->req.category,
262
                q->req.final_dest_lp, q->req.dest_mn_lp, psize,
263
                q->req.is_pull, q->req.pull_size, 0.0, q->req.remote_event_size,
264
                q->remote_event, q->req.src_lp, lp);
265
266
    }
    else{
267
268
        dprintf("%llu (mn):    issuing packet of size %llu (of %llu) "
                "from %llu to %llu at %1.5e (last:%d)\n",
Jonathan Jenkins's avatar
Jonathan Jenkins committed
269
                LLU(lp->gid), LLU(psize), LLU(q->rem), LLU(q->req.src_lp),
270
                LLU(q->req.final_dest_lp), tw_now(lp), is_last_packet);
271
        *poffset = s->method->model_net_method_packet_event(&q->req,
272
273
                q->req.msg_size - q->rem, psize, 0.0, &q->sched_params,
                q->remote_event, q->local_event, lp, is_last_packet);
274
    }
275

Jonathan Jenkins's avatar
Jonathan Jenkins committed
276
    // if last packet - remove from list, free, save for rc
277
    if (is_last_packet){
278
        dprintf("last %spkt: %llu (%llu) to %llu, size %llu at %1.5e (pull:%d)\n",
Jonathan Jenkins's avatar
Jonathan Jenkins committed
279
                s->is_recv_queue ? "recv " : "send ",
280
281
                LLU(lp->gid), LLU(q->req.src_lp), LLU(q->req.final_dest_lp),
                LLU(q->req.is_pull ? q->req.pull_size : q->req.msg_size), tw_now(lp),
Jonathan Jenkins's avatar
Jonathan Jenkins committed
282
                q->req.is_pull);
283
        qlist_pop(&s->reqs);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
284
        s->queue_len--;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
285
        rc->req = q->req;
286
        rc->sched_params = q->sched_params;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
287
288
289
290
291
292
293
294
295
296
297
        void *e_dat = rc_event_save;
        if (q->req.remote_event_size > 0){
            memcpy(e_dat, q->remote_event, q->req.remote_event_size);
            e_dat = (char*) e_dat + q->req.remote_event_size;
            free(q->remote_event);
        }
        if (q->req.self_event_size > 0){
            memcpy(e_dat, q->local_event, q->req.self_event_size);
            free(q->local_event);
        }
        free(q);
298
299
300
301
302
303
304
305
306
        rc->rtn = 1;
    }
    else{
        q->rem -= psize;
        rc->rtn = 0;
    }
    return rc->rtn;
}

307
void fcfs_next_rc(
308
309
310
311
        void                     * sched,
        const void               * rc_event_save,
        const model_net_sched_rc * rc,
        tw_lp                    * lp){
312
    mn_sched_queue *s = sched;
313
314
315
316
    if (rc->rtn == -1){
        // no op
    }
    else{
317
        if (s->is_recv_queue){
318
            dprintf("%llu (mn): rc receiving message\n", LLU(lp->gid));
319
320
321
            s->method->model_net_method_recv_msg_event_rc(lp);
        }
        else {
322
            dprintf("%llu (mn): rc issuing packet\n", LLU(lp->gid));
323
324
            s->method->model_net_method_packet_event_rc(lp);
        }
325
326
327
328
329
330
331
        if (rc->rtn == 0){
            // just get the front and increment rem
            mn_sched_qitem *q = qlist_entry(s->reqs.next, mn_sched_qitem, ql);
            // just increment rem
            q->rem += q->req.packet_size;
        }
        else if (rc->rtn == 1){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
332
333
334
335
            // re-create the q item
            mn_sched_qitem *q = malloc(sizeof(mn_sched_qitem));
            assert(q);
            q->req = rc->req;
336
            q->sched_params = rc->sched_params;
337
            q->rem = q->req.msg_size % q->req.packet_size;
338
339
            // processed exactly a packet's worth of data
            if (q->rem == 0 && q->req.msg_size != 0){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
340
341
                q->rem = q->req.packet_size;
            }
342
            const void * e_dat = rc_event_save;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
343
344
345
            if (q->req.remote_event_size > 0){
                q->remote_event = malloc(q->req.remote_event_size);
                memcpy(q->remote_event, e_dat, q->req.remote_event_size);
346
                e_dat = (const char*) e_dat + q->req.remote_event_size;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
347
            }
348
            else { q->remote_event = NULL; }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
349
350
351
352
            if (q->req.self_event_size > 0) {
                q->local_event = malloc(q->req.self_event_size);
                memcpy(q->local_event, e_dat, q->req.self_event_size);
            }
353
            else { q->local_event = NULL; }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
354
355
            // add back to front of list
            qlist_add(&q->ql, &s->reqs);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
356
            s->queue_len++;
357
358
359
360
361
362
363
        }
        else {
            assert(0);
        }
    }
}

364
365
366
void rr_init (
        const struct model_net_method     * method, 
        const model_net_sched_cfg_params  * params,
367
        int                                 is_recv_queue,
368
369
        void                             ** sched){
    // same underlying representation
370
    fcfs_init(method, params, is_recv_queue, sched);
371
372
373
}

void rr_destroy (void *sched){
374
375
    // same underlying representation
    fcfs_destroy(sched);
376
377
378
}

void rr_add (
379
380
381
382
383
384
385
386
387
        const model_net_request * req,
        const mn_sched_params   * sched_params,
        int                       remote_event_size,
        void                    * remote_event,
        int                       local_event_size,
        void                    * local_event,
        void                    * sched,
        model_net_sched_rc      * rc,
        tw_lp                   * lp){
388
    fcfs_add(req, sched_params, remote_event_size, remote_event,
389
            local_event_size, local_event, sched, rc, lp);
390
}
391

392
void rr_add_rc(void *sched, const model_net_sched_rc *rc, tw_lp *lp){
393
    fcfs_add_rc(sched, rc, lp);
394
395
}

396
int rr_next(
397
398
399
400
401
        tw_stime              * poffset,
        void                  * sched,
        void                  * rc_event_save,
        model_net_sched_rc    * rc,
        tw_lp                 * lp){
402
403
404
405
406
407
408
409
    int ret = fcfs_next(poffset, sched, rc_event_save, rc, lp);
    // if error in fcfs or the request was finished & removed, then nothing to
    // do here
    if (ret == -1 || ret == 1)
        return ret;
    // otherwise request was successful, still in the queue
    else {
        mn_sched_queue *s = sched;
410
        qlist_add_tail(qlist_pop(&s->reqs), &s->reqs);
411
        return ret;
412
413
414
    }
}

415
void rr_next_rc (
416
417
418
419
        void                     * sched,
        const void               * rc_event_save,
        const model_net_sched_rc * rc,
        tw_lp                    * lp){
420
421
422
423
424
    // only time we need to do something apart from fcfs is on a successful
    // rr_next that didn't remove the item from the queue
    if (rc->rtn == 0){
        mn_sched_queue *s = sched;
        qlist_add(qlist_pop_back(&s->reqs), &s->reqs);
425
    }
426
    fcfs_next_rc(sched, rc_event_save, rc, lp);
427
428
}

429
430
431
void prio_init (
        const struct model_net_method     * method, 
        const model_net_sched_cfg_params  * params,
432
        int                                 is_recv_queue,
433
434
435
436
437
438
439
        void                             ** sched){
    *sched = malloc(sizeof(mn_sched_prio));
    mn_sched_prio *ss = *sched;
    ss->params = params->u.prio;
    ss->sub_scheds = malloc(ss->params.num_prios*sizeof(mn_sched_queue*));
    ss->sub_sched_iface = sched_interfaces[ss->params.sub_stype];
    for (int i = 0; i < ss->params.num_prios; i++){
440
441
        ss->sub_sched_iface->init(method, params, is_recv_queue,
                (void**)&ss->sub_scheds[i]);
442
443
444
445
446
447
448
449
450
451
452
453
454
    }
}

void prio_destroy (void *sched){
    mn_sched_prio *ss = sched;
    for (int i = 0; i < ss->params.num_prios; i++){
        ss->sub_sched_iface->destroy(ss->sub_scheds[i]);
        free(ss->sub_scheds);
        free(ss);
    }
}

void prio_add (
455
456
457
458
459
460
461
462
463
        const model_net_request * req,
        const mn_sched_params   * sched_params,
        int                       remote_event_size,
        void                    * remote_event,
        int                       local_event_size,
        void                    * local_event,
        void                    * sched,
        model_net_sched_rc      * rc,
        tw_lp                   * lp){
464
465
    // sched_msg_params is simply an int
    mn_sched_prio *ss = sched;
466
    int prio = sched_params->prio;
467
468
469
470
471
    if (prio == -1){
        // default prio - lowest possible 
        prio = ss->params.num_prios-1;
    }
    else if (prio >= ss->params.num_prios){
472
473
        tw_error(TW_LOC, "sched for lp %llu: invalid prio (%d vs [%d,%d))",
                LLU(lp->gid), prio, 0, ss->params.num_prios);
474
    }
475
    dprintf("%llu (mn):    adding with prio %d\n", LLU(lp->gid), prio);
476
477
478
    ss->sub_sched_iface->add(req, sched_params, remote_event_size,
            remote_event, local_event_size, local_event, ss->sub_scheds[prio],
            rc, lp);
479
480
481
    rc->prio = prio;
}

482
void prio_add_rc(void * sched, const model_net_sched_rc *rc, tw_lp *lp){
483
484
    // just call the sub scheduler's add_rc
    mn_sched_prio *ss = sched;
485
    dprintf("%llu (mn): rc adding with prio %d\n", LLU(lp->gid), rc->prio);
486
487
488
489
    ss->sub_sched_iface->add_rc(ss->sub_scheds[rc->prio], rc, lp);
}

int prio_next(
490
491
492
493
494
        tw_stime              * poffset,
        void                  * sched,
        void                  * rc_event_save,
        model_net_sched_rc    * rc,
        tw_lp                 * lp){
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
    // check each priority, first one that's non-empty gets the next
    mn_sched_prio *ss = sched;
    for (int i = 0; i < ss->params.num_prios; i++){
        // TODO: this works for now while the other schedulers have the same
        // internal representation
        if (!qlist_empty(&ss->sub_scheds[i]->reqs)){
            rc->prio = i;
            return ss->sub_sched_iface->next(
                    poffset, ss->sub_scheds[i], rc_event_save, rc, lp);
        }
    }
    rc->prio = -1;
    return -1; // all sub schedulers had no work 
}

void prio_next_rc (
511
512
513
514
        void                     * sched,
        const void               * rc_event_save,
        const model_net_sched_rc * rc,
        tw_lp                    * lp){
515
516
517
518
519
520
521
522
523
    if (rc->prio != -1){
        // we called a next somewhere
        mn_sched_prio *ss = sched;
        ss->sub_sched_iface->next_rc(ss->sub_scheds[rc->prio], rc_event_save,
                rc, lp);
    }
    // else, no-op
}

524
525
526
527
528
529
530
531
/*
 * Local variables:
 *  c-indent-level: 4
 *  c-basic-offset: 4
 * End:
 *
 * vim: ft=c ts=8 sts=4 sw=4 expandtab
 */