From a349938c4478f496a770e0b220ece3155eb5fa3b Mon Sep 17 00:00:00 2001 From: mubarak Date: Mon, 14 Sep 2015 15:11:51 -0400 Subject: [PATCH] Adding congestion control in dragonfly (mostly comes from uiuc_tracer) --- codes/net/dragonfly.h | 4 + src/models/networks/model-net/dragonfly.c | 2493 +++++++++++---------- src/models/networks/model-net/torus.c | 2 +- tests/conf/modelnet-test-dragonfly.conf | 5 +- tests/modelnet-test.c | 8 +- 5 files changed, 1342 insertions(+), 1170 deletions(-) diff --git a/codes/net/dragonfly.h b/codes/net/dragonfly.h index d8a7e74..b7e4f53 100644 --- a/codes/net/dragonfly.h +++ b/codes/net/dragonfly.h @@ -40,6 +40,9 @@ struct terminal_message /* number of hops traversed by the packet */ short my_N_hop; + short my_l_hop, my_g_hop; + short saved_channel; + /* Intermediate LP ID from which this message is coming */ unsigned int intm_lp_id; short new_vc; @@ -77,6 +80,7 @@ struct terminal_message /* LP ID of the sending node, has to be a network node in the dragonfly */ tw_lpid sender_node; + tw_lpid next_stop; /* for reverse computation */ struct pending_router_msgs * saved_elem; diff --git a/src/models/networks/model-net/dragonfly.c b/src/models/networks/model-net/dragonfly.c index 5712c0d..1077802 100644 --- a/src/models/networks/model-net/dragonfly.c +++ b/src/models/networks/model-net/dragonfly.c @@ -11,12 +11,12 @@ #include #include "codes/codes_mapping.h" -#include "codes/jenkins-hash.h" #include "codes/codes.h" #include "codes/model-net.h" #include "codes/model-net-method.h" #include "codes/model-net-lp.h" #include "codes/net/dragonfly.h" +#include "sys/file.h" #define CREDIT_SIZE 8 #define MEAN_PROCESS 1.0 @@ -34,14 +34,11 @@ #define TRACK -1 #define PRINT_ROUTER_TABLE 1 #define DEBUG 0 +#define USE_DIRECT_SCHEME 1 #define LP_CONFIG_NM (model_net_lp_config_names[DRAGONFLY]) #define LP_METHOD_NM (model_net_method_names[DRAGONFLY]) -#define DRAGONFLY_DBG 0 -#define dprintf(_fmt, ...) \ - do {if (CLIENT_DBG) printf(_fmt, __VA_ARGS__);} while (0) - long term_ecount, router_ecount, term_rev_ecount, router_rev_ecount; static double maxd(double a, double b) { return a < b ? b : a; } @@ -71,14 +68,26 @@ int router_magic_num = 0; /* terminal magic number */ int terminal_magic_num = 0; -/* number of routers in a mapping group */ -static int num_routers_per_mgrp = 0; +typedef struct terminal_message_list terminal_message_list; +struct terminal_message_list { + terminal_message msg; + char* event_data; + terminal_message_list *next; + terminal_message_list *prev; +}; -/* maximum number of terminals and routers */ -int max_term_occupancy, max_router_occupancy; +void init_terminal_message_list(terminal_message_list *this, + terminal_message *inmsg) { + this->msg = *inmsg; + this->event_data = NULL; + this->next = NULL; + this->prev = NULL; +} -/* noise of 1ns */ -double noise = 1.0; +void delete_terminal_message_list(terminal_message_list *this) { + if(this->event_data != NULL) free(this->event_data); + free(this); +} struct dragonfly_param { @@ -92,7 +101,6 @@ struct dragonfly_param int global_vc_size; /* buffer size of the global channels */ int cn_vc_size; /* buffer size of the compute node channels */ int chunk_size; /* full-sized packets are broken into smaller chunks.*/ - // derived parameters int num_cn; int num_groups; @@ -100,20 +108,14 @@ struct dragonfly_param int total_routers; int total_terminals; int num_global_channels; + double cn_delay; + double local_delay; + double global_delay; + double credit_delay; }; -struct pending_router_msgs -{ - struct pending_router_msgs * next; - struct pending_router_msgs * prev; - char * event_data; - terminal_message msg; - int output_chan; - int next_stop; -}; /* handles terminal and router events like packet generate/send/receive/buffer */ typedef enum event_t event_t; - typedef struct terminal_state terminal_state; typedef struct router_state router_state; @@ -123,14 +125,17 @@ struct terminal_state unsigned long long packet_counter; // Dragonfly specific parameters - tw_lpid router_id; - tw_lpid terminal_id; + unsigned int router_id; + unsigned int terminal_id; // Each terminal will have an input and output channel with the router int* vc_occupancy; // NUM_VC - int* output_vc_state; + int num_vcs; tw_stime terminal_available_time; tw_stime next_credit_available_time; + terminal_message_list **terminal_msgs; + terminal_message_list **terminal_msgs_tail; + int in_send_loop; // Terminal generate, sends and arrival T_SEND, T_ARRIVAL, T_GENERATE // Router-Router Intra-group sends and receives RR_LSEND, RR_LARRIVE // Router-Router Inter-group sends and receives RR_GSEND, RR_GARRIVE @@ -159,7 +164,6 @@ struct terminal_state /* to maintain a count of child nodes that have fanned in at the parent during the collective fan-in phase*/ int num_fan_nodes; - int max_term_vc_occupancy; const char * anno; const dragonfly_param *params; @@ -170,8 +174,10 @@ enum event_t { T_GENERATE=1, T_ARRIVE, + T_SEND, T_BUFFER, - R_FORWARD, + R_SEND, + R_ARRIVE, R_BUFFER, D_COLLECTIVE_INIT, D_COLLECTIVE_FAN_IN, @@ -209,25 +215,26 @@ struct router_state { unsigned int router_id; unsigned int group_id; + + int* global_channel; tw_stime* next_output_available_time; tw_stime* next_credit_available_time; tw_stime* cur_hist_start_time; + terminal_message_list ***pending_msgs; + terminal_message_list ***pending_msgs_tail; + terminal_message_list ***queued_msgs; + terminal_message_list ***queued_msgs_tail; + int *in_send_loop; - int* vc_occupancy; - int* output_vc_state; - int * global_channel; - int max_router_vc_occupancy; + int** vc_occupancy; + int* link_traffic; const char * anno; const dragonfly_param *params; int* prev_hist_num; int* cur_hist_num; - - struct pending_router_msgs * head; - struct pending_router_msgs * tail; - int num_waiting; }; static short routing = MINIMAL; @@ -240,118 +247,146 @@ static tw_stime max_collective = 0; static long long total_hops = 0; static long long N_finished_packets = 0; -/* function definitions */ -static void router_credit_send(router_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp); +/* convert GiB/s and bytes to ns */ +static tw_stime bytes_to_ns(uint64_t bytes, double GB_p_s) +{ + tw_stime time; + + /* bytes to GB */ + time = ((double)bytes)/(1024.0*1024.0*1024.0); + /* MB to s */ + time = time / GB_p_s; + /* s to ns */ + time = time * 1000.0 * 1000.0 * 1000.0; + + return(time); +} /* returns the dragonfly router lp type for lp registration */ static const tw_lptype* dragonfly_get_router_lp_type(void); -/* remove head of the queue */ -void remove_pending_list_head(struct pending_router_msgs** head, - struct pending_router_msgs** tail, - terminal_message * msg) +/* returns the dragonfly message size */ +static int dragonfly_get_msg_sz(void) { - struct pending_router_msgs * elem = *head; - - if(*head == *tail) - *tail = NULL; + return sizeof(terminal_message); +} - *head = (*head)->next; - - if(*head) - (*head)->prev = NULL; - - free(elem); +static void append_to_terminal_message_list( + terminal_message_list ** thisq, + terminal_message_list ** thistail, + int index, + terminal_message_list *msg) { + if(thisq[index] == NULL) { + thisq[index] = msg; + } else { + thistail[index]->next = msg; + msg->prev = thistail[index]; + } + thistail[index] = msg; } -/* pending router messages */ -void add_pending_router_message(struct pending_router_msgs** head, - struct pending_router_msgs** tail, - terminal_message * msg, - int chan, - int next_stop) -{ - struct pending_router_msgs * elem = malloc(sizeof(struct pending_router_msgs)); - memcpy(&(elem->msg), msg, sizeof(terminal_message)); - elem->prev = NULL; - elem->next_stop = next_stop; - elem->output_chan = chan; +static void prepend_to_terminal_message_list( + terminal_message_list ** thisq, + terminal_message_list ** thistail, + int index, + terminal_message_list *msg) { + if(thisq[index] == NULL) { + thistail[index] = msg; + } else { + thisq[index]->prev = msg; + msg->next = thisq[index]; + } + thisq[index] = msg; +} - if(msg->remote_event_size_bytes) - { - void * m_data = msg+1; - elem->event_data = (void*)malloc(msg->remote_event_size_bytes); - memcpy(elem->event_data, m_data, msg->remote_event_size_bytes); +static void create_prepend_to_terminal_message_list( + terminal_message_list ** thisq, + terminal_message_list ** thistail, + int index, + terminal_message *msg) { + terminal_message_list* new_entry = (terminal_message_list*)malloc( + sizeof(terminal_message_list)); + init_terminal_message_list(new_entry, msg); + if(msg->remote_event_size_bytes) { + void *m_data = model_net_method_get_edata(DRAGONFLY, msg); + new_entry->event_data = (void*)malloc(msg->remote_event_size_bytes); + memcpy(new_entry->event_data, m_data, msg->remote_event_size_bytes); } - elem->next = *head; - - if(*head) - (*head)->prev = elem; - - if(!(*head)) - *tail = elem; - - *head = elem; - return; + prepend_to_terminal_message_list( thisq, thistail, index, new_entry); } -/* remove message from the queue */ -struct pending_router_msgs * remove_pending_router_msgs(struct pending_router_msgs** head, - struct pending_router_msgs** tail, - int chan) -{ - struct pending_router_msgs * elem = *head; - - if(!elem) - return NULL; - - while(elem != NULL) - { - if(elem->output_chan == chan) - { - /* Remove elemt from the list */ - /* if there is just one element */ - if(elem == *head && elem == *tail) - { - *head = NULL; - *tail = NULL; - } - /* if element if at the head */ - if(elem == *head && elem != *tail) - { - *head = elem->next; - elem->next->prev = NULL; - } - - /* if element is at the tail */ - if(elem == *tail && elem != *head) - { - *tail = elem->prev; - elem->prev->next = NULL; - } - - /* if element is in the middle */ - if(elem->prev) - elem->prev->next = elem->next; - - if(elem->next) - elem->next->prev = elem->prev; - - - //printf("\n Returned element %d %d %d", elem->msg.dest_terminal_id, chan, elem->next_stop); - return elem; - } - elem = elem->next; - } - return NULL; +static terminal_message_list* return_head( + terminal_message_list ** thisq, + terminal_message_list ** thistail, + int index) { + terminal_message_list *head = thisq[index]; + if(head != NULL) { + thisq[index] = head->next; + if(head->next != NULL) { + head->next->prev = NULL; + head->next = NULL; + } else { + thistail[index] = NULL; + } + } + return head; } -/* returns the dragonfly message size */ -static int dragonfly_get_msg_sz(void) -{ - return sizeof(terminal_message); +static terminal_message_list* return_tail( + terminal_message_list ** thisq, + terminal_message_list ** thistail, + int index) { + terminal_message_list *tail = thistail[index]; + if(tail->prev != NULL) { + tail->prev->next = NULL; + thistail[index] = tail->prev; + tail->prev = NULL; + } else { + thistail[index] = NULL; + thisq[index] = NULL; + } + return tail; } +static void copy_terminal_list_entry( terminal_message_list *cur_entry, + terminal_message *msg) { + terminal_message *cur_msg = &cur_entry->msg; + msg->travel_start_time = cur_msg->travel_start_time; + msg->packet_ID = cur_msg->packet_ID; + strcpy(msg->category, cur_msg->category); + msg->final_dest_gid = cur_msg->final_dest_gid; + msg->sender_lp = cur_msg->sender_lp; + msg->dest_terminal_id = cur_msg->dest_terminal_id; + msg->src_terminal_id = cur_msg->src_terminal_id; + msg->local_id = cur_msg->local_id; + msg->origin_router_id = cur_msg->origin_router_id; + msg->my_N_hop = cur_msg->my_N_hop; + msg->my_l_hop = cur_msg->my_l_hop; + msg->my_g_hop = cur_msg->my_g_hop; + msg->intm_lp_id = cur_msg->intm_lp_id; + msg->saved_channel = cur_msg->saved_channel; + msg->saved_vc = cur_msg->saved_vc; + msg->last_hop = cur_msg->last_hop; + msg->path_type = cur_msg->path_type; + msg->vc_index = cur_msg->vc_index; + msg->output_chan = cur_msg->output_chan; + msg->is_pull = cur_msg->is_pull; + msg->pull_size = cur_msg->pull_size; + msg->intm_group_id = cur_msg->intm_group_id; + msg->chunk_id = cur_msg->chunk_id; + msg->packet_size = cur_msg->packet_size; + msg->local_event_size_bytes = cur_msg->local_event_size_bytes; + msg->remote_event_size_bytes = cur_msg->remote_event_size_bytes; + msg->sender_node = cur_msg->sender_node; + msg->next_stop = cur_msg->next_stop; + msg->magic = cur_msg->magic; + + if(msg->local_event_size_bytes + msg->remote_event_size_bytes > 0) { + void *m_data = model_net_method_get_edata(DRAGONFLY, msg); + memcpy(m_data, cur_entry->event_data, + msg->local_event_size_bytes + msg->remote_event_size_bytes); + } +} static void dragonfly_read_config(const char * anno, dragonfly_param *params){ // shorthand dragonfly_param *p = params; @@ -364,63 +399,64 @@ static void dragonfly_read_config(const char * anno, dragonfly_param *params){ p->num_routers); } - configuration_get_value_int(&config, "PARAMS", "num_vcs", anno, + /*configuration_get_value_int(&config, "PARAMS", "num_vcs", anno, &p->num_vcs); - if(!p->num_vcs) { + if(p->num_vcs <= 0) { p->num_vcs = 1; fprintf(stderr, "Number of virtual channels not specified, setting to %d\n", p->num_vcs); - } + }*/ + + p->num_vcs = 3; configuration_get_value_int(&config, "PARAMS", "local_vc_size", anno, &p->local_vc_size); - if(p->local_vc_size <= 0) { + if(!p->local_vc_size) { p->local_vc_size = 1024; fprintf(stderr, "Buffer size of local channels not specified, setting to %d\n", p->local_vc_size); } configuration_get_value_int(&config, "PARAMS", "global_vc_size", anno, &p->global_vc_size); - if(p->global_vc_size <= 0) { + if(!p->global_vc_size) { p->global_vc_size = 2048; fprintf(stderr, "Buffer size of global channels not specified, setting to %d\n", p->global_vc_size); } configuration_get_value_int(&config, "PARAMS", "cn_vc_size", anno, &p->cn_vc_size); - if(p->cn_vc_size <= 0) { + if(!p->cn_vc_size) { p->cn_vc_size = 1024; fprintf(stderr, "Buffer size of compute node channels not specified, setting to %d\n", p->cn_vc_size); } configuration_get_value_int(&config, "PARAMS", "chunk_size", anno, &p->chunk_size); - if(p->chunk_size <= 0) { + if(!p->chunk_size) { p->chunk_size = 64; - fprintf(stderr, "Chunk size for packets is not specified, setting to %d\n", p->chunk_size); + fprintf(stderr, "Chunk size for packets is specified, setting to %d\n", p->chunk_size); } configuration_get_value_double(&config, "PARAMS", "local_bandwidth", anno, &p->local_bandwidth); - if(p->local_bandwidth <= 0) { + if(!p->local_bandwidth) { p->local_bandwidth = 5.25; fprintf(stderr, "Bandwidth of local channels not specified, setting to %lf\n", p->local_bandwidth); } configuration_get_value_double(&config, "PARAMS", "global_bandwidth", anno, &p->global_bandwidth); - if(p->global_bandwidth <= 0) { + if(!p->global_bandwidth) { p->global_bandwidth = 4.7; fprintf(stderr, "Bandwidth of global channels not specified, setting to %lf\n", p->global_bandwidth); } configuration_get_value_double(&config, "PARAMS", "cn_bandwidth", anno, &p->cn_bandwidth); - if(p->cn_bandwidth <= 0) { + if(!p->cn_bandwidth) { p->cn_bandwidth = 5.25; fprintf(stderr, "Bandwidth of compute node channels not specified, setting to %lf\n", p->cn_bandwidth); } - - char routing_str[MAX_NAME_LENGTH]; configuration_get_value(&config, "PARAMS", "routing", anno, routing_str, MAX_NAME_LENGTH); if(strcmp(routing_str, "minimal") == 0) routing = MINIMAL; - else if(strcmp(routing_str, "nonminimal")==0 || strcmp(routing_str,"non-minimal")==0) + else if(strcmp(routing_str, "nonminimal")==0 || + strcmp(routing_str,"non-minimal")==0) routing = NON_MINIMAL; else if (strcmp(routing_str, "adaptive") == 0) routing = ADAPTIVE; @@ -437,40 +473,29 @@ static void dragonfly_read_config(const char * anno, dragonfly_param *params){ p->num_cn = p->num_routers/2; p->num_global_channels = p->num_routers/2; p->num_groups = p->num_routers * p->num_cn + 1; - p->radix = p->num_vcs * - (p->num_routers + p->num_global_channels + p->num_cn); + p->radix = (p->num_cn + p->num_global_channels + p->num_routers); p->total_routers = p->num_groups * p->num_routers; p->total_terminals = p->total_routers * p->num_cn; + int rank; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + if(!rank) { + printf("\n Total nodes %d routers %d groups %d radix %d \n", + p->num_cn * p->total_routers, p->total_routers, p->num_groups, + p->radix); + } - if(!g_tw_mynode) - printf("\n Total nodes %d routers %d groups %d radix %d num_vc %d ", p->num_cn * p->total_routers, - p->total_routers, - p->num_groups, - p->radix, - p->num_vcs); -} - -/* convert GiB/s and bytes to ns */ -static tw_stime bytes_to_ns(uint64_t bytes, double GB_p_s) -{ - tw_stime time; - - /* bytes to GB */ - time = ((double)bytes)/(1024.0*1024.0*1024.0); - /* MB to s */ - time = time / GB_p_s; - /* s to ns */ - time = time * 1000.0 * 1000.0 * 1000.0; + p->cn_delay = (1.0 / p->cn_bandwidth) * p->chunk_size; + p->local_delay = (1.0 / p->local_bandwidth) * p->chunk_size; + p->global_delay = (1.0 / p->global_bandwidth) * p->chunk_size; + p->credit_delay = (1.0 / p->local_bandwidth) * 8; //assume 8 bytes packet - return(time); } -/* reverse computation for msg ready event */ static void dragonfly_configure(){ anno_map = codes_mapping_get_lp_anno_map(LP_CONFIG_NM); assert(anno_map); num_params = anno_map->num_annos + (anno_map->has_unanno_lp > 0); - all_params = calloc(num_params, sizeof(*all_params)); + all_params = malloc(num_params * sizeof(*all_params)); for (uint64_t i = 0; i < anno_map->num_annos; i++){ const char * anno = anno_map->annotations[i].ptr; @@ -486,32 +511,26 @@ static void dragonfly_report_stats() { long long avg_hops, total_finished_packets; tw_stime avg_time, max_time; - long total_term_events, total_router_events; int total_minimal_packets, total_nonmin_packets, total_completed_packets; MPI_Reduce( &total_hops, &avg_hops, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD); MPI_Reduce( &N_finished_packets, &total_finished_packets, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD); MPI_Reduce( &dragonfly_total_time, &avg_time, 1,MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD); MPI_Reduce( &dragonfly_max_latency, &max_time, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD); - MPI_Reduce( &term_ecount, &total_term_events, 1, MPI_LONG, MPI_SUM, 0, MPI_COMM_WORLD); - MPI_Reduce( &router_ecount, &total_router_events, 1, MPI_LONG, MPI_SUM, 0, MPI_COMM_WORLD); - if(routing == ADAPTIVE || routing == PROG_ADAPTIVE) { MPI_Reduce(&minimal_count, &total_minimal_packets, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD); MPI_Reduce(&nonmin_count, &total_nonmin_packets, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD); MPI_Reduce(&completed_packets, &total_completed_packets, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD); } + /* print statistics */ if(!g_tw_mynode) { - printf("\n total finished packets %lld ", total_finished_packets); - printf(" Average number of hops traversed %f average message latency %lf us maximum message latency %lf us\n", (float)avg_hops/total_finished_packets, avg_time/(total_finished_packets*1000), max_time/1000); + printf(" Average number of hops traversed %f average message latency %lf us maximum message latency %lf us \n", (float)avg_hops/total_finished_packets, avg_time/(total_finished_packets*1000), max_time/1000); if(routing == ADAPTIVE || routing == PROG_ADAPTIVE) - printf("\n ADAPTIVE ROUTING STATS: %d packets routed minimally %d packets routed non-minimally completed packets %d ", total_minimal_packets, total_nonmin_packets, total_completed_packets); + printf("\n ADAPTIVE ROUTING STATS: %d percent packets routed minimally %d percent packets routed non-minimally completed packets %d ", total_minimal_packets, total_nonmin_packets, total_completed_packets); - printf("\n Max terminal occupancy %d max router occupancy %d ", max_term_occupancy, max_router_occupancy); - printf("\n Event population: total committed terminal events:%ld router events: %ld ", total_term_events, total_router_events); } return; } @@ -574,31 +593,216 @@ void dragonfly_collective_init(terminal_state * s, #endif } -/* dragonfly packet event , generates a dragonfly packet on the compute node */ -static tw_stime dragonfly_packet_event(char const * category, tw_lpid final_dest_lp, tw_lpid dest_mn_lp, uint64_t packet_size, int is_pull, uint64_t pull_size, tw_stime offset, const mn_sched_params *sched_params, int remote_event_size, const void* remote_event, int self_event_size, const void* self_event, tw_lpid src_lp, tw_lp *sender, int is_last_pckt) +/* initialize a dragonfly compute node terminal */ +void +terminal_init( terminal_state * s, + tw_lp * lp ) +{ + uint32_t h1 = 0, h2 = 0; + bj_hashlittle2(LP_METHOD_NM, strlen(LP_METHOD_NM), &h1, &h2); + terminal_magic_num = h1 + h2; + + int i; + char anno[MAX_NAME_LENGTH]; + + // Assign the global router ID + // TODO: be annotation-aware + codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, NULL, + &mapping_type_id, anno, &mapping_rep_id, &mapping_offset); + if (anno[0] == '\0'){ + s->anno = NULL; + s->params = &all_params[num_params-1]; + } + else{ + s->anno = strdup(anno); + int id = configuration_get_annotation_index(anno, anno_map); + s->params = &all_params[id]; + } + + int num_lps = codes_mapping_get_lp_count(lp_group_name, 1, LP_CONFIG_NM, + s->anno, 0); + + s->terminal_id = (mapping_rep_id * num_lps) + mapping_offset; + s->router_id=(int)s->terminal_id / (s->params->num_routers/2); + s->terminal_available_time = 0.0; + s->packet_counter = 0; + + s->num_vcs = 1; + s->vc_occupancy = (int*)malloc(s->num_vcs * sizeof(int)); + + for( i = 0; i < s->num_vcs; i++ ) + { + s->vc_occupancy[i]=0; + } + + s->terminal_msgs = + (terminal_message_list**)malloc(1*sizeof(terminal_message_list*)); + s->terminal_msgs_tail = + (terminal_message_list**)malloc(1*sizeof(terminal_message_list*)); + s->terminal_msgs[0] = NULL; + s->terminal_msgs_tail[0] = NULL; + s->in_send_loop = 0; + + dragonfly_collective_init(s, lp); + return; +} + + +/* sets up the router virtual channels, global channels, + * local channels, compute node channels */ +void router_setup(router_state * r, tw_lp * lp) { + uint32_t h1 = 0, h2 = 0; + bj_hashlittle2(LP_METHOD_NM, strlen(LP_METHOD_NM), &h1, &h2); + router_magic_num = h1 + h2; + + char anno[MAX_NAME_LENGTH]; + codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, NULL, + &mapping_type_id, anno, &mapping_rep_id, &mapping_offset); + + if (anno[0] == '\0'){ + r->anno = NULL; + r->params = &all_params[num_params-1]; + } else{ + r->anno = strdup(anno); + int id = configuration_get_annotation_index(anno, anno_map); + r->params = &all_params[id]; + } + + // shorthand + const dragonfly_param *p = r->params; + + r->router_id=mapping_rep_id + mapping_offset; + r->group_id=r->router_id/p->num_routers; + + int i,j; + int router_offset = (r->router_id % p->num_routers) * + (p->num_global_channels / 2) + 1; + + r->global_channel = (int*)malloc(p->num_global_channels * sizeof(int)); + r->next_output_available_time = (tw_stime*)malloc(p->radix * sizeof(tw_stime)); + r->next_credit_available_time = (tw_stime*)malloc(p->radix * sizeof(tw_stime)); + r->cur_hist_start_time = (tw_stime*)malloc(p->radix * sizeof(tw_stime)); + r->link_traffic = (int*)malloc(p->radix * sizeof(int)); + r->cur_hist_num = (int*)malloc(p->radix * sizeof(int)); + r->prev_hist_num = (int*)malloc(p->radix * sizeof(int)); + + r->vc_occupancy = (int**)malloc(p->radix * sizeof(int*)); + r->in_send_loop = (int*)malloc(p->radix * sizeof(int)); + r->pending_msgs = + (terminal_message_list***)malloc(p->radix * sizeof(terminal_message_list**)); + r->pending_msgs_tail = + (terminal_message_list***)malloc(p->radix * sizeof(terminal_message_list**)); + r->queued_msgs = + (terminal_message_list***)malloc(p->radix * sizeof(terminal_message_list**)); + r->queued_msgs_tail = + (terminal_message_list***)malloc(p->radix * sizeof(terminal_message_list**)); + + for(i=0; i < p->radix; i++) + { + // Set credit & router occupancy + r->next_output_available_time[i]=0; + r->next_credit_available_time[i]=0; + r->cur_hist_start_time[i] = 0; + r->link_traffic[i]=0; + r->cur_hist_num[i] = 0; + r->prev_hist_num[i] = 0; + + r->in_send_loop[i] = 0; + r->vc_occupancy[i] = (int*)malloc(p->num_vcs * sizeof(int)); + r->pending_msgs[i] = (terminal_message_list**)malloc(p->num_vcs * + sizeof(terminal_message_list*)); + r->pending_msgs_tail[i] = (terminal_message_list**)malloc(p->num_vcs * + sizeof(terminal_message_list*)); + r->queued_msgs[i] = (terminal_message_list**)malloc(p->num_vcs * + sizeof(terminal_message_list*)); + r->queued_msgs_tail[i] = (terminal_message_list**)malloc(p->num_vcs * + sizeof(terminal_message_list*)); + for(j = 0; j < p->num_vcs; j++) { + r->vc_occupancy[i][j] = 0; + r->pending_msgs[i][j] = NULL; + r->pending_msgs_tail[i][j] = NULL; + r->queued_msgs[i][j] = NULL; + r->queued_msgs_tail[i][j] = NULL; + } + } + +#if DEBUG == 1 + printf("\n LP ID %d VC occupancy radix %d Router %d is connected to ", lp->gid, p->radix, r->router_id); +#endif + //round the number of global channels to the nearest even number +#if USE_DIRECT_SCHEME + int first = r->router_id % p->num_routers; + for(i=0; i < p->num_global_channels; i++) + { + int target_grp = first; + if(target_grp == r->group_id) { + target_grp = p->num_groups - 1; + } + int my_pos = r->group_id % p->num_routers; + if(r->group_id == p->num_groups - 1) { + my_pos = target_grp % p->num_routers; + } + r->global_channel[i] = target_grp * p->num_routers + my_pos; + first += p->num_routers; + } +#else + for(i=0; i < p->num_global_channels; i++) + { + if(i % 2 != 0) + { + r->global_channel[i]=(r->router_id + (router_offset * p->num_routers))%p->total_routers; + router_offset++; + } + else + { + r->global_channel[i]=r->router_id - ((router_offset) * p->num_routers); + } + if(r->global_channel[i]<0) + { + r->global_channel[i]=p->total_routers+r->global_channel[i]; + } +#if DEBUG == 1 + printf("\n channel %d ", r->global_channel[i]); +#endif + } +#endif + +#if DEBUG == 1 + printf("\n"); +#endif + return; +} + + +/* dragonfly packet event , generates a dragonfly packet on the compute node */ +static tw_stime dragonfly_packet_event(const char* category, + tw_lpid final_dest_lp, tw_lpid dest_mn_lp, + uint64_t packet_size, int is_pull, + uint64_t pull_size, tw_stime offset, const mn_sched_params *sched_params, + int remote_event_size, const void* remote_event, int self_event_size, + const void* self_event, tw_lpid src_lp, tw_lp *sender, int is_last_pckt) { tw_event * e_new; tw_stime xfer_to_nic_time; terminal_message * msg; char* tmp_ptr; - xfer_to_nic_time = codes_local_latency(sender); /* Throws an error of found last KP time > current event time otherwise when LPs of one type are placed together*/ + xfer_to_nic_time = codes_local_latency(sender); + //printf("\n transfer in time %f %f ", xfer_to_nic_time+offset, tw_now(sender)); + //e_new = tw_event_new(sender->gid, xfer_to_nic_time+offset, sender); + //msg = tw_event_data(e_new); e_new = model_net_method_event_new(sender->gid, xfer_to_nic_time+offset, sender, DRAGONFLY, (void**)&msg, (void**)&tmp_ptr); strcpy(msg->category, category); msg->final_dest_gid = final_dest_lp; - msg->dest_terminal_id = dest_mn_lp; msg->sender_lp=src_lp; - msg->sender_mn_lp = sender->gid; msg->packet_size = packet_size; msg->remote_event_size_bytes = 0; msg->local_event_size_bytes = 0; msg->type = T_GENERATE; - msg->magic = terminal_magic_num; msg->is_pull = is_pull; msg->pull_size = pull_size; - msg->chunk_id = 0; - msg->packet_ID = 0; + msg->magic = terminal_magic_num; if(is_last_pckt) /* Its the last packet so pass in remote and local event information*/ { @@ -615,6 +819,7 @@ static tw_stime dragonfly_packet_event(char const * category, tw_lpid final_dest tmp_ptr += self_event_size; } } + //printf("\n dragonfly remote event %d local event %d last packet %d %lf ", msg->remote_event_size_bytes, msg->local_event_size_bytes, is_last_pckt, xfer_to_nic_time); tw_event_send(e_new); return xfer_to_nic_time; } @@ -626,130 +831,19 @@ static void dragonfly_packet_event_rc(tw_lp *sender) return; } -void send_packet_from_router(router_state * s, - tw_bf * bf, - terminal_message * msg, - tw_lp * lp, - int output_chan, - int next_stop, - char * event_data) -{ - router_credit_send(s, bf, msg, lp); - - tw_stime ts; - tw_event *e; - terminal_message *m; - - int global=0; - int output_port = output_chan / s->params->num_vcs; - double bandwidth = s->params->local_bandwidth; - // Allocate output Virtual Channel - if(output_port >= s->params->num_routers && - output_port < s->params->num_routers + s->params->num_global_channels) - { - global = 1; - bandwidth = s->params->global_bandwidth; - } - - // If source router doesn't have global channel and buffer space is available, then assign to appropriate intra-group virtual channel - msg->saved_available_time = s->next_output_available_time[output_port]; - ts = g_tw_lookahead + bytes_to_ns(s->params->chunk_size, bandwidth) + tw_rand_exponential(lp->rng, noise); - - s->next_output_available_time[output_port] = maxd(s->next_output_available_time[output_port], tw_now(lp)); - s->next_output_available_time[output_port] += ts; - // dest can be a router or a terminal, so we must check - void * m_data; - if (next_stop == msg->dest_terminal_id){ - e = model_net_method_event_new(next_stop, - s->next_output_available_time[output_port] - tw_now(lp), lp, - DRAGONFLY, (void**)&m, &m_data); - } - else{ - e = tw_event_new(next_stop, s->next_output_available_time[output_port] - tw_now(lp), lp); - m = tw_event_data(e); - m_data = m+1; - } - memcpy(m, msg, sizeof(terminal_message)); - if (msg->remote_event_size_bytes){ - if(!event_data) - memcpy(m_data, msg+1, msg->remote_event_size_bytes); - else - memcpy(m_data, event_data, msg->remote_event_size_bytes); - } - if(global) - m->last_hop=GLOBAL; - else - m->last_hop = LOCAL; - - m->local_id = s->router_id; - /* for reverse computation */ - msg->new_vc = output_chan; - /* for sending back credit in forward event handler */ - m->saved_vc = output_chan; - m->intm_lp_id = lp->gid; - s->vc_occupancy[output_chan]++; - - if(s->vc_occupancy[output_chan] > s->max_router_vc_occupancy) - { - bf->c3 = 1; - msg->saved_occupancy = s->max_router_vc_occupancy; - s->max_router_vc_occupancy = s->vc_occupancy[output_chan]; - } - if(routing == PROG_ADAPTIVE) - { - if(tw_now(lp) - s->cur_hist_start_time[output_chan] >= WINDOW_LENGTH) - { - bf->c2 = 1; - msg->saved_hist_num = s->prev_hist_num[output_chan]; - msg->saved_hist_start_time = s->cur_hist_start_time[output_chan]; - - s->prev_hist_num[output_chan] = s->cur_hist_num[output_chan]; - - s->cur_hist_start_time[output_chan] = tw_now(lp); - s->cur_hist_num[output_chan] = 1; - } - else - { - s->cur_hist_num[output_chan]++; - } - } - /* Determine the event type. If the packet has arrived at the final destination - router then it should arrive at the destination terminal next. */ - if(next_stop == msg->dest_terminal_id) - { - m->type = T_ARRIVE; - m->magic = terminal_magic_num; - - if(s->vc_occupancy[output_chan] >= s->params->cn_vc_size) - s->output_vc_state[output_chan] = VC_CREDIT; - } - else - { - /* The packet has to be sent to another router */ - m->type = R_FORWARD; - m->magic = router_magic_num; - /* If this is a global channel then the buffer space is different */ - if( global ) - { - if(s->vc_occupancy[output_chan] >= s->params->global_vc_size) - s->output_vc_state[output_chan] = VC_CREDIT; - } - else - { - /* buffer space is less for local channels */ - if( s->vc_occupancy[output_chan] >= s->params->local_vc_size) - s->output_vc_state[output_chan] = VC_CREDIT; - } - } - tw_event_send(e); - return; - -} /* given two group IDs, find the router of the src_gid that connects to the dest_gid*/ tw_lpid getRouterFromGroupID(int dest_gid, int src_gid, - int num_routers) + int num_routers, + int total_groups) { +#if USE_DIRECT_SCHEME + int dest = dest_gid; + if(dest == total_groups - 1) { + dest = src_gid; + } + return src_gid * num_routers + (dest % num_routers); +#else int group_begin = src_gid * num_routers; int group_end = (src_gid * num_routers) + num_routers-1; int offset = (dest_gid * num_routers - group_begin) / num_routers; @@ -771,402 +865,389 @@ tw_lpid getRouterFromGroupID(int dest_gid, router_id = group_begin + (offset / half_channel); return router_id; +#endif } /*When a packet is sent from the current router and a buffer slot becomes available, a credit is sent back to schedule another packet event*/ -static void router_credit_send(router_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp) -{ +void router_credit_send(router_state * s, tw_bf * bf, terminal_message * msg, + tw_lp * lp, int sq) { tw_event * buf_e; tw_stime ts; terminal_message * buf_msg; - int dest=0, type = R_BUFFER; + int dest = 0, type = R_BUFFER; int is_terminal = 0; - int found_magic = router_magic_num; - tw_stime credit_delay = 0.0; const dragonfly_param *p = s->params; - int sender_radix; - // Notify sender terminal about available buffer space - if(msg->last_hop == TERMINAL) - { - dest = msg->src_terminal_id; - sender_radix = msg->local_id % p->num_cn; - //determine the time in ns to transfer the credit - credit_delay = bytes_to_ns(CREDIT_SIZE, p->cn_bandwidth); - type = T_BUFFER; - is_terminal = 1; - found_magic = terminal_magic_num; - } - else if(msg->last_hop == GLOBAL) - { - dest = msg->intm_lp_id; - sender_radix = p->num_cn + (msg->local_id % p->num_global_channels); - credit_delay = bytes_to_ns(CREDIT_SIZE, p->global_bandwidth); - } - else if(msg->last_hop == LOCAL) - { - dest = msg->intm_lp_id; - sender_radix = p->num_cn + p->num_global_channels + (msg->local_id % p->num_routers); - credit_delay = bytes_to_ns(CREDIT_SIZE, p->local_bandwidth) * CREDIT_SIZE; - } - else - printf("\n Invalid message type"); - - msg->sender_radix = sender_radix; - - assert(sender_radix < s->params->radix ); - - msg->saved_credit_time = s->next_credit_available_time[sender_radix]; - s->next_credit_available_time[sender_radix] = maxd(tw_now(lp), s->next_credit_available_time[sender_radix]); - ts = credit_delay + 0.1 + tw_rand_exponential(lp->rng, (double)credit_delay/1000); + + // Notify sender terminal about available buffer space + if(msg->last_hop == TERMINAL) { + dest = msg->src_terminal_id; + type = T_BUFFER; + is_terminal = 1; + } else if(msg->last_hop == GLOBAL) { + dest = msg->intm_lp_id; + } else if(msg->last_hop == LOCAL) { + dest = msg->intm_lp_id; + } else + printf("\n Invalid message type"); + + ts = g_tw_lookahead + s->params->credit_delay + tw_rand_unif(lp->rng); - s->next_credit_available_time[sender_radix]+=ts; - if (is_terminal){ - buf_e = model_net_method_event_new(dest, - s->next_credit_available_time[sender_radix] - tw_now(lp), lp, - DRAGONFLY, (void**)&buf_msg, NULL); - } - else{ - buf_e = tw_event_new(dest, s->next_credit_available_time[sender_radix] - tw_now(lp) , lp); - buf_msg = tw_event_data(buf_e); - } - buf_msg->origin_router_id = s->router_id; + if (is_terminal) { + buf_e = model_net_method_event_new(dest, ts, lp, DRAGONFLY, + (void**)&buf_msg, NULL); + buf_msg->magic = terminal_magic_num; + } else { + buf_e = tw_event_new(dest, ts , lp); + buf_msg = tw_event_data(buf_e); + buf_msg->magic = router_magic_num; + } + + if(sq == -1) { + buf_msg->vc_index = msg->vc_index; + buf_msg->output_chan = msg->output_chan; + } else { buf_msg->vc_index = msg->saved_vc; - buf_msg->type=type; - buf_msg->magic = found_magic; - buf_msg->last_hop = msg->last_hop; - buf_msg->packet_ID=msg->packet_ID; - - tw_event_send(buf_e); + buf_msg->output_chan = msg->saved_channel; + } + + buf_msg->type = type; - return; + tw_event_send(buf_e); + return; } -static void packet_generate_send_rc(terminal_state * s, - tw_bf * bf, - terminal_message * msg, - tw_lp * lp) +void packet_generate_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp) { term_rev_ecount++; term_ecount--; tw_rand_reverse_unif(lp->rng); - - s->terminal_available_time = msg->saved_available_time; - tw_rand_reverse_unif(lp->rng); - int vc = msg->saved_vc; - s->vc_occupancy[vc]--; - s->packet_counter--; - s->output_vc_state[vc] = VC_IDLE; - - //if(bf->c2) - // s->max_term_vc_occupancy = msg->saved_occupancy; - if (msg->chunk_id == (msg->num_chunks-1)){ - codes_local_latency_reverse(lp); - } + int num_chunks = msg->packet_size/s->params->chunk_size; + if(msg->packet_size % s->params->chunk_size) + num_chunks++; - if(bf->c1) - codes_local_latency_reverse(lp); + if(!num_chunks) + num_chunks++; - struct mn_stats* stat; - stat = model_net_find_stats(msg->category, s->dragonfly_stats_array); + int i; + for(i = 0; i < num_chunks; i++) { + delete_terminal_message_list(return_tail(s->terminal_msgs, + s->terminal_msgs_tail, 0)); + } + if(bf->c5) { + tw_rand_reverse_unif(lp->rng); + s->in_send_loop = 0; + } + struct mn_stats* stat; + stat = model_net_find_stats(msg->category, s->dragonfly_stats_array); stat->send_count--; stat->send_bytes -= msg->packet_size; stat->send_time -= (1/s->params->cn_bandwidth) * msg->packet_size; } /* generates packet at the current dragonfly compute node */ -static void packet_generate_send(terminal_state * s, - tw_bf * bf, - terminal_message * msg, - tw_lp * lp) -{ - bf->c1 = 0; - term_ecount++; - term_rev_ecount++; - - const dragonfly_param *p = s->params; +void packet_generate(terminal_state * s, tw_bf * bf, terminal_message * msg, + tw_lp * lp) { + term_ecount++; - tw_stime ts, travel_start_time; - tw_event *e; - tw_lpid router_id; - terminal_message *m; - int i, total_event_size; + tw_stime ts; + tw_lpid dest_terminal_id; + dest_terminal_id = model_net_find_local_device(DRAGONFLY, s->anno, 0, + msg->final_dest_gid); + msg->dest_terminal_id = dest_terminal_id; - uint64_t num_chunks = msg->packet_size / p->chunk_size; - if (msg->packet_size % s->params->chunk_size) - num_chunks++; + const dragonfly_param *p = s->params; - if(!num_chunks) - num_chunks = 1; + ts = g_tw_lookahead + s->params->cn_delay + tw_rand_unif(lp->rng); + model_net_method_idle_event(ts, 0, lp); - if(!msg->packet_ID) - msg->packet_ID = lp->gid + g_tw_nlp * s->packet_counter + tw_rand_integer(lp->rng, 0, lp->gid + g_tw_nlp * s->packet_counter); + tw_event *e; + terminal_message *m; - // Each packet is broken into chunks and then sent over the channel - msg->saved_available_time = s->terminal_available_time; - tw_stime head_delay = bytes_to_ns(s->params->chunk_size, s->params->cn_bandwidth); - ts = head_delay + tw_rand_exponential(lp->rng, noise); - //printf("\n ts %f calculated %f ", ts, s->params->chunk_size * (1/s->params->cn_bandwidth)); - s->terminal_available_time = maxd(s->terminal_available_time, tw_now(lp)); - s->terminal_available_time += ts; + int i, total_event_size; + int num_chunks = msg->packet_size / p->chunk_size; + if (msg->packet_size % s->params->chunk_size) num_chunks++; + msg->packet_ID = lp->gid + g_tw_nlp * s->packet_counter; + msg->travel_start_time = tw_now(lp); + msg->my_N_hop = 0; + msg->my_l_hop = 0; + msg->my_g_hop = 0; + msg->intm_group_id = -1; - - int chan = -1, j; - for(j = 0; j < p->num_vcs; j++) - { - if(s->vc_occupancy[j] < p->cn_vc_size) - { - chan=j; - break; - } - } - - /* for reverse computation */ - msg->saved_vc = chan; - - /* simulation should exit */ - if(chan == -1) - tw_error(TW_LOC, "\n No terminal buffers available, increase buffer size"); - - //TODO: be annotation-aware - codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, NULL, - &mapping_type_id, NULL, &mapping_rep_id, &mapping_offset); - - codes_mapping_get_lp_id(lp_group_name, "dragonfly_router", NULL, 1, - s->router_id/num_routers_per_mgrp, - s->router_id % num_routers_per_mgrp, &router_id); - - e = tw_event_new(router_id, s->terminal_available_time - tw_now(lp), lp); - m = tw_event_data(e); - memcpy(m, msg, sizeof(terminal_message)); - - if(msg->chunk_id == 0) - travel_start_time = tw_now(lp); - else - travel_start_time = msg->travel_start_time; - - m->num_chunks = num_chunks; - m->magic = router_magic_num; - m->origin_router_id = s->router_id; - m->type = R_FORWARD; - m->src_terminal_id = lp->gid; - m->chunk_id = msg->chunk_id; - m->last_hop = TERMINAL; - m->intm_group_id = -1; - m->travel_start_time = travel_start_time; - m->path_type = -1; - m->local_event_size_bytes = 0; - m->local_id = s->terminal_id; - - if (msg->remote_event_size_bytes){ - memcpy(m+1, model_net_method_get_edata(DRAGONFLY, msg), - msg->remote_event_size_bytes); - } - - tw_event_send(e); + for(i = 0; i < num_chunks; i++) + { + terminal_message_list *cur_chunk = (terminal_message_list*)malloc( + sizeof(terminal_message_list)); + init_terminal_message_list(cur_chunk, msg); - if(msg->packet_ID == TRACK && msg->chunk_id == num_chunks - 1) - printf("\n packet %d generated chunk id %d reach at time %lf ", - msg->packet_ID, - msg->chunk_id, - s->terminal_available_time - tw_now(lp)); + if(msg->remote_event_size_bytes + msg->local_event_size_bytes > 0) { + cur_chunk->event_data = (char*)malloc( + msg->remote_event_size_bytes + msg->local_event_size_bytes); + } + + void * m_data_src = model_net_method_get_edata(DRAGONFLY, msg); + if (msg->remote_event_size_bytes){ + memcpy(cur_chunk->event_data, m_data_src, msg->remote_event_size_bytes); + } + if (msg->local_event_size_bytes){ + m_data_src = (char*)m_data_src + msg->remote_event_size_bytes; + memcpy((char*)cur_chunk->event_data + msg->remote_event_size_bytes, + m_data_src, msg->local_event_size_bytes); + } - if(msg->chunk_id == num_chunks - 1) - { - // now that message is sent, issue an "idle" event to tell the scheduler - // when I'm next available - model_net_method_idle_event(codes_local_latency(lp) + - s->terminal_available_time - tw_now(lp), 0, lp); - - /* local completion message */ - if(msg->local_event_size_bytes > 0) - { - tw_event* e_new; - terminal_message* m_new; - void* local_event = - (char*)model_net_method_get_edata(DRAGONFLY, msg) + - msg->remote_event_size_bytes; - ts = g_tw_lookahead + (1/s->params->cn_bandwidth) * msg->local_event_size_bytes; - e_new = tw_event_new(msg->sender_lp, ts, lp); - m_new = tw_event_data(e_new); - memcpy(m_new, local_event, msg->local_event_size_bytes); - tw_event_send(e_new); - } - } - s->packet_counter++; - s->vc_occupancy[chan]++; + cur_chunk->msg.chunk_id = i; + append_to_terminal_message_list(s->terminal_msgs, s->terminal_msgs_tail, + 0, cur_chunk); + } - if(s->vc_occupancy[chan] > s->max_term_vc_occupancy) - { - bf->c2 = 1; - //msg->saved_occupancy = s->max_term_vc_occupancy; - s->max_term_vc_occupancy = s->vc_occupancy[chan]; - } + if(s->in_send_loop == 0) { + bf->c5 = 1; + terminal_message *m; + ts = g_tw_lookahead + s->params->cn_delay + tw_rand_unif(lp->rng); + tw_event* e = model_net_method_event_new(lp->gid, ts, lp, DRAGONFLY, + (void**)&m, NULL); + m->type = T_SEND; + m->magic = terminal_magic_num; + s->in_send_loop = 1; + tw_event_send(e); + } - if(s->vc_occupancy[chan] >= s->params->cn_vc_size) - s->output_vc_state[chan] = VC_CREDIT; - - /* calculate statistics */ - total_event_size = model_net_get_msg_sz(DRAGONFLY) + - msg->remote_event_size_bytes + msg->local_event_size_bytes; - struct mn_stats* stat; - stat = model_net_find_stats(msg->category, s->dragonfly_stats_array); - stat->send_count++; - stat->send_bytes += msg->packet_size; - stat->send_time += (s->terminal_available_time - tw_now(lp)); - if(stat->max_event_size < total_event_size) + total_event_size = model_net_get_msg_sz(DRAGONFLY) + + msg->remote_event_size_bytes + msg->local_event_size_bytes; + mn_stats* stat; + stat = model_net_find_stats(msg->category, s->dragonfly_stats_array); + stat->send_count++; + stat->send_bytes += msg->packet_size; + stat->send_time += (1/p->cn_bandwidth) * msg->packet_size; + if(stat->max_event_size < total_event_size) stat->max_event_size = total_event_size; - /* Now schedule another packet generate event */ - if(msg->chunk_id < num_chunks - 1) - { - bf->c1 = 1; - /* Issue another packet generate event */ - tw_event * e_gen; - terminal_message * m_gen; - void * m_gen_data; - - /* Keep the packet generate event a little behind packet send */ - e_gen = model_net_method_event_new(lp->gid, codes_local_latency(lp), lp, DRAGONFLY, (void**)&m_gen,(void**)&m_gen_data); - - void *m_gen_data_src = model_net_method_get_edata(DRAGONFLY, msg); - memcpy(m_gen, msg, sizeof(terminal_message)); - - m_gen->chunk_id = msg->chunk_id + 1; - m_gen->type = T_GENERATE; - m_gen->travel_start_time = travel_start_time; - - if (msg->remote_event_size_bytes){ - memcpy(m_gen_data, m_gen_data_src, - msg->remote_event_size_bytes); - m_gen_data = (char*)m_gen_data + msg->remote_event_size_bytes; - m_gen_data_src = (char*)m_gen_data_src + msg->remote_event_size_bytes; - } - if (msg->local_event_size_bytes) - memcpy(m_gen_data, m_gen_data_src, msg->local_event_size_bytes); - - tw_event_send(e_gen); - } - return; } -static void packet_arrive_rc(terminal_state * s, - tw_bf * bf, - terminal_message * msg, - tw_lp * lp) +void packet_send_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, + tw_lp * lp) { - term_ecount--; - term_rev_ecount++; - - uint64_t num_chunks = msg->num_chunks; - - if(msg->chunk_id == num_chunks - 1) - completed_packets--; + term_ecount--; + term_rev_ecount++; - if(msg->path_type == MINIMAL && msg->chunk_id == num_chunks - 1) - minimal_count--; + if(bf->c1) { + s->in_send_loop = 1; + return; + } + + s->terminal_available_time = msg->saved_available_time; + tw_rand_reverse_unif(lp->rng); + if(bf->c2) { + codes_local_latency_reverse(lp); + } + + s->packet_counter--; + s->vc_occupancy[0] -= s->params->chunk_size; - if(msg->path_type == NON_MINIMAL && msg->chunk_id == num_chunks - 1) - nonmin_count--; + create_prepend_to_terminal_message_list(s->terminal_msgs, + s->terminal_msgs_tail, 0, msg); + if(bf->c3) { + tw_rand_reverse_unif(lp->rng); + } + if(bf->c4) { + s->in_send_loop = 1; + } + return; +} +/* sends the packet from the current dragonfly compute node to the attached router */ +void packet_send(terminal_state * s, tw_bf * bf, terminal_message * msg, + tw_lp * lp) { + + term_ecount++; + tw_stime ts; + tw_event *e; + terminal_message *m; + tw_lpid router_id; - tw_rand_reverse_unif(lp->rng); - s->next_credit_available_time = msg->saved_credit_time; - if(msg->chunk_id == num_chunks-1) - { - mn_stats* stat; - stat = model_net_find_stats(msg->category, s->dragonfly_stats_array); - stat->recv_count--; - stat->recv_bytes -= msg->packet_size; - stat->recv_time -= tw_now(lp) - msg->travel_start_time; - - N_finished_packets--; - - dragonfly_total_time -= (tw_now(lp) - msg->travel_start_time); - if(bf->c3) - dragonfly_max_latency = msg->saved_available_time; - } - - if (msg->chunk_id == num_chunks-1 && - msg->remote_event_size_bytes && - msg->is_pull) - { - int net_id = model_net_get_id(LP_METHOD_NM); - model_net_event_rc(net_id, lp, msg->pull_size); - } + terminal_message_list* cur_entry = s->terminal_msgs[0]; + + if(s->vc_occupancy[0] + s->params->chunk_size > s->params->cn_vc_size + || cur_entry == NULL) { + bf->c1 = 1; + s->in_send_loop = 0; + //printf("[%d] Skipping send %d %d\n", lp->gid, cur_entry == NULL, + // (s->vc_occupancy[0] + s->params->chunk_size > s->params->cn_vc_size)); + return; + } + + msg->saved_available_time = s->terminal_available_time; + ts = g_tw_lookahead + s->params->cn_delay + tw_rand_unif(lp->rng); + s->terminal_available_time = maxd(s->terminal_available_time, tw_now(lp)); + s->terminal_available_time += ts; + + //TODO: be annotation-aware + codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, NULL, + &mapping_type_id, NULL, &mapping_rep_id, &mapping_offset); + codes_mapping_get_lp_id(lp_group_name, "dragonfly_router", NULL, 1, + s->router_id, 0, &router_id); + // we are sending an event to the router, so no method_event here + e = tw_event_new(router_id, s->terminal_available_time - tw_now(lp), lp); + m = tw_event_data(e); + memcpy(m, &cur_entry->msg, sizeof(terminal_message)); + if (m->remote_event_size_bytes){ + memcpy(model_net_method_get_edata(DRAGONFLY, m), cur_entry->event_data, + m->remote_event_size_bytes); + } + m->origin_router_id = s->router_id; + m->type = R_ARRIVE; + m->src_terminal_id = lp->gid; + m->vc_index = 0; + m->last_hop = TERMINAL; + m->intm_group_id = -1; + m->magic = router_magic_num; + m->path_type = -1; + m->local_event_size_bytes = 0; + m->local_id = s->terminal_id; + tw_event_send(e); + + int num_chunks = cur_entry->msg.packet_size/s->params->chunk_size; + if(cur_entry->msg.packet_size % s->params->chunk_size) + num_chunks++; + + if(cur_entry->msg.chunk_id == num_chunks - 1 && + (cur_entry->msg.local_event_size_bytes > 0)) { + bf->c2 = 1; + ts = codes_local_latency(lp); + tw_event *e_new = tw_event_new(cur_entry->msg.sender_lp, ts, lp); + terminal_message* m_new = tw_event_data(e_new); + void *local_event = (char*)cur_entry->event_data + + cur_entry->msg.remote_event_size_bytes; + memcpy(m_new, local_event, cur_entry->msg.local_event_size_bytes); + tw_event_send(e_new); + } + s->packet_counter++; + s->vc_occupancy[0] += s->params->chunk_size; + cur_entry = return_head(s->terminal_msgs, s->terminal_msgs_tail, 0); + copy_terminal_list_entry(cur_entry, msg); + delete_terminal_message_list(cur_entry); + + cur_entry = s->terminal_msgs[0]; + + if(cur_entry != NULL && + s->vc_occupancy[0] + s->params->chunk_size <= s->params->cn_vc_size) { + bf->c3 = 1; + terminal_message *m; + ts = g_tw_lookahead + s->params->cn_delay + tw_rand_unif(lp->rng); + tw_event* e = model_net_method_event_new(lp->gid, ts, lp, DRAGONFLY, + (void**)&m, NULL); + m->type = T_SEND; + m->magic = terminal_magic_num; + tw_event_send(e); + } else { + bf->c4 = 1; + s->in_send_loop = 0; + } + return; } -/* packet arrives at the destination terminal */ -static void packet_arrive(terminal_state * s, - tw_bf * bf, - terminal_message * msg, - tw_lp * lp) + +void packet_arrive_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp) { - term_ecount++; - term_rev_ecount++; - - uint64_t num_chunks = msg->num_chunks; + term_ecount--; + term_rev_ecount++; - if(msg->chunk_id == num_chunks - 1) - completed_packets++; + completed_packets--; + if(msg->path_type == MINIMAL) + minimal_count--; + if(msg->path_type == NON_MINIMAL) + nonmin_count--; + + if(bf->c2) { + mn_stats* stat; + stat = model_net_find_stats(msg->category, s->dragonfly_stats_array); + stat->recv_count--; + stat->recv_bytes -= msg->packet_size; + stat->recv_time -= tw_now(lp) - msg->travel_start_time; + N_finished_packets--; + dragonfly_total_time -= (tw_now(lp) - msg->travel_start_time); + total_hops -= msg->my_N_hop; + if(bf->c3) + dragonfly_max_latency = msg->saved_available_time; + + if(bf->c4) + { + int net_id = model_net_get_id(LP_METHOD_NM); + model_net_event_rc(net_id, lp, msg->pull_size); - if(msg->path_type == MINIMAL && msg->chunk_id == num_chunks - 1) - minimal_count++; + } + } + msg->my_N_hop--; + tw_rand_reverse_unif(lp->rng); - if(msg->path_type == NON_MINIMAL && msg->chunk_id == num_chunks - 1) - nonmin_count++; - - if(msg->path_type != MINIMAL && msg->path_type != NON_MINIMAL) - printf("\n Wrong message path type %d ", msg->path_type); + return; +} +/* packet arrives at the destination terminal */ +void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg, + tw_lp * lp) { + + term_ecount++; + + bf->c2 = 0; + bf->c3 = 0; + bf->c4 = 0; + int num_chunks = msg->packet_size / s->params->chunk_size; + if (msg->packet_size % s->params->chunk_size) + num_chunks++; + + completed_packets++; + + if(msg->path_type == MINIMAL) + minimal_count++; + + if(msg->path_type == NON_MINIMAL) + nonmin_count++; + + if(msg->path_type != MINIMAL && msg->path_type != NON_MINIMAL) + printf("\n Wrong message path type %d ", msg->path_type); #if DEBUG == 1 -if(msg->packet_ID == TRACK && msg->chunk_id == num_chunks-1) - { - printf( "(%lf) [Terminal %d] packet %lld has arrived \n", - tw_now(lp), (int)lp->gid, msg->packet_ID); + if( msg->packet_ID == TRACK && msg->chunk_id == num_chunks-1) + { + printf( "(%lf) [Terminal %d] packet %lld has arrived \n", + tw_now(lp), (int)lp->gid, msg->packet_ID); - printf("travel start time is %f\n", - msg->travel_start_time); + printf("travel start time is %f\n", + msg->travel_start_time); - } + printf("My hop now is %d\n",msg->my_N_hop); + } #endif - // Packet arrives and accumulate # queued - // Find a queue with an empty buffer slot - tw_event * e, * buf_e; - terminal_message * m, * buf_msg; tw_stime ts; - bf->c3 = 0; - bf->c2 = 0; + msg->my_N_hop++; if(msg->chunk_id == num_chunks-1) { - bf->c2 = 1; - mn_stats* stat = model_net_find_stats(msg->category, s->dragonfly_stats_array); - stat->recv_count++; - stat->recv_bytes += msg->packet_size; - stat->recv_time += tw_now(lp) - msg->travel_start_time; - - N_finished_packets++; - dragonfly_total_time += tw_now( lp ) - msg->travel_start_time; - - if (dragonfly_max_latency < tw_now( lp ) - msg->travel_start_time) - { - bf->c3 = 1; - msg->saved_available_time = dragonfly_max_latency; - dragonfly_max_latency=tw_now( lp ) - msg->travel_start_time; - } - // Trigger an event on receiving server + bf->c2 = 1; + mn_stats* stat = model_net_find_stats(msg->category, s->dragonfly_stats_array); + stat->recv_count++; + stat->recv_bytes += msg->packet_size; + stat->recv_time += tw_now(lp) - msg->travel_start_time; + + N_finished_packets++; + dragonfly_total_time += tw_now( lp ) - msg->travel_start_time; + total_hops += msg->my_N_hop; + + if (dragonfly_max_latency < tw_now( lp ) - msg->travel_start_time) { + bf->c3 = 1; + msg->saved_available_time = dragonfly_max_latency; + dragonfly_max_latency = tw_now( lp ) - msg->travel_start_time; + } if(msg->remote_event_size_bytes) { - if(msg->packet_ID == TRACK) - printf("\n completed at %lf ", tw_now(lp)); void * tmp_ptr = model_net_method_get_edata(DRAGONFLY, msg); ts = g_tw_lookahead + bytes_to_ns(msg->remote_event_size_bytes, (1/s->params->cn_bandwidth)); if (msg->is_pull){ + bf->c4 = 0; struct codes_mctx mc_dst = codes_mctx_set_global_direct(msg->sender_mn_lp); struct codes_mctx mc_src = @@ -1177,90 +1258,38 @@ if(msg->packet_ID == TRACK && msg->chunk_id == num_chunks-1) msg->remote_event_size_bytes, tmp_ptr, 0, NULL, lp); } else{ - e = tw_event_new(msg->final_dest_gid, ts, lp); - m = tw_event_data(e); - memcpy(m, tmp_ptr, msg->remote_event_size_bytes); + tw_event * e = tw_event_new(msg->final_dest_gid, ts, lp); + void * m_remote = tw_event_data(e); + memcpy(m_remote, tmp_ptr, msg->remote_event_size_bytes); tw_event_send(e); } } } - tw_stime credit_delay = bytes_to_ns(CREDIT_SIZE, s->params->cn_bandwidth); - ts = credit_delay + 0.1 + tw_rand_exponential(lp->rng, noise); - - msg->saved_credit_time = s->next_credit_available_time; - s->next_credit_available_time = maxd(s->next_credit_available_time, tw_now(lp)); - s->next_credit_available_time += ts; + // NIC aggregation - should this be a separate function? + // Trigger an event on receiving server + ts = g_tw_lookahead + s->params->credit_delay + tw_rand_unif(lp->rng); + if(msg->intm_lp_id == TRACK) printf("\n terminal sending credit at chan %d ", msg->saved_vc); - //TODO: be annotation-aware - // no method_event here - message going to router -// printf("\n current time %lf scheduling in time %lf ", tw_now(lp), s->next_credit_available_time - tw_now(lp)); - buf_e = tw_event_new(msg->intm_lp_id, s->next_credit_available_time - tw_now(lp), lp); + // no method_event here - message going to router + tw_event * buf_e; + terminal_message * buf_msg; + buf_e = tw_event_new(msg->intm_lp_id, ts, lp); buf_msg = tw_event_data(buf_e); buf_msg->magic = router_magic_num; - buf_msg->vc_index = msg->saved_vc; - buf_msg->type=R_BUFFER; - buf_msg->packet_ID=msg->packet_ID; - buf_msg->last_hop = TERMINAL; + buf_msg->vc_index = msg->vc_index; + buf_msg->output_chan = msg->output_chan; + buf_msg->type = R_BUFFER; tw_event_send(buf_e); return; } -/* initialize a dragonfly compute node terminal */ -void -terminal_init( terminal_state * s, - tw_lp * lp ) -{ -// printf("\n terminal ID %ld ", lp->gid); - uint32_t h1 = 0, h2 = 0; - bj_hashlittle2(LP_METHOD_NM, strlen(LP_METHOD_NM), &h1, &h2); - terminal_magic_num = h1 + h2; - - int i; - char anno[MAX_NAME_LENGTH]; - - // Assign the global router ID - // TODO: be annotation-aware - codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, NULL, - &mapping_type_id, anno, &mapping_rep_id, &mapping_offset); - if (anno[0] == '\0'){ - s->anno = NULL; - s->params = &all_params[num_params-1]; - } - else{ - s->anno = strdup(anno); - int id = configuration_get_annotation_index(anno, anno_map); - s->params = &all_params[id]; - } - - int num_lps = codes_mapping_get_lp_count(lp_group_name, 1, LP_CONFIG_NM, - s->anno, 0); - - s->terminal_id = (mapping_rep_id * num_lps) + mapping_offset; - s->router_id=(int)s->terminal_id / (s->params->num_routers/2); - s->terminal_available_time = 0.0; - s->packet_counter = 0; - - s->vc_occupancy = (int*)malloc(s->params->num_vcs * sizeof(int)); - s->output_vc_state = (int*)malloc(s->params->num_vcs * sizeof(int)); - s->max_term_vc_occupancy = 0; - - for( i = 0; i < s->params->num_vcs; i++ ) - { - s->vc_occupancy[i]=0; - s->output_vc_state[i]=VC_IDLE; - } -// printf("\n Terminal ID %d Router ID %d ", s->terminal_id, s->router_id); - dragonfly_collective_init(s, lp); - return; -} - /* collective operation for the torus network */ -void dragonfly_collective(char const * category, int message_size, int remote_event_size, const void* remote_event, tw_lp* sender) +void dragonfly_collective(char* category, int message_size, int remote_event_size, const void* remote_event, tw_lp* sender) { tw_event * e_new; tw_stime xfer_to_nic_time; @@ -1484,7 +1513,7 @@ static void node_collective_fan_out(terminal_state * s, { bf->c1 = 1; tw_event* e_new; - terminal_message * msg_new; + nodes_message * msg_new; tw_stime xfer_to_nic_time; for( i = 0; i < s->num_children; i++ ) @@ -1506,7 +1535,7 @@ static void node_collective_fan_out(terminal_state * s, e_new = model_net_method_event_new(child_nic_id, xfer_to_nic_time, lp, DRAGONFLY, (void**)&msg_new, &m_data); - memcpy(msg_new, msg, sizeof(terminal_message)); + memcpy(msg_new, msg, sizeof(nodes_message)); if (msg->remote_event_size_bytes){ memcpy(m_data, model_net_method_get_edata(DRAGONFLY, msg), msg->remote_event_size_bytes); @@ -1527,46 +1556,40 @@ static void node_collective_fan_out(terminal_state * s, } } -static void -terminal_buf_update_rc(terminal_state * s, +void terminal_buf_update_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp) { - term_ecount--; - term_rev_ecount++; + s->vc_occupancy[0] += s->params->chunk_size; + if(bf->c1) { + codes_local_latency_reverse(lp); + s->in_send_loop = 0; + } - int msg_indx = msg->vc_index; - s->vc_occupancy[msg_indx]++; - if(s->vc_occupancy[msg_indx] == s->params->cn_vc_size) - s->output_vc_state[msg_indx] = VC_CREDIT; + return; } - /* update the compute node-router channel buffer */ -static void +void terminal_buf_update(terminal_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp) { - term_ecount++; - term_rev_ecount++; - - // Update the buffer space associated with this router LP - int msg_indx = msg->vc_index; - s->vc_occupancy[msg_indx]--; - s->output_vc_state[msg_indx] = VC_IDLE; - - if(s->vc_occupancy[msg_indx] < 0) - { - char buf[64]; - int written = sprintf(buf, - "terminal %d: error vc occupancy \n", - lp->gid); + s->vc_occupancy[0] -= s->params->chunk_size; + if(s->in_send_loop == 0 && s->terminal_msgs[0] != NULL) { + terminal_message *m; + bf->c1 = 1; + tw_stime ts = codes_local_latency(lp); + tw_event* e = model_net_method_event_new(lp->gid, ts, lp, DRAGONFLY, + (void**)&m, NULL); + m->type = T_SEND; + m->magic = terminal_magic_num; + s->in_send_loop = 1; + tw_event_send(e); + } - lp_io_write(lp->gid, "dragonfly-vc-errors", written, buf); - } - return; + return; } void @@ -1575,18 +1598,22 @@ terminal_event( terminal_state * s, terminal_message * msg, tw_lp * lp ) { - assert(msg->magic == terminal_magic_num); *(int *)bf = (int)0; + assert(msg->magic == terminal_magic_num); switch(msg->type) { case T_GENERATE: - packet_generate_send(s,bf,msg,lp); + packet_generate(s,bf,msg,lp); break; case T_ARRIVE: packet_arrive(s,bf,msg,lp); break; + case T_SEND: + packet_send(s,bf,msg,lp); + break; + case T_BUFFER: terminal_buf_update(s, bf, msg, lp); break; @@ -1605,6 +1632,7 @@ terminal_event( terminal_state * s, default: printf("\n LP %d Terminal message type not supported %d ", (int)lp->gid, msg->type); + tw_error(TW_LOC, "Msg type not supported"); } } @@ -1612,22 +1640,50 @@ void dragonfly_terminal_final( terminal_state * s, tw_lp * lp ) { - MPI_Reduce( &s->max_term_vc_occupancy, &max_term_occupancy, 1, MPI_INT, MPI_MAX, 0, MPI_COMM_WORLD); model_net_print_stats(lp->gid, s->dragonfly_stats_array); + + if(s->terminal_msgs[0] != NULL) + printf("[%d] leftover terminal messages \n", lp->gid); } void dragonfly_router_final(router_state * s, tw_lp * lp) { - MPI_Reduce( &s->max_router_vc_occupancy, &max_router_occupancy, 1, MPI_INT, MPI_MAX, 0, MPI_COMM_WORLD); free(s->global_channel); + char *stats_file = getenv("TRACER_LINK_FILE"); + if(stats_file != NULL) { + FILE *fout = fopen(stats_file, "a"); + const dragonfly_param *p = s->params; + int result = flock(fileno(fout), LOCK_EX); + fprintf(fout, "%d %d ", s->router_id / p->num_routers, + s->router_id % p->num_routers); + for(int d = 0; d < p->num_routers + p->num_global_channels; d++) { + fprintf(fout, "%d ", s->link_traffic[d]); + } + fprintf(fout, "\n"); + result = flock(fileno(fout), LOCK_UN); + fclose(fout); + } + int i, j; + for(i = 0; i < s->params->radix; i++) { + for(j = 0; j < 3; j++) { + if(s->queued_msgs[i][j] != NULL) { + printf("[%d] leftover queued messages %d %d %d\n", lp->gid, i, j, + s->vc_occupancy[i][j]); + } + if(s->pending_msgs[i][j] != NULL) { + printf("[%d] lefover pending messages %d %d\n", lp->gid, i, j); + } + } + } } /* Get the number of hops for this particular path source and destination groups */ int get_num_hops(int local_router_id, int dest_router_id, int num_routers, - int non_min) + int non_min, + int total_groups) { int local_grp_id = local_router_id / num_routers; int dest_group_id = dest_router_id / num_routers; @@ -1644,12 +1700,14 @@ int get_num_hops(int local_router_id, } /* if the router in the source group has direct connection to the destination group */ - tw_lpid src_connecting_router = getRouterFromGroupID(dest_group_id, local_grp_id, num_routers); + tw_lpid src_connecting_router = getRouterFromGroupID(dest_group_id, + local_grp_id, num_routers, total_groups); if(src_connecting_router == local_router_id) num_hops--; - tw_lpid dest_connecting_router = getRouterFromGroupID(local_grp_id, dest_group_id, num_routers); + tw_lpid dest_connecting_router = getRouterFromGroupID(local_grp_id, + dest_group_id, num_routers, total_groups); if(dest_connecting_router == dest_router_id) num_hops--; @@ -1676,9 +1734,9 @@ get_next_stop(router_state * s, codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, NULL, &mapping_type_id, NULL, &mapping_rep_id, &mapping_offset); - int local_router_id = mapping_offset + (mapping_rep_id * num_routers_per_mgrp); + int local_router_id = (mapping_offset + mapping_rep_id); - bf->c2 = 0; + dest_group_id = dest_router_id / s->params->num_routers; /* If the packet has arrived at the destination router */ if(dest_router_id == local_router_id) @@ -1689,9 +1747,8 @@ get_next_stop(router_state * s, /* Generate inter-mediate destination for non-minimal routing (selecting a random group) */ if(msg->last_hop == TERMINAL && path == NON_MINIMAL) { - if(dest_router_id / s->params->num_routers != s->group_id) + if(dest_group_id != s->group_id) { - bf->c2 = 1; msg->intm_group_id = intm_id; } } @@ -1702,14 +1759,11 @@ get_next_stop(router_state * s, msg->intm_group_id = -1;//no inter-mediate group } /* Intermediate group ID is set. Divert the packet to the intermediate group. */ - if(path == NON_MINIMAL && msg->intm_group_id >= 0) + if(path == NON_MINIMAL && msg->intm_group_id >= 0 && + (dest_group_id != s->group_id)) { dest_group_id = msg->intm_group_id; } - else /* direct the packet to the destination group */ - { - dest_group_id = dest_router_id / s->params->num_routers; - } /********************** DECIDE THE ROUTER IN THE DESTINATION GROUP ***************/ /* It means the packet has arrived at the destination group. Now divert it to the destination router. */ @@ -1720,19 +1774,30 @@ get_next_stop(router_state * s, else { /* Packet is at the source or intermediate group. Find a router that has a path to the destination group. */ - dest_lp=getRouterFromGroupID(dest_group_id,s->router_id/s->params->num_routers, s->params->num_routers); + dest_lp=getRouterFromGroupID(dest_group_id, + s->router_id/s->params->num_routers, s->params->num_routers, + s->params->num_groups); if(dest_lp == local_router_id) { +#if USE_DIRECT_SCHEME + // printf("[%d] tg %d orig \n", lp->gid, target_grp, dest_group_id); + int my_pos = s->group_id % s->params->num_routers; + if(s->group_id == s->params->num_groups - 1) { + my_pos = dest_group_id % s->params->num_routers; + } + dest_lp = dest_group_id * s->params->num_routers + my_pos; +#else for(i=0; i < s->params->num_global_channels; i++) { if(s->global_channel[i] / s->params->num_routers == dest_group_id) dest_lp=s->global_channel[i]; } +#endif } } - codes_mapping_get_lp_id(lp_group_name, "dragonfly_router", s->anno, 0, dest_lp/num_routers_per_mgrp, - dest_lp % num_routers_per_mgrp, &router_dest_id); + codes_mapping_get_lp_id(lp_group_name, "dragonfly_router", s->anno, 0, dest_lp, + 0, &router_dest_id); return router_dest_id; } /* gets the output port corresponding to the next stop of the message */ @@ -1759,23 +1824,30 @@ get_output_port( router_state * s, { codes_mapping_get_lp_info(next_stop, lp_group_name, &mapping_grp_id, NULL, &mapping_type_id, NULL, &mapping_rep_id, &mapping_offset); - int local_router_id = mapping_rep_id * num_routers_per_mgrp + mapping_offset; + int local_router_id = mapping_rep_id + mapping_offset; int intm_grp_id = local_router_id / s->params->num_routers; if(intm_grp_id != s->group_id) { +#if USE_DIRECT_SCHEME + int target_grp = intm_grp_id; + if(target_grp == s->params->num_groups - 1) { + target_grp = s->group_id; + } + output_port = s->params->num_routers + (target_grp) / + s->params->num_routers; +#else for(i=0; i < s->params->num_global_channels; i++) { if(s->global_channel[i] == local_router_id) output_port = s->params->num_routers + i; } - assert(output_port != -1); +#endif } else { output_port = local_router_id % s->params->num_routers; } - //printf("\n Router id %d Next stop %d local router id %d output port %d ", s->router_id, next_stop, local_router_id, output_port); // printf("\n output port not found %d next stop %d local router id %d group id %d intm grp id %d %d", output_port, next_stop, local_router_id, s->group_id, intm_grp_id, local_router_id%num_routers); } return output_port; @@ -1788,111 +1860,266 @@ static int do_adaptive_routing( router_state * s, terminal_message * msg, tw_lp * lp, int dest_router_id, - int intm_id) -{ - int next_stop; - int minimal_out_port = -1, nonmin_out_port = -1; - // decide which routing to take - // get the queue occupancy of both the minimal and non-minimal output ports - int minimal_next_stop=get_next_stop(s, bf, msg, lp, MINIMAL, dest_router_id, -1); - minimal_out_port = get_output_port(s, bf, msg, lp, minimal_next_stop); - int nonmin_next_stop = get_next_stop(s, bf, msg, lp, NON_MINIMAL, dest_router_id, intm_id); - nonmin_out_port = get_output_port(s, bf, msg, lp, nonmin_next_stop); - int nonmin_port_count = s->vc_occupancy[nonmin_out_port]; - int min_port_count = s->vc_occupancy[minimal_out_port]; - //int nonmin_vc = s->vc_occupancy[nonmin_out_port * s->params->num_vcs + 2]; - //int min_vc = s->vc_occupancy[minimal_out_port * s->params->num_vcs + 1]; - -// printf("\n min output port %d nonmin output port %d ", minimal_next_stop, nonmin_next_stop); - // Now get the expected number of hops to be traversed for both routes - int dest_group_id = dest_router_id / s->params->num_routers; - int num_min_hops = get_num_hops(s->router_id, dest_router_id, s->params->num_routers, 0); - - int intm_router_id = getRouterFromGroupID(intm_id, s->router_id / s->params->num_routers, s->params->num_routers); - - //printf("\n source %d Intm router id is %d dest router id %d ", s->router_id, intm_router_id, dest_router_id); - int num_nonmin_hops = get_num_hops(s->router_id, intm_router_id, s->params->num_routers, 1) + get_num_hops(intm_router_id, dest_router_id, s->params->num_routers, 1); - - assert(num_nonmin_hops <= 6); - - /* average the local queues of the router */ - unsigned int q_avg = 0; - int i; - for( i = 0; i < s->params->radix; i++) - { - if( i != minimal_out_port) - q_avg += s->vc_occupancy[i]; - } - q_avg = q_avg / (s->params->radix - 1); + int intm_id) { + int next_stop; + int minimal_out_port = -1, nonmin_out_port = -1; + // decide which routing to take + // get the queue occupancy of both the minimal and non-minimal output ports + int minimal_next_stop=get_next_stop(s, bf, msg, lp, MINIMAL, dest_router_id, -1); + minimal_out_port = get_output_port(s, bf, msg, lp, minimal_next_stop); + int nonmin_next_stop = get_next_stop(s, bf, msg, lp, NON_MINIMAL, dest_router_id, intm_id); + nonmin_out_port = get_output_port(s, bf, msg, lp, nonmin_next_stop); + int nomin_vc = 0; + if(nonmin_out_port < s->params->num_routers) { + nomin_vc = msg->my_l_hop; + } else if(nonmin_out_port < (s->params->num_routers + + s->params->num_global_channels)) { + nomin_vc = msg->my_g_hop; + } + int nonmin_port_count = s->vc_occupancy[nonmin_out_port][nomin_vc]; + int min_vc = 0; + if(minimal_out_port < s->params->num_routers) { + min_vc = msg->my_l_hop; + } else if(minimal_out_port < (s->params->num_routers + + s->params->num_global_channels)) { + min_vc = msg->my_g_hop; + } + int min_port_count = s->vc_occupancy[minimal_out_port][min_vc]; - int min_out_chan = minimal_out_port * s->params->num_vcs; - int nonmin_out_chan = nonmin_out_port * s->params->num_vcs; + // Now get the expected number of hops to be traversed for both routes + int dest_group_id = dest_router_id / s->params->num_routers; + int num_min_hops = get_num_hops(s->router_id, dest_router_id, + s->params->num_routers, 0, s->params->num_groups); - /* Adding history window approach, not taking the queue status at every simulation time thats why, we are maintaining the current history window number and an average of the previous history window number. */ - int min_hist_count = s->cur_hist_num[min_out_chan] + (s->prev_hist_num[min_out_chan]/2); - int nonmin_hist_count = s->cur_hist_num[nonmin_out_chan] + (s->prev_hist_num[min_out_chan]/2); + int intm_router_id = getRouterFromGroupID(intm_id, + s->router_id / s->params->num_routers, s->params->num_routers, + s->params->num_groups); - /*printf("\n min hist count %d chan %d nonmin hist count %d %d ", min_hist_count, - min_out_chan, - nonmin_hist_count, - nonmin_out_chan); - */ - if(num_min_hops * (min_port_count - min_hist_count) <= (num_nonmin_hops * ((q_avg + 1) - nonmin_hist_count))) - { - msg->path_type = MINIMAL; - next_stop = minimal_next_stop; - msg->intm_group_id = -1; + int num_nonmin_hops = get_num_hops(s->router_id, intm_router_id, + s->params->num_routers, 1, s->params->num_groups) + + get_num_hops(intm_router_id, dest_router_id, s->params->num_routers, 1, + s->params->num_groups); - if(msg->packet_ID == TRACK) - printf("\n (%lf) [Router %d] Packet %d routing minimally dest router id %d ", tw_now(lp), (int)lp->gid, (int)msg->packet_ID, dest_router_id); - } - else - { - msg->path_type = NON_MINIMAL; - next_stop = nonmin_next_stop; - msg->intm_group_id = intm_id; + assert(num_nonmin_hops <= 6); - if(msg->packet_ID == TRACK) - printf("\n (%lf) [Router %d] Packet %d routing non-minimally ", tw_now(lp), (int)lp->gid, (int)msg->packet_ID); + /* average the local queues of the router */ + unsigned int q_avg = 0; + int i; + for( i = 0; i < s->params->radix; i++) + { + if( i != minimal_out_port) + q_avg += s->vc_occupancy[i][0] + s->vc_occupancy[i][1] + + s->vc_occupancy[i][2]; + } + q_avg = q_avg / (s->params->radix - 1); + + int min_out_chan = minimal_out_port; + int nonmin_out_chan = nonmin_out_port; + + /* Adding history window approach, not taking the queue status at every + * simulation time thats why, we are maintaining the current history + * window number and an average of the previous history window number. */ + int min_hist_count = s->cur_hist_num[min_out_chan] + + (s->prev_hist_num[min_out_chan]/2); + int nonmin_hist_count = s->cur_hist_num[nonmin_out_chan] + + (s->prev_hist_num[min_out_chan]/2); + + if(num_min_hops * (min_port_count - min_hist_count) <= (num_nonmin_hops * ((q_avg + 1) - nonmin_hist_count))) { + msg->path_type = MINIMAL; + next_stop = minimal_next_stop; + msg->intm_group_id = -1; + + if(msg->packet_ID == TRACK) + printf("\n (%lf) [Router %d] Packet %d routing minimally ", tw_now(lp), (int)lp->gid, (int)msg->packet_ID); + } + else + { + msg->path_type = NON_MINIMAL; + next_stop = nonmin_next_stop; + msg->intm_group_id = intm_id; - } - return next_stop; + if(msg->packet_ID == TRACK) + printf("\n (%lf) [Router %d] Packet %d routing non-minimally ", tw_now(lp), (int)lp->gid, (int)msg->packet_ID); + + } + return next_stop; } -static void router_packet_forward_rc( router_state * s, - tw_bf * bf, - terminal_message * msg, - tw_lp * lp) +void router_packet_receive_rc(router_state * s, + tw_bf * bf, + terminal_message * msg, + tw_lp * lp) { router_rev_ecount++; router_ecount--; + + int output_port = msg->saved_vc; + int output_chan = msg->saved_channel; + tw_rand_reverse_unif(lp->rng); + + if(bf->c2) { + tw_rand_reverse_unif(lp->rng); + delete_terminal_message_list(return_tail(s->pending_msgs[output_port], + s->pending_msgs_tail[output_port], output_chan)); + s->vc_occupancy[output_port][output_chan] -= s->params->chunk_size; + if(bf->c3) { + codes_local_latency_reverse(lp); + s->in_send_loop[output_port] = 0; + } + } + if(bf->c4) { + delete_terminal_message_list(return_tail(s->queued_msgs[output_port], + s->queued_msgs_tail[output_port], output_chan)); + } +} - uint64_t num_chunks = msg->num_chunks; - if(msg->chunk_id == num_chunks - 1) - total_hops--; +/* Packet arrives at the router and a credit is sent back to the sending terminal/router */ +void +router_packet_receive( router_state * s, + tw_bf * bf, + terminal_message * msg, + tw_lp * lp ) +{ + router_ecount++; - tw_rand_reverse_unif(lp->rng); - - if(bf->c1) - { - s->num_waiting--; - remove_pending_list_head(&(s->head), &(s->tail), msg); + bf->c2 = 0; + bf->c3 = 0; + bf->c4 = 0; + + tw_event *e; + terminal_message *m; + tw_stime ts; + + int next_stop = -1, output_port = -1, output_chan = -1; - return; + codes_mapping_get_lp_info(msg->dest_terminal_id, lp_group_name, + &mapping_grp_id, NULL, &mapping_type_id, NULL, &mapping_rep_id, + &mapping_offset); + int num_lps = codes_mapping_get_lp_count(lp_group_name, 1, LP_CONFIG_NM, + s->anno, 0); + int dest_router_id = (mapping_offset + (mapping_rep_id * num_lps)) / + s->params->num_cn; + int intm_id = tw_rand_integer(lp->rng, 0, s->params->num_groups - 1); + int local_grp_id = s->router_id / s->params->num_routers; + if(intm_id == local_grp_id) + intm_id = (local_grp_id + 2) % s->params->num_groups; + + /* progressive adaptive routing makes a check at every node/router at the + * source group to sense congestion. Once it does and decides on taking + * non-minimal path, it does not check any longer. */ + if(routing == PROG_ADAPTIVE + && msg->path_type != NON_MINIMAL + && local_grp_id == ( msg->origin_router_id / s->params->num_routers)) { + next_stop = do_adaptive_routing(s, bf, msg, lp, dest_router_id, intm_id); + } else if(msg->last_hop == TERMINAL && routing == ADAPTIVE) { + next_stop = do_adaptive_routing(s, bf, msg, lp, dest_router_id, intm_id); + } else { + if(routing == ADAPTIVE || routing == PROG_ADAPTIVE) + assert(msg->path_type == MINIMAL || msg->path_type == NON_MINIMAL); + + if(routing == MINIMAL || routing == NON_MINIMAL) + msg->path_type = routing; /*defaults to the routing algorithm if we + don't have adaptive routing here*/ + next_stop = get_next_stop(s, bf, msg, lp, msg->path_type, dest_router_id, + intm_id); + } + terminal_message_list * cur_chunk = (terminal_message_list *)malloc( + sizeof(terminal_message_list)); + init_terminal_message_list(cur_chunk, msg); + if(msg->remote_event_size_bytes > 0) { + void *m_data_src = model_net_method_get_edata(DRAGONFLY, msg); + cur_chunk->event_data = (char*)malloc(msg->remote_event_size_bytes); + memcpy(cur_chunk->event_data, m_data_src, msg->remote_event_size_bytes); + } + + output_port = get_output_port(s, bf, msg, lp, next_stop); + output_chan = 0; + int max_vc_size = s->params->cn_vc_size; + + cur_chunk->msg.vc_index = output_port; + cur_chunk->msg.next_stop = next_stop; + + if(output_port < s->params->num_routers) { + output_chan = msg->my_l_hop; + if(msg->my_g_hop == 1) output_chan = 1; + if(msg->my_g_hop == 2) output_chan = 2; + max_vc_size = s->params->local_vc_size; + cur_chunk->msg.my_l_hop++; + } else if(output_port < (s->params->num_routers + + s->params->num_global_channels)) { + output_chan = msg->my_g_hop; + max_vc_size = s->params->global_vc_size; + cur_chunk->msg.my_g_hop++; + } + + cur_chunk->msg.output_chan = output_chan; + cur_chunk->msg.my_N_hop++; + + assert(output_chan < 3); + assert(output_port < s->params->radix); + + if(s->vc_occupancy[output_port][output_chan] + s->params->chunk_size + <= max_vc_size) { + bf->c2 = 1; + router_credit_send(s, bf, msg, lp, -1); + append_to_terminal_message_list( s->pending_msgs[output_port], + s->pending_msgs_tail[output_port], output_chan, cur_chunk); + s->vc_occupancy[output_port][output_chan] += s->params->chunk_size; + if(s->in_send_loop[output_port] == 0) { + bf->c3 = 1; + terminal_message *m; + ts = codes_local_latency(lp); + tw_event *e = tw_event_new(lp->gid, ts, lp); + m = tw_event_data(e); + m->type = R_SEND; + m->magic = router_magic_num; + m->vc_index = output_port; + tw_event_send(e); + s->in_send_loop[output_port] = 1; } - tw_rand_reverse_unif(lp->rng); - int old_port = msg->sender_radix; - s->next_credit_available_time[old_port] = msg->saved_credit_time; - - - tw_rand_reverse_unif(lp->rng); - int output_chan = msg->new_vc; - int output_port = output_chan / s->params->num_vcs; + } else { + bf->c4 = 1; + cur_chunk->msg.saved_vc = msg->vc_index; + cur_chunk->msg.saved_channel = msg->output_chan; + append_to_terminal_message_list( s->queued_msgs[output_port], + s->queued_msgs_tail[output_port], output_chan, cur_chunk); + } + + msg->saved_vc = output_port; + msg->saved_channel = output_chan; + return; +} - if(bf->c3) - s->max_router_vc_occupancy = msg->saved_occupancy; +void router_packet_send_rc(router_state * s, + tw_bf * bf, + terminal_message * msg, tw_lp * lp) +{ + router_ecount--; + router_rev_ecount++; + + int output_port = msg->vc_index; + int output_chan = msg->output_chan; + if(bf->c1) { + s->in_send_loop[output_port] = 1; + return; + } + + tw_rand_reverse_unif(lp->rng); + + s->next_output_available_time[output_port] = msg->saved_available_time; + s->link_traffic[output_port] -= s->params->chunk_size; + create_prepend_to_terminal_message_list(s->pending_msgs[output_port], + s->pending_msgs_tail[output_port], output_chan, msg); + + if(bf->c3) { + tw_rand_reverse_unif(lp->rng); + } + + if(bf->c4) { + s->in_send_loop[output_port] = 1; + } - if(routing == PROG_ADAPTIVE) + if(routing == PROG_ADAPTIVE) { if(bf->c2) { @@ -1903,358 +2130,293 @@ static void router_packet_forward_rc( router_state * s, else s->cur_hist_num[output_chan]--; } - - s->next_output_available_time[output_port] = msg->saved_available_time; - s->vc_occupancy[output_chan]--; - s->output_vc_state[output_chan]=VC_IDLE; } - /* routes the current packet to the next stop */ -static void router_packet_forward( router_state * s, - tw_bf * bf, - terminal_message * msg, - tw_lp * lp) +void +router_packet_send( router_state * s, + tw_bf * bf, + terminal_message * msg, tw_lp * lp) { - router_ecount++; - router_rev_ecount++; - - bf->c1 = 0; - bf->c2 = 0; - bf->c3 = 0; + router_ecount++; + tw_stime ts; + tw_event *e; + terminal_message *m; + int output_port = msg->vc_index; + int output_chan = 2; + + terminal_message_list *cur_entry = s->pending_msgs[output_port][2]; + if(cur_entry == NULL) { + cur_entry = s->pending_msgs[output_port][1]; + output_chan = 1; + if(cur_entry == NULL) { + cur_entry = s->pending_msgs[output_port][0]; + output_chan = 0; + } + } - int next_stop = -1, output_port = -1, output_chan = -1; - int i; - int buf_size = s->params->local_vc_size; + if(cur_entry == NULL) { + bf->c1 = 1; + s->in_send_loop[output_port] = 0; + //printf("[%d] Router skipping send at begin %d \n", lp->gid, output_port); + return; + } - const dragonfly_param *p = s->params; - uint64_t num_chunks = msg->num_chunks; - - if(msg->chunk_id == num_chunks - 1) - total_hops++; + int to_terminal = 1, global = 0; + double delay = s->params->cn_delay; + + if(output_port < s->params->num_routers) { + to_terminal = 0; + delay = s->params->global_delay; + } else if(output_port < s->params->num_routers + + s->params->num_global_channels) { + to_terminal = 0; + global = 1; + delay = s->params->global_delay; + } - - codes_mapping_get_lp_info(msg->dest_terminal_id, lp_group_name, - &mapping_grp_id, NULL, &mapping_type_id, NULL, &mapping_rep_id, - &mapping_offset); - int num_lps = codes_mapping_get_lp_count(lp_group_name, 1, LP_CONFIG_NM, - s->anno, 0); - int dest_router_id = (mapping_offset + (mapping_rep_id * num_lps)) / s->params->num_routers; - int intm_id = tw_rand_integer(lp->rng, 0, s->params->num_groups - 1); - int local_grp_id = s->router_id / s->params->num_routers; - if(intm_id == local_grp_id) - intm_id = (local_grp_id + 2) % s->params->num_groups; + ts = g_tw_lookahead + delay + tw_rand_unif(lp->rng); + + msg->saved_available_time = s->next_output_available_time[output_port]; + s->next_output_available_time[output_port] = + maxd(s->next_output_available_time[output_port], tw_now(lp)); + s->next_output_available_time[output_port] += ts; + + // dest can be a router or a terminal, so we must check + void * m_data; + if (to_terminal) { + assert(cur_entry->msg.next_stop == cur_entry->msg.dest_terminal_id); + e = model_net_method_event_new(cur_entry->msg.next_stop, + s->next_output_available_time[output_port] - tw_now(lp), lp, + DRAGONFLY, (void**)&m, &m_data); + } else { + e = tw_event_new(cur_entry->msg.next_stop, + s->next_output_available_time[output_port] - tw_now(lp), lp); + m = tw_event_data(e); + m_data = model_net_method_get_edata(DRAGONFLY, m); + } + memcpy(m, &cur_entry->msg, sizeof(terminal_message)); + if (m->remote_event_size_bytes){ + memcpy(m_data, cur_entry->event_data, m->remote_event_size_bytes); + } - if(DEBUG && lp->gid == TRACK) - { - printf("\n Router %d dest router id %d ", s->router_id, dest_router_id); - int i; - /*for (i = 0; i < s->params->radix; i++) - printf("\n vc occupancy %d ", s->vc_occupancy[i]);*/ - } -/* progressive adaptive routing makes a check at every node/router at the source group to sense congestion. Once it does and decides on taking non-minimal path, it does not check any longer. */ - if(routing == PROG_ADAPTIVE - && msg->path_type != NON_MINIMAL - && local_grp_id == ( msg->origin_router_id / s->params->num_routers)) - { - next_stop = do_adaptive_routing(s, bf, msg, lp, dest_router_id, intm_id); - } - else if(msg->last_hop == TERMINAL && routing == ADAPTIVE) - { - next_stop = do_adaptive_routing(s, bf, msg, lp, dest_router_id, intm_id); - } + if(global) + m->last_hop = GLOBAL; else - { - if(routing == ADAPTIVE || routing == PROG_ADAPTIVE) - assert(msg->path_type == MINIMAL || msg->path_type == NON_MINIMAL); + m->last_hop = LOCAL; - if(routing == MINIMAL || routing == NON_MINIMAL) - msg->path_type = routing; /*defaults to the routing algorithm if we don't have adaptive routing here*/ - next_stop = get_next_stop(s, bf, msg, lp, msg->path_type, dest_router_id, intm_id); - } - output_port = get_output_port(s, bf, msg, lp, next_stop); - int begin_chan = output_port * s->params->num_vcs; + m->local_id = s->router_id; + m->intm_lp_id = lp->gid; + m->magic = router_magic_num; - assert(output_port != -1 && begin_chan != -1 && output_port < s->params->radix); + s->link_traffic[output_port] += s->params->chunk_size; - if(output_port >= s->params->num_routers && - output_port < s->params->num_routers + s->params->num_global_channels) + if(routing == PROG_ADAPTIVE) { - buf_size = s->params->global_vc_size; + if(tw_now(lp) - s->cur_hist_start_time[output_port] >= WINDOW_LENGTH) { + s->prev_hist_num[output_port] = s->cur_hist_num[output_port]; + s->cur_hist_start_time[output_port] = tw_now(lp); + s->cur_hist_num[output_port] = 1; + } else { + s->cur_hist_num[output_port]++; + } } - if(output_port >= s->params->num_routers + s->params->num_global_channels) - buf_size = s->params->cn_vc_size; - - for(i = begin_chan; i < begin_chan + p->num_vcs; i++) - { - if(s->vc_occupancy[i] < buf_size) - { - output_chan = i; - break; - } + /* Determine the event type. If the packet has arrived at the final + * destination router then it should arrive at the destination terminal + * next.*/ + if(to_terminal) { + m->type = T_ARRIVE; + m->magic = terminal_magic_num; + } else { + /* The packet has to be sent to another router */ + m->magic = router_magic_num; + m->type = R_ARRIVE; } - if(output_chan == -1) - { - /* Here we want to add the packets in the waiting queue instead of terminating - * the simulation */ - //printf("\n %lf Router %ld buffers overflowed from incoming terminals channel %d occupancy %d radix %d next_stop %d buffer size %d ", tw_now(lp),(long int) lp->gid, begin_chan, s->vc_occupancy[output_chan], s->params->radix, next_stop, buf_size); - bf->c1 = 1; - s->num_waiting++; - add_pending_router_message(&(s->head), &(s->tail), msg, begin_chan, next_stop); - return; - } - - send_packet_from_router(s, bf, msg, lp, output_chan, next_stop, NULL); + tw_event_send(e); + + cur_entry = return_head(s->pending_msgs[output_port], + s->pending_msgs_tail[output_port], output_chan); + copy_terminal_list_entry(cur_entry, msg); + delete_terminal_message_list(cur_entry); + + cur_entry = s->pending_msgs[output_port][2]; + if(cur_entry == NULL) cur_entry = s->pending_msgs[output_port][1]; + if(cur_entry == NULL) cur_entry = s->pending_msgs[output_port][0]; + if(cur_entry != NULL) { + bf->c3 = 1; + terminal_message *m; + ts = g_tw_lookahead + delay + tw_rand_unif(lp->rng); + tw_event *e = tw_event_new(lp->gid, ts, lp); + m = tw_event_data(e); + m->type = R_SEND; + m->magic = router_magic_num; + m->vc_index = output_port; + tw_event_send(e); + } else { + bf->c4 = 1; + s->in_send_loop[output_port] = 0; + //printf("[%d] Router skipping send at end %d\n", lp->gid, output_port); + } + + return; } -/* sets up the router virtual channels, global channels, local channels, compute node channels */ -void router_setup(router_state * r, tw_lp * lp) +void router_buf_update_rc(router_state * s, + tw_bf * bf, + terminal_message * msg, + tw_lp * lp) { - //printf("\n Router ID %ld ", lp->gid); - uint32_t h1 = 0, h2 = 0; - bj_hashlittle2(LP_METHOD_NM, strlen(LP_METHOD_NM), &h1, &h2); - router_magic_num = h1 + h2; - - char anno[MAX_NAME_LENGTH]; - codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, NULL, - &mapping_type_id, anno, &mapping_rep_id, &mapping_offset); - num_routers_per_mgrp = codes_mapping_get_lp_count (lp_group_name, 1, "dragonfly_router", - NULL, 0); - - int num_grp_reps = codes_mapping_get_group_reps(lp_group_name); - - if (anno[0] == '\0'){ - r->anno = NULL; - r->params = &all_params[num_params-1]; - } - else{ - r->anno = strdup(anno); - int id = configuration_get_annotation_index(anno, anno_map); - r->params = &all_params[id]; - } - - // shorthand - const dragonfly_param *p = r->params; - - /* Checking for consistency of configuration */ - if(p->total_routers != num_grp_reps * num_routers_per_mgrp) - tw_error(TW_LOC, - "\n Config error: num_routers specified %d total routers computed in the network %d does not match with repetitions * dragonfly_router %d ", - p->num_routers, p->total_routers, num_grp_reps * num_routers_per_mgrp); - - r->router_id=mapping_rep_id * num_routers_per_mgrp + mapping_offset; - r->group_id=r->router_id/p->num_routers; - - int i; - int router_offset=(r->router_id % p->num_routers) * (p->num_global_channels / 2) + 1; - - r->global_channel = (int*)malloc(p->num_global_channels * sizeof(int)); - r->next_output_available_time = (tw_stime*)malloc(p->radix * sizeof(tw_stime)); - r->next_credit_available_time = (tw_stime*)malloc(p->radix * sizeof(tw_stime)); - r->cur_hist_start_time = (tw_stime*)malloc(p->radix * sizeof(tw_stime)); - r->vc_occupancy = (int*)malloc(p->radix * sizeof(int)); - r->output_vc_state = (int*)malloc(p->radix * sizeof(int)); - r->cur_hist_num = (int*)malloc(p->radix * sizeof(int)); - r->prev_hist_num = (int*)malloc(p->radix * sizeof(int)); - r->max_router_vc_occupancy = 0; - r->head = NULL; - r->tail = NULL; - - for(i=0; i < p->radix; i++) - { - // Set credit & router occupancy - r->next_output_available_time[i]=0; - r->next_credit_available_time[i]=0; - r->cur_hist_start_time[i] = 0; - r->vc_occupancy[i]=0; - r->cur_hist_num[i] = 0; - r->prev_hist_num[i] = 0; - r->output_vc_state[i]= VC_IDLE; - } - -#if DEBUG == 1 - printf("\n LP ID %d VC occupancy radix %d Router %d is connected to ", lp->gid, p->radix, r->router_id); -#endif - //round the number of global channels to the nearest even number - for(i=0; i < p->num_global_channels; i++) - { - if(i % 2 != 0) - { - r->global_channel[i]=(r->router_id + (router_offset * p->num_routers))%p->total_routers; - router_offset++; - } - else - { - r->global_channel[i]=r->router_id - ((router_offset) * p->num_routers); - } - if(r->global_channel[i]<0) - { - r->global_channel[i]=p->total_routers+r->global_channel[i]; - } -#if DEBUG == 1 - printf("\n channel %d ", r->global_channel[i]); -#endif - } - -#if DEBUG == 1 - printf("\n"); -#endif - return; -} - -void router_buf_update_rc( - router_state * s, - tw_bf * bf, - terminal_message * msg, - tw_lp * lp) -{ - router_ecount--; - router_rev_ecount++; - - uint64_t num_chunks = msg->num_chunks; - int msg_indx = msg->vc_index; - s->vc_occupancy[msg_indx]++; - - int buf = s->params->local_vc_size; - - if(msg->last_hop == GLOBAL) - buf = s->params->global_vc_size; - else if(msg->last_hop == TERMINAL) - buf = s->params->cn_vc_size; - - if(s->vc_occupancy[msg_indx] >= buf * num_chunks) - s->output_vc_state[msg_indx] = VC_CREDIT; - - if(bf->c1) - { - struct pending_router_msgs * elem = msg->saved_elem; - add_pending_router_message(&(s->head), &(s->tail), elem, elem->output_chan, elem->next_stop); - } + int indx = msg->vc_index; + int output_chan = msg->output_chan; + s->vc_occupancy[indx][output_chan] += s->params->chunk_size; + if(bf->c1) { + terminal_message_list* head = return_tail(s->pending_msgs[indx], + s->pending_msgs_tail[indx], output_chan); + tw_rand_reverse_unif(lp->rng); + prepend_to_terminal_message_list(s->queued_msgs[indx], + s->queued_msgs_tail[indx], output_chan, head); + s->vc_occupancy[indx][output_chan] -= s->params->chunk_size; + } + if(bf->c2) { + codes_local_latency_reverse(lp); + s->in_send_loop[indx] = 0; + } } /* Update the buffer space associated with this router LP */ void router_buf_update(router_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp) { - router_ecount++; - router_rev_ecount++; - - bf->c1 = 0; - - int msg_indx = msg->vc_index; - - if(TRACK == lp->gid) - { - /*printf("\n router %d lp id %d msg indx %d vc_occupancy %d from %d last hop %d ", s->router_id, lp->gid, msg_indx, s->vc_occupancy[msg_indx], msg->origin_router_id, msg->last_hop); */ - int i; - printf("\n channel occupancy "); - for(i = 0; i < s->params->radix; i++) - printf(" %d ", s->vc_occupancy[i]); - } - //assert(s->vc_occupancy[msg_indx] > 0); - s->vc_occupancy[msg_indx]--; - s->output_vc_state[msg_indx] = VC_IDLE; - - - struct pending_router_msgs * elem = - remove_pending_router_msgs(&(s->head), &(s->tail), msg_indx); - - if(elem) - { - bf->c1 = 1; - msg->saved_elem = elem; - s->num_waiting--; - send_packet_from_router(s, bf, &(elem->msg), lp, elem->output_chan, elem->next_stop, elem->event_data); - } - /* now traverse the waiting list and find the message waiting for this - * channel */ - return; + int indx = msg->vc_index; + int output_chan = msg->output_chan; + s->vc_occupancy[indx][output_chan] -= s->params->chunk_size; + if(TRACK == lp->gid) + { + int i; + printf("\n channel occupancy "); + for(i = 0; i < s->params->radix; i++) + printf(" %d ", s->vc_occupancy[i]); + } + if(s->queued_msgs[indx][output_chan] != NULL) { + bf->c1 = 1; + terminal_message_list *head = return_head(s->queued_msgs[indx], + s->queued_msgs_tail[indx], output_chan); + router_credit_send(s, bf, &head->msg, lp, 1); + append_to_terminal_message_list(s->pending_msgs[indx], + s->pending_msgs_tail[indx], output_chan, head); + s->vc_occupancy[indx][output_chan] += s->params->chunk_size; + } + if(s->in_send_loop[indx] == 0 && s->pending_msgs[indx][output_chan] != NULL) { + bf->c2 = 1; + terminal_message *m; + tw_stime ts = codes_local_latency(lp); + tw_event *e = tw_event_new(lp->gid, ts, lp); + m = tw_event_data(e); + m->type = R_SEND; + m->vc_index = indx; + m->magic = router_magic_num; + s->in_send_loop[indx] = 1; + tw_event_send(e); + } + + return; } -void router_event(router_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp) -{ +void router_event(router_state * s, tw_bf * bf, terminal_message * msg, + tw_lp * lp) { assert(msg->magic == router_magic_num); switch(msg->type) - { - case R_FORWARD: // Router has sent a packet to an intra-group router (local channel) - router_packet_forward(s, bf, msg, lp); - break; - - case R_BUFFER: - router_buf_update(s, bf, msg, lp); - break; - - default: - printf("\n (%lf) [Router %d] Router Message type not supported %d dest terminal id %d packet ID %d ", tw_now(lp), (int)lp->gid, msg->type, (int)msg->dest_terminal_id, (int)msg->packet_ID); - break; - } + { + case R_SEND: // Router has sent a packet to an intra-group router (local channel) + router_packet_send(s, bf, msg, lp); + break; + + case R_ARRIVE: // Router has received a packet from an intra-group router (local channel) + router_packet_receive(s, bf, msg, lp); + break; + + case R_BUFFER: + router_buf_update(s, bf, msg, lp); + break; + + default: + printf("\n (%lf) [Router %d] Router Message type not supported %d dest " + "terminal id %d packet ID %d ", tw_now(lp), (int)lp->gid, msg->type, + (int)msg->dest_terminal_id, (int)msg->packet_ID); + tw_error(TW_LOC, "Msg type not supported"); + break; + } } /* Reverse computation handler for a terminal event */ -void terminal_rc_event_handler(terminal_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp) -{ +void terminal_rc_event_handler(terminal_state * s, tw_bf * bf, + terminal_message * msg, tw_lp * lp) { + switch(msg->type) { - case T_GENERATE: - packet_generate_send_rc(s, bf, msg, lp); - break; - - case T_ARRIVE: - packet_arrive_rc(s, bf, msg, lp); - break; - - case T_BUFFER: - terminal_buf_update_rc(s, bf, msg, lp); - break; - - case D_COLLECTIVE_INIT: - { - s->collective_init_time = msg->saved_collective_init_time; - } - break; + case T_GENERATE: + packet_generate_rc(s, bf, msg, lp); + break; - case D_COLLECTIVE_FAN_IN: - { - int i; - s->num_fan_nodes--; - if(bf->c1) - { - s->num_fan_nodes = msg->saved_fan_nodes; - } - if(bf->c2) - { - s->num_fan_nodes = msg->saved_fan_nodes; - for( i = 0; i < s->num_children; i++ ) - tw_rand_reverse_unif(lp->rng); - } - } + case T_SEND: + packet_send_rc(s, bf, msg, lp); break; - case D_COLLECTIVE_FAN_OUT: - { - int i; - if(bf->c1) - { - for( i = 0; i < s->num_children; i++ ) - tw_rand_reverse_unif(lp->rng); - } - } - } + case T_ARRIVE: + packet_arrive_rc(s, bf, msg, lp); + break; + + case T_BUFFER: + terminal_buf_update_rc(s, bf, msg, lp); + break; + + case D_COLLECTIVE_INIT: + { + s->collective_init_time = msg->saved_collective_init_time; + } + break; + case D_COLLECTIVE_FAN_IN: { + int i; + s->num_fan_nodes--; + if(bf->c1) + { + s->num_fan_nodes = msg->saved_fan_nodes; + } + if(bf->c2) + { + s->num_fan_nodes = msg->saved_fan_nodes; + for( i = 0; i < s->num_children; i++ ) + tw_rand_reverse_unif(lp->rng); + } + } + break; + + case D_COLLECTIVE_FAN_OUT: { + int i; + if(bf->c1) + { + for( i = 0; i < s->num_children; i++ ) + tw_rand_reverse_unif(lp->rng); + } + } + } } /* Reverse computation handler for a router event */ -void router_rc_event_handler(router_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp) -{ - switch(msg->type) - { - case R_FORWARD: - router_packet_forward_rc(s, bf, msg, lp); - break; - - case R_BUFFER: - router_buf_update_rc(s, bf, msg, lp); - break; - - } +void router_rc_event_handler(router_state * s, tw_bf * bf, + terminal_message * msg, tw_lp * lp) { + switch(msg->type) { + case R_SEND: + router_packet_send_rc(s, bf, msg, lp); + break; + case R_ARRIVE: + router_packet_receive_rc(s, bf, msg, lp); + break; + + case R_BUFFER: + router_buf_update_rc(s, bf, msg, lp); + break; + } } + /* dragonfly compute node and router LP types */ tw_lptype dragonfly_lps[] = { @@ -2290,6 +2452,22 @@ static const tw_lptype* dragonfly_get_router_lp_type(void) return(&dragonfly_lps[1]); } +static tw_lpid dragonfly_find_local_device( + const char * annotation, + int ignore_annotations, + tw_lp * sender) +{ + int mapping_grp_id, mapping_rep_id, mapping_type_id, mapping_offset; + tw_lpid dest_id; + + codes_mapping_get_lp_info(sender->gid, lp_group_name, &mapping_grp_id, + NULL, &mapping_type_id, NULL, &mapping_rep_id, &mapping_offset); + codes_mapping_get_lp_id(lp_group_name, LP_CONFIG_NM, annotation, + ignore_annotations, mapping_rep_id, mapping_offset, &dest_id); + + return(dest_id); +} + static void dragonfly_register(tw_lptype *base_type) { lp_type_register(LP_CONFIG_NM, base_type); lp_type_register("dragonfly_router", &dragonfly_lps[1]); @@ -2311,12 +2489,3 @@ struct model_net_method dragonfly_method = .mn_collective_call_rc = dragonfly_collective_rc }; - -/* - * Local variables: - * c-indent-level: 4 - * c-basic-offset: 4 - * End: - * - * vim: ts=8 sts=4 sw=4 expandtab - */ diff --git a/src/models/networks/model-net/torus.c b/src/models/networks/model-net/torus.c index f4e440c..9b922fc 100644 --- a/src/models/networks/model-net/torus.c +++ b/src/models/networks/model-net/torus.c @@ -915,7 +915,7 @@ static void packet_generate( nodes_state * s, m_gen_data_src = (char*)m_gen_data_src + msg->remote_event_size_bytes; } if (msg->local_event_size_bytes) - memcpy(m_gen_data, m_gen_data_src, msg->local_event_size_bytes); + memcpy(m_gen_data, m_gen_data_src, msg->local_event_size_bytes); tw_event_send(e_gen); } diff --git a/tests/conf/modelnet-test-dragonfly.conf b/tests/conf/modelnet-test-dragonfly.conf index 109b9da..a41c20c 100644 --- a/tests/conf/modelnet-test-dragonfly.conf +++ b/tests/conf/modelnet-test-dragonfly.conf @@ -16,14 +16,13 @@ PARAMS modelnet_scheduler="fcfs"; chunk_size="32"; # modelnet_scheduler="round-robin"; - num_vcs="1"; num_routers="4"; local_vc_size="2048"; global_vc_size="8192"; - cn_vc_size="512"; + cn_vc_size="1024"; local_bandwidth="5.25"; global_bandwidth="4.7"; cn_bandwidth="5.25"; - message_size="312"; + message_size="320"; routing="minimal"; } diff --git a/tests/modelnet-test.c b/tests/modelnet-test.c index 33eca62..f93a968 100644 --- a/tests/modelnet-test.c +++ b/tests/modelnet-test.c @@ -24,8 +24,8 @@ #include "codes/configuration.h" #include "codes/lp-type-lookup.h" -#define NUM_REQS 1 /* number of requests sent by each server */ -#define PAYLOAD_SZ 2048 /* size of simulated data payload, bytes */ +#define NUM_REQS 5 /* number of requests sent by each server */ +#define PAYLOAD_SZ 4096 /* size of simulated data payload, bytes */ static int net_id = 0; static int num_routers = 0; @@ -455,7 +455,7 @@ static void handle_ack_event( memcpy(m_remote, m_local, sizeof(svr_msg)); m_remote->svr_event_type = (do_pull) ? ACK : REQ; -// printf("handle_ack_event(), lp %llu.\n", (unsigned long long)lp->gid); + printf("handle_ack_event(), lp %llu.\n", (unsigned long long)lp->gid); /* safety check that this request got to the right server */ // printf("\n m->src %d lp->gid %d ", m->src, lp->gid); @@ -514,7 +514,7 @@ static void handle_req_event( memcpy(m_remote, m_local, sizeof(svr_msg)); m_remote->svr_event_type = ACK; - //printf("handle_req_event(), lp %llu src %llu .\n", (unsigned long long)lp->gid, (unsigned long long) m->src); + printf("handle_req_event(), lp %llu src %llu .\n", (unsigned long long)lp->gid, (unsigned long long) m->src); /* safety check that this request got to the right server */ // printf("\n m->src %d lp->gid %d ", m->src, lp->gid); -- 2.26.2