model-net-sched-impl.c 17.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (C) 2014 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

#include <stdlib.h>
#include <assert.h>

#include "model-net-sched-impl.h"
#include "codes/model-net-sched.h"
#include "codes/model-net-method.h"
#include "codes/quicklist.h"

15 16 17
#define MN_SCHED_DEBUG_VERBOSE 0

#if MN_SCHED_DEBUG_VERBOSE
18
#define dprintf(_fmt, ...) printf(_fmt, ##__VA_ARGS__)
19 20 21 22
#else
#define dprintf(_fmt, ...)
#endif

23 24 25 26
/// scheduler-specific data structures 

typedef struct mn_sched_qitem {
    model_net_request req;
27
    mn_sched_params sched_params;
28 29 30 31 32 33 34 35 36
    // remaining bytes to send
    uint64_t rem;
    // pointers to event structures 
    // sizes are given in the request struct
    void * remote_event;
    void * local_event;
    struct qlist_head ql;
} mn_sched_qitem;

37 38 39 40
// fcfs and round-robin each use a single queue
typedef struct mn_sched_queue {
    // method containing packet event to call
    const struct model_net_method *method;
41
    int is_recv_queue;
42 43 44 45 46 47 48 49 50 51 52
    struct qlist_head reqs; // of type mn_sched_qitem
} mn_sched_queue;

// priority scheduler consists of a bunch of rr/fcfs queues
typedef struct mn_sched_prio {
    mn_prio_params params;
    const model_net_sched_interface *sub_sched_iface;
    mn_sched_queue ** sub_scheds; // one for each params.num_prios
} mn_sched_prio;

/// scheduler-specific function decls and tables
53 54 55

/// FCFS
// void used to avoid ptr-to-ptr conv warnings
56 57 58
static void fcfs_init (
        const struct model_net_method     * method, 
        const model_net_sched_cfg_params  * params,
59
        int                                 is_recv_queue,
60
        void                             ** sched);
61 62
static void fcfs_destroy (void *sched);
static void fcfs_add (
63 64 65 66 67 68 69 70 71
        model_net_request     * req,
        const mn_sched_params * sched_params,
        int                     remote_event_size,
        void                  * remote_event,
        int                     local_event_size,
        void                  * local_event,
        void                  * sched,
        model_net_sched_rc    * rc,
        tw_lp                 * lp);
72
static void fcfs_add_rc(void *sched, model_net_sched_rc *rc, tw_lp *lp);
73
static int  fcfs_next(
74 75 76 77 78
        tw_stime              * poffset,
        void                  * sched,
        void                  * rc_event_save,
        model_net_sched_rc    * rc,
        tw_lp                 * lp);
79 80 81 82 83
static void fcfs_next_rc(
        void               * sched,
        void               * rc_event_save,
        model_net_sched_rc * rc,
        tw_lp              * lp);
84

85 86 87 88
// ROUND-ROBIN
static void rr_init (
        const struct model_net_method     * method, 
        const model_net_sched_cfg_params  * params,
89
        int                                 is_recv_queue,
90
        void                             ** sched);
91 92
static void rr_destroy (void *sched);
static void rr_add (
93 94 95 96 97 98 99 100 101
        model_net_request     * req,
        const mn_sched_params * sched_params,
        int                     remote_event_size,
        void                  * remote_event,
        int                     local_event_size,
        void                  * local_event,
        void                  * sched,
        model_net_sched_rc    * rc,
        tw_lp                 * lp);
102
static void rr_add_rc(void *sched, model_net_sched_rc *rc, tw_lp *lp);
103
static int  rr_next(
104 105 106 107 108
        tw_stime              * poffset,
        void                  * sched,
        void                  * rc_event_save,
        model_net_sched_rc    * rc,
        tw_lp                 * lp);
109 110 111 112 113 114 115 116
static void rr_next_rc (
        void               * sched,
        void               * rc_event_save,
        model_net_sched_rc * rc,
        tw_lp              * lp);
static void prio_init (
        const struct model_net_method     * method, 
        const model_net_sched_cfg_params  * params,
117
        int                                 is_recv_queue,
118 119 120
        void                             ** sched);
static void prio_destroy (void *sched);
static void prio_add (
121 122 123 124 125 126 127 128 129
        model_net_request     * req,
        const mn_sched_params * sched_params,
        int                     remote_event_size,
        void                  * remote_event,
        int                     local_event_size,
        void                  * local_event,
        void                  * sched,
        model_net_sched_rc    * rc,
        tw_lp                 * lp);
130 131
static void prio_add_rc(void *sched, model_net_sched_rc *rc, tw_lp *lp);
static int  prio_next(
132 133 134 135 136
        tw_stime              * poffset,
        void                  * sched,
        void                  * rc_event_save,
        model_net_sched_rc    * rc,
        tw_lp                 * lp);
137 138 139 140 141
static void prio_next_rc (
        void               * sched,
        void               * rc_event_save,
        model_net_sched_rc * rc,
        tw_lp              * lp);
142 143

/// function tables (names defined by X macro in model-net-sched.h)
144
static const model_net_sched_interface fcfs_tab = 
145
{ &fcfs_init, &fcfs_destroy, &fcfs_add, &fcfs_add_rc, &fcfs_next, &fcfs_next_rc};
146
static const model_net_sched_interface rr_tab = 
147
{ &rr_init, &rr_destroy, &rr_add, &rr_add_rc, &rr_next, &rr_next_rc};
148 149
static const model_net_sched_interface prio_tab =
{ &prio_init, &prio_destroy, &prio_add, &prio_add_rc, &prio_next, &prio_next_rc};
150 151

#define X(a,b,c) c,
152
const model_net_sched_interface * sched_interfaces[] = {
153 154 155 156 157 158
    SCHEDULER_TYPES
};
#undef X

/// FCFS implementation 

159 160 161
void fcfs_init(
        const struct model_net_method     * method, 
        const model_net_sched_cfg_params  * params,
162
        int                                 is_recv_queue,
163 164 165
        void                             ** sched){
    *sched = malloc(sizeof(mn_sched_queue));
    mn_sched_queue *ss = *sched;
166
    ss->method = method;
167
    ss->is_recv_queue = is_recv_queue;
168 169 170 171 172 173 174 175
    INIT_QLIST_HEAD(&ss->reqs);
}

void fcfs_destroy(void *sched){
    free(sched);
}

void fcfs_add (
176 177 178 179 180 181 182 183 184
        model_net_request     * req,
        const mn_sched_params * sched_params,
        int                     remote_event_size,
        void                  * remote_event,
        int                     local_event_size,
        void                  * local_event,
        void                  * sched,
        model_net_sched_rc    * rc,
        tw_lp                 * lp){
185 186
    mn_sched_qitem *q = malloc(sizeof(mn_sched_qitem));
    q->req = *req;
187
    q->sched_params = *sched_params;
188 189 190 191 192
    q->rem = req->is_pull ? PULL_MSG_SIZE : req->msg_size;
    if (remote_event_size > 0){
        q->remote_event = malloc(remote_event_size);
        memcpy(q->remote_event, remote_event, remote_event_size);
    }
193
    else { q->remote_event = NULL; }
194 195 196 197
    if (local_event_size > 0){
        q->local_event = malloc(local_event_size);
        memcpy(q->local_event, local_event, local_event_size);
    }
198
    else { q->local_event = NULL; }
199
    mn_sched_queue *s = sched;
200
    qlist_add_tail(&q->ql, &s->reqs);
201 202
    dprintf("%lu (mn):    adding request from %lu to %lu\n", lp->gid,
            req->src_lp, req->final_dest_lp);
203 204 205
}

void fcfs_add_rc(void *sched, model_net_sched_rc *rc, tw_lp *lp){
206
    mn_sched_queue *s = sched;
207 208 209
    struct qlist_head *ent = qlist_pop_back(&s->reqs);
    assert(ent != NULL);
    mn_sched_qitem *q = qlist_entry(ent, mn_sched_qitem, ql);
210 211
    dprintf("%lu (mn): rc adding request from %lu to %lu\n", lp->gid,
            q->req.src_lp, q->req.final_dest_lp);
212 213 214
    // free'ing NULLs is a no-op 
    free(q->remote_event);
    free(q->local_event);
215 216 217
    free(q);
}

218
int fcfs_next(
219 220 221 222 223
        tw_stime              * poffset,
        void                  * sched,
        void                  * rc_event_save,
        model_net_sched_rc    * rc,
        tw_lp                 * lp){
224
    mn_sched_queue *s = sched;
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
    struct qlist_head *ent = s->reqs.next;
    if (ent == &s->reqs){
        rc->rtn = -1;
        return -1;
    }
    mn_sched_qitem *q = qlist_entry(ent, mn_sched_qitem, ql);
    
    // issue the next packet
    int is_last_packet;
    uint64_t psize;
    if (q->req.packet_size >= q->rem) {
        psize = q->rem;
        is_last_packet = 1;
    }
    else{
        psize = q->req.packet_size;
        is_last_packet = 0;
    }

244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
    if (s->is_recv_queue){
        dprintf("%lu (mn):    receiving message of size %lu (of %lu) "
                "from %lu to %lu\n",
                lp->gid, psize, q->rem, q->req.src_lp, q->req.final_dest_lp);
        *poffset = s->method->model_net_method_recv_msg_event(q->req.category,
                q->req.final_dest_lp, psize, q->req.is_pull, q->req.msg_size,
                0.0, q->req.remote_event_size, q->remote_event, q->req.src_lp,
                lp);
    }
    else{
        dprintf("%lu (mn):    issuing packet of size %lu (of %lu) "
                "from %lu to %lu\n",
                lp->gid, psize, q->rem, q->req.src_lp, q->req.final_dest_lp);
        *poffset = s->method->model_net_method_packet_event(q->req.category,
                q->req.final_dest_lp, psize, q->req.is_pull, q->req.msg_size,
                0.0, &q->sched_params, q->req.remote_event_size, q->remote_event,
                q->req.self_event_size, q->local_event, q->req.src_lp, lp,
                is_last_packet);
    }
263

264
    // if last packet - remove from list, free, save for rc
265 266
    if (is_last_packet){
        qlist_pop(&s->reqs);
267
        rc->req = q->req;
268
        rc->sched_params = q->sched_params;
269 270 271 272 273 274 275 276 277 278 279
        void *e_dat = rc_event_save;
        if (q->req.remote_event_size > 0){
            memcpy(e_dat, q->remote_event, q->req.remote_event_size);
            e_dat = (char*) e_dat + q->req.remote_event_size;
            free(q->remote_event);
        }
        if (q->req.self_event_size > 0){
            memcpy(e_dat, q->local_event, q->req.self_event_size);
            free(q->local_event);
        }
        free(q);
280 281 282 283 284 285 286 287 288
        rc->rtn = 1;
    }
    else{
        q->rem -= psize;
        rc->rtn = 0;
    }
    return rc->rtn;
}

289 290 291 292 293 294
void fcfs_next_rc(
        void               * sched,
        void               * rc_event_save,
        model_net_sched_rc * rc,
        tw_lp              * lp){
    mn_sched_queue *s = sched;
295 296 297 298
    if (rc->rtn == -1){
        // no op
    }
    else{
299 300 301 302 303 304 305 306
        if (s->is_recv_queue){
            dprintf("%lu (mn): rc receiving message\n", lp->gid);
            s->method->model_net_method_recv_msg_event_rc(lp);
        }
        else {
            dprintf("%lu (mn): rc issuing packet\n", lp->gid);
            s->method->model_net_method_packet_event_rc(lp);
        }
307 308 309 310 311 312 313
        if (rc->rtn == 0){
            // just get the front and increment rem
            mn_sched_qitem *q = qlist_entry(s->reqs.next, mn_sched_qitem, ql);
            // just increment rem
            q->rem += q->req.packet_size;
        }
        else if (rc->rtn == 1){
314 315 316 317
            // re-create the q item
            mn_sched_qitem *q = malloc(sizeof(mn_sched_qitem));
            assert(q);
            q->req = rc->req;
318
            q->sched_params = rc->sched_params;
319 320
            q->rem = (q->req.is_pull ? PULL_MSG_SIZE : q->req.msg_size) % 
                q->req.packet_size;
321 322 323 324 325 326 327 328 329
            if (q->rem == 0){ // processed exactly a packet's worth of data
                q->rem = q->req.packet_size;
            }
            void * e_dat = rc_event_save;
            if (q->req.remote_event_size > 0){
                q->remote_event = malloc(q->req.remote_event_size);
                memcpy(q->remote_event, e_dat, q->req.remote_event_size);
                e_dat = (char*) e_dat + q->req.remote_event_size;
            }
330
            else { q->remote_event = NULL; }
331 332 333 334
            if (q->req.self_event_size > 0) {
                q->local_event = malloc(q->req.self_event_size);
                memcpy(q->local_event, e_dat, q->req.self_event_size);
            }
335
            else { q->local_event = NULL; }
336 337
            // add back to front of list
            qlist_add(&q->ql, &s->reqs);
338 339 340 341 342 343 344
        }
        else {
            assert(0);
        }
    }
}

345 346 347
void rr_init (
        const struct model_net_method     * method, 
        const model_net_sched_cfg_params  * params,
348
        int                                 is_recv_queue,
349 350
        void                             ** sched){
    // same underlying representation
351
    fcfs_init(method, params, is_recv_queue, sched);
352 353 354
}

void rr_destroy (void *sched){
355 356
    // same underlying representation
    fcfs_destroy(sched);
357 358 359
}

void rr_add (
360 361 362 363 364 365 366 367 368 369
        model_net_request     * req,
        const mn_sched_params * sched_params,
        int                     remote_event_size,
        void                  * remote_event,
        int                     local_event_size,
        void                  * local_event,
        void                  * sched,
        model_net_sched_rc    * rc,
        tw_lp                 * lp){
    fcfs_add(req, sched_params, remote_event_size, remote_event,
370
            local_event_size, local_event, sched, rc, lp);
371
}
372

373
void rr_add_rc(void *sched, model_net_sched_rc *rc, tw_lp *lp){
374
    fcfs_add_rc(sched, rc, lp);
375 376
}

377
int rr_next(
378 379 380 381 382
        tw_stime              * poffset,
        void                  * sched,
        void                  * rc_event_save,
        model_net_sched_rc    * rc,
        tw_lp                 * lp){
383 384 385 386 387 388 389 390 391 392
    int ret = fcfs_next(poffset, sched, rc_event_save, rc, lp);
    // if error in fcfs or the request was finished & removed, then nothing to
    // do here
    if (ret == -1 || ret == 1)
        return ret;
    // otherwise request was successful, still in the queue
    else {
        mn_sched_queue *s = sched;
        qlist_add_tail(&s->reqs, qlist_pop(&s->reqs));
        return ret;
393 394 395
    }
}

396 397 398 399 400
void rr_next_rc (
        void               * sched,
        void               * rc_event_save,
        model_net_sched_rc * rc,
        tw_lp              * lp){
401 402 403 404 405
    // only time we need to do something apart from fcfs is on a successful
    // rr_next that didn't remove the item from the queue
    if (rc->rtn == 0){
        mn_sched_queue *s = sched;
        qlist_add(qlist_pop_back(&s->reqs), &s->reqs);
406
    }
407
    fcfs_next_rc(sched, rc_event_save, rc, lp);
408 409
}

410 411 412
void prio_init (
        const struct model_net_method     * method, 
        const model_net_sched_cfg_params  * params,
413
        int                                 is_recv_queue,
414 415 416 417 418 419 420
        void                             ** sched){
    *sched = malloc(sizeof(mn_sched_prio));
    mn_sched_prio *ss = *sched;
    ss->params = params->u.prio;
    ss->sub_scheds = malloc(ss->params.num_prios*sizeof(mn_sched_queue*));
    ss->sub_sched_iface = sched_interfaces[ss->params.sub_stype];
    for (int i = 0; i < ss->params.num_prios; i++){
421 422
        ss->sub_sched_iface->init(method, params, is_recv_queue,
                (void**)&ss->sub_scheds[i]);
423 424 425 426 427 428 429 430 431 432 433 434 435
    }
}

void prio_destroy (void *sched){
    mn_sched_prio *ss = sched;
    for (int i = 0; i < ss->params.num_prios; i++){
        ss->sub_sched_iface->destroy(ss->sub_scheds[i]);
        free(ss->sub_scheds);
        free(ss);
    }
}

void prio_add (
436 437 438 439 440 441 442 443 444
        model_net_request     * req,
        const mn_sched_params * sched_params,
        int                     remote_event_size,
        void                  * remote_event,
        int                     local_event_size,
        void                  * local_event,
        void                  * sched,
        model_net_sched_rc    * rc,
        tw_lp                 * lp){
445 446
    // sched_msg_params is simply an int
    mn_sched_prio *ss = sched;
447 448
    int prio = sched_params->prio;
    dprintf("%lu (mn):    adding with prio %d\n", lp->gid, prio);
449 450 451 452 453 454 455 456
    if (prio == -1){
        // default prio - lowest possible 
        prio = ss->params.num_prios-1;
    }
    else if (prio >= ss->params.num_prios){
        tw_error(TW_LOC, "sched for lp %lu: invalid prio (%d vs [%d,%d))",
                lp->gid, prio, 0, ss->params.num_prios);
    }
457 458 459
    ss->sub_sched_iface->add(req, sched_params, remote_event_size,
            remote_event, local_event_size, local_event, ss->sub_scheds[prio],
            rc, lp);
460 461 462 463 464 465
    rc->prio = prio;
}

void prio_add_rc(void * sched, model_net_sched_rc *rc, tw_lp *lp){
    // just call the sub scheduler's add_rc
    mn_sched_prio *ss = sched;
466
    dprintf("%lu (mn): rc adding with prio %d\n", lp->gid, rc->prio);
467 468 469 470
    ss->sub_sched_iface->add_rc(ss->sub_scheds[rc->prio], rc, lp);
}

int prio_next(
471 472 473 474 475
        tw_stime              * poffset,
        void                  * sched,
        void                  * rc_event_save,
        model_net_sched_rc    * rc,
        tw_lp                 * lp){
476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504
    // check each priority, first one that's non-empty gets the next
    mn_sched_prio *ss = sched;
    for (int i = 0; i < ss->params.num_prios; i++){
        // TODO: this works for now while the other schedulers have the same
        // internal representation
        if (!qlist_empty(&ss->sub_scheds[i]->reqs)){
            rc->prio = i;
            return ss->sub_sched_iface->next(
                    poffset, ss->sub_scheds[i], rc_event_save, rc, lp);
        }
    }
    rc->prio = -1;
    return -1; // all sub schedulers had no work 
}

void prio_next_rc (
        void               * sched,
        void               * rc_event_save,
        model_net_sched_rc * rc,
        tw_lp              * lp){
    if (rc->prio != -1){
        // we called a next somewhere
        mn_sched_prio *ss = sched;
        ss->sub_sched_iface->next_rc(ss->sub_scheds[rc->prio], rc_event_save,
                rc, lp);
    }
    // else, no-op
}

505 506 507 508 509 510 511 512
/*
 * Local variables:
 *  c-indent-level: 4
 *  c-basic-offset: 4
 * End:
 *
 * vim: ft=c ts=8 sts=4 sw=4 expandtab
 */