Commit e200280e authored by mubarak's avatar mubarak

Adding collective modeling functions in model-net (currently only...

Adding collective modeling functions in model-net (currently only dragonfly/torus models support it)
parent 784827fc
......@@ -32,6 +32,8 @@ struct model_net_method
int (*mn_get_msg_sz)();
void (*mn_report_stats)();
tw_lpid (*model_net_method_find_local_device)(tw_lp *sender);
void (*mn_collective_call)(char* category, int message_size, int remote_event_size, const void* remote_event, tw_lp* sender);
void (*mn_collective_call_rc)(int message_size, tw_lp* sender);
};
extern struct model_net_method * method_array[];
......
......@@ -153,6 +153,22 @@ int model_net_setup(char* net_name, uint64_t packet_size, const void* net_params
/* utility function to get the modelnet ID post-setup */
int model_net_get_id(char *net_name);
/* This event does a collective operation call for model-net */
void model_net_event_collective(
int net_id,
char* category,
int message_size,
int remote_event_size,
const void* remote_event,
tw_lp *sender);
/* reverse event of the collective operation call */
void model_net_event_collective_rc(
int net_id,
int message_size,
tw_lp *sender);
/* allocate and transmit a new event that will pass through model_net to
* arrive at its destination:
*
......
......@@ -44,15 +44,24 @@ struct terminal_message
int output_chan;
int is_pull;
uint64_t pull_size;
/* for reverse computation */
tw_stime saved_available_time;
tw_stime saved_credit_time;
tw_stime saved_collective_init_time;
int intm_group_id;
short chunk_id;
uint64_t packet_size;
int remote_event_size_bytes;
int local_event_size_bytes;
/* for reverse computation of a node's fan in*/
int saved_fan_nodes;
tw_lpid sender_svr;
/* LP ID of the sending node, has to be a network node in the dragonfly */
tw_lpid sender_node;
};
#endif /* end of include guard: DRAGONFLY_H */
......
......@@ -17,6 +17,9 @@ enum nodes_event_t
ARRIVAL,
SEND,
CREDIT,
T_COLLECTIVE_INIT,
T_COLLECTIVE_FAN_IN,
T_COLLECTIVE_FAN_OUT
};
struct nodes_message
......@@ -28,6 +31,9 @@ struct nodes_message
/* for reverse event computation*/
tw_stime saved_available_time;
/* message saved collective time */
tw_stime saved_collective_init_time;
/* packet ID */
unsigned long long packet_ID;
/* event type of the message */
......@@ -44,8 +50,12 @@ struct nodes_message
tw_lpid final_dest_gid;
/* destination torus node of the message */
tw_lpid dest_lp;
/* LP ID of the sender, comes from codes, can be a server or any other I/O LP type */
tw_lpid sender_lp;
/* LP ID of the sender, comes from codes, can be a server or any other I/O LP type. Should not change
during network operations. */
tw_lpid sender_svr;
/* LP ID of the sending node, has to be a network node in the torus */
tw_lpid sender_node;
/* number of hops traversed by the packet */
int my_N_hop;
......@@ -57,6 +67,10 @@ struct nodes_message
int next_stop;
/* size of the torus packet */
uint64_t packet_size;
/* for reverse computation of a node's fan in*/
int saved_fan_nodes;
/* chunk id of the flit (distinguishes flits) */
short chunk_id;
......
......@@ -17,11 +17,18 @@
#include "codes/model-net-lp.h"
#include "codes/net/dragonfly.h"
#define CHUNK_SIZE 32.0
#define CREDIT_SIZE 8
#define MEAN_PROCESS 1.0
/* collective specific parameters */
#define TREE_DEGREE 4
#define LEVEL_DELAY 1000
#define DRAGONFLY_COLLECTIVE_DEBUG 1
#define NUM_COLLECTIVES 1
#define COLLECTIVE_COMPUTATION_DELAY 5700
#define DRAGONFLY_FAN_OUT_DELAY 20.0
// debugging parameters
#define TRACK 235221
#define PRINT_ROUTER_TABLE 1
......@@ -80,7 +87,32 @@ struct terminal_state
// Router-Router Intra-group sends and receives RR_LSEND, RR_LARRIVE
// Router-Router Inter-group sends and receives RR_GSEND, RR_GARRIVE
struct mn_stats dragonfly_stats_array[CATEGORY_MAX];
/* collective init time */
tw_stime collective_init_time;
/* node ID in the tree */
tw_lpid node_id;
/* messages sent & received in collectives may get interchanged several times so we have to save the
origin server information in the node's state */
tw_lpid origin_svr;
/* parent node ID of the current node */
tw_lpid parent_node_id;
/* array of children to be allocated in terminal_init*/
tw_lpid* children;
/* children of a node can be less than or equal to the tree degree */
int num_children;
short is_root;
short is_leaf;
/* to maintain a count of child nodes that have fanned in at the parent during the collective
fan-in phase*/
int num_fan_nodes;
};
/* terminal event type (1-4) */
enum event_t
{
......@@ -90,7 +122,10 @@ enum event_t
T_BUFFER,
R_SEND,
R_ARRIVE,
R_BUFFER
R_BUFFER,
D_COLLECTIVE_INIT,
D_COLLECTIVE_FAN_IN,
D_COLLECTIVE_FAN_OUT
};
/* status of a virtual channel can be idle, active, allocated or wait for credit */
enum vc_status
......@@ -138,6 +173,7 @@ static uint64_t num_chunks;
static tw_stime dragonfly_total_time = 0;
static tw_stime dragonfly_max_latency = 0;
static tw_stime max_collective = 0;
static long long total_hops = 0;
......@@ -199,6 +235,61 @@ static void dragonfly_report_stats()
return;
}
void dragonfly_collective_init(terminal_state * s,
tw_lp * lp)
{
codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, &mapping_type_id, lp_type_name, &mapping_rep_id, &mapping_offset);
int num_lps = codes_mapping_get_lp_count(lp_group_name, LP_CONFIG_NM);
int num_reps = codes_mapping_get_group_reps(lp_group_name);
s->node_id = (mapping_rep_id * num_lps) + mapping_offset;
int i;
/* handle collective operations by forming a tree of all the LPs */
/* special condition for root of the tree */
if( s->node_id == 0)
{
s->parent_node_id = -1;
s->is_root = 1;
}
else
{
s->parent_node_id = (s->node_id - ((s->node_id - 1) % TREE_DEGREE)) / TREE_DEGREE;
s->is_root = 0;
}
s->children = (tw_lpid*)malloc(TREE_DEGREE * sizeof(tw_lpid));
/* set the isleaf to zero by default */
s->is_leaf = 1;
s->num_children = 0;
/* calculate the children of the current node. If its a leaf, no need to set children,
only set isleaf and break the loop*/
for( i = 0; i < TREE_DEGREE; i++ )
{
tw_lpid next_child = (TREE_DEGREE * s->node_id) + i + 1;
if(next_child < (num_lps * num_reps))
{
s->num_children++;
s->is_leaf = 0;
s->children[i] = next_child;
}
else
s->children[i] = -1;
}
#if DRAGONFLY_COLLECTIVE_DEBUG == 1
printf("\n LP %ld parent node id ", s->node_id);
for( i = 0; i < TREE_DEGREE; i++ )
printf(" child node ID %ld ", s->children[i]);
printf("\n");
if(s->is_leaf)
printf("\n LP %ld is leaf ", s->node_id);
#endif
}
/* dragonfly packet event , generates a dragonfly packet on the compute node */
static tw_stime dragonfly_packet_event(char* category, tw_lpid final_dest_lp, uint64_t packet_size, int is_pull, uint64_t pull_size, tw_stime offset, int remote_event_size, const void* remote_event, int self_event_size, const void* self_event, tw_lpid src_lp, tw_lp *sender, int is_last_pckt)
{
......@@ -596,9 +687,260 @@ terminal_init( terminal_state * s,
s->vc_occupancy[i]=0;
s->output_vc_state[i]=VC_IDLE;
}
dragonfly_collective_init(s, lp);
return;
}
/* collective operation for the torus network */
void dragonfly_collective(char* category, int message_size, int remote_event_size, const void* remote_event, tw_lp* sender)
{
tw_event * e_new;
tw_stime xfer_to_nic_time;
terminal_message * msg;
tw_lpid local_nic_id;
char* tmp_ptr;
codes_mapping_get_lp_info(sender->gid, lp_group_name, &mapping_grp_id, &mapping_type_id, lp_type_name, &mapping_rep_id, &mapping_offset);
codes_mapping_get_lp_id(lp_group_name, LP_CONFIG_NM, mapping_rep_id, mapping_offset, &local_nic_id);
xfer_to_nic_time = g_tw_lookahead + codes_local_latency(sender);
e_new = model_net_method_event_new(local_nic_id, xfer_to_nic_time,
sender, DRAGONFLY, (void**)&msg, (void**)&tmp_ptr);
msg->remote_event_size_bytes = message_size;
strcpy(msg->category, category);
msg->sender_svr=sender->gid;
msg->type = D_COLLECTIVE_INIT;
tmp_ptr = (char*)msg;
tmp_ptr += dragonfly_get_msg_sz();
if(remote_event_size > 0)
{
msg->remote_event_size_bytes = remote_event_size;
memcpy(tmp_ptr, remote_event, remote_event_size);
tmp_ptr += remote_event_size;
}
tw_event_send(e_new);
return;
}
/* reverse for collective operation of the dragonfly network */
void dragonfly_collective_rc(int message_size, tw_lp* sender)
{
codes_local_latency_reverse(sender);
return;
}
static void send_remote_event(terminal_state * s,
tw_bf * bf,
terminal_message * msg,
tw_lp * lp)
{
// Trigger an event on receiving server
if(msg->remote_event_size_bytes)
{
tw_event* e;
tw_stime ts;
terminal_message * m;
ts = (1/cn_bandwidth) * msg->remote_event_size_bytes;
e = codes_event_new(s->origin_svr, ts, lp);
m = tw_event_data(e);
char* tmp_ptr = (char*)msg;
tmp_ptr += dragonfly_get_msg_sz();
memcpy(m, tmp_ptr, msg->remote_event_size_bytes);
tw_event_send(e);
}
}
static void node_collective_init(terminal_state * s,
tw_bf * bf,
terminal_message * msg,
tw_lp * lp)
{
tw_event * e_new;
tw_lpid parent_nic_id;
tw_stime xfer_to_nic_time;
terminal_message * msg_new;
int num_lps;
msg->saved_collective_init_time = s->collective_init_time;
s->collective_init_time = tw_now(lp);
s->origin_svr = msg->sender_svr;
if(s->is_leaf)
{
//printf("\n LP %ld sending message to parent %ld ", s->node_id, s->parent_node_id);
/* get the global LP ID of the parent node */
codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, &mapping_type_id, lp_type_name, &mapping_rep_id, &mapping_offset);
num_lps = codes_mapping_get_lp_count(lp_group_name, LP_CONFIG_NM);
codes_mapping_get_lp_id(lp_group_name, LP_CONFIG_NM, s->parent_node_id/num_lps , (s->parent_node_id % num_lps), &parent_nic_id);
/* send a message to the parent that the LP has entered the collective operation */
xfer_to_nic_time = g_tw_lookahead + LEVEL_DELAY;
//e_new = codes_event_new(parent_nic_id, xfer_to_nic_time, lp);
void* m_data;
e_new = model_net_method_event_new(parent_nic_id, xfer_to_nic_time,
lp, DRAGONFLY, (void**)&msg_new, (void**)&m_data);
memcpy(msg_new, msg, sizeof(terminal_message));
if (msg->remote_event_size_bytes){
memcpy(m_data, model_net_method_get_edata(DRAGONFLY, msg),
msg->remote_event_size_bytes);
}
msg_new->type = D_COLLECTIVE_FAN_IN;
msg_new->sender_node = s->node_id;
tw_event_send(e_new);
}
return;
}
static void node_collective_fan_in(terminal_state * s,
tw_bf * bf,
terminal_message * msg,
tw_lp * lp)
{
int i;
s->num_fan_nodes++;
codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, &mapping_type_id, lp_type_name, &mapping_rep_id, &mapping_offset);
int num_lps = codes_mapping_get_lp_count(lp_group_name, LP_CONFIG_NM);
tw_event* e_new;
terminal_message * msg_new;
tw_stime xfer_to_nic_time;
bf->c1 = 0;
bf->c2 = 0;
/* if the number of fanned in nodes have completed at the current node then signal the parent */
if((s->num_fan_nodes == s->num_children) && !s->is_root)
{
bf->c1 = 1;
msg->saved_fan_nodes = s->num_fan_nodes-1;
s->num_fan_nodes = 0;
tw_lpid parent_nic_id;
xfer_to_nic_time = g_tw_lookahead + LEVEL_DELAY;
/* get the global LP ID of the parent node */
codes_mapping_get_lp_id(lp_group_name, LP_CONFIG_NM, s->parent_node_id/num_lps , (s->parent_node_id % num_lps), &parent_nic_id);
/* send a message to the parent that the LP has entered the collective operation */
//e_new = codes_event_new(parent_nic_id, xfer_to_nic_time, lp);
//msg_new = tw_event_data(e_new);
void * m_data;
e_new = model_net_method_event_new(parent_nic_id,
xfer_to_nic_time,
lp, DRAGONFLY, (void**)&msg_new, &m_data);
memcpy(msg_new, msg, sizeof(terminal_message));
msg_new->type = D_COLLECTIVE_FAN_IN;
msg_new->sender_node = s->node_id;
if (msg->remote_event_size_bytes){
memcpy(m_data, model_net_method_get_edata(DRAGONFLY, msg),
msg->remote_event_size_bytes);
}
tw_event_send(e_new);
}
/* root node starts off with the fan-out phase */
if(s->is_root && (s->num_fan_nodes == s->num_children))
{
bf->c2 = 1;
msg->saved_fan_nodes = s->num_fan_nodes-1;
s->num_fan_nodes = 0;
send_remote_event(s, bf, msg, lp);
for( i = 0; i < s->num_children; i++ )
{
tw_lpid child_nic_id;
/* Do some computation and fan out immediate child nodes from the collective */
xfer_to_nic_time = g_tw_lookahead + COLLECTIVE_COMPUTATION_DELAY + LEVEL_DELAY + tw_rand_exponential(lp->rng, (double)LEVEL_DELAY/50);
/* get global LP ID of the child node */
codes_mapping_get_lp_id(lp_group_name, LP_CONFIG_NM, s->children[i]/num_lps , (s->children[i] % num_lps), &child_nic_id);
//e_new = codes_event_new(child_nic_id, xfer_to_nic_time, lp);
//msg_new = tw_event_data(e_new);
void * m_data;
e_new = model_net_method_event_new(child_nic_id,
xfer_to_nic_time,
lp, DRAGONFLY, (void**)&msg_new, &m_data);
memcpy(msg_new, msg, sizeof(terminal_message));
if (msg->remote_event_size_bytes){
memcpy(m_data, model_net_method_get_edata(TORUS, msg),
msg->remote_event_size_bytes);
}
msg_new->type = D_COLLECTIVE_FAN_OUT;
msg_new->sender_node = s->node_id;
tw_event_send(e_new);
}
}
}
static void node_collective_fan_out(terminal_state * s,
tw_bf * bf,
terminal_message * msg,
tw_lp * lp)
{
int i;
int num_lps = codes_mapping_get_lp_count(lp_group_name, LP_CONFIG_NM);
bf->c1 = 0;
bf->c2 = 0;
send_remote_event(s, bf, msg, lp);
if(!s->is_leaf)
{
bf->c1 = 1;
tw_event* e_new;
nodes_message * msg_new;
tw_stime xfer_to_nic_time;
for( i = 0; i < s->num_children; i++ )
{
xfer_to_nic_time = g_tw_lookahead + DRAGONFLY_FAN_OUT_DELAY + tw_rand_exponential(lp->rng, (double)DRAGONFLY_FAN_OUT_DELAY/10);
if(s->children[i] > 0)
{
tw_lpid child_nic_id;
/* get global LP ID of the child node */
codes_mapping_get_lp_id(lp_group_name, LP_CONFIG_NM, s->children[i]/num_lps , (s->children[i] % num_lps), &child_nic_id);
//e_new = codes_event_new(child_nic_id, xfer_to_nic_time, lp);
//msg_new = tw_event_data(e_new);
//memcpy(msg_new, msg, sizeof(nodes_message) + msg->remote_event_size_bytes);
void* m_data;
e_new = model_net_method_event_new(child_nic_id,
xfer_to_nic_time,
lp, DRAGONFLY, (void**)&msg_new, &m_data);
memcpy(msg_new, msg, sizeof(nodes_message));
if (msg->remote_event_size_bytes){
memcpy(m_data, model_net_method_get_edata(DRAGONFLY, msg),
msg->remote_event_size_bytes);
}
msg_new->type = D_COLLECTIVE_FAN_OUT;
msg_new->sender_node = s->node_id;
tw_event_send(e_new);
}
}
}
//printf("\n Fan out phase completed %ld ", lp->gid);
if(max_collective < tw_now(lp) - s->collective_init_time )
{
bf->c2 = 1;
max_collective = tw_now(lp) - s->collective_init_time;
}
}
/* update the compute node-router channel buffer */
void
terminal_buf_update(terminal_state * s,
......@@ -639,7 +981,19 @@ terminal_event( terminal_state * s,
case T_BUFFER:
terminal_buf_update(s, bf, msg, lp);
break;
case D_COLLECTIVE_INIT:
node_collective_init(s, bf, msg, lp);
break;
case D_COLLECTIVE_FAN_IN:
node_collective_fan_in(s, bf, msg, lp);
break;
case D_COLLECTIVE_FAN_OUT:
node_collective_fan_out(s, bf, msg, lp);
break;
default:
printf("\n LP %d Terminal message type not supported %d ", (int)lp->gid, msg->type);
}
......@@ -681,13 +1035,14 @@ get_next_stop(router_state * s,
bf->c2 = 0;
/* If the packet has arrived at the destination router */
if(dest_router_id == local_router_id)
{
dest_lp = msg->dest_terminal_id;
return dest_lp;
}
// Generate inter-mediate destination
/* Generate inter-mediate destination for non-minimal routing (selecting a random group) */
if(msg->last_hop == TERMINAL && path == NON_MINIMAL)
{
if(dest_router_id / num_routers != s->group_id)
......@@ -697,25 +1052,29 @@ get_next_stop(router_state * s,
msg->intm_group_id = intm_grp_id;
}
}
/* It means that the packet has arrived at the inter-mediate group for non-minimal routing. Reset the group now. */
if(msg->intm_group_id == s->group_id)
{
msg->intm_group_id = -1;//no inter-mediate group
}
/* Intermediate group ID is set. Divert the packet to an intermediate group. */
if(msg->intm_group_id >= 0)
{
dest_group_id = msg->intm_group_id;
}
else
else /* direct the packet to the destination group */
{
dest_group_id = dest_router_id / num_routers;
}
/* It means the packet has arrived at the destination group. Now divert it to the destination router. */
if(s->group_id == dest_group_id)
{
dest_lp = dest_router_id;
}
else
{
/* Packet is at the source or intermediate group. Find a router that has a path to the destination group. */
dest_lp=getRouterFromGroupID(dest_group_id,s);
if(dest_lp == local_router_id)
......@@ -860,6 +1219,8 @@ if( msg->packet_ID == TRACK && next_stop != msg->dest_terminal_id && msg->chunk_
m->intm_lp_id = lp->gid;
s->vc_occupancy[output_chan]++;
/* Determine the event type. If the packet has arrived at the final destination
router then it should arrive at the destination terminal next. */
if(next_stop == msg->dest_terminal_id)
{
m->type = T_ARRIVE;
......@@ -869,8 +1230,10 @@ if( msg->packet_ID == TRACK && next_stop != msg->dest_terminal_id && msg->chunk_
}
else
{
/* The packet has to be sent to another router */
m->type = R_ARRIVE;
/* If this is a global channel then the buffer space is different */
if( global )
{
if(s->vc_occupancy[output_chan] >= global_vc_size * num_chunks )
......@@ -878,6 +1241,7 @@ if( msg->packet_ID == TRACK && next_stop != msg->dest_terminal_id && msg->chunk_
}
else
{
/* buffer space is less for local channels */
if( s->vc_occupancy[output_chan] >= local_vc_size * num_chunks )
s->output_vc_state[output_chan] = VC_CREDIT;
}
......@@ -1053,6 +1417,39 @@ void terminal_rc_event_handler(terminal_state * s, tw_bf * bf, terminal_message
s->output_vc_state[msg_indx] = VC_CREDIT;
}
break;
case D_COLLECTIVE_INIT:
{
s->collective_init_time = msg->saved_collective_init_time;
}
break;
case D_COLLECTIVE_FAN_IN:
{
int i;
s->num_fan_nodes--;
if(bf->c1)
{
s->num_fan_nodes = msg->saved_fan_nodes;
}
if(bf->c2)
{
s->num_fan_nodes = msg->saved_fan_nodes;
for( i = 0; i < s->num_children; i++ )
tw_rand_reverse_unif(lp->rng);
}
}
break;
case D_COLLECTIVE_FAN_OUT:
{
int i;
if(bf->c1)
{
for( i = 0; i < s->num_children; i++ )
tw_rand_reverse_unif(lp->rng);
}
}
}
}
......@@ -1171,6 +1568,8 @@ struct model_net_method dragonfly_method =
.mn_get_msg_sz = dragonfly_get_msg_sz,
.mn_report_stats = dragonfly_report_stats,
.model_net_method_find_local_device = dragonfly_find_local_device,
.mn_collective_call = dragonfly_collective,
.mn_collective_call_rc = dragonfly_collective_rc
};
......
......@@ -67,6 +67,12 @@ static int loggp_get_msg_sz(void);
/* Returns the loggp magic number */
static int loggp_get_magic();
/* collective network calls */
static void loggp_collective();
/* collective network calls-- rc */
static void loggp_collective_rc();
/* allocate a new event that will pass through loggp to arriave at its
* destination:
*
......@@ -118,6 +124,8 @@ struct model_net_method loggp_method =
.mn_get_msg_sz = loggp_get_msg_sz,
.mn_report_stats = loggp_report_stats,
.model_net_method_find_local_device = loggp_find_local_device,
.mn_collective_call = loggp_collective,
.mn_collective_call_rc = loggp_collective_rc
};
static void loggp_init(
......@@ -187,6 +195,20 @@ static void loggp_report_stats()
/* TODO: Do we have some loggp statistics to report like we have for torus and dragonfly? */
return;
}
/* collective network calls */
static void loggp_collective()
{
/* collectives not supported */
return;
}
static void loggp_collective_rc()
{
/* collectives not supported */
return;
}
static void loggp_init(
loggp_state * ns,
tw_lp * lp)
......
......@@ -522,6 +522,28 @@ uint64_t model_net_get_packet_size(int net_id)
return method_array[net_id]->packet_size; // TODO: where to set the packet size?
}
/* This event does a collective operation call for model-net */
void model_net_event_collective(int net_id, char* category, int message_size, int remote_event_size, const void* remote_event, tw_lp* sender)
{
if(net_id < 0 || net_id > MAX_NETS)
{
fprintf(stderr, "%s Error: Uninitializied modelnet network, call modelnet_init first\n", __FUNCTION__);
exit(-1);
}
return method_array[net_id]->mn_collective_call(category, message_size, remote_event_size, remote_event, sender);
}
/* reverse event of the collective operation call */
void model_net_event_collective_rc(int net_id, int message_size, tw_lp* sender)
{
if(net_id < 0 || net_id > MAX_NETS)
{
fprintf(stderr, "%s Error: Uninitializied modelnet network, call modelnet_init first\n", __FUNCTION__);
exit(-1);
}
return method_array[net_id]->mn_collective_call_rc(message_size, sender);
}
/* returns lp type for modelnet */
const tw_lptype* model_net_get_lp_type(int net_id)
{
......
......@@ -62,6 +62,12 @@ static int sn_get_magic();
static void print_msg(sn_message *m);
/* collective network calls */
static void simple_net_collective();
/* collective network calls-- rc */
static void simple_net_collective_rc();
/* allocate a new event that will pass through simplenet to arriave at its
* destination:
*
......@@ -111,6 +117,8 @@ struct model_net_method simplenet_method =
.mn_get_msg_sz = sn_get_msg_sz,
.mn_report_stats = sn_report_stats,
.model_net_method_find_local_device = sn_find_local_device,
.mn_collective_call = simple_net_collective,