Commit f61a0c6c authored by Misbah Mubarak's avatar Misbah Mubarak
Browse files

First pass on the congestion control in torus model, fixing stat collection in dragonfly

parent 7a7092e1
......@@ -71,7 +71,8 @@ struct terminal_message
/* for reverse computation */
short path_type;
tw_stime saved_available_time;
tw_stime saved_credit_time;
tw_stime saved_avg_time;
tw_stime saved_start_time;
tw_stime saved_collective_init_time;
tw_stime saved_hist_start_time;
tw_stime msg_start_time;
......
......@@ -40,8 +40,7 @@ struct nodes_message
nodes_event_t type;
/* for reverse computation */
int saved_src_dim;
int saved_src_dir;
int saved_channel;
/* coordinates of the destination torus nodes */
int* dest;
......@@ -71,6 +70,9 @@ struct nodes_message
/* for reverse computation of a node's fan in*/
int saved_fan_nodes;
int source_channel;
int saved_queue;
/* chunk id of the flit (distinguishes flits) */
int chunk_id;
......
......@@ -14,7 +14,7 @@ PARAMS
modelnet_order=( "dragonfly" );
# scheduler options
modelnet_scheduler="fcfs";
chunk_size="64";
chunk_size="512";
# modelnet_scheduler="round-robin";
num_vcs="1";
num_routers="6";
......@@ -24,6 +24,6 @@ PARAMS
local_bandwidth="5.25";
global_bandwidth="4.7";
cn_bandwidth="5.25";
message_size="528";
message_size="544";
routing="adaptive";
}
......@@ -13,7 +13,7 @@
#include "codes/model-net.h"
#include "codes/rc-stack.h"
#define TRACE -1
#define TRACE 0
#define TRACK 0
char workload_type[128];
......@@ -344,7 +344,7 @@ static void mpi_queue_update(struct mpi_queue_ptrs* mpi_queue, struct codes_work
/* prints the elements of a queue (for debugging purposes). */
static void printCompletedQueue(nw_state* s, tw_lp* lp)
{
if(TRACE == lp->gid)
if(TRACE == s->nw_id)
{
printf("\n %lf contents of completed operations queue ", tw_now(lp));
struct completed_requests* current = s->completed_reqs;
......@@ -370,7 +370,7 @@ static void notify_waits_rc(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, du
s->saved_pending_wait = NULL;
}
*/
if(lp->gid == TRACE)
if(s->nw_id == TRACE)
printf("\n %lf reverse -- notify waits req id %d ", tw_now(lp), completed_req);
printCompletedQueue(s, lp);
......@@ -380,7 +380,7 @@ static void notify_waits_rc(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, du
/* if a wait-elem exists, it means the request ID has been matched*/
if(m->u.rc.matched_op == 2)
{
if(lp->gid == TRACE)
if(s->nw_id == TRACE)
{
printf("\n %lf matched req id %d ", tw_now(lp), completed_req);
printCompletedQueue(s, lp);
......@@ -409,7 +409,7 @@ static int notify_waits(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_
struct pending_waits* wait_elem = s->pending_waits;
m->u.rc.matched_op = 0;
if(lp->gid == TRACE)
if(s->nw_id == TRACE)
printf("\n %lf notify waits req id %d ", tw_now(lp), completed_req);
if(!wait_elem)
......@@ -438,7 +438,7 @@ static int notify_waits(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_
{
if(wait_elem->mpi_op->u.waits.req_ids[i] == completed_req)
{
if(lp->gid == TRACE)
if(s->nw_id == TRACE)
printCompletedQueue(s, lp);
m->u.rc.matched_op = 1;
wait_elem->num_completed++;
......@@ -447,7 +447,7 @@ static int notify_waits(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_
if(wait_elem->num_completed == required_count)
{
if(lp->gid == TRACE)
if(s->nw_id == TRACE)
{
printf("\n %lf req %d completed %d", tw_now(lp), completed_req, wait_elem->num_completed);
printCompletedQueue(s, lp);
......@@ -513,7 +513,7 @@ static void codes_exec_mpi_wait(nw_state* s, tw_lp* lp, nw_message * m, struct c
static void codes_exec_mpi_wait_all_rc(nw_state* s, nw_message* m, tw_lp* lp, struct codes_workload_op * mpi_op)
{
if(lp->gid == TRACE)
if(s->nw_id == TRACE)
{
printf("\n %lf codes exec mpi waitall reverse %d ", tw_now(lp), m->u.rc.found_match);
printCompletedQueue(s, lp);
......@@ -837,7 +837,6 @@ static void codes_exec_comp_delay(
msg->msg_type = MPI_OP_GET_NEXT;
tw_event_send(e);
}
/* reverse computation operation for MPI irecv */
......@@ -1257,7 +1256,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
s->nw_id, s->num_completed);
m->u.rc.saved_op = mpi_op;
if(mpi_op->op_type == CODES_WK_END)
if(mpi_op->op_type == CODES_WK_END && s->num_completed == 50000)
{
//rc_stack_push(lp, mpi_op, free, s->st);
s->elapsed_time = tw_now(lp) - s->start_time;
......
......@@ -147,7 +147,6 @@ struct terminal_state
int* vc_occupancy; // NUM_VC
int num_vcs;
tw_stime terminal_available_time;
tw_stime next_credit_available_time;
terminal_message_list **terminal_msgs;
terminal_message_list **terminal_msgs_tail;
int in_send_loop;
......@@ -248,7 +247,6 @@ struct router_state
int* global_channel;
tw_stime* next_output_available_time;
tw_stime* next_credit_available_time;
tw_stime* cur_hist_start_time;
terminal_message_list ***pending_msgs;
terminal_message_list ***pending_msgs_tail;
......@@ -739,7 +737,6 @@ void router_setup(router_state * r, tw_lp * lp)
r->global_channel = (int*)malloc(p->num_global_channels * sizeof(int));
r->next_output_available_time = (tw_stime*)malloc(p->radix * sizeof(tw_stime));
r->next_credit_available_time = (tw_stime*)malloc(p->radix * sizeof(tw_stime));
r->cur_hist_start_time = (tw_stime*)malloc(p->radix * sizeof(tw_stime));
r->link_traffic = (int*)malloc(p->radix * sizeof(int));
r->cur_hist_num = (int*)malloc(p->radix * sizeof(int));
......@@ -760,7 +757,6 @@ void router_setup(router_state * r, tw_lp * lp)
{
// Set credit & router occupancy
r->next_output_available_time[i]=0;
r->next_credit_available_time[i]=0;
r->cur_hist_start_time[i] = 0;
r->link_traffic[i]=0;
r->cur_hist_num[i] = 0;
......@@ -1256,7 +1252,6 @@ void packet_arrive_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw
if(msg->path_type == NON_MINIMAL)
nonmin_count--;
s->next_credit_available_time = msg->saved_credit_time;
uint64_t num_chunks = msg->packet_size / s->params->chunk_size;
N_finished_chunks--;
......@@ -1296,8 +1291,6 @@ void packet_arrive_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw
hash_link = qhash_search(s->rank_tbl, &key);
tmp = qhash_entry(hash_link, struct dfly_qhash_entry, hash_link);
total_hops -= msg->my_N_hop;
mn_stats* stat;
stat = model_net_find_stats(msg->category, s->dragonfly_stats_array);
if(bf->c1)
......@@ -1312,16 +1305,16 @@ void packet_arrive_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw
dragonfly_max_latency = msg->saved_available_time;
if(bf->c7)
if(!hash_link)
{
s->finished_msgs--;
total_msg_sz -= msg->total_size;
N_finished_msgs--;
stat->recv_time -= tw_now(lp) - msg->travel_start_time;
s->total_msg_time -= (tw_now(lp) - msg->msg_start_time);
stat->recv_time -= msg->saved_start_time;
s->total_msg_time -= msg->saved_start_time;
s->total_msg_size -= msg->total_size;
dragonfly_total_time -= (tw_now(lp) - msg->travel_start_time);
dragonfly_total_time -= msg->saved_avg_time;
struct dfly_qhash_entry * d_entry_pop = (struct dfly_qhash_entry*)rc_stack_pop(s->st);
qhash_add(s->rank_tbl, &key, &(d_entry_pop->hash_link));
......@@ -1354,7 +1347,7 @@ void send_remote_event(terminal_state * s, terminal_message * msg, tw_lp * lp, t
model_net_set_msg_param(MN_MSG_PARAM_START_TIME, MN_MSG_PARAM_START_TIME_VAL, &(msg->msg_start_time));
model_net_event_mctx(net_id, &mc_src, &mc_dst, msg->category,
msg->event_rc = model_net_event_mctx(net_id, &mc_src, &mc_dst, msg->category,
msg->sender_lp, msg->pull_size, ts,
remote_event_size, tmp_ptr, 0, NULL, lp);
}
......@@ -1539,10 +1532,12 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg,
N_finished_msgs++;
total_msg_sz += msg->total_size;
dragonfly_total_time += tw_now( lp ) - msg->travel_start_time;
stat->recv_time += tw_now(lp) - msg->travel_start_time;
msg->saved_avg_time = tw_now( lp ) - msg->travel_start_time;
dragonfly_total_time += msg->saved_avg_time;
msg->saved_start_time = (tw_now(lp) - msg->msg_start_time);
stat->recv_time += msg->saved_start_time;
s->finished_msgs++;
s->total_msg_time += (tw_now(lp) - msg->msg_start_time);
s->total_msg_time += msg->saved_start_time;
s->total_msg_size += msg->total_size;
if (dragonfly_max_latency < tw_now( lp ) - msg->travel_start_time) {
......@@ -1923,7 +1918,7 @@ dragonfly_terminal_final( terminal_state * s,
int written = 0;
if(!s->terminal_id)
written = sprintf(s->output_buf, "# Format <LP id> <Terminal ID> <Avg Msg Size> <Avg Msg Time> <# Msgs finished>");
written = sprintf(s->output_buf, "# Format <LP id> <Terminal ID> <Total Msg Size> <Total Msg Time> <# Msgs finished>\n");
written += sprintf(s->output_buf + written, "%lu %lu %ld %lf %ld %ld\n", lp->gid, s->terminal_id, s->total_msg_size, s->total_msg_time, s->finished_msgs, s->finished_packets);
lp_io_write(lp->gid, "dragonfly-msg-stats", written, s->output_buf);
......@@ -2042,9 +2037,7 @@ get_next_stop(router_state * s,
if(msg->last_hop == TERMINAL && path == NON_MINIMAL)
{
if(dest_group_id != s->group_id)
{
msg->intm_group_id = intm_id;
}
}
/******************** DECIDE THE DESTINATION GROUP ***********************/
/* It means that the packet has arrived at the inter-mediate group for non-minimal routing. Reset the group now. */
......@@ -2200,7 +2193,7 @@ static int do_adaptive_routing( router_state * s,
assert(num_nonmin_hops <= 6);
/* average the local queues of the router */
unsigned int q_avg = 0;
/*unsigned int q_avg = 0;
int i;
for( i = 0; i < s->params->radix; i++)
{
......@@ -2209,7 +2202,7 @@ static int do_adaptive_routing( router_state * s,
s->vc_occupancy[i][2];
}
q_avg = q_avg / (s->params->radix - 1);
*/
//int min_out_chan = minimal_out_port;
//int nonmin_out_chan = nonmin_out_port;
......@@ -2311,15 +2304,13 @@ router_packet_receive( router_state * s,
} else if(msg->last_hop == TERMINAL && routing == ADAPTIVE) {
next_stop = do_adaptive_routing(s, bf, msg, lp, dest_router_id, intm_id);
} else {
if(routing == ADAPTIVE || routing == PROG_ADAPTIVE)
assert(msg->path_type == MINIMAL || msg->path_type == NON_MINIMAL);
if(routing == MINIMAL || routing == NON_MINIMAL)
msg->path_type = routing; /*defaults to the routing algorithm if we
don't have adaptive routing here*/
next_stop = get_next_stop(s, bf, msg, lp, msg->path_type, dest_router_id,
intm_id);
}
assert(msg->path_type == MINIMAL || msg->path_type == NON_MINIMAL);
terminal_message_list * cur_chunk = (terminal_message_list *)malloc(
sizeof(terminal_message_list));
init_terminal_message_list(cur_chunk, msg);
......
......@@ -16,11 +16,11 @@
#include "codes/model-net-lp.h"
#include "codes/net/torus.h"
#define CHUNK_SIZE 32
#define DEBUG 1
#define MEAN_INTERVAL 100
#define TRACE -1
#define STATICQ 0
/* collective specific parameters */
#define TREE_DEGREE 4
#define LEVEL_DELAY 1000
......@@ -34,16 +34,41 @@
static double maxd(double a, double b) { return a < b ? b : a; }
enum routing_algo
{
STATIC=0,
ADAPTIVE
};
/* Torus network model implementation of codes, implements the modelnet API */
typedef struct nodes_message_list nodes_message_list;
struct nodes_message_list {
nodes_message msg;
char* event_data;
nodes_message_list *next;
nodes_message_list *prev;
};
void init_nodes_message_list(nodes_message_list *this, nodes_message *inmsg) {
this->msg = *inmsg;
this->event_data = NULL;
this->next = NULL;
this->prev = NULL;
}
void delete_nodes_message_list(nodes_message_list *this) {
if(this->event_data != NULL) free(this->event_data);
free(this);
}
typedef struct torus_param torus_param;
struct torus_param
{
int n_dims; /*Dimension of the torus network, 5-D, 7-D or any other*/
int* dim_length; /*Length of each torus dimension*/
double link_bandwidth;/* bandwidth for each torus link */
double cn_bandwidth; /* injection bandwidth */
int buffer_size; /* number of buffer slots for each vc in flits*/
//int num_net_traces; /* number of network traces to be mapped on torus */
int num_vc; /* number of virtual channels for each torus link */
float mean_process;/* mean process time for each flit */
int chunk_size; /* chunk is the smallest unit--default set to 32 */
......@@ -57,6 +82,10 @@ struct torus_param
double head_delay;
double credit_delay;
double cn_delay;
double router_delay;
int routing;
};
/* codes mapping group name, lp type name */
......@@ -95,6 +124,22 @@ struct nodes_state
tw_stime** next_flit_generate_time;
/* buffer size for each torus virtual channel */
int** buffer;
/* Head and tail of the terminal messages list */
nodes_message_list **terminal_msgs;
nodes_message_list **terminal_msgs_tail;
int all_term_length;
int issueIdle;
int *terminal_length, *queued_length;
/* pending packets to be sent out */
nodes_message_list ***pending_msgs;
nodes_message_list ***pending_msgs_tail;
nodes_message_list ***queued_msgs;
nodes_message_list ***queued_msgs_tail;
nodes_message_list **other_msgs;
nodes_message_list **other_msgs_tail;
int *in_send_loop;
/* traffic through each torus link */
int64_t *link_traffic;
/* coordinates of the current torus node */
int* dim_position;
/* neighbor LP ids for this torus node */
......@@ -136,6 +181,111 @@ struct nodes_state
const torus_param * params;
};
static void append_to_node_message_list(
nodes_state *ns,
nodes_message_list ** thisq,
nodes_message_list ** thistail,
int index,
nodes_message_list *msg) {
if(thisq[index] == NULL) {
thisq[index] = msg;
} else {
thistail[index]->next = msg;
msg->prev = thistail[index];
}
thistail[index] = msg;
}
static void prepend_to_node_message_list(
nodes_state *ns,
nodes_message_list ** thisq,
nodes_message_list ** thistail,
int index,
nodes_message_list *msg) {
if(thisq[index] == NULL) {
thistail[index] = msg;
} else {
thisq[index]->prev = msg;
msg->next = thisq[index];
}
thisq[index] = msg;
}
static void create_prepend_to_node_message_list(
nodes_state *ns,
nodes_message_list ** thisq,
nodes_message_list ** thistail,
int index,
nodes_message *msg) {
nodes_message_list* new_entry = (nodes_message_list*)malloc(
sizeof(nodes_message_list));
init_nodes_message_list(new_entry, msg);
if(msg->remote_event_size_bytes) {
void *m_data = model_net_method_get_edata(TORUS, msg);
new_entry->event_data = (void*)malloc(msg->remote_event_size_bytes);
memcpy(new_entry->event_data, m_data, msg->remote_event_size_bytes);
}
prepend_to_node_message_list(ns, thisq, thistail, index, new_entry);
}
static nodes_message_list* return_head(
nodes_state *ns,
nodes_message_list ** thisq,
nodes_message_list ** thistail,
int index) {
nodes_message_list *head = thisq[index];
if(head != NULL) {
thisq[index] = head->next;
if(head->next != NULL) {
head->next->prev = NULL;
head->next = NULL;
} else {
thistail[index] = NULL;
}
}
return head;
}
static nodes_message_list* return_tail(
nodes_state *ns,
nodes_message_list ** thisq,
nodes_message_list ** thistail,
int index) {
nodes_message_list *tail = thistail[index];
if(tail->prev != NULL) {
tail->prev->next = NULL;
thistail[index] = tail->prev;
tail->prev = NULL;
} else {
thistail[index] = NULL;
thisq[index] = NULL;
}
return tail;
}
static void copy_nodes_list_entry( nodes_message_list *cur_entry,
nodes_message *msg) {
nodes_message *cur_msg = &cur_entry->msg;
strcpy(msg->category, cur_msg->category);
msg->travel_start_time = cur_msg->travel_start_time;
msg->packet_ID = cur_msg->packet_ID;
msg->final_dest_gid = cur_msg->final_dest_gid;
msg->dest_lp = cur_msg->dest_lp;
msg->sender_svr = cur_msg->sender_svr;
msg->sender_node = cur_msg->sender_node;
msg->my_N_hop = cur_msg->my_N_hop;
msg->next_stop = cur_msg->next_stop;
msg->packet_size = cur_msg->packet_size;
msg->chunk_id = cur_msg->chunk_id;
msg->is_pull = cur_msg->is_pull;
msg->pull_size = cur_msg->pull_size;
msg->local_event_size_bytes = cur_msg->local_event_size_bytes;
msg->remote_event_size_bytes = cur_msg->remote_event_size_bytes;
if(msg->local_event_size_bytes + msg->remote_event_size_bytes > 0) {
void *m_data = model_net_method_get_edata(TORUS, msg);
memcpy(m_data, cur_entry->event_data,
msg->local_event_size_bytes + msg->remote_event_size_bytes);
}
}
static void torus_read_config(
const char * anno,
torus_param * params){
......@@ -204,23 +354,11 @@ static void torus_read_config(
i++;
token = strtok(NULL,",");
}
/*int num_nodes = 1;
for( i = 0; i < p->n_dims; i++)
num_nodes *= p->dim_length[i];
configuration_get_value_int(&config, "PARAMS", "num_net_traces", anno, &p->num_net_traces);
if(!p->num_net_traces) {
p->num_net_traces = num_nodes;
fprintf(stderr, "Number of network traces not specified, setting to %d",
p->num_net_traces);
}
// Number of network traces should be <= number of torus network nodes `
assert(p->num_net_traces <= num_nodes);*/
// create derived parameters
// factor is an exclusive prefix product
p->router_delay = 50;
p->routing = STATIC;
p->factor = malloc(p->n_dims * sizeof(int));
p->factor[0] = 1;
for(i = 1; i < p->n_dims; i++)
......@@ -373,6 +511,7 @@ static tw_stime torus_packet_event(
msg->sender_svr= req->src_lp;
msg->sender_node = sender->gid;
msg->packet_size = packet_size;
msg->travel_start_time = tw_now(sender);
msg->remote_event_size_bytes = 0;
msg->local_event_size_bytes = 0;
msg->chunk_id = 0;
......@@ -400,6 +539,32 @@ static tw_stime torus_packet_event(
return xfer_to_nic_time;
}
/*Sends a 8-byte credit back to the torus node LP that sent the message */
static void credit_send( nodes_state * s,
tw_bf * bf,
tw_lp * lp,
nodes_message * msg,
int sq)
{
tw_event * e;
nodes_message *m;
tw_stime ts;
ts = g_tw_lookahead + s->params->credit_delay + tw_rand_unif(lp->rng);
e = model_net_method_event_new(msg->sender_node, ts, lp, TORUS,
(void**)&m, NULL);
if(sq == -1) {
m->source_direction = msg->source_direction;
m->source_dim = msg->source_dim;
m->source_channel = msg->source_channel;
} else {
m->source_direction = msg->saved_queue % 2;
m->source_dim = msg->saved_queue / 2;
m->source_channel = msg->saved_channel;
}
m->type = CREDIT;
tw_event_send(e);
}
/*Initialize the torus model, this initialization part is borrowed from Ning's torus model */
static void torus_init( nodes_state * s,
tw_lp * lp )
......@@ -432,6 +597,48 @@ static void torus_init( nodes_state * s,
s->next_flit_generate_time =
(tw_stime**)malloc(2*p->n_dims*sizeof(tw_stime*));
for(i=0; i < 2*p->n_dims; i++)
{
s->buffer[i] = (int*)malloc(p->num_vc * sizeof(int));
s->next_link_available_time[i] =
(tw_stime*)malloc(p->num_vc * sizeof(tw_stime));
s->next_credit_available_time[i] =
(tw_stime*)malloc(p->num_vc * sizeof(tw_stime));
s->next_flit_generate_time[i] =
(tw_stime*)malloc(p->num_vc * sizeof(tw_stime));
}
s->terminal_length = (int*)malloc(2*p->n_dims*sizeof(int));
s->queued_length = (int*)malloc(2*p->n_dims*sizeof(int));
s->terminal_msgs =
(nodes_message_list**)malloc(2*p->n_dims*sizeof(nodes_message_list*));
s->terminal_msgs_tail =
(nodes_message_list**)malloc(2*p->n_dims*sizeof(nodes_message_list*));
s->pending_msgs =
(nodes_message_list***)malloc(2*p->n_dims*sizeof(nodes_message_list**));
s->pending_msgs_tail =
(nodes_message_list***)malloc(2*p->n_dims*sizeof(nodes_message_list**));
s->queued_msgs =
(nodes_message_list***)malloc(2*p->n_dims*sizeof(nodes_message_list**));
s->queued_msgs_tail =
(nodes_message_list***)malloc(2*p->n_dims*sizeof(nodes_message_list**));
for(i = 0; i < 2*p->n_dims; i++) {
s->pending_msgs[i] =
(nodes_message_list**)malloc(p->num_vc*sizeof(nodes_message_list*));
s->pending_msgs_tail[i] =
(nodes_message_list**)malloc(p->num_vc*sizeof(nodes_message_list*));
s->queued_msgs[i] =
(nodes_message_list**)malloc(p->num_vc*sizeof(nodes_message_list*));
s->queued_msgs_tail[i] =
(nodes_message_list**)malloc(p->num_vc*sizeof(nodes_message_list*));
}
s->other_msgs =
(nodes_message_list**)malloc(2*p->n_dims*sizeof(nodes_message_list*));
s->other_msgs_tail =
(nodes_message_list**)malloc(2*p->n_dims*sizeof(nodes_message_list*));
s->in_send_loop =
(int *)malloc(2*p->n_dims*sizeof(int));
s->link_traffic = (int64_t *)malloc(2*p->n_dims*sizeof(int64_t));
for(i=0; i < 2*p->n_dims; i++)
{
s->buffer[i] = (int*)malloc(p->num_vc * sizeof(int));
......@@ -441,6 +648,15 @@ static void torus_init( nodes_state * s,
(tw_stime*)malloc(p->num_vc * sizeof(tw_stime));
s->next_flit_generate_time[i] =
(tw_stime*)malloc(p->num_vc * sizeof(tw_stime));
s->terminal_msgs[i] = NULL;
s->terminal_msgs_tail[i] = NULL;
s->other_msgs[i] = NULL;
s->other_msgs_tail[i] = NULL;
s->in_send_loop[i] = 0;
s->terminal_length[i] = 0;
s->queued_length[i] = 0;
s->all_term_length = 0;
s->issueIdle = 0;