model-net-sched-impl.c 18.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (C) 2014 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

#include <stdlib.h>
#include <assert.h>

#include "model-net-sched-impl.h"
#include "codes/model-net-sched.h"
#include "codes/model-net-method.h"
#include "codes/quicklist.h"

15 16
#define MN_SCHED_DEBUG_VERBOSE 0

17 18 19 20
#define dprintf(_fmt, ...) \
    do { \
        if (MN_SCHED_DEBUG_VERBOSE) printf(_fmt, ##__VA_ARGS__); \
    } while(0)
21

22 23 24 25
/// scheduler-specific data structures 

typedef struct mn_sched_qitem {
    model_net_request req;
26
    mn_sched_params sched_params;
27 28
    // remaining bytes to send
    uint64_t rem;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
29
    tw_stime entry_time;
30 31 32 33 34 35 36
    // pointers to event structures 
    // sizes are given in the request struct
    void * remote_event;
    void * local_event;
    struct qlist_head ql;
} mn_sched_qitem;

37 38 39 40
// fcfs and round-robin each use a single queue
typedef struct mn_sched_queue {
    // method containing packet event to call
    const struct model_net_method *method;
41
    int is_recv_queue;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
42
    int queue_len;
43 44 45 46 47 48 49 50 51 52 53
    struct qlist_head reqs; // of type mn_sched_qitem
} mn_sched_queue;

// priority scheduler consists of a bunch of rr/fcfs queues
typedef struct mn_sched_prio {
    mn_prio_params params;
    const model_net_sched_interface *sub_sched_iface;
    mn_sched_queue ** sub_scheds; // one for each params.num_prios
} mn_sched_prio;

/// scheduler-specific function decls and tables
54 55 56

/// FCFS
// void used to avoid ptr-to-ptr conv warnings
57 58 59
static void fcfs_init (
        const struct model_net_method     * method, 
        const model_net_sched_cfg_params  * params,
60
        int                                 is_recv_queue,
61
        void                             ** sched);
62 63
static void fcfs_destroy (void *sched);
static void fcfs_add (
64 65 66 67 68 69 70 71 72 73
        const model_net_request * req,
        const mn_sched_params   * sched_params,
        int                       remote_event_size,
        void                    * remote_event,
        int                       local_event_size,
        void                    * local_event,
        void                    * sched,
        model_net_sched_rc      * rc,
        tw_lp                   * lp);
static void fcfs_add_rc(void *sched, const model_net_sched_rc *rc, tw_lp *lp);
74
static int  fcfs_next(
75 76 77 78 79
        tw_stime              * poffset,
        void                  * sched,
        void                  * rc_event_save,
        model_net_sched_rc    * rc,
        tw_lp                 * lp);
80
static void fcfs_next_rc(
81 82 83 84
        void                     * sched,
        const void               * rc_event_save,
        const model_net_sched_rc * rc,
        tw_lp                    * lp);
85

86 87 88 89
// ROUND-ROBIN
static void rr_init (
        const struct model_net_method     * method, 
        const model_net_sched_cfg_params  * params,
90
        int                                 is_recv_queue,
91
        void                             ** sched);
92 93
static void rr_destroy (void *sched);
static void rr_add (
94 95 96 97 98 99 100 101 102 103
        const model_net_request * req,
        const mn_sched_params   * sched_params,
        int                       remote_event_size,
        void                    * remote_event,
        int                       local_event_size,
        void                    * local_event,
        void                    * sched,
        model_net_sched_rc      * rc,
        tw_lp                   * lp);
static void rr_add_rc(void *sched, const model_net_sched_rc *rc, tw_lp *lp);
104
static int  rr_next(
105 106 107 108 109
        tw_stime              * poffset,
        void                  * sched,
        void                  * rc_event_save,
        model_net_sched_rc    * rc,
        tw_lp                 * lp);
110
static void rr_next_rc (
111 112 113 114
        void                     * sched,
        const void               * rc_event_save,
        const model_net_sched_rc * rc,
        tw_lp                    * lp);
115 116 117
static void prio_init (
        const struct model_net_method     * method, 
        const model_net_sched_cfg_params  * params,
118
        int                                 is_recv_queue,
119 120 121
        void                             ** sched);
static void prio_destroy (void *sched);
static void prio_add (
122 123 124 125 126 127 128 129 130 131
        const model_net_request * req,
        const mn_sched_params   * sched_params,
        int                       remote_event_size,
        void                    * remote_event,
        int                       local_event_size,
        void                    * local_event,
        void                    * sched,
        model_net_sched_rc      * rc,
        tw_lp                   * lp);
static void prio_add_rc(void *sched, const model_net_sched_rc *rc, tw_lp *lp);
132
static int  prio_next(
133 134 135 136 137
        tw_stime              * poffset,
        void                  * sched,
        void                  * rc_event_save,
        model_net_sched_rc    * rc,
        tw_lp                 * lp);
138
static void prio_next_rc (
139 140 141 142
        void                     * sched,
        const void               * rc_event_save,
        const model_net_sched_rc * rc,
        tw_lp                    * lp);
143 144

/// function tables (names defined by X macro in model-net-sched.h)
145
static const model_net_sched_interface fcfs_tab = 
146
{ &fcfs_init, &fcfs_destroy, &fcfs_add, &fcfs_add_rc, &fcfs_next, &fcfs_next_rc};
147
static const model_net_sched_interface rr_tab = 
148
{ &rr_init, &rr_destroy, &rr_add, &rr_add_rc, &rr_next, &rr_next_rc};
149 150
static const model_net_sched_interface prio_tab =
{ &prio_init, &prio_destroy, &prio_add, &prio_add_rc, &prio_next, &prio_next_rc};
151 152

#define X(a,b,c) c,
153
const model_net_sched_interface * sched_interfaces[] = {
154 155 156 157 158 159
    SCHEDULER_TYPES
};
#undef X

/// FCFS implementation 

160 161 162
void fcfs_init(
        const struct model_net_method     * method, 
        const model_net_sched_cfg_params  * params,
163
        int                                 is_recv_queue,
164 165 166
        void                             ** sched){
    *sched = malloc(sizeof(mn_sched_queue));
    mn_sched_queue *ss = *sched;
167
    ss->method = method;
168
    ss->is_recv_queue = is_recv_queue;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
169
    ss->queue_len = 0;
170 171 172 173 174 175 176 177
    INIT_QLIST_HEAD(&ss->reqs);
}

void fcfs_destroy(void *sched){
    free(sched);
}

void fcfs_add (
178 179 180 181 182 183 184 185 186
        const model_net_request * req,
        const mn_sched_params   * sched_params,
        int                       remote_event_size,
        void                    * remote_event,
        int                       local_event_size,
        void                    * local_event,
        void                    * sched,
        model_net_sched_rc      * rc,
        tw_lp                   * lp){
187
    mn_sched_qitem *q = malloc(sizeof(mn_sched_qitem));
Jonathan Jenkins's avatar
Jonathan Jenkins committed
188
    q->entry_time = tw_now(lp);
189
    q->req = *req;
190
    q->sched_params = *sched_params;
191
    q->rem = req->msg_size;
192 193 194 195
    if (remote_event_size > 0){
        q->remote_event = malloc(remote_event_size);
        memcpy(q->remote_event, remote_event, remote_event_size);
    }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
196
    else { q->remote_event = NULL; }
197 198 199 200
    if (local_event_size > 0){
        q->local_event = malloc(local_event_size);
        memcpy(q->local_event, local_event, local_event_size);
    }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
201
    else { q->local_event = NULL; }
202
    mn_sched_queue *s = sched;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
203
    s->queue_len++;
204
    qlist_add_tail(&q->ql, &s->reqs);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
205 206 207
    dprintf("%lu (mn):    adding %srequest from %lu to %lu, size %lu, at %lf\n",
            lp->gid, req->is_pull ? "pull " : "", req->src_lp,
            req->final_dest_lp, req->msg_size, tw_now(lp));
208 209
}

210
void fcfs_add_rc(void *sched, const model_net_sched_rc *rc, tw_lp *lp){
211
    mn_sched_queue *s = sched;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
212
    s->queue_len--;
213 214 215
    struct qlist_head *ent = qlist_pop_back(&s->reqs);
    assert(ent != NULL);
    mn_sched_qitem *q = qlist_entry(ent, mn_sched_qitem, ql);
216 217
    dprintf("%lu (mn): rc adding request from %lu to %lu\n", lp->gid,
            q->req.src_lp, q->req.final_dest_lp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
218 219 220
    // free'ing NULLs is a no-op 
    free(q->remote_event);
    free(q->local_event);
221 222 223
    free(q);
}

224
int fcfs_next(
225 226 227 228 229
        tw_stime              * poffset,
        void                  * sched,
        void                  * rc_event_save,
        model_net_sched_rc    * rc,
        tw_lp                 * lp){
230
    mn_sched_queue *s = sched;
231 232 233 234 235 236
    struct qlist_head *ent = s->reqs.next;
    if (ent == &s->reqs){
        rc->rtn = -1;
        return -1;
    }
    mn_sched_qitem *q = qlist_entry(ent, mn_sched_qitem, ql);
237

238 239 240 241 242 243 244 245 246 247 248 249
    // issue the next packet
    int is_last_packet;
    uint64_t psize;
    if (q->req.packet_size >= q->rem) {
        psize = q->rem;
        is_last_packet = 1;
    }
    else{
        psize = q->req.packet_size;
        is_last_packet = 0;
    }

250 251
    if (s->is_recv_queue){
        dprintf("%lu (mn):    receiving message of size %lu (of %lu) "
Jonathan Jenkins's avatar
Jonathan Jenkins committed
252
                "from %lu to %lu at %1.5e (last:%d)\n",
Jonathan Jenkins's avatar
Jonathan Jenkins committed
253
                lp->gid, psize, q->rem, q->req.src_lp, q->req.final_dest_lp,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
254
                tw_now(lp), is_last_packet);
255 256
        // note: we overloaded on the dest_mn_lp field - it's the dest of the
        // soruce in the case of a pull
257
        *poffset = s->method->model_net_method_recv_msg_event(q->req.category,
258
                q->req.final_dest_lp, q->req.dest_mn_lp, psize,
259
                q->req.is_pull, q->req.pull_size, 0.0, q->req.remote_event_size,
260
                q->remote_event, q->req.src_lp, lp);
261 262 263
    }
    else{
        dprintf("%lu (mn):    issuing packet of size %lu (of %lu) "
Jonathan Jenkins's avatar
Jonathan Jenkins committed
264
                "from %lu to %lu at %1.5e (last:%d)\n",
Jonathan Jenkins's avatar
Jonathan Jenkins committed
265
                lp->gid, psize, q->rem, q->req.src_lp, q->req.final_dest_lp,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
266
                tw_now(lp), is_last_packet);
267 268 269

        /* TODO: Add correct message ID here */
        *poffset = s->method->model_net_method_packet_event(&q->req, 0, 
270 271
                q->req.msg_size - q->rem, psize, 0.0, &q->sched_params,
                q->remote_event, q->local_event, lp, is_last_packet);
272
    }
273

Jonathan Jenkins's avatar
Jonathan Jenkins committed
274
    // if last packet - remove from list, free, save for rc
275
    if (is_last_packet){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
276 277 278
        dprintf("last %spkt: %lu (%lu) to %lu, size %lu at %1.5e (pull:%d)\n",
                s->is_recv_queue ? "recv " : "send ",
                lp->gid, q->req.src_lp, q->req.final_dest_lp,
279
                q->req.is_pull ? q->req.pull_size : q->req.msg_size, tw_now(lp),
Jonathan Jenkins's avatar
Jonathan Jenkins committed
280
                q->req.is_pull);
281
        qlist_pop(&s->reqs);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
282
        s->queue_len--;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
283
        rc->req = q->req;
284
        rc->sched_params = q->sched_params;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
285 286 287 288 289 290 291 292 293 294 295
        void *e_dat = rc_event_save;
        if (q->req.remote_event_size > 0){
            memcpy(e_dat, q->remote_event, q->req.remote_event_size);
            e_dat = (char*) e_dat + q->req.remote_event_size;
            free(q->remote_event);
        }
        if (q->req.self_event_size > 0){
            memcpy(e_dat, q->local_event, q->req.self_event_size);
            free(q->local_event);
        }
        free(q);
296 297 298 299 300 301 302 303 304
        rc->rtn = 1;
    }
    else{
        q->rem -= psize;
        rc->rtn = 0;
    }
    return rc->rtn;
}

305
void fcfs_next_rc(
306 307 308 309
        void                     * sched,
        const void               * rc_event_save,
        const model_net_sched_rc * rc,
        tw_lp                    * lp){
310
    mn_sched_queue *s = sched;
311 312 313 314
    if (rc->rtn == -1){
        // no op
    }
    else{
315 316 317 318 319 320 321 322
        if (s->is_recv_queue){
            dprintf("%lu (mn): rc receiving message\n", lp->gid);
            s->method->model_net_method_recv_msg_event_rc(lp);
        }
        else {
            dprintf("%lu (mn): rc issuing packet\n", lp->gid);
            s->method->model_net_method_packet_event_rc(lp);
        }
323 324 325 326 327 328 329
        if (rc->rtn == 0){
            // just get the front and increment rem
            mn_sched_qitem *q = qlist_entry(s->reqs.next, mn_sched_qitem, ql);
            // just increment rem
            q->rem += q->req.packet_size;
        }
        else if (rc->rtn == 1){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
330 331 332 333
            // re-create the q item
            mn_sched_qitem *q = malloc(sizeof(mn_sched_qitem));
            assert(q);
            q->req = rc->req;
334
            q->sched_params = rc->sched_params;
335
            q->rem = q->req.msg_size % q->req.packet_size;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
336 337 338
            if (q->rem == 0){ // processed exactly a packet's worth of data
                q->rem = q->req.packet_size;
            }
339
            const void * e_dat = rc_event_save;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
340 341 342
            if (q->req.remote_event_size > 0){
                q->remote_event = malloc(q->req.remote_event_size);
                memcpy(q->remote_event, e_dat, q->req.remote_event_size);
343
                e_dat = (const char*) e_dat + q->req.remote_event_size;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
344
            }
345
            else { q->remote_event = NULL; }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
346 347 348 349
            if (q->req.self_event_size > 0) {
                q->local_event = malloc(q->req.self_event_size);
                memcpy(q->local_event, e_dat, q->req.self_event_size);
            }
350
            else { q->local_event = NULL; }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
351 352
            // add back to front of list
            qlist_add(&q->ql, &s->reqs);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
353
            s->queue_len++;
354 355 356 357 358 359 360
        }
        else {
            assert(0);
        }
    }
}

361 362 363
void rr_init (
        const struct model_net_method     * method, 
        const model_net_sched_cfg_params  * params,
364
        int                                 is_recv_queue,
365 366
        void                             ** sched){
    // same underlying representation
367
    fcfs_init(method, params, is_recv_queue, sched);
368 369 370
}

void rr_destroy (void *sched){
371 372
    // same underlying representation
    fcfs_destroy(sched);
373 374 375
}

void rr_add (
376 377 378 379 380 381 382 383 384
        const model_net_request * req,
        const mn_sched_params   * sched_params,
        int                       remote_event_size,
        void                    * remote_event,
        int                       local_event_size,
        void                    * local_event,
        void                    * sched,
        model_net_sched_rc      * rc,
        tw_lp                   * lp){
385
    fcfs_add(req, sched_params, remote_event_size, remote_event,
386
            local_event_size, local_event, sched, rc, lp);
387
}
388

389
void rr_add_rc(void *sched, const model_net_sched_rc *rc, tw_lp *lp){
390
    fcfs_add_rc(sched, rc, lp);
391 392
}

393
int rr_next(
394 395 396 397 398
        tw_stime              * poffset,
        void                  * sched,
        void                  * rc_event_save,
        model_net_sched_rc    * rc,
        tw_lp                 * lp){
399 400 401 402 403 404 405 406
    int ret = fcfs_next(poffset, sched, rc_event_save, rc, lp);
    // if error in fcfs or the request was finished & removed, then nothing to
    // do here
    if (ret == -1 || ret == 1)
        return ret;
    // otherwise request was successful, still in the queue
    else {
        mn_sched_queue *s = sched;
407
        qlist_add_tail(qlist_pop(&s->reqs), &s->reqs);
408
        return ret;
409 410 411
    }
}

412
void rr_next_rc (
413 414 415 416
        void                     * sched,
        const void               * rc_event_save,
        const model_net_sched_rc * rc,
        tw_lp                    * lp){
417 418 419 420 421
    // only time we need to do something apart from fcfs is on a successful
    // rr_next that didn't remove the item from the queue
    if (rc->rtn == 0){
        mn_sched_queue *s = sched;
        qlist_add(qlist_pop_back(&s->reqs), &s->reqs);
422
    }
423
    fcfs_next_rc(sched, rc_event_save, rc, lp);
424 425
}

426 427 428
void prio_init (
        const struct model_net_method     * method, 
        const model_net_sched_cfg_params  * params,
429
        int                                 is_recv_queue,
430 431 432 433 434 435 436
        void                             ** sched){
    *sched = malloc(sizeof(mn_sched_prio));
    mn_sched_prio *ss = *sched;
    ss->params = params->u.prio;
    ss->sub_scheds = malloc(ss->params.num_prios*sizeof(mn_sched_queue*));
    ss->sub_sched_iface = sched_interfaces[ss->params.sub_stype];
    for (int i = 0; i < ss->params.num_prios; i++){
437 438
        ss->sub_sched_iface->init(method, params, is_recv_queue,
                (void**)&ss->sub_scheds[i]);
439 440 441 442 443 444 445 446 447 448 449 450 451
    }
}

void prio_destroy (void *sched){
    mn_sched_prio *ss = sched;
    for (int i = 0; i < ss->params.num_prios; i++){
        ss->sub_sched_iface->destroy(ss->sub_scheds[i]);
        free(ss->sub_scheds);
        free(ss);
    }
}

void prio_add (
452 453 454 455 456 457 458 459 460
        const model_net_request * req,
        const mn_sched_params   * sched_params,
        int                       remote_event_size,
        void                    * remote_event,
        int                       local_event_size,
        void                    * local_event,
        void                    * sched,
        model_net_sched_rc      * rc,
        tw_lp                   * lp){
461 462
    // sched_msg_params is simply an int
    mn_sched_prio *ss = sched;
463
    int prio = sched_params->prio;
464 465 466 467 468 469 470 471
    if (prio == -1){
        // default prio - lowest possible 
        prio = ss->params.num_prios-1;
    }
    else if (prio >= ss->params.num_prios){
        tw_error(TW_LOC, "sched for lp %lu: invalid prio (%d vs [%d,%d))",
                lp->gid, prio, 0, ss->params.num_prios);
    }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
472
    dprintf("%lu (mn):    adding with prio %d\n", lp->gid, prio);
473 474 475
    ss->sub_sched_iface->add(req, sched_params, remote_event_size,
            remote_event, local_event_size, local_event, ss->sub_scheds[prio],
            rc, lp);
476 477 478
    rc->prio = prio;
}

479
void prio_add_rc(void * sched, const model_net_sched_rc *rc, tw_lp *lp){
480 481
    // just call the sub scheduler's add_rc
    mn_sched_prio *ss = sched;
482
    dprintf("%lu (mn): rc adding with prio %d\n", lp->gid, rc->prio);
483 484 485 486
    ss->sub_sched_iface->add_rc(ss->sub_scheds[rc->prio], rc, lp);
}

int prio_next(
487 488 489 490 491
        tw_stime              * poffset,
        void                  * sched,
        void                  * rc_event_save,
        model_net_sched_rc    * rc,
        tw_lp                 * lp){
492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507
    // check each priority, first one that's non-empty gets the next
    mn_sched_prio *ss = sched;
    for (int i = 0; i < ss->params.num_prios; i++){
        // TODO: this works for now while the other schedulers have the same
        // internal representation
        if (!qlist_empty(&ss->sub_scheds[i]->reqs)){
            rc->prio = i;
            return ss->sub_sched_iface->next(
                    poffset, ss->sub_scheds[i], rc_event_save, rc, lp);
        }
    }
    rc->prio = -1;
    return -1; // all sub schedulers had no work 
}

void prio_next_rc (
508 509 510 511
        void                     * sched,
        const void               * rc_event_save,
        const model_net_sched_rc * rc,
        tw_lp                    * lp){
512 513 514 515 516 517 518 519 520
    if (rc->prio != -1){
        // we called a next somewhere
        mn_sched_prio *ss = sched;
        ss->sub_sched_iface->next_rc(ss->sub_scheds[rc->prio], rc_event_save,
                rc, lp);
    }
    // else, no-op
}

521 522 523 524 525 526 527 528
/*
 * Local variables:
 *  c-indent-level: 4
 *  c-basic-offset: 4
 * End:
 *
 * vim: ft=c ts=8 sts=4 sw=4 expandtab
 */