Commit 562bdbbd authored by Noah Wolfe's avatar Noah Wolfe

Cleaned code. Removed unused collective functionality. Added ability to...

Cleaned code. Removed unused collective functionality. Added ability to compute router connections at runtime rather than load from file.
parent 77989e77
......@@ -198,14 +198,14 @@ static void issue_event(
*/
const char * anno;
configuration_get_value_int(&config, "PARAMS", "packet_size", anno, &this_packet_size);
configuration_get_value_int(&config, "PARAMS", "packet_size", NULL, &this_packet_size);
if(!this_packet_size) {
this_packet_size = 0;
fprintf(stderr, "Packet size not specified, setting to %d\n", this_packet_size);
exit(0);
}
configuration_get_value_double(&config, "PARAMS", "global_bandwidth", anno, &this_global_bandwidth);
configuration_get_value_double(&config, "PARAMS", "global_bandwidth", NULL, &this_global_bandwidth);
if(!this_global_bandwidth) {
this_global_bandwidth = 4.7;
fprintf(stderr, "Bandwidth of global channels not specified, setting to %lf\n", this_global_bandwidth);
......@@ -258,7 +258,7 @@ static void handle_kickoff_event(
svr_msg * m,
tw_lp * lp)
{
char* anno;
char anno[MAX_NAME_LENGTH];
tw_lpid local_dest = -1, global_dest = -1;
svr_msg * m_local = malloc(sizeof(svr_msg));
......@@ -270,11 +270,9 @@ static void handle_kickoff_event(
memcpy(m_remote, m_local, sizeof(svr_msg));
m_remote->svr_event_type = REMOTE;
//slimfly change
// assert(net_id == DRAGONFLY); /* only supported for dragonfly model right now. */
assert(net_id == SLIMFLY); /* only supported for dragonfly model right now. */
ns->start_ts = tw_now(lp);
// codes_mapping_get_lp_info(lp->gid, group_name, &group_index, lp_type_name, &lp_type_index, anno, &rep_id, &offset);
codes_mapping_get_lp_info(lp->gid, group_name, &group_index, lp_type_name, &lp_type_index, anno, &rep_id, &offset);
/* in case of uniform random traffic, send to a random destination. */
if(traffic == UNIFORM)
......@@ -471,8 +469,8 @@ int main(
}
*/
num_servers_per_rep = codes_mapping_get_lp_count("MODELNET_GRP", 1, "server", NULL, 1);
configuration_get_value_int(&config, "PARAMS", "num_terminals", anno, &num_terminals);
configuration_get_value_int(&config, "PARAMS", "num_routers", anno, &num_routers_per_grp);
configuration_get_value_int(&config, "PARAMS", "num_terminals", NULL, &num_terminals);
configuration_get_value_int(&config, "PARAMS", "num_routers", NULL, &num_routers_per_grp);
num_groups = (num_routers_per_grp * 2);
num_nodes = num_groups * num_routers_per_grp * num_servers_per_rep;
num_nodes_per_grp = num_routers_per_grp * num_servers_per_rep;
......
......@@ -25,13 +25,6 @@
#define MEAN_PROCESS 1.0
/* collective specific parameters */
#define TREE_DEGREE 4
#define LEVEL_DELAY 1000
#define SLIMFLY_COLLECTIVE_DEBUG 0
#define NUM_COLLECTIVES 1
#define COLLECTIVE_COMPUTATION_DELAY 5700
#define SLIMFLY_FAN_OUT_DELAY 20.0
#define WINDOW_LENGTH 0
#define DFLY_HASH_TABLE_SIZE 65536
// debugging parameters
......@@ -39,10 +32,10 @@
//#define TRACK 100001
#define TRACK_MSG 0
#define TRACK_OUTPUT 1
#define PRINT_ROUTER_TABLE 1
#define DEBUG 1
#define DEBUG_ROUTING 0
#define USE_DIRECT_SCHEME 1
#define LOAD_FROM_FILE 1
#define LP_CONFIG_NM (model_net_lp_config_names[SLIMFLY])
#define LP_METHOD_NM (model_net_method_names[SLIMFLY])
......@@ -61,12 +54,12 @@
#define TEMP_RADIX 10
#define TEMP_NUM_VC 4
//MMS19
//#define TEMP_NUM_GROUPS 26
//#define TEMP_NUM_ROUTERS 338
//#define TEMP_NUM_TERMINALS 3042
//#define TEMP_RADIX 28
//#define TEMP_NUM_VC 4
//MMS43
/*#define TEMP_NUM_GROUPS 26
#define TEMP_NUM_ROUTERS 338
#define TEMP_NUM_TERMINALS 3042
#define TEMP_RADIX 28
#define TEMP_NUM_VC 4
*///MMS43
//#define TEMP_NUM_GROUPS 58
//#define TEMP_NUM_ROUTERS 1682
//#define TEMP_NUM_TERMINALS 18502
......@@ -112,9 +105,8 @@ FILE * MMS_input_file=NULL;
int csf_ratio = 1; //Constant selected to balance the ratio between minimal and indirect routes
int num_indirect_routes = 4; //Number of indirect (Valiant) routes to use in Adaptive routing methods
float adaptive_threshold = 0.1;
float throughput_total;
float temp_t = 0.0;
float temp_t2 = 0.0;
float pe_throughput_percent = 0.0;
float pe_throughput = 0.0;
// MMS7 q=5 Slimfly basic configuration parameters
static int X[] = {1,4}; // : Subgraph 0 generator set
static int X_prime[] = {2,3}; // : Subgraph 1 generator set
......@@ -145,8 +137,6 @@ static int X_prime[] = {2,3}; // : Subgraph 1 generator set
/*End Noah Misc*/
long term_ecount, router_ecount, term_rev_ecount, router_rev_ecount;
static double maxd(double a, double b) { return a < b ? b : a; }
/* minimal and non-minimal packet counts for adaptive routing*/
......@@ -215,7 +205,7 @@ struct slimfly_param
double credit_delay;
//slimfly added
double router_delay; /*Router processing delay moving packet from input port to output port*/
double link_delay; /*Network link latency*/
double link_delay; /*Network link latency. Currently encorporated into the arrival time*/
int num_local_channels;
};
......@@ -258,36 +248,12 @@ struct terminal_state
// Terminal generate, sends and arrival T_SEND, T_ARRIVAL, T_GENERATE
// Router-Router Intra-group sends and receives RR_LSEND, RR_LARRIVE
// Router-Router Inter-group sends and receives RR_GSEND, RR_GARRIVE
struct mn_stats slimfly_stats_array[CATEGORY_MAX];
/* collective init time */
tw_stime collective_init_time;
/* node ID in the tree */
tw_lpid node_id;
/* messages sent & received in collectives may get interchanged several times so we have to save the
origin server information in the node's state */
tw_lpid origin_svr;
/* parent node ID of the current node */
tw_lpid parent_node_id;
/* array of children to be allocated in slim_terminal_init*/
tw_lpid* children;
/* children of a node can be less than or equal to the tree degree */
int num_children;
short is_root;
short is_leaf;
struct mn_stats slimfly_stats_array[CATEGORY_MAX];
struct rc_stack * st;
int issueIdle;
int terminal_length;
/* to maintain a count of child nodes that have fanned in at the parent during the collective
fan-in phase*/
int num_fan_nodes;
const char * anno;
const slimfly_param *params;
......@@ -313,10 +279,7 @@ enum event_t
T_BUFFER,
R_SEND,
R_ARRIVE,
R_BUFFER,
D_COLLECTIVE_INIT,
D_COLLECTIVE_FAN_IN,
D_COLLECTIVE_FAN_OUT
R_BUFFER
};
/* status of a virtual channel can be idle, active, allocated or wait for credit */
enum vc_status
......@@ -342,8 +305,7 @@ enum ROUTING_ALGO
{
MINIMAL = 0,
NON_MINIMAL,
ADAPTIVE,
PROG_ADAPTIVE
ADAPTIVE
};
struct router_state
......@@ -355,7 +317,6 @@ struct router_state
int* local_channel;
tw_stime* next_output_available_time;
tw_stime* cur_hist_start_time;
slim_terminal_message_list ***pending_msgs;
slim_terminal_message_list ***pending_msgs_tail;
slim_terminal_message_list ***queued_msgs;
......@@ -363,21 +324,19 @@ struct router_state
int *in_send_loop;
int** vc_occupancy;
int* link_traffic;
int* link_traffic; //Aren't used
const char * anno;
const slimfly_param *params;
int* prev_hist_num;
int* cur_hist_num;
int* prev_hist_num; //Aren't used
int* cur_hist_num; //Aren't used
};
static short routing = MINIMAL;
static tw_stime slimfly_total_time = 0;
static tw_stime slimfly_max_latency = 0;
static tw_stime max_collective = 0;
static long long total_hops = 0;
static long long N_finished_packets = 0;
......@@ -436,6 +395,7 @@ static void free_tmp(void * ptr)
free(sfly->remote_event_data);
free(sfly);
}
static void append_to_terminal_message_list(
slim_terminal_message_list ** thisq,
slim_terminal_message_list ** thistail,
......@@ -636,11 +596,13 @@ static void slimfly_read_config(const char * anno, slimfly_param *params){
fprintf(stderr, "Number of terminals not specified, setting to %d\n", p->num_cn);
}
p->router_delay = 50;
configuration_get_value_double(&config, "PARAMS", "router_delay", anno,
&p->router_delay);
p->router_delay = -1;
configuration_get_value_double(&config, "PARAMS", "router_delay", anno, &p->router_delay);
if(p->router_delay < 0) {
p->router_delay = 0;
fprintf(stderr, "Router delay not specified, setting to %lf\n", p->router_delay);
}
//printf("num_global_channels:%d\n",p->num_global_channels);
char routing_str[MAX_NAME_LENGTH];
configuration_get_value(&config, "PARAMS", "routing", anno, routing_str,
MAX_NAME_LENGTH);
......@@ -651,8 +613,6 @@ static void slimfly_read_config(const char * anno, slimfly_param *params){
routing = NON_MINIMAL;
else if (strcmp(routing_str, "adaptive") == 0)
routing = ADAPTIVE;
else if (strcmp(routing_str, "prog-adaptive") == 0)
routing = PROG_ADAPTIVE;
else
{
fprintf(stderr,
......@@ -706,6 +666,8 @@ static void slimfly_report_stats()
int total_minimal_packets, total_nonmin_packets;
float throughput_avg = 0.0;
float throughput_avg2 = 0.0;
int i,j,k,t;
char log[300];
MPI_Reduce( &total_hops, &avg_hops, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
MPI_Reduce( &N_finished_packets, &total_finished_packets, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
......@@ -715,39 +677,32 @@ static void slimfly_report_stats()
MPI_Reduce( &slimfly_total_time, &avg_time, 1,MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
MPI_Reduce( &slimfly_max_latency, &max_time, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
MPI_Reduce(&temp_t, &throughput_avg, 1, MPI_FLOAT, MPI_SUM, 0, MPI_COMM_WORLD);
MPI_Reduce(&temp_t2, &throughput_avg2, 1, MPI_FLOAT, MPI_SUM, 0, MPI_COMM_WORLD);
// if(routing == ADAPTIVE || routing == PROG_ADAPTIVE)
{
MPI_Reduce(&minimal_count, &total_minimal_packets, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
MPI_Reduce(&nonmin_count, &total_nonmin_packets, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
}
MPI_Reduce(&pe_throughput_percent, &throughput_avg, 1, MPI_FLOAT, MPI_SUM, 0, MPI_COMM_WORLD);
MPI_Reduce(&pe_throughput, &throughput_avg2, 1, MPI_FLOAT, MPI_SUM, 0, MPI_COMM_WORLD);
MPI_Reduce(&minimal_count, &total_minimal_packets, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
MPI_Reduce(&nonmin_count, &total_nonmin_packets, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
/* print statistics */
if(!g_tw_mynode)
{
printf(" Average number of hops traversed %f average chunk latency %lf us maximum chunk latency %lf us avg message size %lf bytes finished messages %lld \n", (float)avg_hops/total_finished_chunks, avg_time/(total_finished_chunks*1000), max_time/1000, (float)final_msg_sz/total_finished_msgs, total_finished_msgs);
if(routing == ADAPTIVE || routing == PROG_ADAPTIVE)
printf("\n ADAPTIVE ROUTING STATS: %d percent chunks routed minimally %d percent chunks routed non-minimally completed packets %lld ", total_minimal_packets, total_nonmin_packets, total_finished_chunks);
#if PARAMS_LOG
throughput_avg = throughput_avg / (float)slim_total_terminals_noah;
throughput_avg2 = throughput_avg2 / (float)slim_total_terminals_noah;
// printf("\n\n\n#########################throughput_avg:%f##########################\n\n\n",throughput_avg);
//Open file to append simulation results
char log[200];
sprintf( log, "slimfly-results-log.txt");
slimfly_results_log=fopen(log, "a");
if(slimfly_results_log == NULL)
tw_error(TW_LOC, "\n Failed to open slimfly results log file \n");
printf("Printing Simulation Parameters/Results Log File\n");
fprintf(slimfly_results_log,"%10.3lf, %15.3lf, %11.3lf, %13.3d, %16.3d, %25.5f, %14.5f, ", (float)avg_hops/total_finished_packets, avg_time/(total_finished_packets),max_time,total_minimal_packets,total_nonmin_packets,throughput_avg*100,throughput_avg2);
fprintf(slimfly_results_log,"%10.3lf, %15.3lf, %11.3lf, %13.3d, %16.3d, %16.3d, %25.5f, %14.5f, ", (float)avg_hops/total_finished_packets, avg_time/(total_finished_packets),max_time,total_minimal_packets,total_nonmin_packets,total_finished_chunks,throughput_avg*100,throughput_avg2);
fclose(slimfly_results_log);
#endif
}
int i,j,k,t;
char log[300];
#if ROUTER_OCCUPANCY_LOG
if(tw_ismaster())
{
......@@ -865,64 +820,6 @@ static void slimfly_report_stats()
return;
}
void slimfly_collective_init(terminal_state * s,
tw_lp * lp)
{
// TODO: be annotation-aware
codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, NULL,
&mapping_type_id, NULL, &mapping_rep_id, &mapping_offset);
int num_lps = codes_mapping_get_lp_count(lp_group_name, 1, LP_CONFIG_NM,
NULL, 1);
int num_reps = codes_mapping_get_group_reps(lp_group_name);
s->node_id = (mapping_rep_id * num_lps) + mapping_offset;
int i;
/* handle collective operations by forming a tree of all the LPs */
/* special condition for root of the tree */
if( s->node_id == 0)
{
s->parent_node_id = -1;
s->is_root = 1;
}
else
{
s->parent_node_id = (s->node_id - ((s->node_id - 1) % TREE_DEGREE)) / TREE_DEGREE;
s->is_root = 0;
}
s->children = (tw_lpid*)malloc(TREE_DEGREE * sizeof(tw_lpid));
/* set the isleaf to zero by default */
s->is_leaf = 1;
s->num_children = 0;
/* calculate the children of the current node. If its a leaf, no need to set children,
only set isleaf and break the loop*/
for( i = 0; i < TREE_DEGREE; i++ )
{
tw_lpid next_child = (TREE_DEGREE * s->node_id) + i + 1;
if(next_child < (num_lps * num_reps))
{
s->num_children++;
s->is_leaf = 0;
s->children[i] = next_child;
}
else
s->children[i] = -1;
}
#if SLIMFLY_COLLECTIVE_DEBUG == 1
printf("\n LP %ld parent node id ", s->node_id);
for( i = 0; i < TREE_DEGREE; i++ )
printf(" child node ID %ld ", s->children[i]);
printf("\n");
if(s->is_leaf)
printf("\n LP %ld is leaf ", s->node_id);
#endif
}
/* initialize a slimfly compute node terminal */
void
slim_terminal_init( terminal_state * s,
......@@ -989,7 +886,6 @@ slim_terminal_init( terminal_state * s,
s->in_send_loop = 0;
s->issueIdle = 0;
slimfly_collective_init(s, lp);
return;
}
......@@ -1024,7 +920,6 @@ void slim_router_setup(router_state * r, tw_lp * lp)
r->global_channel = (int*)malloc(p->num_global_channels * sizeof(int));
r->local_channel = (int*)malloc(p->num_local_channels * sizeof(int));
r->next_output_available_time = (tw_stime*)malloc(p->radix * sizeof(tw_stime));
r->cur_hist_start_time = (tw_stime*)malloc(p->radix * sizeof(tw_stime));
r->link_traffic = (int*)malloc(p->radix * sizeof(int));
r->cur_hist_num = (int*)malloc(p->radix * sizeof(int));
r->prev_hist_num = (int*)malloc(p->radix * sizeof(int));
......@@ -1044,7 +939,6 @@ void slim_router_setup(router_state * r, tw_lp * lp)
{
// Set credit & router occupancy
r->next_output_available_time[i]=0;
r->cur_hist_start_time[i] = 0;
r->link_traffic[i]=0;
r->cur_hist_num[i] = 0;
r->prev_hist_num[i] = 0;
......@@ -1068,8 +962,9 @@ void slim_router_setup(router_state * r, tw_lp * lp)
}
}
//slimfly added
//Load input MMS router and node layout/connection graph
#if LOAD_FROM_FILE
//slimfly added
//Load input MMS router and node layout/connection graph from file
char log[500];
sprintf( log, "simulation-input-files/MMS.%d/MMS.%d.%d.bsconf", p->num_global_channels+p->num_local_channels, p->num_global_channels+p->num_local_channels, p->num_cn);
MMS_input_file = fopen( log, "r");
......@@ -1115,6 +1010,127 @@ void slim_router_setup(router_state * r, tw_lp * lp)
}
fclose(MMS_input_file);
#else
//Compute MMS router layout/connection graph
int rid_s = (int)r->router_id; // ID for source router
int rid_d; // ID for dest. router
int s_s,s_d; // subgraph location for source and destination routers
int i_s,i_d; // x or m coordinates for source and destination routers
int j_s,j_d; // y or c coordinates for source and destination routers
int k;
int local_idx = 0;
int global_idx = 0;
int generator_size = sizeof(X)/sizeof(int);
printf("generator size:%d\n",generator_size);
for(rid_d=0;rid_d<r->params->slim_total_routers;rid_d++)
{
// Decompose source and destination Router IDs into 3D subgraph coordinates (subgraph,i,j)
if(rid_d >= r->params->slim_total_routers/2)
{
s_d = 1;
i_d = (rid_d - r->params->slim_total_routers/2) / r->params->num_global_channels;
j_d = (rid_d - r->params->slim_total_routers/2) % r->params->num_global_channels;
}
else
{
s_d = 0;
i_d = rid_d / r->params->num_global_channels;
j_d = rid_d % r->params->num_global_channels;
}
if(rid_s >= r->params->slim_total_routers/2)
{
s_s = 1;
i_s = (rid_s - r->params->slim_total_routers/2) / r->params->num_global_channels;
j_s = (rid_s - r->params->slim_total_routers/2) % r->params->num_global_channels;
}
else
{
s_s = 0;
i_s = rid_s / r->params->num_global_channels;
j_s = rid_s % r->params->num_global_channels;
}
// Check for subgraph 0 local connections
if(s_s==0 && s_d==0)
{
if(i_s==i_d) // equation (2) y-y' is in X'
{
for(k=0;k<generator_size;k++)
{
if(abs(j_s-j_d)==X[k])
{
r->local_channel[local_idx++] = rid_d;
printf("router%d,router%d\n",rid_s,rid_d);
}
}
}
}
// Check if global connections
if(s_s==0 && s_d==1)
{
if(j_s == (i_d*i_s + j_d) % r->params->num_routers) // equation (3) y=mx+c
{
r->global_channel[global_idx++] = rid_d;
printf("router%d,router%d\n",rid_s,rid_d);
}
}
}
// Loop over second subgraph source routers
for(rid_d==r->params->slim_total_routers-1;rid_d>=0;rid_d--)
{
// Decompose source and destination Router IDs into 3D subgraph coordinates (subgraph,i,j)
if(rid_d >= r->params->slim_total_routers/2)
{
s_d = 1;
i_d = (rid_d - r->params->slim_total_routers/2) / r->params->num_global_channels;
j_d = (rid_d - r->params->slim_total_routers/2) % r->params->num_global_channels;
}
else
{
s_d = 0;
i_d = rid_d / r->params->num_global_channels;
j_d = rid_d % r->params->num_global_channels;
}
if(rid_s >= r->params->slim_total_routers/2)
{
s_s = 1;
i_s = (rid_s - r->params->slim_total_routers/2) / r->params->num_global_channels;
j_s = (rid_s - r->params->slim_total_routers/2) % r->params->num_global_channels;
}
else
{
s_s = 0;
i_s = rid_s / r->params->num_global_channels;
j_s = rid_s % r->params->num_global_channels;
}
// Check for subgraph 1 local connections
if(s_s==1 && s_d==1)
{
if(i_s==i_d) // equation (2) c-c' is in X'
{
for(k=0;k<generator_size;k++)
{
if(abs(j_s-j_d)==X_prime[k])
{
r->local_channel[local_idx++] = rid_d;
printf("router%d,router%d\n",rid_s,rid_d);
}
}
}
}
// Check if global connections
if(s_s==1 && s_d==0)
{
if(j_d == (i_s*i_d + j_s) % r->params->num_routers) // equation (3) y=mx+c
{
r->global_channel[global_idx++] = rid_d;
printf("router%d,router%d\n",rid_s,rid_d);
}
}
}
#endif
#if DEBUG
// printf("\n LP ID %d VC occupancy radix %d Router %d group ID %d is connected to ", (int)lp->gid, p->radix, r->router_id, r->group_id);
#endif
......@@ -1926,274 +1942,6 @@ void slim_packet_arrive(terminal_state * s, tw_bf * bf, slim_terminal_message *
return;
}
/* collective operation for the torus network */
void slimfly_collective(char const * category, int message_size, int remote_event_size, const void* remote_event, tw_lp* sender)
{
tw_event * e_new;
tw_stime xfer_to_nic_time;
slim_terminal_message * msg;
tw_lpid local_nic_id;
char* tmp_ptr;
codes_mapping_get_lp_info(sender->gid, lp_group_name, &mapping_grp_id,
NULL, &mapping_type_id, NULL, &mapping_rep_id, &mapping_offset);
codes_mapping_get_lp_id(lp_group_name, LP_CONFIG_NM, NULL, 1,
mapping_rep_id, mapping_offset, &local_nic_id);
xfer_to_nic_time = codes_local_latency(sender);
e_new = model_net_method_event_new(local_nic_id, xfer_to_nic_time,
sender, SLIMFLY, (void**)&msg, (void**)&tmp_ptr);
msg->remote_event_size_bytes = message_size;
strcpy(msg->category, category);
msg->sender_svr=sender->gid;
msg->type = D_COLLECTIVE_INIT;
tmp_ptr = (char*)msg;
tmp_ptr += slimfly_get_msg_sz();
if(remote_event_size > 0)
{
msg->remote_event_size_bytes = remote_event_size;
memcpy(tmp_ptr, remote_event, remote_event_size);
tmp_ptr += remote_event_size;
}
tw_event_send(e_new);
return;
}
/* reverse for collective operation of the slimfly network */
void slimfly_collective_rc(int message_size, tw_lp* sender)
{
codes_local_latency_reverse(sender);
return;
}
static void send_collective_remote_event(terminal_state * s,
tw_bf * bf,
slim_terminal_message * msg,
tw_lp * lp)
{
// Trigger an event on receiving server
if(msg->remote_event_size_bytes)
{
tw_event* e;
tw_stime ts;
slim_terminal_message * m;
ts = (1/s->params->cn_bandwidth) * msg->remote_event_size_bytes;
e = codes_event_new(s->origin_svr, ts, lp);
m = tw_event_data(e);
char* tmp_ptr = (char*)msg;
tmp_ptr += slimfly_get_msg_sz();
memcpy(m, tmp_ptr, msg->remote_event_size_bytes);
tw_event_send(e);
}
}
static void node_collective_init(terminal_state * s,
tw_bf * bf,
slim_terminal_message * msg,
tw_lp * lp)
{
tw_event * e_new;
tw_lpid parent_nic_id;
tw_stime xfer_to_nic_time;
slim_terminal_message * msg_new;
int num_lps;
msg->saved_collective_init_time = s->collective_init_time;
s->collective_init_time = tw_now(lp);
s->origin_svr = msg->sender_svr;
if(s->is_leaf)
{
//printf("\n LP %ld sending message to parent %ld ", s->node_id, s->parent_node_id);
/* get the global LP ID of the parent node */
// TODO: be annotation-aware
codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id,
NULL, &mapping_type_id, NULL, &mapping_rep_id,
&mapping_offset);
num_lps = codes_mapping_get_lp_count(lp_group_name, 1, LP_CONFIG_NM,
s->anno, 0);
codes_mapping_get_lp_id(lp_group_name, LP_CONFIG_NM, s->anno, 0,
s->parent_node_id/num_lps, (s->parent_node_id % num_lps),
&parent_nic_id);
/* send a message to the parent that the LP has entered the collective operation */
xfer_to_nic_time = g_tw_lookahead + LEVEL_DELAY;
//e_new = codes_event_new(parent_nic_id, xfer_to_nic_time, lp);
void* m_data;
e_new = model_net_method_event_new(parent_nic_id, xfer_to_nic_time,
lp, SLIMFLY, (void**)&msg_new, (void**)&m_data);
memcpy(msg_new, msg, sizeof(slim_terminal_message));
if (msg->remote_event_size_bytes){
memcpy(m_data, model_net_method_get_edata(SLIMFLY, msg),
msg->remote_event_size_bytes);
}
msg_new->type = D_COLLECTIVE_FAN_IN;
msg_new->sender_node = s->node_id;
tw_event_send(e_new);
}
return;
}
static void node_collective_fan_in(terminal_state * s,
tw_bf * bf,
slim_terminal_message * msg,
tw_lp * lp)
{
int i;
s->num_fan_nodes++;
codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id,
NULL, &mapping_type_id, NULL, &mapping_rep_id, &mapping_offset);
int num_lps = codes_mapping_get_lp_count(lp_group_name, 1, LP_CONFIG_NM,
s->anno, 0);
tw_event* e_new;
slim_terminal_message * msg_new;
tw_stime xfer_to_nic_time;
bf->c1 = 0;
bf->c2 = 0;
/* if the number of fanned in nodes have completed at the current node then signal the parent */
if((s->num_fan_nodes == s->num_children) && !s->is_root)
{
bf->c1 = 1;
msg->saved_fan_nodes = s->num_fan_nodes-1;
s->num_fan_nodes = 0;
tw_lpid parent_nic_id;
xfer_to_nic_time = g_tw_lookahead + LEVEL_DELAY;
/* get the global LP ID of the parent node */
codes_mapping_get_lp_id(lp_group_name, LP_CONFIG_NM, s->anno, 0,
s->parent_node_id/num_lps, (s->parent_node_id % num_lps),
&parent_nic_id);
/* send a message to the parent that the LP has entered the collective operation */
//e_new = codes_event_new(parent_nic_id, xfer_to_nic_time, lp);
//msg_new = tw_event_data(e_new);
void * m_data;
e_new = model_net_method_event_new(parent_nic_id,
xfer_to_nic_time,
lp, SLIMFLY, (void**)&msg_new, &m_data);
memcpy(msg_new, msg, sizeof(slim_terminal_message));
msg_new->type = D_COLLECTIVE_FAN_IN;
msg_new->sender_node = s->node_id;
if (msg->remote_event_size_bytes){
memcpy(m_data, model_net_method_get_edata(SLIMFLY, msg),
msg->remote_event_size_bytes);
}
tw_event_send(e_new);
}
/* root node starts off with the fan-out phase */
if(s->is_root && (s->num_fan_nodes == s->num_children))
{
bf->c2 = 1;
msg->saved_fan_nodes = s->num_fan_nodes-1;