model-net-sched-impl.c 18.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
/*
 * Copyright (C) 2014 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

#include <stdlib.h>
#include <assert.h>

#include "model-net-sched-impl.h"
11 12 13 14
#include <codes/model-net-sched.h>
#include <codes/model-net-method.h>
#include <codes/quicklist.h>
#include <codes/codes.h>
15

16 17
#define MN_SCHED_DEBUG_VERBOSE 0

18 19 20 21
#define dprintf(_fmt, ...) \
    do { \
        if (MN_SCHED_DEBUG_VERBOSE) printf(_fmt, ##__VA_ARGS__); \
    } while(0)
22

23 24 25 26
/// scheduler-specific data structures 

typedef struct mn_sched_qitem {
    model_net_request req;
27
    mn_sched_params sched_params;
28 29
    // remaining bytes to send
    uint64_t rem;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
30
    tw_stime entry_time;
31 32 33 34 35 36 37
    // pointers to event structures 
    // sizes are given in the request struct
    void * remote_event;
    void * local_event;
    struct qlist_head ql;
} mn_sched_qitem;

38 39 40 41
// fcfs and round-robin each use a single queue
typedef struct mn_sched_queue {
    // method containing packet event to call
    const struct model_net_method *method;
42
    int is_recv_queue;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
43
    int queue_len;
44 45 46 47 48 49 50 51 52 53 54
    struct qlist_head reqs; // of type mn_sched_qitem
} mn_sched_queue;

// priority scheduler consists of a bunch of rr/fcfs queues
typedef struct mn_sched_prio {
    mn_prio_params params;
    const model_net_sched_interface *sub_sched_iface;
    mn_sched_queue ** sub_scheds; // one for each params.num_prios
} mn_sched_prio;

/// scheduler-specific function decls and tables
55 56 57

/// FCFS
// void used to avoid ptr-to-ptr conv warnings
58 59 60
static void fcfs_init (
        const struct model_net_method     * method, 
        const model_net_sched_cfg_params  * params,
61
        int                                 is_recv_queue,
62
        void                             ** sched);
63 64
static void fcfs_destroy (void *sched);
static void fcfs_add (
65 66 67 68 69 70 71 72 73 74
        const model_net_request * req,
        const mn_sched_params   * sched_params,
        int                       remote_event_size,
        void                    * remote_event,
        int                       local_event_size,
        void                    * local_event,
        void                    * sched,
        model_net_sched_rc      * rc,
        tw_lp                   * lp);
static void fcfs_add_rc(void *sched, const model_net_sched_rc *rc, tw_lp *lp);
75
static int  fcfs_next(
76 77 78 79 80
        tw_stime              * poffset,
        void                  * sched,
        void                  * rc_event_save,
        model_net_sched_rc    * rc,
        tw_lp                 * lp);
81
static void fcfs_next_rc(
82 83 84 85
        void                     * sched,
        const void               * rc_event_save,
        const model_net_sched_rc * rc,
        tw_lp                    * lp);
86

87 88 89 90
// ROUND-ROBIN
static void rr_init (
        const struct model_net_method     * method, 
        const model_net_sched_cfg_params  * params,
91
        int                                 is_recv_queue,
92
        void                             ** sched);
93 94
static void rr_destroy (void *sched);
static void rr_add (
95 96 97 98 99 100 101 102 103 104
        const model_net_request * req,
        const mn_sched_params   * sched_params,
        int                       remote_event_size,
        void                    * remote_event,
        int                       local_event_size,
        void                    * local_event,
        void                    * sched,
        model_net_sched_rc      * rc,
        tw_lp                   * lp);
static void rr_add_rc(void *sched, const model_net_sched_rc *rc, tw_lp *lp);
105
static int  rr_next(
106 107 108 109 110
        tw_stime              * poffset,
        void                  * sched,
        void                  * rc_event_save,
        model_net_sched_rc    * rc,
        tw_lp                 * lp);
111
static void rr_next_rc (
112 113 114 115
        void                     * sched,
        const void               * rc_event_save,
        const model_net_sched_rc * rc,
        tw_lp                    * lp);
116 117 118
static void prio_init (
        const struct model_net_method     * method, 
        const model_net_sched_cfg_params  * params,
119
        int                                 is_recv_queue,
120 121 122
        void                             ** sched);
static void prio_destroy (void *sched);
static void prio_add (
123 124 125 126 127 128 129 130 131 132
        const model_net_request * req,
        const mn_sched_params   * sched_params,
        int                       remote_event_size,
        void                    * remote_event,
        int                       local_event_size,
        void                    * local_event,
        void                    * sched,
        model_net_sched_rc      * rc,
        tw_lp                   * lp);
static void prio_add_rc(void *sched, const model_net_sched_rc *rc, tw_lp *lp);
133
static int  prio_next(
134 135 136 137 138
        tw_stime              * poffset,
        void                  * sched,
        void                  * rc_event_save,
        model_net_sched_rc    * rc,
        tw_lp                 * lp);
139
static void prio_next_rc (
140 141 142 143
        void                     * sched,
        const void               * rc_event_save,
        const model_net_sched_rc * rc,
        tw_lp                    * lp);
144 145

/// function tables (names defined by X macro in model-net-sched.h)
146
static const model_net_sched_interface fcfs_tab = 
147
{ &fcfs_init, &fcfs_destroy, &fcfs_add, &fcfs_add_rc, &fcfs_next, &fcfs_next_rc};
148
static const model_net_sched_interface rr_tab = 
149
{ &rr_init, &rr_destroy, &rr_add, &rr_add_rc, &rr_next, &rr_next_rc};
150 151
static const model_net_sched_interface prio_tab =
{ &prio_init, &prio_destroy, &prio_add, &prio_add_rc, &prio_next, &prio_next_rc};
152 153

#define X(a,b,c) c,
154
const model_net_sched_interface * sched_interfaces[] = {
155 156 157 158 159 160
    SCHEDULER_TYPES
};
#undef X

/// FCFS implementation 

161 162 163
void fcfs_init(
        const struct model_net_method     * method, 
        const model_net_sched_cfg_params  * params,
164
        int                                 is_recv_queue,
165
        void                             ** sched){
166
    (void)params; // unused for fcfs
167 168
    *sched = malloc(sizeof(mn_sched_queue));
    mn_sched_queue *ss = *sched;
169
    ss->method = method;
170
    ss->is_recv_queue = is_recv_queue;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
171
    ss->queue_len = 0;
172 173 174 175 176 177 178 179
    INIT_QLIST_HEAD(&ss->reqs);
}

void fcfs_destroy(void *sched){
    free(sched);
}

void fcfs_add (
180 181 182 183 184 185 186 187 188
        const model_net_request * req,
        const mn_sched_params   * sched_params,
        int                       remote_event_size,
        void                    * remote_event,
        int                       local_event_size,
        void                    * local_event,
        void                    * sched,
        model_net_sched_rc      * rc,
        tw_lp                   * lp){
189
    (void)rc; // unneeded for fcfs
190
    mn_sched_qitem *q = malloc(sizeof(mn_sched_qitem));
Jonathan Jenkins's avatar
Jonathan Jenkins committed
191
    q->entry_time = tw_now(lp);
192
    q->req = *req;
193
    q->sched_params = *sched_params;
194
    q->rem = req->msg_size;
195 196 197 198
    if (remote_event_size > 0){
        q->remote_event = malloc(remote_event_size);
        memcpy(q->remote_event, remote_event, remote_event_size);
    }
199
    else { q->remote_event = NULL; }
200 201 202 203
    if (local_event_size > 0){
        q->local_event = malloc(local_event_size);
        memcpy(q->local_event, local_event, local_event_size);
    }
204
    else { q->local_event = NULL; }
205
    mn_sched_queue *s = sched;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
206
    s->queue_len++;
207
    qlist_add_tail(&q->ql, &s->reqs);
208 209 210
    dprintf("%llu (mn):    adding %srequest from %llu to %llu, size %llu, at %lf\n",
            LLU(lp->gid), req->is_pull ? "pull " : "", LLU(req->src_lp),
            LLU(req->final_dest_lp), LLU(req->msg_size), tw_now(lp));
211 212
}

213
void fcfs_add_rc(void *sched, const model_net_sched_rc *rc, tw_lp *lp){
214
    (void)rc;
215
    mn_sched_queue *s = sched;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
216
    s->queue_len--;
217 218 219
    struct qlist_head *ent = qlist_pop_back(&s->reqs);
    assert(ent != NULL);
    mn_sched_qitem *q = qlist_entry(ent, mn_sched_qitem, ql);
220 221
    dprintf("%llu (mn): rc adding request from %llu to %llu\n", LLU(lp->gid),
            LLU(q->req.src_lp), LLU(q->req.final_dest_lp));
222 223 224
    // free'ing NULLs is a no-op 
    free(q->remote_event);
    free(q->local_event);
225 226 227
    free(q);
}

228
int fcfs_next(
229 230 231 232 233
        tw_stime              * poffset,
        void                  * sched,
        void                  * rc_event_save,
        model_net_sched_rc    * rc,
        tw_lp                 * lp){
234
    mn_sched_queue *s = sched;
235 236 237 238 239 240
    struct qlist_head *ent = s->reqs.next;
    if (ent == &s->reqs){
        rc->rtn = -1;
        return -1;
    }
    mn_sched_qitem *q = qlist_entry(ent, mn_sched_qitem, ql);
241

242 243 244 245 246 247 248 249 250 251 252 253
    // issue the next packet
    int is_last_packet;
    uint64_t psize;
    if (q->req.packet_size >= q->rem) {
        psize = q->rem;
        is_last_packet = 1;
    }
    else{
        psize = q->req.packet_size;
        is_last_packet = 0;
    }

254
    if (s->is_recv_queue){
255 256 257 258
        dprintf("%llu (mn):    receiving message of size %llu (of %llu) "
                "from %llu to %llu at %1.5e (last:%d)\n",
                LLU(lp->gid), LLU(psize), LLU(q->rem), LLU(q->req.src_lp),
                LLU(q->req.final_dest_lp), tw_now(lp), is_last_packet);
259 260
        // note: we overloaded on the dest_mn_lp field - it's the dest of the
        // soruce in the case of a pull
261
        *poffset = s->method->model_net_method_recv_msg_event(q->req.category,
262
                q->req.final_dest_lp, q->req.dest_mn_lp, psize,
263
                q->req.is_pull, q->req.pull_size, 0.0, q->req.remote_event_size,
264
                q->remote_event, q->req.src_lp, lp);
265 266
    }
    else{
267 268
        dprintf("%llu (mn):    issuing packet of size %llu (of %llu) "
                "from %llu to %llu at %1.5e (last:%d)\n",
Jonathan Jenkins's avatar
Jonathan Jenkins committed
269
                LLU(lp->gid), LLU(psize), LLU(q->rem), LLU(q->req.src_lp),
270
                LLU(q->req.final_dest_lp), tw_now(lp), is_last_packet);
271
        *poffset = s->method->model_net_method_packet_event(&q->req,
272 273
                q->req.msg_size - q->rem, psize, 0.0, &q->sched_params,
                q->remote_event, q->local_event, lp, is_last_packet);
274
    }
275

276
    // if last packet - remove from list, free, save for rc
277
    if (is_last_packet){
278
        dprintf("last %spkt: %llu (%llu) to %llu, size %llu at %1.5e (pull:%d)\n",
Jonathan Jenkins's avatar
Jonathan Jenkins committed
279
                s->is_recv_queue ? "recv " : "send ",
280 281
                LLU(lp->gid), LLU(q->req.src_lp), LLU(q->req.final_dest_lp),
                LLU(q->req.is_pull ? q->req.pull_size : q->req.msg_size), tw_now(lp),
Jonathan Jenkins's avatar
Jonathan Jenkins committed
282
                q->req.is_pull);
283
        qlist_pop(&s->reqs);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
284
        s->queue_len--;
285
        rc->req = q->req;
286
        rc->sched_params = q->sched_params;
287 288 289 290 291 292 293 294 295 296 297
        void *e_dat = rc_event_save;
        if (q->req.remote_event_size > 0){
            memcpy(e_dat, q->remote_event, q->req.remote_event_size);
            e_dat = (char*) e_dat + q->req.remote_event_size;
            free(q->remote_event);
        }
        if (q->req.self_event_size > 0){
            memcpy(e_dat, q->local_event, q->req.self_event_size);
            free(q->local_event);
        }
        free(q);
298 299 300 301 302 303 304 305 306
        rc->rtn = 1;
    }
    else{
        q->rem -= psize;
        rc->rtn = 0;
    }
    return rc->rtn;
}

307
void fcfs_next_rc(
308 309 310 311
        void                     * sched,
        const void               * rc_event_save,
        const model_net_sched_rc * rc,
        tw_lp                    * lp){
312
    mn_sched_queue *s = sched;
313 314 315 316
    if (rc->rtn == -1){
        // no op
    }
    else{
317
        if (s->is_recv_queue){
318
            dprintf("%llu (mn): rc receiving message\n", LLU(lp->gid));
319 320 321
            s->method->model_net_method_recv_msg_event_rc(lp);
        }
        else {
322
            dprintf("%llu (mn): rc issuing packet\n", LLU(lp->gid));
323 324
            s->method->model_net_method_packet_event_rc(lp);
        }
325 326 327 328 329 330 331
        if (rc->rtn == 0){
            // just get the front and increment rem
            mn_sched_qitem *q = qlist_entry(s->reqs.next, mn_sched_qitem, ql);
            // just increment rem
            q->rem += q->req.packet_size;
        }
        else if (rc->rtn == 1){
332 333 334 335
            // re-create the q item
            mn_sched_qitem *q = malloc(sizeof(mn_sched_qitem));
            assert(q);
            q->req = rc->req;
336
            q->sched_params = rc->sched_params;
337
            q->rem = q->req.msg_size % q->req.packet_size;
338 339
            // processed exactly a packet's worth of data
            if (q->rem == 0 && q->req.msg_size != 0){
340 341
                q->rem = q->req.packet_size;
            }
342
            const void * e_dat = rc_event_save;
343 344 345
            if (q->req.remote_event_size > 0){
                q->remote_event = malloc(q->req.remote_event_size);
                memcpy(q->remote_event, e_dat, q->req.remote_event_size);
346
                e_dat = (const char*) e_dat + q->req.remote_event_size;
347
            }
348
            else { q->remote_event = NULL; }
349 350 351 352
            if (q->req.self_event_size > 0) {
                q->local_event = malloc(q->req.self_event_size);
                memcpy(q->local_event, e_dat, q->req.self_event_size);
            }
353
            else { q->local_event = NULL; }
354 355
            // add back to front of list
            qlist_add(&q->ql, &s->reqs);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
356
            s->queue_len++;
357 358 359 360 361 362 363
        }
        else {
            assert(0);
        }
    }
}

364 365 366
void rr_init (
        const struct model_net_method     * method, 
        const model_net_sched_cfg_params  * params,
367
        int                                 is_recv_queue,
368 369
        void                             ** sched){
    // same underlying representation
370
    fcfs_init(method, params, is_recv_queue, sched);
371 372 373
}

void rr_destroy (void *sched){
374 375
    // same underlying representation
    fcfs_destroy(sched);
376 377 378
}

void rr_add (
379 380 381 382 383 384 385 386 387
        const model_net_request * req,
        const mn_sched_params   * sched_params,
        int                       remote_event_size,
        void                    * remote_event,
        int                       local_event_size,
        void                    * local_event,
        void                    * sched,
        model_net_sched_rc      * rc,
        tw_lp                   * lp){
388
    fcfs_add(req, sched_params, remote_event_size, remote_event,
389
            local_event_size, local_event, sched, rc, lp);
390
}
391

392
void rr_add_rc(void *sched, const model_net_sched_rc *rc, tw_lp *lp){
393
    fcfs_add_rc(sched, rc, lp);
394 395
}

396
int rr_next(
397 398 399 400 401
        tw_stime              * poffset,
        void                  * sched,
        void                  * rc_event_save,
        model_net_sched_rc    * rc,
        tw_lp                 * lp){
402 403 404 405 406 407 408 409
    int ret = fcfs_next(poffset, sched, rc_event_save, rc, lp);
    // if error in fcfs or the request was finished & removed, then nothing to
    // do here
    if (ret == -1 || ret == 1)
        return ret;
    // otherwise request was successful, still in the queue
    else {
        mn_sched_queue *s = sched;
410
        qlist_add_tail(qlist_pop(&s->reqs), &s->reqs);
411
        return ret;
412 413 414
    }
}

415
void rr_next_rc (
416 417 418 419
        void                     * sched,
        const void               * rc_event_save,
        const model_net_sched_rc * rc,
        tw_lp                    * lp){
420 421 422 423 424
    // only time we need to do something apart from fcfs is on a successful
    // rr_next that didn't remove the item from the queue
    if (rc->rtn == 0){
        mn_sched_queue *s = sched;
        qlist_add(qlist_pop_back(&s->reqs), &s->reqs);
425
    }
426
    fcfs_next_rc(sched, rc_event_save, rc, lp);
427 428
}

429 430 431
void prio_init (
        const struct model_net_method     * method, 
        const model_net_sched_cfg_params  * params,
432
        int                                 is_recv_queue,
433 434 435 436 437 438 439
        void                             ** sched){
    *sched = malloc(sizeof(mn_sched_prio));
    mn_sched_prio *ss = *sched;
    ss->params = params->u.prio;
    ss->sub_scheds = malloc(ss->params.num_prios*sizeof(mn_sched_queue*));
    ss->sub_sched_iface = sched_interfaces[ss->params.sub_stype];
    for (int i = 0; i < ss->params.num_prios; i++){
440 441
        ss->sub_sched_iface->init(method, params, is_recv_queue,
                (void**)&ss->sub_scheds[i]);
442 443 444 445 446 447 448 449 450 451 452 453 454
    }
}

void prio_destroy (void *sched){
    mn_sched_prio *ss = sched;
    for (int i = 0; i < ss->params.num_prios; i++){
        ss->sub_sched_iface->destroy(ss->sub_scheds[i]);
        free(ss->sub_scheds);
        free(ss);
    }
}

void prio_add (
455 456 457 458 459 460 461 462 463
        const model_net_request * req,
        const mn_sched_params   * sched_params,
        int                       remote_event_size,
        void                    * remote_event,
        int                       local_event_size,
        void                    * local_event,
        void                    * sched,
        model_net_sched_rc      * rc,
        tw_lp                   * lp){
464 465
    // sched_msg_params is simply an int
    mn_sched_prio *ss = sched;
466
    int prio = sched_params->prio;
467 468 469 470 471
    if (prio == -1){
        // default prio - lowest possible 
        prio = ss->params.num_prios-1;
    }
    else if (prio >= ss->params.num_prios){
472 473
        tw_error(TW_LOC, "sched for lp %llu: invalid prio (%d vs [%d,%d))",
                LLU(lp->gid), prio, 0, ss->params.num_prios);
474
    }
475
    dprintf("%llu (mn):    adding with prio %d\n", LLU(lp->gid), prio);
476 477 478
    ss->sub_sched_iface->add(req, sched_params, remote_event_size,
            remote_event, local_event_size, local_event, ss->sub_scheds[prio],
            rc, lp);
479 480 481
    rc->prio = prio;
}

482
void prio_add_rc(void * sched, const model_net_sched_rc *rc, tw_lp *lp){
483 484
    // just call the sub scheduler's add_rc
    mn_sched_prio *ss = sched;
485
    dprintf("%llu (mn): rc adding with prio %d\n", LLU(lp->gid), rc->prio);
486 487 488 489
    ss->sub_sched_iface->add_rc(ss->sub_scheds[rc->prio], rc, lp);
}

int prio_next(
490 491 492 493 494
        tw_stime              * poffset,
        void                  * sched,
        void                  * rc_event_save,
        model_net_sched_rc    * rc,
        tw_lp                 * lp){
495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510
    // check each priority, first one that's non-empty gets the next
    mn_sched_prio *ss = sched;
    for (int i = 0; i < ss->params.num_prios; i++){
        // TODO: this works for now while the other schedulers have the same
        // internal representation
        if (!qlist_empty(&ss->sub_scheds[i]->reqs)){
            rc->prio = i;
            return ss->sub_sched_iface->next(
                    poffset, ss->sub_scheds[i], rc_event_save, rc, lp);
        }
    }
    rc->prio = -1;
    return -1; // all sub schedulers had no work 
}

void prio_next_rc (
511 512 513 514
        void                     * sched,
        const void               * rc_event_save,
        const model_net_sched_rc * rc,
        tw_lp                    * lp){
515 516 517 518 519 520 521 522 523
    if (rc->prio != -1){
        // we called a next somewhere
        mn_sched_prio *ss = sched;
        ss->sub_sched_iface->next_rc(ss->sub_scheds[rc->prio], rc_event_save,
                rc, lp);
    }
    // else, no-op
}

524 525 526 527 528 529 530 531
/*
 * Local variables:
 *  c-indent-level: 4
 *  c-basic-offset: 4
 * End:
 *
 * vim: ft=c ts=8 sts=4 sw=4 expandtab
 */