Commit a78fee52 authored by Misbah Mubarak's avatar Misbah Mubarak

Updated congestion control in dragonfly: now messages are saved in a waiting...

Updated congestion control in dragonfly: now messages are saved in a waiting list at the routers in case of buffer overflows
parent 3a916687
......@@ -16,7 +16,6 @@ struct terminal_message
{
/* magic number */
int magic;
/* flit travel start time*/
tw_stime travel_start_time;
/* packet ID of the flit */
......@@ -78,6 +77,9 @@ struct terminal_message
/* LP ID of the sending node, has to be a network node in the dragonfly */
tw_lpid sender_node;
/* for reverse computation */
struct pending_router_msgs * saved_elem;
};
#endif /* end of include guard: DRAGONFLY_H */
......
......@@ -102,6 +102,15 @@ struct dragonfly_param
int num_global_channels;
};
struct pending_router_msgs
{
struct pending_router_msgs * next;
struct pending_router_msgs * prev;
char * event_data;
terminal_message msg;
int output_chan;
int next_stop;
};
/* handles terminal and router events like packet generate/send/receive/buffer */
typedef enum event_t event_t;
......@@ -200,8 +209,6 @@ struct router_state
{
unsigned int router_id;
unsigned int group_id;
int* global_channel;
tw_stime* next_output_available_time;
tw_stime* next_credit_available_time;
......@@ -209,6 +216,7 @@ struct router_state
int* vc_occupancy;
int* output_vc_state;
int * global_channel;
int max_router_vc_occupancy;
const char * anno;
......@@ -216,6 +224,10 @@ struct router_state
int* prev_hist_num;
int* cur_hist_num;
struct pending_router_msgs * head;
struct pending_router_msgs * tail;
int num_waiting;
};
static short routing = MINIMAL;
......@@ -228,9 +240,112 @@ static tw_stime max_collective = 0;
static long long total_hops = 0;
static long long N_finished_packets = 0;
/* function definitions */
static void router_credit_send(router_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp);
/* returns the dragonfly router lp type for lp registration */
static const tw_lptype* dragonfly_get_router_lp_type(void);
/* remove head of the queue */
void remove_pending_list_head(struct pending_router_msgs** head,
struct pending_router_msgs** tail,
terminal_message * msg)
{
struct pending_router_msgs * elem = *head;
if(*head == *tail)
*tail = NULL;
*head = (*head)->next;
if(*head)
(*head)->prev = NULL;
free(elem);
}
/* pending router messages */
void add_pending_router_message(struct pending_router_msgs** head,
struct pending_router_msgs** tail,
terminal_message * msg,
int chan,
int next_stop)
{
struct pending_router_msgs * elem = malloc(sizeof(struct pending_router_msgs));
memcpy(&(elem->msg), msg, sizeof(terminal_message));
elem->prev = NULL;
elem->next_stop = next_stop;
elem->output_chan = chan;
if(msg->remote_event_size_bytes)
{
void * m_data = msg+1;
elem->event_data = (void*)malloc(msg->remote_event_size_bytes);
memcpy(elem->event_data, m_data, msg->remote_event_size_bytes);
}
elem->next = *head;
if(*head)
(*head)->prev = elem;
if(!(*head))
*tail = elem;
*head = elem;
return;
}
/* remove message from the queue */
struct pending_router_msgs * remove_pending_router_msgs(struct pending_router_msgs** head,
struct pending_router_msgs** tail,
int chan)
{
struct pending_router_msgs * elem = *head;
if(!elem)
return NULL;
while(elem != NULL)
{
if(elem->output_chan == chan)
{
/* Remove elemt from the list */
/* if there is just one element */
if(elem == *head && elem == *tail)
{
*head = NULL;
*tail = NULL;
}
/* if element if at the head */
if(elem == *head && elem != *tail)
{
*head = elem->next;
elem->next->prev = NULL;
}
/* if element is at the tail */
if(elem == *tail && elem != *head)
{
*tail = elem->prev;
elem->prev->next = NULL;
}
/* if element is in the middle */
if(elem->prev)
elem->prev->next = elem->next;
if(elem->next)
elem->next->prev = elem->prev;
//printf("\n Returned element %d %d %d", elem->msg.dest_terminal_id, chan, elem->next_stop);
return elem;
}
elem = elem->next;
}
return NULL;
}
/* returns the dragonfly message size */
static int dragonfly_get_msg_sz(void)
{
......@@ -323,10 +438,8 @@ static void dragonfly_read_config(const char * anno, dragonfly_param *params){
p->num_global_channels = p->num_routers/2;
p->num_groups = p->num_routers * p->num_cn + 1;
p->radix = p->num_vcs *
(p->num_cn + p->num_global_channels + p->num_routers);
(p->num_routers + p->num_global_channels + p->num_cn);
p->total_routers = p->num_groups * p->num_routers;
p->total_terminals = p->total_routers * p->num_cn;
if(!g_tw_mynode)
......@@ -513,6 +626,125 @@ static void dragonfly_packet_event_rc(tw_lp *sender)
return;
}
void send_packet_from_router(router_state * s,
tw_bf * bf,
terminal_message * msg,
tw_lp * lp,
int output_chan,
int next_stop,
char * event_data)
{
router_credit_send(s, bf, msg, lp);
tw_stime ts;
tw_event *e;
terminal_message *m;
int global=0;
int output_port = output_chan / s->params->num_vcs;
double bandwidth = s->params->local_bandwidth;
// Allocate output Virtual Channel
if(output_port >= s->params->num_routers &&
output_port < s->params->num_routers + s->params->num_global_channels)
{
global = 1;
bandwidth = s->params->global_bandwidth;
}
// If source router doesn't have global channel and buffer space is available, then assign to appropriate intra-group virtual channel
msg->saved_available_time = s->next_output_available_time[output_port];
ts = g_tw_lookahead + bytes_to_ns(s->params->chunk_size, bandwidth) + tw_rand_exponential(lp->rng, noise);
s->next_output_available_time[output_port] = maxd(s->next_output_available_time[output_port], tw_now(lp));
s->next_output_available_time[output_port] += ts;
// dest can be a router or a terminal, so we must check
void * m_data;
if (next_stop == msg->dest_terminal_id){
e = model_net_method_event_new(next_stop,
s->next_output_available_time[output_port] - tw_now(lp), lp,
DRAGONFLY, (void**)&m, &m_data);
}
else{
e = tw_event_new(next_stop, s->next_output_available_time[output_port] - tw_now(lp), lp);
m = tw_event_data(e);
m_data = m+1;
}
memcpy(m, msg, sizeof(terminal_message));
if (msg->remote_event_size_bytes){
if(!event_data)
memcpy(m_data, msg+1, msg->remote_event_size_bytes);
else
memcpy(m_data, event_data, msg->remote_event_size_bytes);
}
if(global)
m->last_hop=GLOBAL;
else
m->last_hop = LOCAL;
m->local_id = s->router_id;
/* for reverse computation */
msg->new_vc = output_chan;
/* for sending back credit in forward event handler */
m->saved_vc = output_chan;
m->intm_lp_id = lp->gid;
s->vc_occupancy[output_chan]++;
if(s->vc_occupancy[output_chan] > s->max_router_vc_occupancy)
{
bf->c3 = 1;
msg->saved_occupancy = s->max_router_vc_occupancy;
s->max_router_vc_occupancy = s->vc_occupancy[output_chan];
}
if(routing == PROG_ADAPTIVE)
{
if(tw_now(lp) - s->cur_hist_start_time[output_chan] >= WINDOW_LENGTH)
{
bf->c2 = 1;
msg->saved_hist_num = s->prev_hist_num[output_chan];
msg->saved_hist_start_time = s->cur_hist_start_time[output_chan];
s->prev_hist_num[output_chan] = s->cur_hist_num[output_chan];
s->cur_hist_start_time[output_chan] = tw_now(lp);
s->cur_hist_num[output_chan] = 1;
}
else
{
s->cur_hist_num[output_chan]++;
}
}
/* Determine the event type. If the packet has arrived at the final destination
router then it should arrive at the destination terminal next. */
if(next_stop == msg->dest_terminal_id)
{
m->type = T_ARRIVE;
m->magic = terminal_magic_num;
if(s->vc_occupancy[output_chan] >= s->params->cn_vc_size)
s->output_vc_state[output_chan] = VC_CREDIT;
}
else
{
/* The packet has to be sent to another router */
m->type = R_FORWARD;
m->magic = router_magic_num;
/* If this is a global channel then the buffer space is different */
if( global )
{
if(s->vc_occupancy[output_chan] >= s->params->global_vc_size)
s->output_vc_state[output_chan] = VC_CREDIT;
}
else
{
/* buffer space is less for local channels */
if( s->vc_occupancy[output_chan] >= s->params->local_vc_size)
s->output_vc_state[output_chan] = VC_CREDIT;
}
}
tw_event_send(e);
return;
}
/* given two group IDs, find the router of the src_gid that connects to the dest_gid*/
tw_lpid getRouterFromGroupID(int dest_gid,
int src_gid,
......@@ -542,7 +774,7 @@ tw_lpid getRouterFromGroupID(int dest_gid,
}
/*When a packet is sent from the current router and a buffer slot becomes available, a credit is sent back to schedule another packet event*/
void router_credit_send(router_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp)
static void router_credit_send(router_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp)
{
tw_event * buf_e;
tw_stime ts;
......@@ -695,7 +927,7 @@ static void packet_generate_send(terminal_state * s,
/* for reverse computation */
msg->saved_vc = chan;
/* simulation should exit */
/* simulation should exit */
if(chan == -1)
tw_error(TW_LOC, "\n No terminal buffers available, increase buffer size");
......@@ -707,7 +939,7 @@ static void packet_generate_send(terminal_state * s,
s->router_id/num_routers_per_mgrp,
s->router_id % num_routers_per_mgrp, &router_id);
e = tw_event_new(router_id, s->terminal_available_time - tw_now(lp), lp);
e = tw_event_new(router_id, s->terminal_available_time - tw_now(lp), lp);
m = tw_event_data(e);
memcpy(m, msg, sizeof(terminal_message));
......@@ -798,14 +1030,13 @@ static void packet_generate_send(terminal_state * s,
void * m_gen_data;
/* Keep the packet generate event a little behind packet send */
ts = s->terminal_available_time - tw_now(lp) + codes_local_latency(lp);
e_gen = model_net_method_event_new(lp->gid, ts, lp, DRAGONFLY, (void**)&m_gen,(void**)&m_gen_data);
e_gen = model_net_method_event_new(lp->gid, codes_local_latency(lp), lp, DRAGONFLY, (void**)&m_gen,(void**)&m_gen_data);
void *m_gen_data_src = model_net_method_get_edata(DRAGONFLY, msg);
memcpy(m_gen, msg, sizeof(terminal_message));
m_gen->chunk_id = msg->chunk_id + 1;
m_gen->type = T_GENERATE;
m_gen->type = T_GENERATE;
m_gen->travel_start_time = travel_start_time;
if (msg->remote_event_size_bytes){
......@@ -1538,11 +1769,13 @@ get_output_port( router_state * s,
if(s->global_channel[i] == local_router_id)
output_port = s->params->num_routers + i;
}
assert(output_port != -1);
}
else
{
output_port = local_router_id % s->params->num_routers;
}
//printf("\n Router id %d Next stop %d local router id %d output port %d ", s->router_id, next_stop, local_router_id, output_port);
// printf("\n output port not found %d next stop %d local router id %d group id %d intm grp id %d %d", output_port, next_stop, local_router_id, s->group_id, intm_grp_id, local_router_id%num_routers);
}
return output_port;
......@@ -1631,7 +1864,7 @@ static void router_packet_forward_rc( router_state * s,
terminal_message * msg,
tw_lp * lp)
{
router_rev_ecount++;
router_rev_ecount++;
router_ecount--;
uint64_t num_chunks = msg->num_chunks;
......@@ -1639,12 +1872,18 @@ static void router_packet_forward_rc( router_state * s,
total_hops--;
tw_rand_reverse_unif(lp->rng);
tw_rand_reverse_unif(lp->rng);
if(bf->c1)
{
s->num_waiting--;
remove_pending_list_head(&(s->head), &(s->tail), msg);
return;
}
tw_rand_reverse_unif(lp->rng);
int old_port = msg->sender_radix;
s->next_credit_available_time[old_port] = msg->saved_credit_time;
if(bf->c1)
return;
tw_rand_reverse_unif(lp->rng);
int output_chan = msg->new_vc;
......@@ -1683,19 +1922,17 @@ static void router_packet_forward( router_state * s,
bf->c2 = 0;
bf->c3 = 0;
tw_stime ts;
tw_event *e;
terminal_message *m;
int next_stop = -1, output_port = -1, output_chan = -1;
float bandwidth = s->params->local_bandwidth;
int i;
int buf_size = s->params->local_vc_size;
const dragonfly_param *p = s->params;
uint64_t num_chunks = msg->num_chunks;
if(msg->chunk_id == num_chunks - 1)
total_hops++;
router_credit_send(s, bf, msg, lp);
codes_mapping_get_lp_info(msg->dest_terminal_id, lp_group_name,
&mapping_grp_id, NULL, &mapping_type_id, NULL, &mapping_rep_id,
......@@ -1736,121 +1973,38 @@ static void router_packet_forward( router_state * s,
next_stop = get_next_stop(s, bf, msg, lp, msg->path_type, dest_router_id, intm_id);
}
output_port = get_output_port(s, bf, msg, lp, next_stop);
output_chan = output_port * s->params->num_vcs;
int global=0;
int buf_size = s->params->local_vc_size;
int begin_chan = output_port * s->params->num_vcs;
assert(output_port != -1 && begin_chan != -1 && output_port < s->params->radix);
assert(output_port != -1 && output_chan != -1 && output_port < s->params->radix);
// Allocate output Virtual Channel
if(output_port >= s->params->num_routers &&
output_port < s->params->num_routers + s->params->num_global_channels)
output_port < s->params->num_routers + s->params->num_global_channels)
{
bandwidth = s->params->global_bandwidth;
global = 1;
buf_size = s->params->global_vc_size;
buf_size = s->params->global_vc_size;
}
if(output_port >= s->params->num_routers + s->params->num_global_channels)
buf_size = s->params->cn_vc_size;
if(s->vc_occupancy[output_chan] >= buf_size)
buf_size = s->params->cn_vc_size;
for(i = begin_chan; i < begin_chan + p->num_vcs; i++)
{
if(s->vc_occupancy[i] < buf_size)
{
output_chan = i;
break;
}
}
if(output_chan == -1)
{
printf("\n %lf Router %ld buffers overflowed from incoming terminals channel %d occupancy %d radix %d next_stop %d buffer size %d ", tw_now(lp),(long int) lp->gid, output_chan, s->vc_occupancy[output_chan], s->params->radix, next_stop, buf_size);
bf->c1 = 1;
/* Here we want to add the packets in the waiting queue instead of terminating
* the simulation */
//printf("\n %lf Router %ld buffers overflowed from incoming terminals channel %d occupancy %d radix %d next_stop %d buffer size %d ", tw_now(lp),(long int) lp->gid, begin_chan, s->vc_occupancy[output_chan], s->params->radix, next_stop, buf_size);
bf->c1 = 1;
s->num_waiting++;
add_pending_router_message(&(s->head), &(s->tail), msg, begin_chan, next_stop);
return;
}
// If source router doesn't have global channel and buffer space is available, then assign to appropriate intra-group virtual channel
msg->saved_available_time = s->next_output_available_time[output_port];
ts = g_tw_lookahead + bytes_to_ns(s->params->chunk_size, bandwidth) + tw_rand_exponential(lp->rng, noise);
s->next_output_available_time[output_port] = maxd(s->next_output_available_time[output_port], tw_now(lp));
s->next_output_available_time[output_port] += ts;
// dest can be a router or a terminal, so we must check
void * m_data;
if (next_stop == msg->dest_terminal_id){
e = model_net_method_event_new(next_stop,
s->next_output_available_time[output_port] - tw_now(lp), lp,
DRAGONFLY, (void**)&m, &m_data);
}
else{
e = tw_event_new(next_stop, s->next_output_available_time[output_port] - tw_now(lp), lp);
m = tw_event_data(e);
m_data = m+1;
}
memcpy(m, msg, sizeof(terminal_message));
if (msg->remote_event_size_bytes){
memcpy(m_data, msg+1, msg->remote_event_size_bytes);
}
if(global)
m->last_hop=GLOBAL;
else
m->last_hop = LOCAL;
m->local_id = s->router_id;
/* for reverse computation */
msg->new_vc = output_chan;
/* for sending back credit in forward event handler */
m->saved_vc = output_chan;
m->intm_lp_id = lp->gid;
s->vc_occupancy[output_chan]++;
if(s->vc_occupancy[output_chan] > s->max_router_vc_occupancy)
{
bf->c3 = 1;
msg->saved_occupancy = s->max_router_vc_occupancy;
s->max_router_vc_occupancy = s->vc_occupancy[output_chan];
}
if(routing == PROG_ADAPTIVE)
{
if(tw_now(lp) - s->cur_hist_start_time[output_chan] >= WINDOW_LENGTH)
{
bf->c2 = 1;
msg->saved_hist_num = s->prev_hist_num[output_chan];
msg->saved_hist_start_time = s->cur_hist_start_time[output_chan];
s->prev_hist_num[output_chan] = s->cur_hist_num[output_chan];
s->cur_hist_start_time[output_chan] = tw_now(lp);
s->cur_hist_num[output_chan] = 1;
}
else
{
s->cur_hist_num[output_chan]++;
}
}
/* Determine the event type. If the packet has arrived at the final destination
router then it should arrive at the destination terminal next. */
if(next_stop == msg->dest_terminal_id)
{
m->type = T_ARRIVE;
m->magic = terminal_magic_num;
if(s->vc_occupancy[output_chan] >= s->params->cn_vc_size)
s->output_vc_state[output_chan] = VC_CREDIT;
}
else
{
/* The packet has to be sent to another router */
m->type = R_FORWARD;
m->magic = router_magic_num;
/* If this is a global channel then the buffer space is different */
if( global )
{
if(s->vc_occupancy[output_chan] >= s->params->global_vc_size)
s->output_vc_state[output_chan] = VC_CREDIT;
}
else
{
/* buffer space is less for local channels */
if( s->vc_occupancy[output_chan] >= s->params->local_vc_size)
s->output_vc_state[output_chan] = VC_CREDIT;
}
}
tw_event_send(e);
return;
send_packet_from_router(s, bf, msg, lp, output_chan, next_stop, NULL);
}
/* sets up the router virtual channels, global channels, local channels, compute node channels */
......@@ -1903,7 +2057,9 @@ void router_setup(router_state * r, tw_lp * lp)
r->cur_hist_num = (int*)malloc(p->radix * sizeof(int));
r->prev_hist_num = (int*)malloc(p->radix * sizeof(int));
r->max_router_vc_occupancy = 0;
r->head = NULL;
r->tail = NULL;
for(i=0; i < p->radix; i++)
{
// Set credit & router occupancy
......@@ -1968,6 +2124,12 @@ void router_buf_update_rc(
if(s->vc_occupancy[msg_indx] >= buf * num_chunks)
s->output_vc_state[msg_indx] = VC_CREDIT;
if(bf->c1)
{
struct pending_router_msgs * elem = msg->saved_elem;
add_pending_router_message(&(s->head), &(s->tail), elem, elem->output_chan, elem->next_stop);
}
}
/* Update the buffer space associated with this router LP */
void router_buf_update(router_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp)
......@@ -1975,6 +2137,8 @@ void router_buf_update(router_state * s, tw_bf * bf, terminal_message * msg, tw_
router_ecount++;
router_rev_ecount++;
bf->c1 = 0;
int msg_indx = msg->vc_index;
if(TRACK == lp->gid)
......@@ -1988,6 +2152,20 @@ void router_buf_update(router_state * s, tw_bf * bf, terminal_message * msg, tw_
//assert(s->vc_occupancy[msg_indx] > 0);
s->vc_occupancy[msg_indx]--;
s->output_vc_state[msg_indx] = VC_IDLE;
struct pending_router_msgs * elem =
remove_pending_router_msgs(&(s->head), &(s->tail), msg_indx);
if(elem)
{
bf->c1 = 1;
msg->saved_elem = elem;
s->num_waiting--;
send_packet_from_router(s, bf, &(elem->msg), lp, elem->output_chan, elem->next_stop, elem->event_data);
}
/* now traverse the waiting list and find the message waiting for this
* channel */
return;
}
......
......@@ -10,7 +10,7 @@ LPGROUPS
PARAMS
{
packet_size="2147483648";
message_size="280";
message_size="312";
modelnet_order=( "loggp" );
# scheduler options
modelnet_scheduler="fcfs";
......
......@@ -10,7 +10,7 @@ LPGROUPS
PARAMS
{
packet_size="512";
message_size="280";
message_size="312";
modelnet_order=( "simplenet" );
# scheduler options
modelnet_scheduler="priority";
......
......@@ -18,12 +18,12 @@ PARAMS
# modelnet_scheduler="round-robin";
num_vcs="1";
num_routers="4";
local_vc_size="32768";
global_vc_size="65536";
cn_vc_size="32768";
local_vc_size="2048";
global_vc_size="8192";
cn_vc_size="512";
local_bandwidth="5.25";
global_bandwidth="4.7";
cn_bandwidth="5.25";
message_size="304";
message_size="312";
routing="minimal";
}
......@@ -9,7 +9,7 @@ LPGROUPS
}
PARAMS
{
message_size="304";
message_size="312";
modelnet_order=( "loggp" );
# scheduler options
modelnet_scheduler="fcfs-full";
......
......@@ -9,7 +9,7 @@ LPGROUPS
}
PARAMS
{
message_size="256";
message_size="312";
packet_size="1024";
modelnet_order=("simplep2p");
# scheduler options
......
......@@ -14,7 +14,7 @@ PARAMS
# scheduler options
modelnet_scheduler="fcfs";
# modelnet_scheduler="round-robin";
message_size="2048";
message_size="312";
n_dims="4";
dim_length="4,2,2,2";
link_bandwidth="2.0";
......
......@@ -10,7 +10,7 @@ LPGROUPS
PARAMS
{
packet_size="512";
message_size="304";
message_size="312";
modelnet_order=( "simplenet" );
# scheduler options
modelnet_scheduler="fcfs";
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment