Commit 6cbfff60 authored by Misbah Mubarak's avatar Misbah Mubarak

Updates to torus network model (a) adding bubble flow control (b) credit based flow control updated

parent b1f0e3cc
......@@ -33,7 +33,11 @@ struct nodes_message
/* message saved collective time */
tw_stime saved_collective_init_time;
/* for saving recv and total times*/
tw_stime saved_recv_time;
tw_stime saved_total_time;
/* packet ID */
unsigned long long packet_ID;
/* event type of the message */
......
......@@ -25,5 +25,5 @@ PARAMS
global_bandwidth="4.7";
cn_bandwidth="5.25";
message_size="552";
routing="minimal";
routing="adaptive";
}
......@@ -10,7 +10,7 @@ LPGROUPS
PARAMS
{
packet_size="512";
message_size="488";
message_size="552";
modelnet_order=( "torus" );
# scheduler options
modelnet_scheduler="fcfs";
......
......@@ -25,5 +25,5 @@ PARAMS
global_bandwidth="4.7";
cn_bandwidth="5.25";
message_size="512";
routing="minimal";
routing="nonminimal";
}
......@@ -163,8 +163,8 @@ static void handle_kickoff_rev_event(
if(m->incremented_flag)
return;
ns->msg_sent_count--;
model_net_event_rc(net_id, lp, PAYLOAD_SZ);
ns->msg_sent_count--;
tw_rand_reverse_unif(lp->rng);
}
static void handle_kickoff_event(
......@@ -222,8 +222,6 @@ static void handle_kickoff_event(
ns->msg_sent_count++;
model_net_event(net_id, "test", global_dest, PAYLOAD_SZ, 0.0, sizeof(svr_msg), (const void*)m_remote, sizeof(svr_msg), (const void*)m_local, lp);
ns->msg_sent_count++;
issue_event(ns, lp);
return;
}
......
......@@ -316,7 +316,7 @@ static int dragonfly_hash_func(void *k, int table_size)
struct dfly_hash_key *tmp = (struct dfly_hash_key *)k;
uint32_t pc = 0, pb = 0;
bj_hashlittle2(tmp, sizeof(*tmp), &pc, &pb);
return (int)(pc % (table_size - 1));
return (int)(pc % (uint32_t)(table_size - 1));
}
/* convert GiB/s and bytes to ns */
......@@ -1316,7 +1316,9 @@ void packet_arrive_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw
if(bf->c7)
{
assert(!hash_link);
if(hash_link)
printf("\n Num chunks %d ", tmp->num_chunks);
//assert(!hash_link);
s->finished_msgs--;
total_msg_sz -= msg->total_size;
N_finished_msgs--;
......@@ -1405,12 +1407,6 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg,
int num_chunks = msg->packet_size / s->params->chunk_size;
int total_chunks = msg->total_size / s->params->chunk_size;
if(msg->chunk_id == num_chunks - 1)
{
bf->c31 = 1;
s->packet_fin++;
packet_fin++;
}
if(msg->total_size % s->params->chunk_size)
total_chunks++;
......@@ -1429,6 +1425,12 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg,
if(msg->path_type == NON_MINIMAL)
nonmin_count++;
if(msg->chunk_id == num_chunks - 1)
{
bf->c31 = 1;
s->packet_fin++;
packet_fin++;
}
if(msg->path_type != MINIMAL && msg->path_type != NON_MINIMAL)
printf("\n Wrong message path type %d ", msg->path_type);
......
......@@ -15,6 +15,7 @@
#include "codes/model-net-method.h"
#include "codes/model-net-lp.h"
#include "codes/net/torus.h"
#include "codes/rc-stack.h"
#define DEBUG 1
#define MEAN_INTERVAL 100
......@@ -36,7 +37,6 @@
static double maxd(double a, double b) { return a < b ? b : a; }
/* Torus network model implementation of codes, implements the modelnet API */
typedef struct nodes_message_list nodes_message_list;
struct nodes_message_list {
nodes_message msg;
......@@ -175,6 +175,9 @@ struct nodes_state
const char * anno;
/* LPs configuration */
const torus_param * params;
/* create the RC stack */
struct rc_stack * st;
};
static void append_to_node_message_list(
......@@ -552,11 +555,9 @@ static void credit_send( nodes_state * s,
if(sq == -1) {
m->source_direction = msg->source_direction;
m->source_dim = msg->source_dim;
m->source_channel = msg->source_channel;
} else {
m->source_direction = msg->saved_queue % 2;
m->source_dim = msg->saved_queue / 2;
m->source_channel = msg->saved_channel;
}
m->type = CREDIT;
tw_event_send(e);
......@@ -568,6 +569,8 @@ static void torus_init( nodes_state * s,
int i, j;
char anno[MAX_NAME_LENGTH];
rc_stack_create(&s->st);
codes_mapping_get_lp_info(lp->gid, grp_name, &mapping_grp_id, NULL, &mapping_type_id, anno, &mapping_rep_id, &mapping_offset);
if (anno[0] == '\0'){
s->anno = NULL;
......@@ -782,7 +785,7 @@ static void send_remote_event(nodes_state * s,
tw_stime ts;
nodes_message * m;
ts = (1/s->params->link_bandwidth) * msg->remote_event_size_bytes;
e = codes_event_new(s->origin_svr, ts, lp);
e = tw_event_new(s->origin_svr, ts, lp);
m = tw_event_data(e);
char* tmp_ptr = (char*)msg;
tmp_ptr += torus_get_msg_sz();
......@@ -1071,6 +1074,8 @@ static void enqueue_msg( nodes_state * ns,
int num_chunks = msg->packet_size/ns->params->chunk_size;
if(msg->packet_size % ns->params->chunk_size)
num_chunks++;
if(!num_chunks)
num_chunks = 1;
ns->packet_counter++;
......@@ -1144,6 +1149,46 @@ static void enqueue_msg( nodes_state * ns,
}
/* record the maximum ROSS event size */
static void packet_generate_rc( nodes_state * s,
tw_bf * bf,
nodes_message * msg,
tw_lp * lp )
{
tw_rand_reverse_unif(lp->rng);
s->packet_counter--;
int j;
int queue = msg->source_direction + (msg->source_dim * 2);
uint64_t num_chunks = msg->packet_size/s->params->chunk_size;
if(msg->packet_size % s->params->chunk_size)
num_chunks++;
if(!num_chunks)
num_chunks = 1;
for(j = 0; j < num_chunks; j++)
{
nodes_message_list* cur_entry = return_tail(s,
s->terminal_msgs, s->terminal_msgs_tail, queue);
s->terminal_length[queue] -= s->params->chunk_size;
s->all_term_length -= s->params->chunk_size;
delete_nodes_message_list(cur_entry);
}
if(bf->c11) {
s->issueIdle = 0;
}
if(bf->c8) {
tw_rand_reverse_unif(lp->rng);
s->in_send_loop[queue] = 0;
}
mn_stats* stat;
stat = model_net_find_stats(msg->category, s->torus_stats_array);
stat->send_count--;
stat->send_bytes -= msg->packet_size;
stat->send_time -= (1/s->params->link_bandwidth) * msg->packet_size;
}
/*Generates a packet. If there is a buffer slot available, then the packet is
injected in the network. Else, a buffer overflow exception is thrown.
TODO: We might want to modify this so that if the buffer is full, the packet
......@@ -1161,10 +1206,78 @@ static void packet_generate( nodes_state * s,
tw_lpid dst_lp = msg->dest_lp;
enqueue_msg( s, bf, msg, lp, dst_lp, ts );
}
static void packet_send_rc(nodes_state * s,
tw_bf * bf,
nodes_message * msg,
tw_lp * lp)
{
int queue = msg->source_direction + (msg->source_dim * 2);
if(bf->c1)
{
s->in_send_loop[queue] = 1;
return;
}
if(bf->c3) {
s->buffer[queue][STATICQ] -= s->params->chunk_size;
}
if(bf->c4) {
s->in_send_loop[queue] = 1;
}
tw_rand_reverse_unif(lp->rng);
s->next_link_available_time[queue][0] = msg->saved_available_time;
if(bf->c20)
{
s->link_traffic[queue] -= msg->packet_size % s->params->chunk_size;
}
if(bf->c21)
{
s->link_traffic[queue] -= s->params->chunk_size;
}
if(bf->c6)
{
codes_local_latency_reverse(lp);
}
if(bf->c31)
{
create_prepend_to_node_message_list(s, s->terminal_msgs,
s->terminal_msgs_tail, queue, msg);
s->terminal_length[queue] += s->params->chunk_size;
s->all_term_length += s->params->chunk_size;
}
if(bf->c8)
{
create_prepend_to_node_message_list(s, s->pending_msgs[queue],
s->pending_msgs_tail[queue], STATICQ, msg);
}
if(bf->c22)
{
s->issueIdle = 1;
codes_local_latency_reverse(lp);
}
if(bf->c9)
{
tw_rand_reverse_unif(lp->rng);
}
if(bf->c10)
{
s->in_send_loop[queue] = 1;
}
}
/* send a packet from one torus node to another torus node
A packet can be up to 256 bytes on BG/L and BG/P and up to 512 bytes on BG/Q */
static void packet_send( nodes_state * s,
tw_bf * bf,
tw_bf * bf,
nodes_message * msg,
tw_lp * lp )
{
......@@ -1183,13 +1296,14 @@ static void packet_send( nodes_state * s,
&& s->terminal_msgs[queue] == NULL) {
bf->c1 = 1;
s->in_send_loop[queue] = 0;
//printf("[%d] Empty queue; return\n", lp->gid);
return;
}
nodes_message_list *cur_entry = s->pending_msgs[queue][STATICQ];
if(s->params->routing == STATIC && cur_entry == NULL) {
if(cur_entry == NULL) {
/* Bubble flow control method here, checking if there are 2 empty
* buffer slots only then forward newly injected packets */
if((s->buffer[queue][STATICQ] + (2 * s->params->chunk_size) <= s->params->buffer_size)) {
bf->c3 = 1;
s->buffer[queue][STATICQ] += s->params->chunk_size;
......@@ -1201,14 +1315,15 @@ static void packet_send( nodes_state * s,
s->in_send_loop[queue] = 0;
return;
}
} else {
bf->c5 = 1;
}
}
int num_chunks = cur_entry->msg.packet_size/s->params->chunk_size;
if(cur_entry->msg.packet_size % s->params->chunk_size)
num_chunks++;
if(!num_chunks)
num_chunks = 1;
double bytetime;
if((cur_entry->msg.packet_size % s->params->chunk_size) && (cur_entry->msg.chunk_id == num_chunks - 1))
{
......@@ -1234,13 +1349,6 @@ static void packet_send( nodes_state * s,
}
m->type = ARRIVAL;
m->source_channel = STATICQ;
if(msg->packet_ID == TRACE)
printf("\n lp %d packet %lld flit id %d being sent to %d after time %lf ",
(int) lp->gid, m->packet_ID, m->chunk_id, m->next_stop,
s->next_link_available_time[queue][0] - tw_now(lp));
m->sender_node = lp->gid;
m->local_event_size_bytes = 0; /* We just deliver the local event here */
......@@ -1275,7 +1383,9 @@ static void packet_send( nodes_state * s,
}
}
/* isT=1 means that we can send the newly injected packets */
if(isT) {
bf->c31 = 1;
cur_entry = return_head(s, s->terminal_msgs, s->terminal_msgs_tail,
queue);
s->terminal_length[queue] -= s->params->chunk_size;
......@@ -1320,6 +1430,74 @@ static void packet_send( nodes_state * s,
}
}
static void packet_arrive_rc(nodes_state * s,
tw_bf * bf,
nodes_message * msg,
tw_lp * lp)
{
tw_rand_reverse_unif(lp->rng);
msg->my_N_hop--;
if(bf->c1)
{
tw_rand_reverse_unif(lp->rng);
}
if(bf->c2)
{
struct mn_stats* stat;
stat = model_net_find_stats(msg->category, s->torus_stats_array);
stat->recv_count--;
stat->recv_bytes -= msg->packet_size;
stat->recv_time = msg->saved_recv_time;
N_finished_packets--;
total_time -= msg->saved_total_time;
total_hops -= msg->my_N_hop;
if(bf->c3)
{
max_latency = msg->saved_available_time;
}
if(bf->c31)
{
model_net_event_rc2(lp, &msg->event_rc);
}
}
if(bf->c6)
{
nodes_message_list * cur_entry = NULL;
if(bf->c30)
{
cur_entry = return_tail(s, s->queued_msgs[msg->saved_queue],
s->queued_msgs_tail[msg->saved_queue], STATICQ);
s->queued_length[msg->saved_queue] -= s->params->chunk_size;
}
if(bf->c9 || bf->c11)
{
cur_entry = return_tail(s, s->pending_msgs[msg->saved_queue],
s->pending_msgs_tail[msg->saved_queue], STATICQ);
s->buffer[msg->saved_queue][STATICQ] -= s->params->chunk_size;
tw_rand_reverse_unif(lp->rng);
}
if(bf->c8)
{
cur_entry = return_tail(s, s->other_msgs,
s->other_msgs_tail, msg->saved_queue);
}
delete_nodes_message_list(cur_entry);
if(bf->c13)
{
codes_local_latency_reverse(lp);
s->in_send_loop[msg->saved_queue] = 0;
}
}
}
/*Processes the packet after it arrives from the neighboring torus node
* routes it to the next compute node if this is not the destination
* OR if this is the destination then a remote event at the server is issued. */
......@@ -1348,17 +1526,22 @@ static void packet_arrive( nodes_state * s,
if(msg->packet_size % s->params->chunk_size)
num_chunks++;
if(!num_chunks)
num_chunks = 1;
if( msg->chunk_id == num_chunks - 1 )
{
bf->c2 = 1;
stat = model_net_find_stats(msg->category, s->torus_stats_array);
stat->recv_count++;
stat->recv_bytes += msg->packet_size;
stat->recv_time += tw_now( lp ) - msg->travel_start_time;
msg->saved_recv_time = stat->recv_time;
stat->recv_time += tw_now( lp ) - msg->travel_start_time;
/*count the number of packets completed overall*/
N_finished_packets++;
total_time += tw_now( lp ) - msg->travel_start_time;
msg->saved_total_time = total_time;
total_time += tw_now( lp ) - msg->travel_start_time;
total_hops += msg->my_N_hop;
if (max_latency < tw_now( lp ) - msg->travel_start_time) {
......@@ -1371,6 +1554,7 @@ static void packet_arrive( nodes_state * s,
{
void *tmp_ptr = model_net_method_get_edata(TORUS, msg);
if (msg->is_pull){
bf->c31 = 1;
int net_id = model_net_get_id(LP_METHOD_NM);
struct codes_mctx mc_dst =
codes_mctx_set_global_direct(msg->sender_node);
......@@ -1394,18 +1578,11 @@ static void packet_arrive( nodes_state * s,
{
bf->c6 = 1;
int tmp_dir, tmp_dim, queue;
int tmp_dirD, tmp_dimD, queueD;
tw_lpid dst_lp = msg->dest_lp;
tw_lpid dst_lpD = msg->dest_lp;
dimension_order_routing(s, &dst_lp, &tmp_dim, &tmp_dir);
queue = tmp_dir + (tmp_dim * 2);
queueD = tmp_dir + ( tmp_dim * 2 );
tmp_dimD = tmp_dim;
tmp_dirD = tmp_dir;
dst_lpD = dst_lp;
nodes_message_list * cur_chunk = (nodes_message_list *)malloc(
sizeof(nodes_message_list));
init_nodes_message_list(cur_chunk, msg);
......@@ -1426,12 +1603,11 @@ static void packet_arrive( nodes_state * s,
cur_chunk->msg.source_dim = tmp_dim;
cur_chunk->msg.source_direction = tmp_dir;
msg->saved_queue = queue;
msg->saved_channel = STATICQ;
if(s->buffer[queue][STATICQ] + multfactor * s->params->chunk_size
if(s->buffer[queue][STATICQ] + s->params->chunk_size
> s->params->buffer_size) {
bf->c30 = 1;
cur_chunk->msg.saved_queue =
msg->source_direction + ( msg->source_dim * 2 );
cur_chunk->msg.saved_channel = msg->source_channel;
append_to_node_message_list(s, s->queued_msgs[queue],
s->queued_msgs_tail[queue], STATICQ, cur_chunk);
s->queued_length[queue] += s->params->chunk_size;
......@@ -1465,9 +1641,11 @@ static void packet_arrive( nodes_state * s,
bf->c8 = 1;
append_to_node_message_list(s, s->other_msgs,
s->other_msgs_tail, queue, cur_chunk);
cur_chunk->msg.saved_queue =
msg->source_direction + ( msg->source_dim * 2 );
}
}
if(s->in_send_loop[msg->saved_queue] == 0) {
if(s->in_send_loop[queue] == 0) {
bf->c13 = 1;
ts = codes_local_latency(lp);
e = model_net_method_event_new(lp->gid, ts, lp, TORUS, (void**)&m,
......@@ -1475,7 +1653,7 @@ static void packet_arrive( nodes_state * s,
m->type = SEND;
m->source_direction = cur_chunk->msg.source_direction;
m->source_dim = cur_chunk->msg.source_dim;
s->in_send_loop[msg->saved_queue] = 1;
s->in_send_loop[queue] = 1;
tw_event_send(e);
}
}
......@@ -1502,7 +1680,8 @@ static void torus_report_stats()
void
final( nodes_state * s, tw_lp * lp )
{
model_net_print_stats(lp->gid, &s->torus_stats_array);
rc_stack_destroy(s->st);
model_net_print_stats(lp->gid, s->torus_stats_array);
free(s->next_link_available_time);
free(s->next_credit_available_time);
free(s->next_flit_generate_time);
......@@ -1522,11 +1701,50 @@ final( nodes_state * s, tw_lp * lp )
//free(s->params->half_length);
}
static void packet_buffer_process_rc(nodes_state * s,
tw_bf * bf,
nodes_message * msg,
tw_lp * lp)
{
int queue = msg->source_direction + ( msg->source_dim * 2 );
s->buffer[queue][STATICQ] += s->params->chunk_size;
if(bf->c2)
{
nodes_message_list *tail = return_tail(s,
s->pending_msgs[queue], s->pending_msgs_tail[queue],
STATICQ);
prepend_to_node_message_list(s, s->queued_msgs[queue],
s->queued_msgs_tail[queue], STATICQ, tail);
s->queued_length[queue] += s->params->chunk_size;
tw_rand_reverse_unif(lp->rng);
s->buffer[queue][STATICQ] -= s->params->chunk_size;
}
if(bf->c3)
{
nodes_message_list *tail = return_tail(s,
s->pending_msgs[queue], s->pending_msgs_tail[queue],
STATICQ);
prepend_to_node_message_list(s, s->other_msgs,
s->other_msgs_tail, queue, tail);
tw_rand_reverse_unif(lp->rng);
s->buffer[queue][STATICQ] -= s->params->chunk_size;
}
if(bf->c5)
{
codes_local_latency_reverse(lp);
s->in_send_loop[queue] = 0;
}
return;
}
/* increments the buffer count after a credit arrives from the remote compute node */
static void packet_buffer_process( nodes_state * ns, tw_bf * bf, nodes_message * msg, tw_lp * lp )
{
int queue = msg->source_direction + ( msg->source_dim * 2 );
ns->buffer[queue][msg->source_channel] -= ns->params->chunk_size;
ns->buffer[queue][STATICQ] -= ns->params->chunk_size;
if(ns->queued_msgs[queue][STATICQ] != NULL) {
bf->c2 = 1;
......@@ -1568,75 +1786,20 @@ static void node_rc_handler(nodes_state * s, tw_bf * bf, nodes_message * msg, tw
{
switch(msg->type)
{
case GENERATE:
{
s->packet_counter--;
//saved_dim = msg->saved_src_dim;
//saved_dir = msg->saved_src_dir;
uint64_t num_chunks = msg->packet_size/s->params->chunk_size;
if(msg->packet_size % s->params->chunk_size)
num_chunks++;
if(!num_chunks)
num_chunks = 1;
//s->next_flit_generate_time[(saved_dim * 2) + saved_dir][0] = msg->saved_available_time;
codes_local_latency_reverse(lp);
if(bf->c1)
codes_local_latency_reverse(lp);
mn_stats* stat;
stat = model_net_find_stats(msg->category, s->torus_stats_array);
stat->send_count--;
stat->send_bytes -= msg->packet_size;
stat->send_time -= (1/s->params->link_bandwidth) * msg->packet_size;
}
break;
case GENERATE:
packet_generate_rc(s, bf, msg, lp);
break;
case ARRIVAL:
{
tw_rand_reverse_unif(lp->rng);
tw_rand_reverse_unif(lp->rng);
int next_dim = msg->source_dim;
int next_dir = msg->source_direction;
uint64_t num_chunks = msg->packet_size/s->params->chunk_size;
if(msg->packet_size % s->params->chunk_size)
num_chunks++;
s->next_credit_available_time[next_dir + ( next_dim * 2 )][0] = msg->saved_available_time;
if(bf->c2)
{
struct mn_stats* stat;
stat = model_net_find_stats(msg->category, s->torus_stats_array);
stat->recv_count--;
stat->recv_bytes -= msg->packet_size;
stat->recv_time -= tw_now(lp) - msg->travel_start_time;
N_finished_packets--;
total_time -= tw_now( lp ) - msg->travel_start_time;
total_hops -= msg->my_N_hop;
}
msg->my_N_hop--;
if (lp->gid == msg->dest_lp &&
msg->chunk_id == num_chunks-1 &&
msg->remote_event_size_bytes && msg->is_pull){
model_net_event_rc2(lp, &msg->event_rc);
}
}
packet_arrive_rc(s, bf, msg, lp);
break;
case SEND:
{
if(bf->c2)
{
}
}
packet_send_rc(s, bf, msg, lp);
break;
case CREDIT:
{
s->buffer[ msg->source_direction + ( msg->source_dim * 2 ) ][ 0 ]++;
}
packet_buffer_process_rc(s, bf, msg, lp);
break;
case T_COLLECTIVE_INIT:
......
......@@ -18,7 +18,7 @@ PARAMS
n_dims="4";
dim_length="4,2,2,2";
link_bandwidth="2.0";
buffer_size="16384";
buffer_size="4096";
num_vc="1";
chunk_size="32";
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment