Commit f4d8c7a4 authored by Jonathan Jenkins's avatar Jonathan Jenkins

refactored round-robin sched to just use fcfs + minor q tweaks

parent 0164aa2d
......@@ -175,12 +175,7 @@ void fcfs_add (
void * sched,
model_net_sched_rc * rc,
tw_lp * lp){
// NOTE: in optimistic mode, we currently do not have a good way to
// reliably free and re-initialize the q item and the local/remote events
// when processing next/next_rc events. Hence, the memory leaks. Later on
// we'll figure out a better way to handle this.
mn_sched_qitem *q = malloc(sizeof(mn_sched_qitem));
assert(q);
q->req = *req;
q->rem = req->is_pull ? PULL_MSG_SIZE : req->msg_size;
if (remote_event_size > 0){
......@@ -336,34 +331,12 @@ void rr_add (
void * sched,
model_net_sched_rc * rc,
tw_lp * lp){
// NOTE: in optimistic mode, we currently do not have a good way to
// reliably free and re-initialize the q item and the local/remote events
// when processing next/next_rc events. Hence, the memory leaks. Later on
// we'll figure out a better way to handle this.
mn_sched_qitem *q = malloc(sizeof(mn_sched_qitem));
q->req = *req;
q->rem = req->is_pull ? PULL_MSG_SIZE : req->msg_size;
if (remote_event_size > 0){
q->remote_event = malloc(remote_event_size);
memcpy(q->remote_event, remote_event, remote_event_size);
}
else { q->remote_event = NULL; }
if (local_event_size > 0){
q->local_event = malloc(local_event_size);
memcpy(q->local_event, local_event, local_event_size);
}
else{ q->local_event = NULL; }
mn_sched_queue *s = sched;
qlist_add_tail(&q->ql, &s->reqs);
fcfs_add(req, sched_msg_params, remote_event_size, remote_event,
local_event_size, local_event, sched, rc, lp);
}
void rr_add_rc(void *sched, model_net_sched_rc *rc, tw_lp *lp){
mn_sched_queue *s = sched;
struct qlist_head *ent = qlist_pop_back(&s->reqs);
assert(ent != NULL);
mn_sched_qitem *q = qlist_entry(ent, mn_sched_qitem, ql);
if (q->remote_event) free(q->remote_event);
if (q->local_event) free(q->local_event);
free(q);
fcfs_add_rc(sched, rc, lp);
}
int rr_next(
......@@ -372,55 +345,17 @@ int rr_next(
void * rc_event_save,
model_net_sched_rc * rc,
tw_lp * lp){
mn_sched_queue *s = sched;
struct qlist_head *ent = qlist_pop(&s->reqs);
if (ent == NULL){
rc->rtn = -1;
return -1;
}
mn_sched_qitem *q = qlist_entry(ent, mn_sched_qitem, ql);
// issue the next packet
int is_last_packet;
uint64_t psize;
if (q->req.packet_size >= q->rem) {
psize = q->rem;
is_last_packet = 1;
}
else{
psize = q->req.packet_size;
is_last_packet = 0;
}
dprintf("%lu (mn): issuing packet of size %lu (of %lu) from %lu to %lu\n",
lp->gid, psize, q->rem, q->req.src_lp, q->req.final_dest_lp);
*poffset = s->method->model_net_method_packet_event(q->req.category,
q->req.final_dest_lp, psize, q->req.is_pull, q->req.msg_size, 0.0,
q->req.remote_event_size, q->remote_event, q->req.self_event_size,
q->local_event, q->req.src_lp, lp, is_last_packet);
// if last packet - remove from list, free, save for rc
if (is_last_packet){
rc->req = q->req;
void *e_dat = rc_event_save;
if (q->req.remote_event_size > 0){
memcpy(e_dat, q->remote_event, q->req.remote_event_size);
e_dat = (char*) e_dat + q->req.remote_event_size;
free(q->remote_event);
}
if (q->req.self_event_size > 0){
memcpy(e_dat, q->local_event, q->req.self_event_size);
free(q->local_event);
}
free(q);
rc->rtn = 1;
}
else{
q->rem -= psize;
qlist_add_tail(&q->ql, &s->reqs);
rc->rtn = 0;
int ret = fcfs_next(poffset, sched, rc_event_save, rc, lp);
// if error in fcfs or the request was finished & removed, then nothing to
// do here
if (ret == -1 || ret == 1)
return ret;
// otherwise request was successful, still in the queue
else {
mn_sched_queue *s = sched;
qlist_add_tail(&s->reqs, qlist_pop(&s->reqs));
return ret;
}
return rc->rtn;
}
void rr_next_rc (
......@@ -428,56 +363,13 @@ void rr_next_rc (
void * rc_event_save,
model_net_sched_rc * rc,
tw_lp * lp){
mn_sched_queue *s = sched;
if (rc->rtn == -1){
// no op
}
else {
s->method->model_net_method_packet_event_rc(lp);
if (rc->rtn == 0){
// increment rem and put item back to front of list
struct qlist_head *ent = qlist_pop_back(&s->reqs);
qlist_add(ent, &s->reqs);
mn_sched_qitem *q = qlist_entry(ent, mn_sched_qitem, ql);
q->rem += q->req.packet_size;
dprintf("%lu (mn): rc issuing packet of size %lu (of %lu) from "
"%lu to %lu\n",
lp->gid, q->req.packet_size, q->rem, q->req.src_lp,
q->req.final_dest_lp);
}
else if (rc->rtn == 1){
// re-create the q item
mn_sched_qitem *q = malloc(sizeof(mn_sched_qitem));
assert(q);
q->req = rc->req;
q->rem = (q->req.is_pull ? PULL_MSG_SIZE : q->req.msg_size) %
q->req.packet_size;
if (q->rem == 0){ // processed exactly a packet's worth of data
q->rem = q->req.packet_size;
}
void * e_dat = rc_event_save;
if (q->req.remote_event_size > 0){
q->remote_event = malloc(q->req.remote_event_size);
memcpy(q->remote_event, e_dat, q->req.remote_event_size);
e_dat = (char*) e_dat + q->req.remote_event_size;
}
else { q->remote_event = NULL; }
if (q->req.self_event_size > 0) {
q->local_event = malloc(q->req.self_event_size);
memcpy(q->local_event, e_dat, q->req.self_event_size);
}
else { q->local_event = NULL; }
// add back to front of list
qlist_add(&q->ql, &s->reqs);
dprintf("%lu (mn): rc issuing packet of size %lu (of %lu) from "
"%lu to %lu\n",
lp->gid, q->rem, q->rem, q->req.src_lp,
q->req.final_dest_lp);
}
else {
assert(0);
}
// only time we need to do something apart from fcfs is on a successful
// rr_next that didn't remove the item from the queue
if (rc->rtn == 0){
mn_sched_queue *s = sched;
qlist_add(qlist_pop_back(&s->reqs), &s->reqs);
}
fcfs_next_rc(sched, rc_event_save, rc, lp);
}
void prio_init (
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment