...
 
Commits (5)
......@@ -41,6 +41,8 @@ include $(top_srcdir)/src/Makefile.subdir
include $(top_srcdir)/tests/Makefile.subdir
include $(top_srcdir)/doc/Makefile.subdir
LDADD += -lcgraph
if USE_DEBUG
AM_CPPFLAGS += -g
AM_CFLAGS += -g
......
......@@ -31,6 +31,7 @@ extern "C" {
#include "net/simplep2p.h"
#include "net/torus.h"
#include "net/express-mesh.h"
#include "net/network-graph.h"
extern int model_net_base_magic;
......@@ -140,6 +141,7 @@ typedef struct model_net_wrap_msg {
sp_message m_sp2p; // simplep2p
nodes_message m_torus; // torus
em_message m_em; // express-mesh
network_graph_message m_ng; // express-mesh
// add new ones here
} msg;
} model_net_wrap_msg;
......
......@@ -74,6 +74,8 @@ typedef struct mn_stats mn_stats;
X(LOGGP, "modelnet_loggp", "loggp", &loggp_method)\
X(EXPRESS_MESH, "modelnet_express_mesh", "express_mesh", &express_mesh_method)\
X(EXPRESS_MESH_ROUTER, "modelnet_express_mesh_router", "express_mesh_router", &express_mesh_router_method)\
X(NETWORK_GRAPH, "modelnet_network_graph", "network_graph", &network_graph_method)\
X(NETWORK_GRAPH_ROUTER, "modelnet_network_graph_router", "network_graph_router", &network_graph_router_method)\
X(DRAGONFLY_PLUS, "modelnet_dragonfly_plus", "dragonfly_plus", &dragonfly_plus_method)\
X(DRAGONFLY_PLUS_ROUTER, "modelnet_dragonfly_plus_router", "dragonfly_plus_router", &dragonfly_plus_router_method)\
X(MAX_NETS, NULL, NULL, NULL)
......
......@@ -36,6 +36,7 @@ struct message_list {
union {
terminal_message dfly_msg;
em_message em_msg;
network_graph_message net_graph_msg;
};
char* event_data;
message_list *next;
......
/*
* Copyright (C) 2014 University of Chicago.
* See COPYRIGHT notice in top-level directory.
*
*/
//CHANGE: modify to match you header file name
#ifndef NETWORK_GRAPH_H
#define NETWORK_GRAPH_H
#ifdef __cplusplus
extern "C" {
#endif
#include <ross.h>
//CHANGE: modify to match the struct
typedef struct network_graph_message network_graph_message;
//CHANGE: modify the struct name - add to message_list union in common-net.h
struct network_graph_message
{
//common entries:
int magic; /* magic number */
short type; /* event type of the flit */
tw_stime travel_start_time; /* flit travel start time*/
unsigned long long packet_ID; /* packet ID of the flit */
char category[CATEGORY_NAME_MAX]; /* category: comes from codes */
tw_lpid final_dest_gid; /* final destination LP ID, this comes from codes can be a server or any other LP type*/
tw_lpid sender_lp; /*sending LP ID from CODES, can be a server or any other LP type */
tw_lpid sender_mn_lp; // source modelnet id (think NIC)
tw_lpid src_terminal_id; /* source terminal ID - mostly same as sender_mn_lp */
tw_lpid dest_terminal_id; /* destination modelnet id */
int dest_terminal; /* logical id of destination modelnet id */
/* packet/message identifier and status */
uint64_t chunk_id; //which chunk of packet I am
uint64_t packet_size; //what is the size of my packet
uint64_t message_id; //seq number at message level - NIC specified
uint64_t total_size; //total size of the message
int remote_event_size_bytes; // data size for target event at destination
int local_event_size_bytes; // data size for event at source
int is_pull;
uint64_t pull_size;
tw_stime msg_start_time;
//info for path traversal
short my_N_hop; /* hops traversed so far */
unsigned int intm_lp_id; /* Intermediate LP ID that sent this packet */
int last_hop; /* last hop of the message, can be a terminal, local router or global router */
int vc_index; /* stores port info */
int output_chan; /* virtual channel within port */
//info for reverse computation
short saved_vc;
short saved_port;
model_net_event_return event_rc;
tw_stime saved_available_time;
tw_stime saved_avg_time;
tw_stime saved_rcv_time;
tw_stime saved_busy_time;
tw_stime saved_total_time;
tw_stime saved_hist_start_time;
tw_stime saved_sample_time;
};
#ifdef __cplusplus
}
#endif
#endif
......@@ -99,11 +99,12 @@ nobase_include_HEADERS = \
codes/net/slimfly.h \
codes/net/fattree.h \
codes/net/loggp.h \
codes/net/network-graph.h \
codes/net/simplenet-upd.h \
codes/net/simplep2p.h \
codes/net/express-mesh.h \
codes/net/torus.h \
codes/codes-mpi-replay.h \
codes/codes-mpi-replay.h \
codes/configfile.h
......@@ -166,6 +167,7 @@ src_libcodes_la_SOURCES = \
src/networks/model-net/dragonfly-plus.C \
src/networks/model-net/slimfly.c \
src/networks/model-net/fattree.c \
src/networks/model-net/network-graph.c \
src/networks/model-net/loggp.c \
src/networks/model-net/simplep2p.c \
src/networks/model-net/model-net-lp.c \
......
......@@ -73,6 +73,9 @@ typedef struct model_net_base_state {
static void model_net_base_lp_init(
model_net_base_state * ns,
tw_lp * lp);
static void model_net_base_prerun(
model_net_base_state * ns,
tw_lp * lp);
static void model_net_base_event(
model_net_base_state * ns,
tw_bf * b,
......@@ -112,7 +115,7 @@ static void handle_sched_next_rc(
/* ROSS function pointer table for this LP */
tw_lptype model_net_base_lp = {
(init_f) model_net_base_lp_init,
(pre_run_f) NULL,
(pre_run_f) model_net_base_prerun,
(event_f) model_net_base_event,
(revent_f) model_net_base_event_rc,
(commit_f) NULL,
......@@ -385,6 +388,10 @@ void model_net_base_configure(){
offsetof(model_net_wrap_msg, msg.m_em);
msg_offsets[EXPRESS_MESH_ROUTER] =
offsetof(model_net_wrap_msg, msg.m_em);
msg_offsets[NETWORK_GRAPH] =
offsetof(model_net_wrap_msg, msg.m_ng);
msg_offsets[NETWORK_GRAPH_ROUTER] =
offsetof(model_net_wrap_msg, msg.m_ng);
// perform the configuration(s)
// This part is tricky, as we basically have to look up all annotations that
......@@ -406,7 +413,7 @@ void model_net_base_configure(){
}
}
if (a == num_params){
// found a new annotation
// found a new annotatio
annos[num_params++] = amap->annotations[n].ptr;
}
}
......@@ -525,6 +532,14 @@ void model_net_base_lp_init(
}
}
void model_net_base_prerun(
model_net_base_state * ns,
tw_lp * lp) {
if(ns->sub_type->pre_run != NULL) {
ns->sub_type->pre_run(ns->sub_state, lp);
}
}
void model_net_base_event(
model_net_base_state * ns,
tw_bf * b,
......
......@@ -32,6 +32,10 @@ extern struct model_net_method dragonfly_custom_router_method;
extern struct model_net_method loggp_method;
extern struct model_net_method express_mesh_method;
extern struct model_net_method express_mesh_router_method;
extern struct model_net_method express_mesh_method;
extern struct model_net_method express_mesh_router_method;
extern struct model_net_method network_graph_method;
extern struct model_net_method network_graph_router_method;
#define X(a,b,c,d) b,
char * model_net_lp_config_names[] = {
......
#include <ross.h>
#include "codes/jenkins-hash.h"
#include "codes/codes_mapping.h"
#include "codes/codes.h"
#include "codes/model-net.h"
#include "codes/model-net-method.h"
#include "codes/model-net-lp.h"
//CHANGE: use the network file created
#include "codes/net/network-graph.h"
#include "codes/net/common-net.h"
#include "sys/file.h"
#include "codes/quickhash.h"
#include "codes/rc-stack.h"
#include <vector>
#include <string>
#include <map>
#include <graphviz/cgraph.h>
#include <cstdio>
#include <search.h>
#define CREDIT_SZ 8
#define HASH_TABLE_SIZE 4999
#define DEBUG 0
#define MAX_STATS 65536
//CHANGE: define them for the local network
#define LOCAL_NETWORK_NAME NETWORK_GRAPH
#define LOCAL_NETWORK_ROUTER_NAME NETWORK_GRAPH_ROUTER
#define LOCAL_MSG_STRUCT network_graph_message
#define LOCAL_MSG_NAME_FROM_UNION net_graph_msg
#define LP_CONFIG_NM_TERM (model_net_lp_config_names[LOCAL_NETWORK_NAME])
#define LP_METHOD_NM_TERM (model_net_method_names[LOCAL_NETWORK_NAME])
#define LP_CONFIG_NM_ROUT (model_net_lp_config_names[LOCAL_NETWORK_ROUTER_NAME])
#define LP_METHOD_NM_ROUT (model_net_method_names[LOCAL_NETWORK_ROUTER_NAME])
static long packet_gen = 0, packet_fin = 0;
static double maxd(double a, double b) { return a < b ? b : a; }
typedef struct local_param local_param;
static uint64_t num_params = 0;
static local_param * all_params = NULL;
static const config_anno_map_t * anno_map = NULL;
/* global variables for codes mapping */
static char lp_group_name[MAX_NAME_LENGTH];
static int mapping_grp_id, mapping_type_id, mapping_rep_id, mapping_offset;
/* router magic number */
static int router_magic_num = 0;
/* terminal magic number */
static int terminal_magic_num = 0;
static int sample_bytes_written = 0;
static int sample_rtr_bytes_written = 0;
static char local_cn_sample_file[MAX_NAME_LENGTH];
static char local_rtr_sample_file[MAX_NAME_LENGTH];
static char network_graph_file[MAX_NAME_LENGTH];
static void init_message_list(message_list *thism,
LOCAL_MSG_STRUCT *inmsg) {
thism->LOCAL_MSG_NAME_FROM_UNION = *inmsg;
thism->event_data = NULL;
thism->next = NULL;
thism->prev = NULL;
thism->in_alt_q = 0;
thism->altq_next = NULL;
thism->altq_prev = NULL;
}
class End {
public:
int port;
int id;
bool isTerm;
End() {}
End(int _port, int _id, bool _isTerm) {
port = _port;
id = _id;
isTerm = _isTerm;
}
~End() {}
};
class Switch_info {
public:
std::string name;
long long guid;
int radix;
std::vector<End> ports;
~Switch_info() {}
};
class Terminal_info {
public:
std::string name;
long long guid;
int radix;
std::vector<End> ports;
~Terminal_info() {}
};
struct local_param
{
double link_bandwidth;/* bandwidth of each link */
double cn_bandwidth;/* injection bandwidth */
int num_vcs; /* number of virtual channels */
int vc_size; /* buffer size of the router-router channels */
int cn_vc_size; /* buffer size of the compute node channels */
int chunk_size; /* full-sized packets are broken into smaller chunks.*/
int router_delay; /* delay at each router */
int routing; /* type of routing */
//derived param
double cn_delay; /* bandwidth based time for 1 byte */
double link_delay; /* bandwidth based time for 1 byte */
double credit_delay; /* how long for credit to arrive - all bytes */
//CHANGE: add network specific data here
int num_switches, num_terminals;
std::vector<Switch_info> switch_id_to_info;
std::vector<Terminal_info> term_id_to_info;
std::map<long long, int> switch_guid_to_id, term_guid_to_id;
std::map<std::string, int> switch_name_to_id, term_name_to_id;
};
struct local_router_sample
{
tw_lpid router_id;
tw_stime* busy_time;
int64_t* link_traffic_sample;
tw_stime end_time;
long fwd_events;
long rev_events;
};
struct local_cn_sample
{
tw_lpid terminal_id;
long fin_chunks_sample;
long data_size_sample;
double fin_hops_sample;
tw_stime fin_chunks_time;
tw_stime *busy_time_sample;
tw_stime end_time;
long fwd_events;
long rev_events;
};
/* handles terminal and router events like packet generate/send/receive/buffer */
typedef struct terminal_state terminal_state;
typedef struct router_state router_state;
/* compute node data (think NIC) structure */
struct terminal_state
{
unsigned int terminal_id; //what is my local id
const char * anno;
const local_param *params;
int radix; //my radix
//which router I am connected to
tw_lpid* conn_gids;
// Each terminal will have input/output channel(s) with the router
int** vc_occupancy; // NUM_VC
tw_stime *terminal_available_time;
//available messages to be sent
message_list ***terminal_msgs;
message_list ***terminal_msgs_tail;
int *total_terminal_length;
int **terminal_length;
int *in_send_loop;
int issueIdle;
//packet aggregation
struct qhash_table *rank_tbl;
//transient storage for reverse computation
struct rc_stack * st;
//stats collection
uint64_t packet_counter;
int packet_gen;
int packet_fin;
struct mn_stats local_stats_array[CATEGORY_MAX];
tw_stime total_time;
uint64_t total_msg_size;
double total_hops;
long finished_msgs;
long finished_chunks;
long finished_packets;
//sampling
tw_stime *last_buf_full;
tw_stime *busy_time;
char output_buf[4096];
long fin_chunks_sample;
long data_size_sample;
double fin_hops_sample;
tw_stime fin_chunks_time;
tw_stime *busy_time_sample;
char sample_buf[4096];
struct local_cn_sample * sample_stat;
int op_arr_size;
int max_arr_size;
/* for logging forward and reverse events */
long fwd_events;
long rev_events;
int *lft;
};
//CHANGE: may need to change if more functionality is desired
/* event types */
enum event_t
{
T_GENERATE=1,
T_ARRIVE,
T_SEND,
T_BUFFER,
R_SEND,
R_ARRIVE,
R_BUFFER,
};
typedef enum event_t event_t;
//CHANGE: may need to change if more functionality is desired
/* whether the last hop of a packet was global, local or a terminal */
enum last_hop
{
ROUTER=1,
TERMINAL
};
//CHANGE: may need to change if more functionality is desired
enum ROUTING_ALGO
{
STATIC = 0,
ADAPTIVE,
};
static char routing_folder[MAX_NAME_LENGTH];
struct router_state
{
//who am I
unsigned int router_id;
const char * anno;
const local_param *params;
int radix;
//CHANGE: may need to be changed if linear storage is not desired
//array/linked list based storage of info about ports/vcs
tw_lpid* conn_gids;
tw_stime* next_output_available_time;
message_list ***pending_msgs;
message_list ***pending_msgs_tail;
message_list ***queued_msgs;
message_list ***queued_msgs_tail;
int** vc_occupancy;
int *in_send_loop;
int *queued_count;
//for reverse computation
struct rc_stack * st;
//sampling and stats
int64_t* link_traffic;
tw_stime* last_buf_full;
tw_stime* busy_time;
tw_stime* busy_time_sample;
struct local_router_sample * rsamples;
int op_arr_size;
int max_arr_size;
long fwd_events, rev_events;
int64_t * link_traffic_sample;
char output_buf[4096];
char output_buf2[4096];
//CHANGE: add network specific data here
int *lft;
int unused;
};
struct VC_Entry {
int vc;
message_list* entry;
};
//global stats
static tw_stime local_total_time = 0;
static tw_stime local_max_latency = 0;
static long long total_hops = 0;
static long long N_finished_packets = 0;
static long long total_msg_sz = 0;
static long long N_finished_msgs = 0;
static long long N_finished_chunks = 0;
/* returns the message size */
static int local_get_msg_sz(void)
{
return sizeof(LOCAL_MSG_STRUCT);
}
//CHANGE: network specific params have to be read here
static void local_read_config(const char * anno, local_param *params){
local_param *p = params;
// general params - do not change unless you intent to modify them
int rc = configuration_get_value_double(&config, "PARAMS", "link_bandwidth",
anno, &p->link_bandwidth);
if(rc) {
p->link_bandwidth = 5.25;
fprintf(stderr, "Bandwidth of links not specified, setting to %lf\n",
p->link_bandwidth);
}
rc = configuration_get_value_double(&config, "PARAMS", "cn_bandwidth",
anno, &p->cn_bandwidth);
if(rc) {
p->cn_bandwidth = 5.25;
fprintf(stderr, "Bandwidth of compute node channels not specified, setting "
"to %lf\n", p->cn_bandwidth);
}
rc = configuration_get_value_int(&config, "PARAMS", "num_vcs", anno,
&p->num_vcs);
if(rc) {
p->num_vcs = 1;
}
rc = configuration_get_value_int(&config, "PARAMS", "chunk_size", anno,
&p->chunk_size);
if(rc) {
p->chunk_size = 512;
fprintf(stderr, "Chunk size for packets is not specified, setting to %d\n",
p->chunk_size);
}
rc = configuration_get_value_int(&config, "PARAMS", "vc_size", anno,
&p->vc_size);
if(rc) {
p->vc_size = 32768;
fprintf(stderr, "Buffer size of link channels not specified, setting to %d\n",
p->vc_size);
}
rc = configuration_get_value_int(&config, "PARAMS", "cn_vc_size", anno,
&p->cn_vc_size);
if(rc) {
p->cn_vc_size = 65536;
fprintf(stderr, "Buffer size of compute node channels not specified, "
"setting to %d\n", p->cn_vc_size);
}
char routing_str[MAX_NAME_LENGTH];
configuration_get_value(&config, "PARAMS", "routing", anno, routing_str,
MAX_NAME_LENGTH);
if(strcmp(routing_str, "static") == 0) {
p->routing = STATIC;
} else {
p->routing = STATIC;
}
if(!g_tw_mynode) printf("Routing is %d\n", p->routing);
routing_folder[0] = '\0';
rc = configuration_get_value(&config, "PARAMS", "routing_folder", anno, routing_folder,
MAX_NAME_LENGTH);
if(routing_folder[0] == '\0') {
if(p->routing == STATIC) {
tw_error(TW_LOC, "routing_folder has to be provided with static routing");
}
}
p->router_delay = 50;
configuration_get_value_int(&config, "PARAMS", "router_delay", anno,
&p->router_delay);
configuration_get_value(&config, "PARAMS", "cn_sample_file", anno,
local_cn_sample_file, MAX_NAME_LENGTH);
configuration_get_value(&config, "PARAMS", "rt_sample_file", anno,
local_rtr_sample_file, MAX_NAME_LENGTH);
//CHANGE: add network specific parameters here
rc = configuration_get_value_int(&config, "PARAMS", "num_switches", anno,
&p->num_switches);
rc = configuration_get_value_int(&config, "PARAMS", "num_terminals", anno,
&p->num_terminals);
p->switch_id_to_info.resize(p->num_switches);
p->term_id_to_info.resize(p->num_terminals);
configuration_get_value(&config, "PARAMS", "network_graph_file", anno,
network_graph_file, MAX_NAME_LENGTH);
FILE *input_file = fopen(network_graph_file, "r");
Agraph_t *input_graph = agread(input_file, NULL);
int num_switches = 0, num_terminals = 0;
Agnode_t *g_node;
Agedge_t *e;
for(g_node = agfstnode(input_graph); g_node; g_node = agnxtnode(input_graph, g_node)) {
char *name = agnameof(g_node);
if(name[0] == 'S') {
Switch_info next_switch;
next_switch.name = std::string(name);
p->switch_name_to_id[next_switch.name] = num_switches;
char * comment = agget(g_node, "comment");
std::string comment_str(comment);
if(comment_str.find("root_switch") == std::string::npos ||
(comment_str.find("root_switch") > comment_str.find("radix"))) {
sscanf(comment, "%llx,radix=%d", &next_switch.guid, &next_switch.radix);
} else {
sscanf(comment, "%llx,root_switch,radix=%d", &next_switch.guid, &next_switch.radix);
}
p->switch_id_to_info[num_switches] = next_switch;
p->switch_id_to_info[num_switches].ports.resize(next_switch.radix);
p->switch_guid_to_id[next_switch.guid] = num_switches;
num_switches++;
}
if(name[0] == 'H') {
Terminal_info next_term;
next_term.name = std::string(name);
int term_id;
sscanf(name, "H<%d", &term_id);
p->term_name_to_id[next_term.name] = term_id;
char * comment = agget(g_node, "comment");
std::string comment_str(comment);
if(comment_str.find("radix") == std::string::npos) {
sscanf(comment, "%x", &next_term.guid);
next_term.radix = 1;
} else {
sscanf(comment, "%x,radix=%d", &next_term.guid, &next_term.radix);
}
p->term_id_to_info[term_id] = next_term;
p->term_id_to_info[term_id].ports.resize(next_term.radix);
p->term_guid_to_id[next_term.guid] = term_id;
if(term_id + 1 > num_terminals) num_terminals = term_id + 1;
}
}
assert(num_terminals == p->num_terminals);
assert(num_switches == p->num_switches);
for(g_node = agfstnode(input_graph); g_node; g_node = agnxtnode(input_graph, g_node)) {
char *name = agnameof(g_node);
int index_in_info;
if(name[0] == 'S') {
index_in_info = p->switch_name_to_id[std::string(name)];
} else {
index_in_info = p->term_name_to_id[std::string(name)];
}
for (e = agfstout(input_graph,g_node); e; e = agnxtout(input_graph,e)) {
char *partner = agnameof(e->node);
char *edge_comment = agget(e, "comment");
int srcPort, dstPort;
sscanf(edge_comment, "P%d->P%d", &srcPort, &dstPort);
int dstId; bool isTerm;
if(partner[0] == 'S') {
dstId = p->switch_name_to_id[std::string(partner)];
isTerm = false;
} else {
dstId = p->term_name_to_id[std::string(partner)];
isTerm = true;
}
if(name[0] == 'S') {
p->switch_id_to_info[index_in_info].ports[srcPort-1] = End(dstPort-1, dstId, isTerm);
} else {
p->term_id_to_info[index_in_info].ports[srcPort-1] = End(dstPort-1, dstId, isTerm);
}
}
}
agclose(input_graph);
fclose(input_file);
//CHANGE: derived parameters often are computed based on network specifics
//general derived parameters
p->cn_delay = bytes_to_ns(1, p->cn_bandwidth);
p->link_delay = bytes_to_ns(1, p->link_bandwidth);
p->credit_delay = 50; //using a constant
uint32_t h1 = 0, h2 = 0;
bj_hashlittle2(LP_METHOD_NM_TERM, strlen(LP_METHOD_NM_TERM), &h1, &h2);
terminal_magic_num = h1 + h2;
bj_hashlittle2(LP_METHOD_NM_ROUT, strlen(LP_METHOD_NM_ROUT), &h1, &h2);
router_magic_num = h1 + h2;
}
static void local_configure(){
anno_map = codes_mapping_get_lp_anno_map(LP_CONFIG_NM_TERM);
assert(anno_map);
num_params = anno_map->num_annos + (anno_map->has_unanno_lp > 0);
//all_params = (local_param *)malloc(num_params * sizeof(*all_params));
all_params = new local_param[num_params];
for (int i = 0; i < anno_map->num_annos; i++){
const char * anno = anno_map->annotations[i].ptr;
local_read_config(anno, &all_params[i]);
}
if (anno_map->has_unanno_lp > 0){
local_read_config(NULL, &all_params[anno_map->num_annos]);
}
}
/* report statistics like average and maximum packet latency, average number of hops traversed */
static void local_report_stats()
{
long long avg_hops, total_finished_packets, total_finished_chunks;
long long total_finished_msgs, final_msg_sz;
tw_stime avg_time, max_time;
int total_minimal_packets, total_nonmin_packets;
long total_gen, total_fin;
MPI_Reduce( &total_hops, &avg_hops, 1, MPI_LONG_LONG, MPI_SUM, 0,
MPI_COMM_WORLD);
MPI_Reduce( &N_finished_packets, &total_finished_packets, 1, MPI_LONG_LONG,
MPI_SUM, 0, MPI_COMM_WORLD);
MPI_Reduce( &N_finished_msgs, &total_finished_msgs, 1, MPI_LONG_LONG, MPI_SUM,
0, MPI_COMM_WORLD);
MPI_Reduce( &N_finished_chunks, &total_finished_chunks, 1, MPI_LONG_LONG,
MPI_SUM, 0, MPI_COMM_WORLD);
MPI_Reduce( &total_msg_sz, &final_msg_sz, 1, MPI_LONG_LONG, MPI_SUM, 0,
MPI_COMM_WORLD);
MPI_Reduce( &local_total_time, &avg_time, 1,MPI_DOUBLE, MPI_SUM, 0,
MPI_COMM_WORLD);
MPI_Reduce( &local_max_latency, &max_time, 1, MPI_DOUBLE, MPI_MAX, 0,
MPI_COMM_WORLD);
MPI_Reduce( &packet_gen, &total_gen, 1, MPI_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
MPI_Reduce( &packet_fin, &total_fin, 1, MPI_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
/* print statistics */
if(!g_tw_mynode)
{
printf(" Average number of hops traversed %f average chunk latency %lf us "
"maximum chunk latency %lf us avg message size %lf bytes finished "
"messages %lld finished chunks %lld \n",
(float)avg_hops/total_finished_chunks,
avg_time/(total_finished_chunks*1000), max_time/1000,
(float)final_msg_sz/total_finished_msgs, total_finished_msgs,
total_finished_chunks);
printf("\n Total packets generated %ld finished %ld \n", total_gen, total_fin);
}
return;
}
/* parse external file with give forwarding tables
* file name: 0x<guid>.lft
* content: '0x<terminal guid> <egress port>' per line, e.g.:
* `head 0x0000000100000000.lf`
* 0x0000000100000000 0
* 0x00000040000000ff 22
* 0x0000000100000001 19
*/
typedef struct {
uint64_t guid;
uint64_t port;
} guid_port_combi_t;
static int cmp_guids(const void *g1, const void *g2)
{
uint64_t guid1 = *((uint64_t *) g1);
uint64_t guid2 = ((guid_port_combi_t *) g2)->guid;
if (guid1 == guid2)
return 0;
else
return 1;
}
static int read_static_lft(long long guid, int * &lft, const local_param * params)
{
char dir_name[512];
char file_name[512];
char *io_dir = routing_folder;
sprintf(dir_name, "%s", io_dir);
sprintf(file_name, "%s/0x%016"PRIx64".lft", dir_name, guid);
FILE *file = NULL;
if (!(file = fopen(file_name, "r"))) {
printf("Coudn't open file %s\n", file_name);
fflush(stdout);
return -1;
}
char line[UINT8_MAX];
char *p = NULL, *e = NULL;
uint64_t dest_guid = 0, port = 0;
int num_added = 0;
//temp storage for the entire table
guid_port_combi_t *tmpLFT = (guid_port_combi_t*)calloc(params->num_terminals + params->num_switches, sizeof(guid_port_combi_t));
//storage for port to take for the destination terminal
lft = (int*)malloc(params->num_terminals * sizeof(int));
/* init all with -1 so that we find missing routing entries */
for (int i = 0; i < params->num_terminals; i++) lft[i] = -1;
while (fgets(line, sizeof(line), file)) {
p = line;
while (isspace(*p))
p++;
if (*p == '\0' || *p == '\n' || *p == '#')
continue;
dest_guid = strtoull(p, &e, 16);
if (e == p || (!isspace(*e) && *e != '#' && *e != '\0')) {
errno = EINVAL;
return -1;
}
p = e;
while (isspace(*p))
p++;
port = strtoull(p, &e, 0);
if (e == p || (!isspace(*e) && *e != '#' && *e != '\0')) {
errno = EINVAL;
return -1;
}
tmpLFT[num_added].guid = dest_guid;
tmpLFT[num_added].port = port;
num_added++;
}
/* we want a real LFT with terminal_id as lookup index, so have to
* convert the read lft, which might contain entries for switches
* or is permutated, into the correct format
*/
for (int dest_num = 0; dest_num < params->num_terminals; dest_num++) {
uint64_t key = params->term_id_to_info[dest_num].guid;
size_t size = params->num_terminals + params->num_switches;
guid_port_combi_t *elem = (guid_port_combi_t*)lfind(&key, tmpLFT, &size,
sizeof(guid_port_combi_t), cmp_guids);
assert(elem);
lft[dest_num] = elem->port - 1;
}
#if FATTREE_DEBUG
printf("I am guid=%016"PRIx64") and my LFT is:\n", guid);
for (int dest_num = 0; dest_num < params->num_terminals; dest_num++)
printf("\tdest %d -> egress port %d\n", dest_num, lft[dest_num]);
#endif
free(tmpLFT);
fclose(file);
return 0;
}
/* initialize a compute node terminal */
static void terminal_init( terminal_state * s, tw_lp * lp )
{
int i;
char anno[MAX_NAME_LENGTH];
s->packet_gen = 0;
s->packet_fin = 0;
codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, NULL,
&mapping_type_id, anno, &mapping_rep_id, &mapping_offset);
if (anno[0] == '\0') {
s->anno = NULL;
s->params = &all_params[num_params-1];
} else {
s->anno = strdup(anno);
int id = configuration_get_annotation_index(anno, anno_map);
s->params = &all_params[id];
}
int num_lps = codes_mapping_get_lp_count(lp_group_name, 0, LP_CONFIG_NM_TERM,
s->anno, 0);
s->terminal_id = codes_mapping_get_lp_relative_id(lp->gid, 0, 0);
const Terminal_info &my_info = s->params->term_id_to_info[s->terminal_id];
s->radix = my_info.radix;
s->conn_gids = (tw_lpid *)malloc(s->radix * sizeof(tw_lpid));
for(int port = 0; port < s->radix; port++) {
const End &next_port = my_info.ports[port];
if(next_port.isTerm) {
s->conn_gids[port] = codes_mapping_get_lpid_from_relative(next_port.id,
NULL, LP_CONFIG_NM_TERM, s->anno, 1);
} else {
s->conn_gids[port] = codes_mapping_get_lpid_from_relative(next_port.id,
NULL, LP_CONFIG_NM_ROUT, s->anno, 1);
}
}
s->packet_counter = 0;
s->finished_msgs = 0;
s->finished_chunks = 0;
s->finished_packets = 0;
s->total_time = 0.0;
s->total_msg_size = 0;
s->fwd_events = 0;
s->rev_events = 0;
rc_stack_create(&s->st);
s->rank_tbl = qhash_init(mn_rank_hash_compare, mn_hash_func, HASH_TABLE_SIZE);
if(!s->rank_tbl)
tw_error(TW_LOC, "\n Hash table not initialized! ");
s->terminal_msgs = (message_list ***)malloc(s->radix * sizeof(message_list**));
s->terminal_msgs_tail = (message_list ***)malloc(s->radix * sizeof(message_list**));
s->vc_occupancy = (int **)malloc(s->radix * sizeof(int*));
s->total_terminal_length = (int *)malloc(s->radix * sizeof(int));
s->terminal_length = (int **)malloc(s->radix * sizeof(int*));
s->in_send_loop = (int *)malloc(s->radix * sizeof(int));
s->terminal_available_time = (tw_stime*)malloc(s->radix * sizeof(tw_stime));
s->last_buf_full = (tw_stime*)malloc(s->radix * sizeof(tw_stime));
s->busy_time = (tw_stime*)malloc(s->radix * sizeof(tw_stime));
s->busy_time_sample = (tw_stime*)malloc(s->radix * sizeof(tw_stime));
for(int port = 0; port < s->radix; port++) {
s->terminal_msgs[port] =
(message_list **)malloc(s->params->num_vcs * sizeof(message_list*));
s->terminal_msgs_tail[port] =
(message_list **)malloc(s->params->num_vcs * sizeof(message_list*));
s->vc_occupancy[port] = (int*)malloc(s->params->num_vcs * sizeof(int));
s->terminal_length[port] = (int*)malloc(s->params->num_vcs * sizeof(int));
for(i = 0; i < s->params->num_vcs; i++ ) {
s->terminal_msgs[port][i] = NULL;
s->terminal_msgs_tail[port][i] = NULL;
s->vc_occupancy[port][i] = 0;
s->terminal_length[port][i] = 0;
}
s->total_terminal_length[port] = 0;
s->in_send_loop[port] = 0;
s->last_buf_full[port] = 0.0;
s->busy_time[port] = 0.0;
s->busy_time_sample[port] = 0.0;
s->terminal_available_time[port] = 0.0;
}
s->issueIdle = 0;
return;
}
void post_terminal_init(terminal_state *s, tw_lp *lp)
{
/* read any LFTs which might have been generated by an external routing
* algorithm, e.g., through the use of opensm\
*/
if(s->params->routing == STATIC) {
if(0 != read_static_lft(s->params->term_id_to_info[s->terminal_id].guid, s->lft, s->params)) {
tw_error(TW_LOC, "Error while reading the routing table for terminal");
}
}
}
//CHANGE: fill this function to set up router's connectivity and network
//specific data structures
static void create_router_connections(router_state * r, tw_lp * lp) {
local_param *p = (local_param *)r->params;
Switch_info &my_info = p->switch_id_to_info[r->router_id];
r->conn_gids = (tw_lpid *)malloc(r->radix * sizeof(tw_lpid));
for(int port = 0; port < r->radix; port++) {
End &next_port = my_info.ports[port];
if(next_port.isTerm) {
r->conn_gids[port] = codes_mapping_get_lpid_from_relative(next_port.id,
NULL, LP_CONFIG_NM_TERM, r->anno, 1);
} else {
r->conn_gids[port] = codes_mapping_get_lpid_from_relative(next_port.id,
NULL, LP_CONFIG_NM_ROUT, r->anno, 1);
}
}
}
static void router_setup(router_state * r, tw_lp * lp)
{
char anno[MAX_NAME_LENGTH];
codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, NULL,
&mapping_type_id, anno, &mapping_rep_id, &mapping_offset);
if (anno[0] == '\0'){
r->anno = NULL;
r->params = &all_params[num_params-1];
} else{
r->anno = strdup(anno);
int id = configuration_get_annotation_index(anno, anno_map);
r->params = &all_params[id];
}
local_param *p = (local_param *)r->params;
r->router_id = codes_mapping_get_lp_relative_id(lp->gid, 0, 0);
if(r->router_id >= p->num_switches) {
r->unused = 1;
return;
}
r->unused = 0;
Switch_info &my_info = p->switch_id_to_info[r->router_id];
r->radix = my_info.radix;
r->fwd_events = 0;
r->rev_events = 0;
r->next_output_available_time = (tw_stime*)malloc(r->radix * sizeof(tw_stime));
r->link_traffic = (int64_t*)malloc(r->radix * sizeof(int64_t));
r->link_traffic_sample = (int64_t*)malloc(r->radix * sizeof(int64_t));
r->vc_occupancy = (int**)malloc(r->radix * sizeof(int*));
r->in_send_loop = (int*)malloc(r->radix * sizeof(int));
r->pending_msgs =
(message_list ***)malloc(r->radix * sizeof(message_list**));
r->pending_msgs_tail =
(message_list ***)malloc(r->radix * sizeof(message_list**));
r->queued_msgs =
(message_list ***)malloc(r->radix * sizeof(message_list**));
r->queued_msgs_tail =
(message_list ***)malloc(r->radix * sizeof(message_list**));
r->queued_count = (int*)malloc(r->radix * sizeof(int));
r->last_buf_full = (tw_stime*)malloc(r->radix * sizeof(tw_stime));
r->busy_time = (tw_stime*)malloc(r->radix * sizeof(tw_stime));
r->busy_time_sample = (tw_stime*)malloc(r->radix * sizeof(tw_stime));
rc_stack_create(&r->st);
for(int i = 0; i < r->radix; i++)
{
// Set credit & router occupancy
r->last_buf_full[i] = 0.0;
r->busy_time[i] = 0.0;
r->busy_time_sample[i] = 0.0;
r->next_output_available_time[i] = 0;
r->link_traffic[i] = 0;
r->link_traffic_sample[i] = 0;
r->queued_count[i] = 0;
r->in_send_loop[i] = 0;
r->vc_occupancy[i] = (int *)malloc(p->num_vcs * sizeof(int));
r->pending_msgs[i] = (message_list **)malloc(p->num_vcs *
sizeof(message_list*));
r->pending_msgs_tail[i] = (message_list **)malloc(p->num_vcs *
sizeof(message_list*));
r->queued_msgs[i] = (message_list **)malloc(p->num_vcs *
sizeof(message_list*));
r->queued_msgs_tail[i] = (message_list **)malloc(p->num_vcs *
sizeof(message_list*));
for(int j = 0; j < p->num_vcs; j++) {
r->vc_occupancy[i][j] = 0;
r->pending_msgs[i][j] = NULL;
r->pending_msgs_tail[i][j] = NULL;
r->queued_msgs[i][j] = NULL;
r->queued_msgs_tail[i][j] = NULL;
}
}
create_router_connections(r, lp);
return;
}
void post_router_init(router_state *r, tw_lp *lp)
{
/* read any LFTs which might have been generated by an external routing
* algorithm, e.g., through the use of opensm\
*/
if(r->params->routing == STATIC && !r->unused) {
if(0 != read_static_lft(r->params->switch_id_to_info[r->router_id].guid, r->lft, r->params)) {
tw_error(TW_LOC, "Error while reading the routing table for switch");
}
}
}
/* packet event , generates a packet on the compute node */
static tw_stime local_packet_event(
model_net_request const * req,
uint64_t message_offset,
uint64_t packet_size,
tw_stime offset,
mn_sched_params const * sched_params,
void const * remote_event,
void const * self_event,
tw_lp *sender,
int is_last_pckt)
{
(void)message_offset;
(void)sched_params;
tw_event * e_new;
tw_stime xfer_to_nic_time;
LOCAL_MSG_STRUCT * msg;
char* tmp_ptr;
xfer_to_nic_time = codes_local_latency(sender);
e_new = model_net_method_event_new(sender->gid, xfer_to_nic_time+offset,
sender, LOCAL_NETWORK_NAME, (void**)&msg, (void**)&tmp_ptr);
strcpy(msg->category, req->category);
msg->final_dest_gid = req->final_dest_lp;
msg->total_size = req->msg_size;
msg->sender_lp = req->src_lp;
msg->sender_mn_lp = sender->gid;
msg->packet_size = packet_size;
msg->travel_start_time = tw_now(sender);
msg->remote_event_size_bytes = 0;
msg->local_event_size_bytes = 0;
msg->type = T_GENERATE;
msg->dest_terminal_id = req->dest_mn_lp;
msg->dest_terminal = codes_mapping_get_lp_relative_id(msg->dest_terminal_id, 0, 0);
msg->message_id = req->msg_id;
msg->is_pull = req->is_pull;
msg->pull_size = req->pull_size;
msg->magic = terminal_magic_num;
msg->msg_start_time = req->msg_start_time;
if(is_last_pckt) /* Its the last packet so pass in remote and local event information*/
{
if(req->remote_event_size > 0)
{
msg->remote_event_size_bytes = req->remote_event_size;
memcpy(tmp_ptr, remote_event, req->remote_event_size);
tmp_ptr += req->remote_event_size;
}
if(req->self_event_size > 0)
{
msg->local_event_size_bytes = req->self_event_size;
memcpy(tmp_ptr, self_event, req->self_event_size);
tmp_ptr += req->self_event_size;
}
}
tw_event_send(e_new);
return xfer_to_nic_time;
}
/* packet event reverse handler */
static void local_packet_event_rc(tw_lp *sender)
{
codes_local_latency_reverse(sender);
return;
}
/* generates packet at the current compute node */
static void packet_generate(terminal_state * s, tw_bf * bf, LOCAL_MSG_STRUCT * msg,
tw_lp * lp) {
packet_gen++;
s->packet_gen++;
tw_stime ts, nic_ts;
assert(lp->gid != msg->dest_terminal_id);
const local_param *p = s->params;
int total_event_size;
uint64_t num_chunks = msg->packet_size / p->chunk_size;
if (msg->packet_size % s->params->chunk_size)
num_chunks++;
if(!num_chunks)
num_chunks = 1;
nic_ts = g_tw_lookahead + (msg->packet_size * s->params->cn_delay) +
tw_rand_unif(lp->rng);
msg->packet_ID = lp->gid + g_tw_nlp * s->packet_counter;
msg->my_N_hop = 0;
int output_port = -1, use_vc = 0;
//find the next port by looking up the lft
if(s->params->routing == STATIC) {
//TODO support routing tables from terminals also
output_port = 0; //s->lft[msg->dest_terminal];
/* assert should only fail if read LFT is incomplete -> broken routing */
assert(output_port >= 0);
} else {
tw_error(TW_LOC, "Only static routing supported in this topology\n");
}
/* TODO: adaptive greedy vc selection */
int min_vc_length = s->terminal_length[output_port][use_vc];
for(int vc = 1; vc < s->params->num_vcs; vc++) {
if(s->terminal_length[output_port][vc] < min_vc_length) {
min_vc_length = s->terminal_length[output_port][vc];
use_vc = vc;
}
}
msg->saved_port = output_port;
msg->saved_vc = use_vc;
for(uint64_t i = 0; i < num_chunks; i++)
{
message_list *cur_chunk = (message_list*)malloc(
sizeof(message_list));
init_message_list(cur_chunk, msg);
if(msg->remote_event_size_bytes + msg->local_event_size_bytes > 0) {
cur_chunk->event_data = (char*)malloc(
msg->remote_event_size_bytes + msg->local_event_size_bytes);
}
void * m_data_src = model_net_method_get_edata(LOCAL_NETWORK_NAME, msg);
if (msg->remote_event_size_bytes){
memcpy(cur_chunk->event_data, m_data_src, msg->remote_event_size_bytes);
}
if (msg->local_event_size_bytes){
m_data_src = (char*)m_data_src + msg->remote_event_size_bytes;
memcpy((char*)cur_chunk->event_data + msg->remote_event_size_bytes,
m_data_src, msg->local_event_size_bytes);
}
cur_chunk->LOCAL_MSG_NAME_FROM_UNION.chunk_id = i;
cur_chunk->port = output_port; cur_chunk->index = use_vc;
append_to_message_list(s->terminal_msgs[output_port], s->terminal_msgs_tail[output_port],
use_vc, cur_chunk);
s->total_terminal_length[output_port] += s->params->chunk_size;
s->terminal_length[output_port][use_vc] += s->params->chunk_size;
}
if(s->total_terminal_length[output_port] < s->params->num_vcs * s->params->cn_vc_size) {
model_net_method_idle_event(nic_ts, 0, lp);
} else {
bf->c11 = 1;
s->issueIdle = 1;
}
if(s->in_send_loop[output_port] == 0) {
bf->c5 = 1;
ts = codes_local_latency(lp);
LOCAL_MSG_STRUCT *m;
tw_event* e = model_net_method_event_new(lp->gid, ts, lp, LOCAL_NETWORK_NAME,
(void**)&m, NULL);
m->type = T_SEND;
m->magic = terminal_magic_num;
m->vc_index = output_port;
s->in_send_loop[output_port] = 1;
tw_event_send(e);
}
total_event_size = model_net_get_msg_sz(LOCAL_NETWORK_NAME) +
msg->remote_event_size_bytes + msg->local_event_size_bytes;
mn_stats* stat;
stat = model_net_find_stats(msg->category, s->local_stats_array);
stat->send_count++;
stat->send_bytes += msg->packet_size;
stat->send_time += p->cn_delay * msg->packet_size;
if(stat->max_event_size < total_event_size)
stat->max_event_size = total_event_size;
return;
}
static void packet_generate_rc(terminal_state * s, tw_bf * bf, LOCAL_MSG_STRUCT * msg, tw_lp * lp)
{
int output_port = msg->saved_port;
int vc = msg->saved_vc;
s->packet_gen--;
packet_gen--;
tw_rand_reverse_unif(lp->rng);
int num_chunks = msg->packet_size/s->params->chunk_size;
if(msg->packet_size % s->params->chunk_size)
num_chunks++;
if(!num_chunks)
num_chunks = 1;
int i;
for(i = 0; i < num_chunks; i++) {
delete_message_list(return_tail(s->terminal_msgs[output_port],
s->terminal_msgs_tail[output_port], vc));
s->total_terminal_length[output_port] -= s->params->chunk_size;
s->terminal_length[output_port][vc] -= s->params->chunk_size;
}
if(bf->c11) {
s->issueIdle = 0;
}
if(bf->c5) {
codes_local_latency_reverse(lp);
s->in_send_loop[output_port] = 0;
}
struct mn_stats* stat;
stat = model_net_find_stats(msg->category, s->local_stats_array);
stat->send_count--;
stat->send_bytes -= msg->packet_size;
stat->send_time -= s->params->cn_delay * msg->packet_size;
}
/* sends the packet from the current compute node to the attached router */
static void packet_send(terminal_state * s, tw_bf * bf, LOCAL_MSG_STRUCT * msg,
tw_lp * lp) {
tw_stime ts;
tw_event *e;
LOCAL_MSG_STRUCT *m;
tw_lpid router_id;
std::vector<VC_Entry> entries;
bool noEmptyVC = false;
int output_port = msg->vc_index;
for(int i = 0; i < s->params->num_vcs; i++) {
if(s->terminal_msgs[output_port][i] != NULL) {
if(s->vc_occupancy[output_port][i] + s->params->chunk_size <= s->params->cn_vc_size) {
VC_Entry tmp;
tmp.vc = i; tmp.entry = s->terminal_msgs[output_port][i];
entries.push_back(tmp);
} else {
noEmptyVC = true;
}
}
}
if(entries.size() == 0) {
bf->c1 = 1;
s->in_send_loop[output_port] = 0;
if(noEmptyVC && !s->last_buf_full[output_port]) {
bf->c3 = 1;
msg->saved_busy_time = s->last_buf_full[output_port];
s->last_buf_full[output_port] = tw_now(lp);
}
return;
}
if(s->last_buf_full[output_port] > 0.0)
{
bf->c4 = 1;
msg->saved_total_time = s->busy_time[output_port];
msg->saved_busy_time = s->last_buf_full[output_port];
msg->saved_sample_time = s->busy_time_sample[output_port];
s->busy_time[output_port] += (tw_now(lp) - s->last_buf_full[output_port]);
s->busy_time_sample[output_port] += (tw_now(lp) - s->last_buf_full[output_port]);
s->last_buf_full[output_port] = 0.0;
}
//CHANGE: randomly pick the VC to send on can be changed.
int pick = tw_rand_integer(lp->rng, 0, entries.size() - 1);
message_list* cur_entry = entries[pick].entry;
int use_vc = entries[pick].vc;
msg->saved_vc = use_vc;
uint64_t num_chunks = cur_entry->LOCAL_MSG_NAME_FROM_UNION.packet_size/s->params->chunk_size;
if(cur_entry->LOCAL_MSG_NAME_FROM_UNION.packet_size % s->params->chunk_size)
num_chunks++;
if(!num_chunks)
num_chunks = 1;
tw_stime delay;
if((cur_entry->LOCAL_MSG_NAME_FROM_UNION.packet_size % s->params->chunk_size)
&& (cur_entry->LOCAL_MSG_NAME_FROM_UNION.chunk_id == num_chunks - 1))
delay = (cur_entry->LOCAL_MSG_NAME_FROM_UNION.packet_size % s->params->chunk_size) *
s->params->cn_delay;
else
delay = s->params->chunk_size * s->params->cn_delay;
msg->saved_available_time = s->terminal_available_time[output_port];
ts = g_tw_lookahead + delay + tw_rand_unif(lp->rng);
s->terminal_available_time[output_port] = maxd(s->terminal_available_time[output_port], tw_now(lp));
s->terminal_available_time[output_port] += ts;
ts = s->terminal_available_time[output_port] - tw_now(lp);
void * remote_event;
e = model_net_method_event_new(s->conn_gids[output_port], ts, lp, LOCAL_NETWORK_ROUTER_NAME,
(void**)&m, &remote_event);
memcpy(m, &cur_entry->LOCAL_MSG_NAME_FROM_UNION, sizeof(LOCAL_MSG_STRUCT));
if (m->remote_event_size_bytes){
memcpy(remote_event, cur_entry->event_data, m->remote_event_size_bytes);
}
m->type = R_ARRIVE;
m->src_terminal_id = lp->gid;
m->vc_index = output_port;
m->output_chan = use_vc;
m->last_hop = TERMINAL;
m->magic = router_magic_num;
m->local_event_size_bytes = 0;
tw_event_send(e);
if(cur_entry->LOCAL_MSG_NAME_FROM_UNION.chunk_id == num_chunks - 1 &&
(cur_entry->LOCAL_MSG_NAME_FROM_UNION.local_event_size_bytes > 0)) {
bf->c2 = 1;
tw_stime local_ts = codes_local_latency(lp);
tw_event *e_new = tw_event_new(cur_entry->LOCAL_MSG_NAME_FROM_UNION.sender_lp, local_ts, lp);
void * m_new = tw_event_data(e_new);
void *local_event = (char*)cur_entry->event_data +
cur_entry->LOCAL_MSG_NAME_FROM_UNION.remote_event_size_bytes;
memcpy(m_new, local_event, cur_entry->LOCAL_MSG_NAME_FROM_UNION.local_event_size_bytes);
tw_event_send(e_new);
}
s->packet_counter++;
s->vc_occupancy[output_port][use_vc] += s->params->chunk_size;
cur_entry = return_head(s->terminal_msgs[output_port], s->terminal_msgs_tail[output_port], use_vc);
rc_stack_push(lp, cur_entry, delete_message_list, s->st);
s->total_terminal_length[output_port] -= s->params->chunk_size;
s->terminal_length[output_port][use_vc] -= s->params->chunk_size;
LOCAL_MSG_STRUCT *m_new;
ts += tw_rand_unif(lp->rng);
e = model_net_method_event_new(lp->gid, ts, lp, LOCAL_NETWORK_NAME,
(void**)&m_new, NULL);
m_new->type = T_SEND;
m_new->vc_index = output_port;
m_new->magic = terminal_magic_num;
tw_event_send(e);
if(s->issueIdle) {
bf->c5 = 1;
s->issueIdle = 0;
ts += tw_rand_unif(lp->rng);
model_net_method_idle_event(ts, 0, lp);
}
return;
}
static void packet_send_rc(terminal_state * s, tw_bf * bf, LOCAL_MSG_STRUCT * msg,
tw_lp * lp)
{
int output_port = msg->vc_index;
if(bf->c1) {
s->in_send_loop[output_port] = 1;
if(bf->c3) {
s->last_buf_full[output_port] = msg->saved_busy_time;
}
return;
}
if(bf->c4)
{
s->busy_time[output_port] = msg->saved_total_time;
s->last_buf_full[output_port] = msg->saved_busy_time;
s->busy_time_sample[output_port] = msg->saved_sample_time;
}
tw_rand_reverse_unif(lp->rng);
tw_rand_reverse_unif(lp->rng);
s->terminal_available_time[output_port] = msg->saved_available_time;
if(bf->c2) {
codes_local_latency_reverse(lp);
}
int use_vc = msg->saved_vc;
s->packet_counter--;
s->vc_occupancy[output_port][use_vc] -= s->params->chunk_size;
message_list* cur_entry = (message_list *)rc_stack_pop(s->st);
cur_entry->port = output_port; cur_entry->index = use_vc;
prepend_to_message_list(s->terminal_msgs[output_port], s->terminal_msgs_tail[output_port],
use_vc, cur_entry);
s->terminal_length[output_port][use_vc] += s->params->chunk_size;
s->total_terminal_length[output_port] += s->params->chunk_size;
tw_rand_reverse_unif(lp->rng);
if(bf->c5)
{
tw_rand_reverse_unif(lp->rng);
s->issueIdle = 1;
}
return;
}
static void send_remote_event(terminal_state * s, LOCAL_MSG_STRUCT * msg,
tw_lp * lp, tw_bf * bf, char * event_data, int remote_event_size)
{
void * tmp_ptr = model_net_method_get_edata(LOCAL_NETWORK_NAME, msg);
//tw_stime ts = g_tw_lookahead + s->params->cn_delay *
// msg->remote_event_size_bytes + tw_rand_unif(lp->rng);;
tw_stime ts = g_tw_lookahead + tw_rand_unif(lp->rng);
if (msg->is_pull){
bf->c4 = 1;
struct codes_mctx mc_dst =
codes_mctx_set_global_direct(msg->sender_mn_lp);
struct codes_mctx mc_src =
codes_mctx_set_global_direct(lp->gid);
int net_id = model_net_get_id(LP_METHOD_NM_TERM);
model_net_set_msg_param(MN_MSG_PARAM_START_TIME,
MN_MSG_PARAM_START_TIME_VAL, &(msg->msg_start_time));
msg->event_rc = model_net_event_mctx(net_id, &mc_src, &mc_dst, msg->category,
msg->sender_lp, msg->pull_size, ts,
remote_event_size, tmp_ptr, 0, NULL, lp);
} else {
tw_event * e = tw_event_new(msg->final_dest_gid, ts, lp);
void * m_remote = tw_event_data(e);
memcpy(m_remote, event_data, remote_event_size);
tw_event_send(e);
}
return;
}
/* packet arrives at the destination terminal */
static void packet_arrive(terminal_state * s, tw_bf * bf, LOCAL_MSG_STRUCT * msg,
tw_lp * lp) {
assert(lp->gid == msg->dest_terminal_id);
//total chunks expected in this message
uint64_t total_chunks = msg->total_size / s->params->chunk_size;
if(msg->total_size % s->params->chunk_size)
total_chunks++;
if(!total_chunks)
total_chunks = 1;
/* send credit back to router */
tw_stime ts = g_tw_lookahead + s->params->credit_delay + tw_rand_unif(lp->rng);
tw_event * buf_e;
LOCAL_MSG_STRUCT * buf_msg;
buf_e = model_net_method_event_new(msg->intm_lp_id, ts, lp,
LOCAL_NETWORK_ROUTER_NAME, (void**)&buf_msg, NULL);
buf_msg->magic = router_magic_num;