Commit d9571f96 authored by Nikhil's avatar Nikhil

Begin work for adding network graph

Change-Id: I968accd0eef11f9490a882dc7141788962b72621
parent 84bd42a0
......@@ -36,6 +36,7 @@ struct message_list {
union {
terminal_message dfly_msg;
em_message em_msg;
network_graph_message net_graph_msg;
};
char* event_data;
message_list *next;
......
/*
* Copyright (C) 2014 University of Chicago.
* See COPYRIGHT notice in top-level directory.
*
*/
//CHANGE: modify to match you header file name
#ifndef NETWORK_GRAPH_H
#define NETWORK_GRAPH_H
#ifdef __cplusplus
extern "C" {
#endif
#include <ross.h>
//CHANGE: modify to match the struct
typedef struct network_graph_message network_graph_message;
//CHANGE: modify the struct name - add to message_list union in common-net.h
struct network_graph_message
{
//common entries:
int magic; /* magic number */
short type; /* event type of the flit */
tw_stime travel_start_time; /* flit travel start time*/
unsigned long long packet_ID; /* packet ID of the flit */
char category[CATEGORY_NAME_MAX]; /* category: comes from codes */
tw_lpid final_dest_gid; /* final destination LP ID, this comes from codes can be a server or any other LP type*/
tw_lpid sender_lp; /*sending LP ID from CODES, can be a server or any other LP type */
tw_lpid sender_mn_lp; // source modelnet id (think NIC)
tw_lpid src_terminal_id; /* source terminal ID - mostly same as sender_mn_lp */
tw_lpid dest_terminal_id; /* destination modelnet id */
int dest_terminal; /* logical id of destination modelnet id */
/* packet/message identifier and status */
uint64_t chunk_id; //which chunk of packet I am
uint64_t packet_size; //what is the size of my packet
uint64_t message_id; //seq number at message level - NIC specified
uint64_t total_size; //total size of the message
int remote_event_size_bytes; // data size for target event at destination
int local_event_size_bytes; // data size for event at source
int is_pull;
uint64_t pull_size;
tw_stime msg_start_time;
//info for path traversal
short my_N_hop; /* hops traversed so far */
unsigned int intm_lp_id; /* Intermediate LP ID that sent this packet */
int last_hop; /* last hop of the message, can be a terminal, local router or global router */
int vc_index; /* stores port info */
int output_chan; /* virtual channel within port */
//info for reverse computation
short saved_vc;
short saved_port;
model_net_event_return event_rc;
tw_stime saved_available_time;
tw_stime saved_avg_time;
tw_stime saved_rcv_time;
tw_stime saved_busy_time;
tw_stime saved_total_time;
tw_stime saved_hist_start_time;
tw_stime saved_sample_time;
};
#ifdef __cplusplus
}
#endif
#endif
#include <ross.h>
#include "codes/jenkins-hash.h"
#include "codes/codes_mapping.h"
#include "codes/codes.h"
#include "codes/model-net.h"
#include "codes/model-net-method.h"
#include "codes/model-net-lp.h"
//CHANGE: use the network file created
#include "codes/net/network-graph.h"
#include "codes/net/common-net.h"
#include "sys/file.h"
#include "codes/quickhash.h"
#include "codes/rc-stack.h"
#include <vector>
#include <graphviz/cgraph.h>
#include <cstdio>
#define CREDIT_SZ 8
#define HASH_TABLE_SIZE 4999
#define DEBUG 0
#define MAX_STATS 65536
//CHANGE: define them for the local network
#define LOCAL_NETWORK_NAME NETWORK_GRAPH
#define LOCAL_NETWORK_ROUTER_NAME NETWORK_GRAPH_ROUTER
#define LOCAL_MSG_STRUCT network_graph_message
#define LOCAL_MSG_NAME_FROM_UNION net_graph_msg
#define LP_CONFIG_NM_TERM (model_net_lp_config_names[LOCAL_NETWORK_NAME])
#define LP_METHOD_NM_TERM (model_net_method_names[LOCAL_NETWORK_NAME])
#define LP_CONFIG_NM_ROUT (model_net_lp_config_names[LOCAL_NETWORK_ROUTER_NAME])
#define LP_METHOD_NM_ROUT (model_net_method_names[LOCAL_NETWORK_ROUTER_NAME])
static long packet_gen = 0, packet_fin = 0;
static double maxd(double a, double b) { return a < b ? b : a; }
typedef struct local_param local_param;
static uint64_t num_params = 0;
static local_param * all_params = NULL;
static const config_anno_map_t * anno_map = NULL;
/* global variables for codes mapping */
static char lp_group_name[MAX_NAME_LENGTH];
static int mapping_grp_id, mapping_type_id, mapping_rep_id, mapping_offset;
/* router magic number */
static int router_magic_num = 0;
/* terminal magic number */
static int terminal_magic_num = 0;
static int sample_bytes_written = 0;
static int sample_rtr_bytes_written = 0;
static char local_cn_sample_file[MAX_NAME_LENGTH];
static char local_rtr_sample_file[MAX_NAME_LENGTH];
static char network_graph_file[MAX_NAME_LENGTH];
static void init_message_list(message_list *thism,
LOCAL_MSG_STRUCT *inmsg) {
thism->LOCAL_MSG_NAME_FROM_UNION = *inmsg;
thism->event_data = NULL;
thism->next = NULL;
thism->prev = NULL;
thism->in_alt_q = 0;
thism->altq_next = NULL;
thism->altq_prev = NULL;
}
class End {
public:
int port;
int id;
bool isTerm;
End(int _port, int _id, bool _isTerm) {
port = _port;
id = _id;
isTerm = _isTerm;
}
}
class Switch_info {
public:
std::string name;
long long guid;
int radix;
vector<End> ports;
};
class Terminal_info {
public:
std::string name;
long long guid;
int radix;
vector<End> ports;
};
struct local_param
{
double link_bandwidth;/* bandwidth of each link */
double cn_bandwidth;/* injection bandwidth */
int num_vcs; /* number of virtual channels */
int vc_size; /* buffer size of the router-router channels */
int cn_vc_size; /* buffer size of the compute node channels */
int chunk_size; /* full-sized packets are broken into smaller chunks.*/
int router_delay; /* delay at each router */
int routing; /* type of routing */
//derived param
double cn_delay; /* bandwidth based time for 1 byte */
double link_delay; /* bandwidth based time for 1 byte */
double credit_delay; /* how long for credit to arrive - all bytes */
//CHANGE: add network specific data here
int num_switches, num_terminals;
vector<Switch_info> switch_id_to_info;
vector<Terminal_info> terminal_id_to_info;
map<long long, int> switch_guid_to_id, terminal_guid_to_id;
map<std::string, int> switch_name_to_id, terminal_name_to_id;
};
struct local_router_sample
{
tw_lpid router_id;
tw_stime* busy_time;
int64_t* link_traffic_sample;
tw_stime end_time;
long fwd_events;
long rev_events;
};
struct local_cn_sample
{
tw_lpid terminal_id;
long fin_chunks_sample;
long data_size_sample;
double fin_hops_sample;
tw_stime fin_chunks_time;
tw_stime busy_time_sample;
tw_stime end_time;
long fwd_events;
long rev_events;
};
/* handles terminal and router events like packet generate/send/receive/buffer */
typedef struct terminal_state terminal_state;
typedef struct router_state router_state;
/* compute node data (think NIC) structure */
struct terminal_state
{
unsigned int terminal_id; //what is my local id
const char * anno;
const local_param *params;
int radix; //my radix
//which router I am connected to
vector<tw_lpid> conn_gids;
// Each terminal will have input/output channel(s) with the router
int** vc_occupancy; // NUM_VC
tw_stime *terminal_available_time;
//available messages to be sent
message_list ***terminal_msgs;
message_list ***terminal_msgs_tail;
int *total_terminal_length;
int **terminal_length;
int *in_send_loop;
int issueIdle;
//packet aggregation
struct qhash_table *rank_tbl;
//transient storage for reverse computation
struct rc_stack * st;
//stats collection
uint64_t packet_counter;
int packet_gen;
int packet_fin;
struct mn_stats local_stats_array[CATEGORY_MAX];
tw_stime total_time;
uint64_t total_msg_size;
double total_hops;
long finished_msgs;
long finished_chunks;
long finished_packets;
//sampling
tw_stime *last_buf_full;
tw_stime *busy_time;
char output_buf[4096];
long fin_chunks_sample;
long data_size_sample;
double fin_hops_sample;
tw_stime fin_chunks_time;
tw_stime *busy_time_sample;
char sample_buf[4096];
struct local_cn_sample * sample_stat;
int op_arr_size;
int max_arr_size;
/* for logging forward and reverse events */
long fwd_events;
long rev_events;
int *lft;
};
//CHANGE: may need to change if more functionality is desired
/* event types */
enum event_t
{
T_GENERATE=1,
T_ARRIVE,
T_SEND,
T_BUFFER,
R_SEND,
R_ARRIVE,
R_BUFFER,
};
typedef enum event_t event_t;
//CHANGE: may need to change if more functionality is desired
/* whether the last hop of a packet was global, local or a terminal */
enum last_hop
{
ROUTER=1,
TERMINAL
};
//CHANGE: may need to change if more functionality is desired
enum ROUTING_ALGO
{
STATIC = 0,
ADAPTIVE,
};
static char routing_folder[MAX_NAME_LENGTH];
struct router_state
{
//who am I
unsigned int router_id;
const char * anno;
const local_param *params;
int radix;
//CHANGE: may need to be changed if linear storage is not desired
//array/linked list based storage of info about ports/vcs
vector<tw_lpid> conn_gids;
tw_stime* next_output_available_time;
message_list ***pending_msgs;
message_list ***pending_msgs_tail;
message_list ***queued_msgs;
message_list ***queued_msgs_tail;
int** vc_occupancy;
int *in_send_loop;
int *queued_count;
//for reverse computation
struct rc_stack * st;
//sampling and stats
int64_t* link_traffic;
tw_stime* last_buf_full;
tw_stime* busy_time;
tw_stime* busy_time_sample;
struct local_router_sample * rsamples;
int op_arr_size;
int max_arr_size;
long fwd_events, rev_events;
int64_t * link_traffic_sample;
char output_buf[4096];
char output_buf2[4096];
//CHANGE: add network specific data here
int *lft;
};
struct VC_Entry {
int vc;
message_list* entry;
};
//global stats
static tw_stime local_total_time = 0;
static tw_stime local_max_latency = 0;
static long long total_hops = 0;
static long long N_finished_packets = 0;
static long long total_msg_sz = 0;
static long long N_finished_msgs = 0;
static long long N_finished_chunks = 0;
/* returns the message size */
static int local_get_msg_sz(void)
{
return sizeof(LOCAL_MSG_STRUCT);
}
//CHANGE: network specific params have to be read here
static void local_read_config(const char * anno, local_param *params){
local_param *p = params;
// general params - do not change unless you intent to modify them
int rc = configuration_get_value_double(&config, "PARAMS", "link_bandwidth",
anno, &p->link_bandwidth);
if(rc) {
p->link_bandwidth = 5.25;
fprintf(stderr, "Bandwidth of links not specified, setting to %lf\n",
p->link_bandwidth);
}
rc = configuration_get_value_double(&config, "PARAMS", "cn_bandwidth",
anno, &p->cn_bandwidth);
if(rc) {
p->cn_bandwidth = 5.25;
fprintf(stderr, "Bandwidth of compute node channels not specified, setting "
"to %lf\n", p->cn_bandwidth);
}
rc = configuration_get_value_int(&config, "PARAMS", "num_vcs", anno,
&p->num_vcs);
if(rc) {
p->num_vcs = 1;
}
rc = configuration_get_value_int(&config, "PARAMS", "chunk_size", anno,
&p->chunk_size);
if(rc) {
p->chunk_size = 512;
fprintf(stderr, "Chunk size for packets is not specified, setting to %d\n",
p->chunk_size);
}
rc = configuration_get_value_int(&config, "PARAMS", "vc_size", anno,
&p->vc_size);
if(rc) {
p->vc_size = 32768;
fprintf(stderr, "Buffer size of link channels not specified, setting to %d\n",
p->vc_size);
}
rc = configuration_get_value_int(&config, "PARAMS", "cn_vc_size", anno,
&p->cn_vc_size);
if(rc) {
p->cn_vc_size = 65536;
fprintf(stderr, "Buffer size of compute node channels not specified, "
"setting to %d\n", p->cn_vc_size);
}
char routing_str[MAX_NAME_LENGTH];
configuration_get_value(&config, "PARAMS", "routing", anno, routing_str,
MAX_NAME_LENGTH);
if(strcmp(routing_str, "static") == 0) {
p->routing = STATIC;
} else {
p->routing = STATIC;
}
if(!g_tw_mynode) printf("Routing is %d\n", p->routing);
routing_folder[0] = '\0';
rc = configuration_get_value(&config, "PARAMS", "routing_folder", anno, routing_folder,
MAX_NAME_LENGTH);
if(routing_folder[0] == '\0') {
if(p->routing == STATIC) {
tw_error(TW_LOC, "routing_folder has to be provided with dump_topo || static routing");
}
}
p->router_delay = 50;
configuration_get_value_int(&config, "PARAMS", "router_delay", anno,
&p->router_delay);
configuration_get_value(&config, "PARAMS", "cn_sample_file", anno,
local_cn_sample_file, MAX_NAME_LENGTH);
configuration_get_value(&config, "PARAMS", "rt_sample_file", anno,
local_rtr_sample_file, MAX_NAME_LENGTH);
//CHANGE: add network specific parameters here
rc = configuration_get_value_int(&config, "PARAMS", "num_switches", anno,
&p->num_switches);
rc = configuration_get_value_int(&config, "PARAMS", "num_terminals", anno,
&p->num_terminals);
p->switch_id_to_info.resize(p->num_switches);
p->terminal_id_to_info.resize(p->num_terminals);
configuration_get_value(&config, "PARAMS", "network_graph_file", anno,
network_graph_file, MAX_NAME_LENGTH);
FILE *input_file = fopen(network_graph_file, "r");
Agraph_t *input_graph = agread(input_file, NULL);
int num_switches = 0, num_terminals = 0;
Agnode_t *g_node;
for(g_node = agfstnode(input_graph); g_node; g_node = agnxtnode(input_graph, g_nodes)) {
char *name = agnameof(g_node);
if(name[0] == 'S') {
switch_info next_switch;
next_switch.name = std::string(name);
switch_name_to_id[next_switch.name] = num_switches;
char * comment = agget(g_node, "comment");
sscanf(comment, "%x,radix=%d", &next_switch.guid, &next_switch.radix);
switch_id_to_info[num_switches] = next_switch;
switch_id_to_info[num_switches].ports.resize(next_switch.radix);
switch_guid_to_id[next_switch.guid] = num_switches;
num_switches++;
}
if(name[0] == 'T') {
terminal_id_to_info next_term;
next_term.name = std::string(name);
int term_id;
sscanf(name, "T<%d", &term_id);
term_name_to_id[next_term.name] = term_id;
char * comment = agget(g_node, "comment");
sscanf(comment, "%x,radix=%d", &next_term.guid, &next_term.radix);
term_id_to_info[term_id] = next_term;
term_id_to_info[term_id].ports.resize(next_term.radix);
term_guid_to_id[next_term.guid] = term_id;
if(term_id > num_terminals) num_terminals = term_id;
}
}
assert(num_terminals == p->num_terminals);
assert(num_switches == p->num_switches);
for(g_node = agfstnode(input_graph); g_node; g_node = agnxtnode(input_graph, g_nodes)) {
char *name = agnameof(g_node);
int index_in_info;
if(name[0] == 'S') {
index_in_info = switch_name_to_id[std::string(name)];
} else {
index_in_info = term_name_to_id[std::string(name)];
}
for (e = agfstedge(g,g_node); e; e = agnxtedge(g,e,g_node)) {
char *partner = agnameof(e->node);
char *edge_comment = agget(e, "comment");
int srcPort, dstPort;
sscanf(edge_comment, "P%d->P%d", &srcPort, &dstPort);
int dstId; bool isTerm;
if(partner[0] == 'S') {
dstId = switch_name_to_id[std::string(partner)];
isTerm = false;
} else {
dstId = term_name_to_id[std::string(partner)];
isTerm = true;
}
if(name[0] == 'S') {
switch_id_to_info[index_in_info].ports[srcPort] = End(dstPort, dstId, isTerm);
} else {
term_id_to_info[index_in_info].ports[srcPort] = End(dstPort, dstId, isTerm);
}
}
}
agclose(input_graph);
fclose(input_file);
//CHANGE: derived parameters often are computed based on network specifics
//general derived parameters
p->cn_delay = bytes_to_ns(1, p->cn_bandwidth);
p->link_delay = bytes_to_ns(1, p->link_bandwidth);
p->credit_delay = 50; //using a constant
uint32_t h1 = 0, h2 = 0;
bj_hashlittle2(LP_METHOD_NM_TERM, strlen(LP_METHOD_NM_TERM), &h1, &h2);
terminal_magic_num = h1 + h2;
bj_hashlittle2(LP_METHOD_NM_ROUT, strlen(LP_METHOD_NM_ROUT), &h1, &h2);
router_magic_num = h1 + h2;
}
static void local_configure(){
anno_map = codes_mapping_get_lp_anno_map(LP_CONFIG_NM_TERM);
assert(anno_map);
num_params = anno_map->num_annos + (anno_map->has_unanno_lp > 0);
all_params = (local_param *)malloc(num_params * sizeof(*all_params));
for (int i = 0; i < anno_map->num_annos; i++){
const char * anno = anno_map->annotations[i].ptr;
local_read_config(anno, &all_params[i]);
}
if (anno_map->has_unanno_lp > 0){
local_read_config(NULL, &all_params[anno_map->num_annos]);
}
}
/* report statistics like average and maximum packet latency, average number of hops traversed */
static void local_report_stats()
{
long long avg_hops, total_finished_packets, total_finished_chunks;
long long total_finished_msgs, final_msg_sz;
tw_stime avg_time, max_time;
int total_minimal_packets, total_nonmin_packets;
long total_gen, total_fin;
MPI_Reduce( &total_hops, &avg_hops, 1, MPI_LONG_LONG, MPI_SUM, 0,
MPI_COMM_WORLD);
MPI_Reduce( &N_finished_packets, &total_finished_packets, 1, MPI_LONG_LONG,
MPI_SUM, 0, MPI_COMM_WORLD);
MPI_Reduce( &N_finished_msgs, &total_finished_msgs, 1, MPI_LONG_LONG, MPI_SUM,
0, MPI_COMM_WORLD);
MPI_Reduce( &N_finished_chunks, &total_finished_chunks, 1, MPI_LONG_LONG,
MPI_SUM, 0, MPI_COMM_WORLD);
MPI_Reduce( &total_msg_sz, &final_msg_sz, 1, MPI_LONG_LONG, MPI_SUM, 0,
MPI_COMM_WORLD);
MPI_Reduce( &local_total_time, &avg_time, 1,MPI_DOUBLE, MPI_SUM, 0,
MPI_COMM_WORLD);
MPI_Reduce( &local_max_latency, &max_time, 1, MPI_DOUBLE, MPI_MAX, 0,
MPI_COMM_WORLD);
MPI_Reduce( &packet_gen, &total_gen, 1, MPI_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
MPI_Reduce( &packet_fin, &total_fin, 1, MPI_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
/* print statistics */
if(!g_tw_mynode)
{
printf(" Average number of hops traversed %f average chunk latency %lf us "
"maximum chunk latency %lf us avg message size %lf bytes finished "
"messages %lld finished chunks %lld \n",
(float)avg_hops/total_finished_chunks,
avg_time/(total_finished_chunks*1000), max_time/1000,
(float)final_msg_sz/total_finished_msgs, total_finished_msgs,
total_finished_chunks);
printf("\n Total packets generated %ld finished %ld \n", total_gen, total_fin);
}
return;
}
/* parse external file with give forwarding tables
* file name: 0x<guid>.lft
* content: '0x<terminal guid> <egress port>' per line, e.g.:
* `head 0x0000000100000000.lf`
* 0x0000000100000000 0
* 0x00000040000000ff 22
* 0x0000000100000001 19
*/
typedef struct {
uint64_t guid;
uint64_t port;
} guid_port_combi_t;
static int cmp_guids(const void *g1, const void *g2)
{
uint64_t guid1 = *((uint64_t *) g1);
uint64_t guid2 = ((guid_port_combi_t *) g2)->guid;
if (guid1 == guid2)
return 0;
else
return 1;
}
static int read_static_lft(long long guid, int & *lft, local_param * params)
{
char dir_name[512];
char file_name[512];
char *io_dir = routing_folder;
sprintf(dir_name, "%s", io_dir);
sprintf(file_name, "%s/0x%016"PRIx64".lft", dir_name, guid);
FILE *file = NULL;
if (!(file = fopen(file_name, "r")))
return -1;
char line[UINT8_MAX];
char *p = NULL, *e = NULL;
uint64_t dest_guid = 0, port = 0;
int num_added = 0;
//temp storage for the entire table
guid_port_combi_t *tmpLFT = calloc(params->num_terminals + params->num_switches, sizeof(guid_port_combi_t));
//storage for port to take for the destination terminal
lft = malloc(params->num_terminals * sizeof(int));
/* init all with -1 so that we find missing routing entries */
for (int i = 0; i < params->num_terminals; i++) lft[i] = -1;
while (fgets(line, sizeof(line), file)) {
p = line;
while (isspace(*p))
p++;
if (*p == '\0' || *p == '\n' || *p == '#')
continue;
dest_guid = strtoull(p, &e, 16);
if (e == p || (!isspace(*e) && *e != '#' && *e != '\0')) {
errno = EINVAL;
return -1;
}
p = e;
while (isspace(*p))
p++;
port = strtoull(p, &e, 0);
if (e == p || (!isspace(*e) && *e != '#' && *e != '\0')) {
errno = EINVAL;
return -1;
}
tmpLFT[num_added].guid = dest_guid;
tmpLFT[num_added].port = port;
num_added++;
}
/* we want a real LFT with terminal_id as lookup index, so have to
* convert the read lft, which might contain entries for switches
* or is permutated, into the correct format
*/
ft_terminal_state tmpTerm;
for (int dest_num = 0; dest_num < params->num_terminals; dest_num++) {
tmpTerm.terminal_id = dest_num;
uint64_t key = term_id_to_info[dest_num].guid;
size_t size = params->num_terminals + params->num_switches;
guid_port_combi_t *elem = lfind(&key, tmpLFT, &size,
sizeof(guid_port_combi_t), cmp_guids);
assert(elem);
lft[dest_num] = elem->port - 1;
}
#if FATTREE_DEBUG
printf("I am guid=%016"PRIx64") and my LFT is:\n", guid);
for (int dest_num = 0; dest_num < params->num_terminals; dest_num++)
printf("\tdest %d -> egress port %d\n", dest_num, lft[dest_num]);
#endif