Commit a6b2a8f2 authored by Misbah Mubarak's avatar Misbah Mubarak
Browse files

Combining router receive and send events, leads to reduced event population

parent 2f0c8d1e
......@@ -161,10 +161,8 @@ enum event_t
{
T_GENERATE=1,
T_ARRIVE,
T_SEND,
T_BUFFER,
R_SEND,
R_ARRIVE,
R_FORWARD,
R_BUFFER,
D_COLLECTIVE_INIT,
D_COLLECTIVE_FAN_IN,
......@@ -582,12 +580,12 @@ void router_credit_send(router_state * s, tw_bf * bf, terminal_message * msg, tw
else
printf("\n Invalid message type");
// Assume it takes 0.1 ns of serialization latency for processing the credits in the queue
msg->sender_radix = sender_radix;
assert(sender_radix < s->params->radix );
int output_port = msg->saved_vc / p->num_vcs;
msg->saved_available_time = s->next_credit_available_time[sender_radix];
s->next_credit_available_time[sender_radix] = maxd(tw_now(lp), s->next_credit_available_time[output_port]);
msg->saved_credit_time = s->next_credit_available_time[sender_radix];
s->next_credit_available_time[sender_radix] = maxd(tw_now(lp), s->next_credit_available_time[sender_radix]);
ts = credit_delay + 0.1 + tw_rand_exponential(lp->rng, (double)credit_delay/1000);
s->next_credit_available_time[sender_radix]+=ts;
......@@ -617,9 +615,7 @@ static void packet_generate_send_rc(terminal_state * s,
terminal_message * msg,
tw_lp * lp)
{
term_pending_ecount++;
int i;
term_ecount++;
tw_rand_reverse_unif(lp->rng);
s->terminal_available_time = msg->saved_available_time;
......@@ -629,6 +625,9 @@ static void packet_generate_send_rc(terminal_state * s,
s->packet_counter--;
s->output_vc_state[vc] = VC_IDLE;
if(bf->c2)
s->max_term_vc_occupancy = msg->saved_occupancy;
if (msg->chunk_id == (msg->num_chunks-1)){
codes_local_latency_reverse(lp);
}
......@@ -649,8 +648,8 @@ static void packet_generate_send(terminal_state * s,
terminal_message * msg,
tw_lp * lp)
{
bf->c1 = 0;
term_ecount++;
term_pending_ecount++;
const dragonfly_param *p = s->params;
......@@ -680,8 +679,6 @@ static void packet_generate_send(terminal_state * s,
s->terminal_available_time = maxd(s->terminal_available_time, tw_now(lp));
s->terminal_available_time += ts;
if(msg->chunk_id == 0)
msg->travel_start_time = tw_now(lp);
int chan = -1, j;
for(j = 0; j < p->num_vcs; j++)
......@@ -711,12 +708,15 @@ static void packet_generate_send(terminal_state * s,
e = tw_event_new(router_id, s->terminal_available_time - tw_now(lp), lp);
m = tw_event_data(e);
memcpy(m, msg, sizeof(terminal_message));
if(msg->chunk_id == 0)
m->travel_start_time = tw_now(lp);
m->magic = router_magic_num;
m->origin_router_id = s->router_id;
m->type = R_ARRIVE;
m->type = R_FORWARD;
m->src_terminal_id = lp->gid;
m->chunk_id = msg->chunk_id;
m->saved_vc = chan;
m->last_hop = TERMINAL;
m->intm_group_id = -1;
m->path_type = -1;
......@@ -762,7 +762,11 @@ static void packet_generate_send(terminal_state * s,
s->vc_occupancy[chan]++;
if(s->vc_occupancy[chan] > s->max_term_vc_occupancy)
{
bf->c2 = 1;
msg->saved_occupancy = s->max_term_vc_occupancy;
s->max_term_vc_occupancy = s->vc_occupancy[chan];
}
if(s->vc_occupancy[chan] >= s->params->cn_vc_size)
s->output_vc_state[chan] = VC_CREDIT;
......@@ -794,7 +798,7 @@ static void packet_generate_send(terminal_state * s,
void *m_gen_data_src = model_net_method_get_edata(DRAGONFLY, msg);
memcpy(m_gen, msg, sizeof(terminal_message));
m_gen->chunk_id = ++(msg->chunk_id);
m_gen->chunk_id = msg->chunk_id + 1;
m_gen->type = T_GENERATE;
if (msg->remote_event_size_bytes){
......@@ -818,7 +822,7 @@ static void packet_arrive_rc(terminal_state * s,
tw_lp * lp)
{
term_ecount++;
uint64_t num_chunks = msg->packet_size / s->params->chunk_size;
uint64_t num_chunks = msg->num_chunks;
if (msg->packet_size % s->params->chunk_size)
num_chunks++;
......@@ -864,7 +868,7 @@ static void packet_arrive(terminal_state * s,
tw_lp * lp)
{
++term_ecount;
uint64_t num_chunks = msg->packet_size / s->params->chunk_size;
uint64_t num_chunks = msg->num_chunks;
if (msg->packet_size % s->params->chunk_size)
num_chunks++;
......@@ -879,6 +883,7 @@ static void packet_arrive(terminal_state * s,
if(msg->path_type != MINIMAL && msg->path_type != NON_MINIMAL)
printf("\n Wrong message path type %d ", msg->path_type);
#if DEBUG == 1
if( msg->packet_ID == TRACK && msg->chunk_id == num_chunks-1)
{
......@@ -1610,19 +1615,31 @@ static int do_adaptive_routing( router_state * s,
return next_stop;
}
static void router_packet_send_rc( router_state * s,
static void router_packet_forward_rc( router_state * s,
tw_bf * bf,
terminal_message * msg,
tw_lp * lp)
{
uint64_t num_chunks = msg->num_chunks;
router_ecount++;
if(msg->chunk_id == num_chunks - 1)
total_hops--;
tw_rand_reverse_unif(lp->rng);
tw_rand_reverse_unif(lp->rng);
int old_port = msg->sender_radix;
s->next_credit_available_time[old_port] = msg->saved_credit_time;
if(bf->c1)
return;
tw_rand_reverse_unif(lp->rng);
int output_chan = msg->old_vc;
int output_chan = msg->new_vc;
int output_port = output_chan / s->params->num_vcs;
if(bf->c3)
s->max_router_vc_occupancy = msg->saved_occupancy;
if(routing == PROG_ADAPTIVE)
{
if(bf->c2)
......@@ -1641,7 +1658,7 @@ static void router_packet_send_rc( router_state * s,
}
/* routes the current packet to the next stop */
static void router_packet_send( router_state * s,
static void router_packet_forward( router_state * s,
tw_bf * bf,
terminal_message * msg,
tw_lp * lp)
......@@ -1659,10 +1676,18 @@ static void router_packet_send( router_state * s,
int next_stop = -1, output_port = -1, output_chan = -1;
float bandwidth = s->params->local_bandwidth;
uint64_t num_chunks = msg->packet_size/s->params->chunk_size;
uint64_t num_chunks = msg->num_chunks;
if(msg->packet_size % s->params->chunk_size)
num_chunks++;
if(!num_chunks)
num_chunks = 1;
if(msg->chunk_id == num_chunks - 1)
total_hops++;
router_credit_send(s, bf, msg, lp);
codes_mapping_get_lp_info(msg->dest_terminal_id, lp_group_name,
&mapping_grp_id, NULL, &mapping_type_id, NULL, &mapping_rep_id,
&mapping_offset);
......@@ -1754,15 +1779,20 @@ static void router_packet_send( router_state * s,
else
m->last_hop = LOCAL;
m->saved_vc = output_chan;
m->local_id = s->router_id;
msg->old_vc = output_chan;
/* for reverse computation */
msg->new_vc = output_chan;
/* for sending back credit in forward event handler */
m->saved_vc = output_chan;
m->intm_lp_id = lp->gid;
s->vc_occupancy[output_chan]++;
if(s->vc_occupancy[output_chan] > s->max_router_vc_occupancy)
{
bf->c3 = 1;
msg->saved_occupancy = s->max_router_vc_occupancy;
s->max_router_vc_occupancy = s->vc_occupancy[output_chan];
}
if(routing == PROG_ADAPTIVE)
{
if(tw_now(lp) - s->cur_hist_start_time[output_chan] >= WINDOW_LENGTH)
......@@ -1794,7 +1824,7 @@ static void router_packet_send( router_state * s,
else
{
/* The packet has to be sent to another router */
m->type = R_ARRIVE;
m->type = R_FORWARD;
m->magic = router_magic_num;
/* If this is a global channel then the buffer space is different */
......@@ -1814,58 +1844,6 @@ static void router_packet_send( router_state * s,
return;
}
static void router_packet_receive_rc( router_state * s,
tw_bf * bf,
terminal_message * msg,
tw_lp * lp )
{
router_ecount++;
uint64_t num_chunks = msg->packet_size/s->params->chunk_size;
if(msg->packet_size % s->params->chunk_size)
num_chunks++;
if(msg->chunk_id == num_chunks - 1)
total_hops--;
tw_rand_reverse_unif(lp->rng);
tw_rand_reverse_unif(lp->rng);
int output_port = msg->saved_vc/s->params->num_vcs;
s->next_credit_available_time[output_port] = msg->saved_available_time;
}
/* Packet arrives at the router and a credit is sent back to the sending terminal/router */
static void router_packet_receive( router_state * s,
tw_bf * bf,
terminal_message * msg,
tw_lp * lp )
{
router_ecount++;
uint64_t num_chunks = msg->packet_size/s->params->chunk_size;
if(msg->packet_size % s->params->chunk_size)
num_chunks++;
if(msg->chunk_id == num_chunks - 1)
total_hops++;
tw_event *e;
terminal_message *m;
tw_stime ts;
ts = g_tw_lookahead + 0.1 + tw_rand_exponential(lp->rng, noise);
if(msg->packet_ID == TRACK && msg->chunk_id == num_chunks-1)
printf("\n packet %lld chunk %d received at router %d ", msg->packet_ID, msg->chunk_id, (int)lp->gid);
router_credit_send(s, bf, msg, lp);
// router self message - no need for method_event
e = tw_event_new(lp->gid, ts, lp);
m = tw_event_data(e);
memcpy(m, msg, sizeof(terminal_message) + msg->remote_event_size_bytes);
m->type = R_SEND;
tw_event_send(e);
return;
}
/* sets up the router virtual channels, global channels, local channels, compute node channels */
void router_setup(router_state * r, tw_lp * lp)
{
......@@ -2006,12 +1984,8 @@ void router_event(router_state * s, tw_bf * bf, terminal_message * msg, tw_lp *
assert(msg->magic == router_magic_num);
switch(msg->type)
{
case R_SEND: // Router has sent a packet to an intra-group router (local channel)
router_packet_send(s, bf, msg, lp);
break;
case R_ARRIVE: // Router has received a packet from an intra-group router (local channel)
router_packet_receive(s, bf, msg, lp);
case R_FORWARD: // Router has sent a packet to an intra-group router (local channel)
router_packet_forward(s, bf, msg, lp);
break;
case R_BUFFER:
......@@ -2084,12 +2058,8 @@ void router_rc_event_handler(router_state * s, tw_bf * bf, terminal_message * ms
num_chunks++;
switch(msg->type)
{
case R_SEND:
router_packet_send_rc(s, bf, msg, lp);
break;
case R_ARRIVE:
router_packet_receive_rc(s, bf, msg, lp);
case R_FORWARD:
router_packet_forward_rc(s, bf, msg, lp);
break;
case R_BUFFER:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment