Commit 5574637a authored by Misbah Mubarak's avatar Misbah Mubarak
Browse files

Merging packet generate and send functions in dragonfly

parent 730a18c7
......@@ -358,10 +358,10 @@ static void dragonfly_report_stats()
/* print statistics */
if(!g_tw_mynode)
{
printf("\n total finished packets %d ", total_finished_packets);
printf("\n total finished packets %d ", total_finished_packets);
printf(" Average number of hops traversed %f average message latency %lf us maximum message latency %lf us \n", (float)avg_hops/total_finished_packets, avg_time/(total_finished_packets*1000), max_time/1000);
if(routing == ADAPTIVE || routing == PROG_ADAPTIVE)
printf("\n ADAPTIVE ROUTING STATS: %d percent packets routed minimally %d percent packets routed non-minimally completed packets %d ", total_minimal_packets, total_nonmin_packets, total_completed_packets);
printf("\n ADAPTIVE ROUTING STATS: %d packets routed minimally %d packets routed non-minimally completed packets %d ", total_minimal_packets, total_nonmin_packets, total_completed_packets);
}
return;
......@@ -573,37 +573,46 @@ void router_credit_send(router_state * s, tw_bf * bf, terminal_message * msg, tw
return;
}
static void packet_generate_rc(terminal_state * s,
static void packet_generate_send_rc(terminal_state * s,
tw_bf * bf,
terminal_message * msg,
tw_lp * lp)
{
int i;
tw_rand_reverse_unif(lp->rng);
if(bf->c1)
tw_rand_reverse_unif(lp->rng);
mn_stats* stat;
s->terminal_available_time = msg->saved_available_time;
tw_rand_reverse_unif(lp->rng);
int vc = msg->saved_vc;
s->vc_occupancy[vc]--;
s->packet_counter--;
s->output_vc_state[vc] = VC_IDLE;
if (msg->chunk_id == (msg->num_chunks-1)){
codes_local_latency_reverse(lp);
}
struct mn_stats* stat;
stat = model_net_find_stats(msg->category, s->dragonfly_stats_array);
stat->send_count--;
stat->send_bytes -= msg->packet_size;
stat->send_time -= (1/s->params->cn_bandwidth) * msg->packet_size;
}
/* generates packet at the current dragonfly compute node */
static void packet_generate(terminal_state * s,
static void packet_generate_send(terminal_state * s,
tw_bf * bf,
terminal_message * msg,
tw_lp * lp)
{
const dragonfly_param *p = s->params;
const dragonfly_param *p = s->params;
tw_stime ts;
tw_event *e;
tw_lpid router_id;
terminal_message *m;
int i, total_event_size;
int chunk_id = msg->chunk_id;
uint64_t num_chunks = msg->packet_size / p->chunk_size;
if (msg->packet_size % s->params->chunk_size)
num_chunks++;
......@@ -615,8 +624,13 @@ static void packet_generate(terminal_state * s,
msg->packet_ID = lp->gid + g_tw_nlp * s->packet_counter + tw_rand_integer(lp->rng, 0, lp->gid + g_tw_nlp * s->packet_counter);
msg->travel_start_time = tw_now(lp);
ts = codes_local_latency(lp);
// Each packet is broken into chunks and then sent over the channel
msg->saved_available_time = s->terminal_available_time;
double head_delay = (1/s->params->cn_bandwidth) * s->params->chunk_size;
ts = head_delay + tw_rand_exponential(lp->rng, (double)head_delay/200);
s->terminal_available_time = maxd(s->terminal_available_time, tw_now(lp));
s->terminal_available_time += ts;
int chan = -1, j;
for(j = 0; j < p->num_vcs; j++)
{
......@@ -626,43 +640,85 @@ static void packet_generate(terminal_state * s,
break;
}
}
// this is a terminal event, so use the method-event version
void * m_data;
e = model_net_method_event_new(lp->gid, ts, lp, DRAGONFLY,
(void**)&m, &m_data);
memcpy(m, msg, sizeof(terminal_message));
void * m_data_src = model_net_method_get_edata(DRAGONFLY, msg);
if (msg->remote_event_size_bytes){
memcpy(m_data, m_data_src, msg->remote_event_size_bytes);
}
if (msg->local_event_size_bytes){
memcpy((char*)m_data + msg->remote_event_size_bytes,
(char*)m_data_src + msg->remote_event_size_bytes,
msg->local_event_size_bytes);
}
m->intm_group_id = -1;
m->saved_vc=0;
m->chunk_id = msg->chunk_id;
m->magic = terminal_magic_num;
/* if(msg->packet_ID == TRACK && msg->chunk_id == num_chunks-1)
printf("\n packet generated %lld at terminal %d chunk id %d ", msg->packet_ID, (int)lp->gid, i);
*/
m->output_chan = -1;
if(chan != -1) // If the input queue is available
/* for reverse computation */
msg->saved_vc = chan;
/* simulation should exit */
if(chan == -1)
tw_error(TW_LOC, "\n No terminal buffers available, increase buffer size");
//TODO: be annotation-aware
codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, NULL,
&mapping_type_id, NULL, &mapping_rep_id, &mapping_offset);
codes_mapping_get_lp_id(lp_group_name, "dragonfly_router", NULL, 1,
s->router_id/num_routers_per_mgrp,
s->router_id % num_routers_per_mgrp, &router_id);
e = tw_event_new(router_id, s->terminal_available_time - tw_now(lp), lp);
m = tw_event_data(e);
memcpy(m, msg, sizeof(terminal_message));
m->magic = router_magic_num;
m->origin_router_id = s->router_id;
m->type = R_ARRIVE;
m->src_terminal_id = lp->gid;
m->chunk_id = msg->chunk_id;
m->saved_vc = chan;
m->last_hop = TERMINAL;
m->intm_group_id = -1;
m->path_type = -1;
m->local_event_size_bytes = 0;
m->local_id = s->terminal_id;
if (msg->remote_event_size_bytes){
memcpy(m+1, model_net_method_get_edata(DRAGONFLY, msg),
msg->remote_event_size_bytes);
}
tw_event_send(e);
if(msg->chunk_id == msg->num_chunks - 1)
{
// Send the packet out
m->type = T_SEND;
tw_event_send(e);
// now that message is sent, issue an "idle" event to tell the scheduler
// when I'm next available
model_net_method_idle_event(codes_local_latency(lp) +
s->terminal_available_time - tw_now(lp), 0, lp);
/* local completion message */
if(msg->local_event_size_bytes > 0)
{
tw_event* e_new;
terminal_message* m_new;
void* local_event =
(char*)model_net_method_get_edata(DRAGONFLY, msg) +
msg->remote_event_size_bytes;
ts = g_tw_lookahead + (1/s->params->cn_bandwidth) * msg->local_event_size_bytes;
e_new = tw_event_new(msg->sender_lp, ts, lp);
m_new = tw_event_data(e_new);
memcpy(m_new, local_event, msg->local_event_size_bytes);
tw_event_send(e_new);
}
}
else
{
printf("\n Exceeded queue size, exitting %d", s->vc_occupancy[0]);
MPI_Finalize();
exit(-1);
} //else
s->packet_counter++;
s->vc_occupancy[chan]++;
if(s->vc_occupancy[chan] >= s->params->cn_vc_size)
s->output_vc_state[chan] = VC_CREDIT;
/* calculate statistics */
total_event_size = model_net_get_msg_sz(DRAGONFLY) +
msg->remote_event_size_bytes + msg->local_event_size_bytes;
struct mn_stats* stat;
stat = model_net_find_stats(msg->category, s->dragonfly_stats_array);
stat->send_count++;
stat->send_bytes += msg->packet_size;
stat->send_time += (1/p->cn_bandwidth) * msg->packet_size;
if(stat->max_event_size < total_event_size)
stat->max_event_size = total_event_size;
/* Now schedule another packet generate event */
if(chunk_id < num_chunks - 1)
if(msg->chunk_id < num_chunks - 1)
{
bf->c1 = 1;
/* Issue another packet generate event */
......@@ -678,7 +734,7 @@ static void packet_generate(terminal_state * s,
void *m_gen_data_src = model_net_method_get_edata(DRAGONFLY, msg);
memcpy(m_gen, msg, sizeof(terminal_message));
m_gen->chunk_id = ++chunk_id;
m_gen->chunk_id = ++(msg->chunk_id);
m_gen->type = T_GENERATE;
if (msg->remote_event_size_bytes){
......@@ -692,17 +748,7 @@ static void packet_generate(terminal_state * s,
tw_event_send(e_gen);
}
total_event_size = model_net_get_msg_sz(DRAGONFLY) +
msg->remote_event_size_bytes + msg->local_event_size_bytes;
mn_stats* stat;
stat = model_net_find_stats(msg->category, s->dragonfly_stats_array);
stat->send_count++;
stat->send_bytes += msg->packet_size;
stat->send_time += (1/p->cn_bandwidth) * msg->packet_size;
if(stat->max_event_size < total_event_size)
stat->max_event_size = total_event_size;
return;
}
......@@ -711,112 +757,25 @@ static void packet_send_rc(terminal_state * s,
terminal_message * msg,
tw_lp * lp)
{
s->terminal_available_time = msg->saved_available_time;
tw_rand_reverse_unif(lp->rng);
int vc = msg->saved_vc;
s->vc_occupancy[vc]--;
s->packet_counter--;
s->output_vc_state[vc] = VC_IDLE;
if (msg->chunk_id == (msg->num_chunks)-1){
codes_local_latency_reverse(lp);
}
}
/* sends the packet from the current dragonfly compute node to the attached router */
static void packet_send(terminal_state * s,
tw_bf * bf,
terminal_message * msg,
tw_lp * lp)
{
tw_stime ts;
tw_event *e;
terminal_message *m;
tw_lpid router_id;
/* Route the packet to its source router */
int vc=msg->saved_vc;
// Each packet is broken into chunks and then sent over the channel
msg->saved_available_time = s->terminal_available_time;
double head_delay = (1/s->params->cn_bandwidth) * s->params->chunk_size;
ts = head_delay + tw_rand_exponential(lp->rng, (double)head_delay/200);
s->terminal_available_time = maxd(s->terminal_available_time, tw_now(lp));
s->terminal_available_time += ts;
//TODO: be annotation-aware
codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, NULL,
&mapping_type_id, NULL, &mapping_rep_id, &mapping_offset);
codes_mapping_get_lp_id(lp_group_name, "dragonfly_router", NULL, 1,
s->router_id/num_routers_per_mgrp, s->router_id % num_routers_per_mgrp, &router_id);
// we are sending an event to the router, so no method_event here
e = tw_event_new(router_id, s->terminal_available_time - tw_now(lp), lp);
//if(msg->packet_ID == TRACK && msg->chunk_id == num_chunks-1)
// printf("\n terminal %d packet %lld chunk %d being sent to router %d router id %d ", (int)lp->gid, (long long)msg->packet_ID, msg->chunk_id, (int)router_id, s->router_id);
m = tw_event_data(e);
memcpy(m, msg, sizeof(terminal_message));
if (msg->remote_event_size_bytes){
memcpy(m+1, model_net_method_get_edata(DRAGONFLY, msg),
msg->remote_event_size_bytes);
}
m->magic = router_magic_num;
m->origin_router_id = s->router_id;
m->type = R_ARRIVE;
m->src_terminal_id = lp->gid;
m->saved_vc = vc;
m->last_hop = TERMINAL;
m->intm_group_id = -1;
m->path_type = -1;
m->local_event_size_bytes = 0;
m->local_id = s->terminal_id;
tw_event_send(e);
if(msg->chunk_id == msg->num_chunks - 1)
{
// now that message is sent, issue an "idle" event to tell the scheduler
// when I'm next available
model_net_method_idle_event(codes_local_latency(lp) +
s->terminal_available_time - tw_now(lp), 0, lp);
/* local completion message */
if(msg->local_event_size_bytes > 0)
{
tw_event* e_new;
terminal_message* m_new;
void* local_event =
(char*)model_net_method_get_edata(DRAGONFLY, msg) +
msg->remote_event_size_bytes;
ts = g_tw_lookahead + (1/s->params->cn_bandwidth) * msg->local_event_size_bytes;
e_new = tw_event_new(msg->sender_lp, ts, lp);
m_new = tw_event_data(e_new);
memcpy(m_new, local_event, msg->local_event_size_bytes);
tw_event_send(e_new);
}
}
s->packet_counter++;
s->vc_occupancy[vc]++;
if(s->vc_occupancy[vc] >= s->params->cn_vc_size)
s->output_vc_state[vc] = VC_CREDIT;
return;
}
static void packet_arrive_rc(terminal_state * s,
tw_bf * bf,
terminal_message * msg,
tw_lp * lp)
{
completed_packets--;
uint64_t num_chunks = msg->packet_size / s->params->chunk_size;
if (msg->packet_size % s->params->chunk_size)
num_chunks++;
if(msg->chunk_id == num_chunks - 1)
completed_packets--;
if(msg->path_type == MINIMAL)
if(msg->path_type == MINIMAL && msg->chunk_id == num_chunks - 1)
minimal_count--;
if(msg->path_type == NON_MINIMAL)
if(msg->path_type == NON_MINIMAL && msg->chunk_id == num_chunks - 1)
nonmin_count--;
tw_rand_reverse_unif(lp->rng);
......@@ -855,12 +814,13 @@ static void packet_arrive(terminal_state * s,
if (msg->packet_size % s->params->chunk_size)
num_chunks++;
completed_packets++;
if(msg->chunk_id == num_chunks - 1)
completed_packets++;
if(msg->path_type == MINIMAL)
if(msg->path_type == MINIMAL && msg->chunk_id == num_chunks - 1)
minimal_count++;
if(msg->path_type == NON_MINIMAL)
if(msg->path_type == NON_MINIMAL && msg->chunk_id == num_chunks - 1)
nonmin_count++;
if(msg->path_type != MINIMAL && msg->path_type != NON_MINIMAL)
......@@ -1305,17 +1265,13 @@ terminal_event( terminal_state * s,
switch(msg->type)
{
case T_GENERATE:
packet_generate(s,bf,msg,lp);
packet_generate_send(s,bf,msg,lp);
break;
case T_ARRIVE:
packet_arrive(s,bf,msg,lp);
break;
case T_SEND:
packet_send(s,bf,msg,lp);
break;
case T_BUFFER:
terminal_buf_update(s, bf, msg, lp);
break;
......@@ -1995,13 +1951,9 @@ void terminal_rc_event_handler(terminal_state * s, tw_bf * bf, terminal_message
switch(msg->type)
{
case T_GENERATE:
packet_generate_rc(s, bf, msg, lp);
packet_generate_send_rc(s, bf, msg, lp);
break;
case T_SEND:
packet_send_rc(s, bf, msg, lp);
break;
case T_ARRIVE:
packet_arrive_rc(s, bf, msg, lp);
break;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment