Commit a5e8b1c7 authored by Misbah Mubarak's avatar Misbah Mubarak
Browse files

Updating congestion control with dragonfly model

parents 9c4e2fad f18e903f
......@@ -40,6 +40,9 @@ struct terminal_message
/* number of hops traversed by the packet */
short my_N_hop;
short my_l_hop, my_g_hop;
short saved_channel;
/* Intermediate LP ID from which this message is coming */
unsigned int intm_lp_id;
short new_vc;
......@@ -77,6 +80,7 @@ struct terminal_message
/* LP ID of the sending node, has to be a network node in the dragonfly */
tw_lpid sender_node;
tw_lpid next_stop;
/* for reverse computation */
struct pending_router_msgs * saved_elem;
......
......@@ -10,13 +10,14 @@
#include <ross.h>
#include "codes/codes_mapping.h"
#include "codes/jenkins-hash.h"
#include "codes/codes_mapping.h"
#include "codes/codes.h"
#include "codes/model-net.h"
#include "codes/model-net-method.h"
#include "codes/model-net-lp.h"
#include "codes/net/dragonfly.h"
#include "sys/file.h"
#define CREDIT_SIZE 8
#define MEAN_PROCESS 1.0
......@@ -34,25 +35,17 @@
#define TRACK -1
#define PRINT_ROUTER_TABLE 1
#define DEBUG 0
#define USE_DIRECT_SCHEME 1
#define LP_CONFIG_NM (model_net_lp_config_names[DRAGONFLY])
#define LP_METHOD_NM (model_net_method_names[DRAGONFLY])
#define DRAGONFLY_DBG 0
#define dprintf(_fmt, ...) \
do {if (CLIENT_DBG) printf(_fmt, __VA_ARGS__);} while (0)
long term_ecount, router_ecount, term_rev_ecount, router_rev_ecount;
static double maxd(double a, double b) { return a < b ? b : a; }
// arrival rate
static double MEAN_INTERVAL=200.0;
// threshold for adaptive routing
static int adaptive_threshold = 10;
/* minimal and non-minimal packet counts for adaptive routing*/
unsigned int minimal_count=0, nonmin_count=0, completed_packets = 0;
static unsigned int minimal_count=0, nonmin_count=0, completed_packets = 0;
typedef struct dragonfly_param dragonfly_param;
/* annotation-specific parameters (unannotated entry occurs at the
......@@ -71,14 +64,26 @@ int router_magic_num = 0;
/* terminal magic number */
int terminal_magic_num = 0;
/* number of routers in a mapping group */
static int num_routers_per_mgrp = 0;
typedef struct terminal_message_list terminal_message_list;
struct terminal_message_list {
terminal_message msg;
char* event_data;
terminal_message_list *next;
terminal_message_list *prev;
};
/* maximum number of terminals and routers */
int max_term_occupancy, max_router_occupancy;
void init_terminal_message_list(terminal_message_list *this,
terminal_message *inmsg) {
this->msg = *inmsg;
this->event_data = NULL;
this->next = NULL;
this->prev = NULL;
}
/* noise of 1ns */
double noise = 1.0;
void delete_terminal_message_list(terminal_message_list *this) {
if(this->event_data != NULL) free(this->event_data);
free(this);
}
struct dragonfly_param
{
......@@ -92,7 +97,6 @@ struct dragonfly_param
int global_vc_size; /* buffer size of the global channels */
int cn_vc_size; /* buffer size of the compute node channels */
int chunk_size; /* full-sized packets are broken into smaller chunks.*/
// derived parameters
int num_cn;
int num_groups;
......@@ -100,20 +104,14 @@ struct dragonfly_param
int total_routers;
int total_terminals;
int num_global_channels;
double cn_delay;
double local_delay;
double global_delay;
double credit_delay;
};
struct pending_router_msgs
{
struct pending_router_msgs * next;
struct pending_router_msgs * prev;
char * event_data;
terminal_message msg;
int output_chan;
int next_stop;
};
/* handles terminal and router events like packet generate/send/receive/buffer */
typedef enum event_t event_t;
typedef struct terminal_state terminal_state;
typedef struct router_state router_state;
......@@ -123,14 +121,17 @@ struct terminal_state
unsigned long long packet_counter;
// Dragonfly specific parameters
tw_lpid router_id;
tw_lpid terminal_id;
unsigned int router_id;
unsigned int terminal_id;
// Each terminal will have an input and output channel with the router
int* vc_occupancy; // NUM_VC
int* output_vc_state;
int num_vcs;
tw_stime terminal_available_time;
tw_stime next_credit_available_time;
terminal_message_list **terminal_msgs;
terminal_message_list **terminal_msgs_tail;
int in_send_loop;
// Terminal generate, sends and arrival T_SEND, T_ARRIVAL, T_GENERATE
// Router-Router Intra-group sends and receives RR_LSEND, RR_LARRIVE
// Router-Router Inter-group sends and receives RR_GSEND, RR_GARRIVE
......@@ -159,7 +160,6 @@ struct terminal_state
/* to maintain a count of child nodes that have fanned in at the parent during the collective
fan-in phase*/
int num_fan_nodes;
int max_term_vc_occupancy;
const char * anno;
const dragonfly_param *params;
......@@ -170,8 +170,10 @@ enum event_t
{
T_GENERATE=1,
T_ARRIVE,
T_SEND,
T_BUFFER,
R_FORWARD,
R_SEND,
R_ARRIVE,
R_BUFFER,
D_COLLECTIVE_INIT,
D_COLLECTIVE_FAN_IN,
......@@ -209,25 +211,26 @@ struct router_state
{
unsigned int router_id;
unsigned int group_id;
int* global_channel;
tw_stime* next_output_available_time;
tw_stime* next_credit_available_time;
tw_stime* cur_hist_start_time;
terminal_message_list ***pending_msgs;
terminal_message_list ***pending_msgs_tail;
terminal_message_list ***queued_msgs;
terminal_message_list ***queued_msgs_tail;
int *in_send_loop;
int* vc_occupancy;
int* output_vc_state;
int * global_channel;
int max_router_vc_occupancy;
int** vc_occupancy;
int* link_traffic;
const char * anno;
const dragonfly_param *params;
int* prev_hist_num;
int* cur_hist_num;
struct pending_router_msgs * head;
struct pending_router_msgs * tail;
int num_waiting;
};
static short routing = MINIMAL;
......@@ -240,118 +243,144 @@ static tw_stime max_collective = 0;
static long long total_hops = 0;
static long long N_finished_packets = 0;
/* function definitions */
static void router_credit_send(router_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp);
/* returns the dragonfly router lp type for lp registration */
static const tw_lptype* dragonfly_get_router_lp_type(void);
/* remove head of the queue */
void remove_pending_list_head(struct pending_router_msgs** head,
struct pending_router_msgs** tail,
terminal_message * msg)
/* convert GiB/s and bytes to ns */
static tw_stime bytes_to_ns(uint64_t bytes, double GB_p_s)
{
struct pending_router_msgs * elem = *head;
tw_stime time;
if(*head == *tail)
*tail = NULL;
/* bytes to GB */
time = ((double)bytes)/(1024.0*1024.0*1024.0);
/* MB to s */
time = time / GB_p_s;
/* s to ns */
time = time * 1000.0 * 1000.0 * 1000.0;
*head = (*head)->next;
if(*head)
(*head)->prev = NULL;
free(elem);
return(time);
}
/* pending router messages */
void add_pending_router_message(struct pending_router_msgs** head,
struct pending_router_msgs** tail,
terminal_message * msg,
int chan,
int next_stop)
/* returns the dragonfly message size */
static int dragonfly_get_msg_sz(void)
{
struct pending_router_msgs * elem = malloc(sizeof(struct pending_router_msgs));
memcpy(&(elem->msg), msg, sizeof(terminal_message));
elem->prev = NULL;
elem->next_stop = next_stop;
elem->output_chan = chan;
if(msg->remote_event_size_bytes)
{
void * m_data = msg+1;
elem->event_data = (void*)malloc(msg->remote_event_size_bytes);
memcpy(elem->event_data, m_data, msg->remote_event_size_bytes);
}
elem->next = *head;
if(*head)
(*head)->prev = elem;
if(!(*head))
*tail = elem;
*head = elem;
return;
return sizeof(terminal_message);
}
/* remove message from the queue */
struct pending_router_msgs * remove_pending_router_msgs(struct pending_router_msgs** head,
struct pending_router_msgs** tail,
int chan)
{
struct pending_router_msgs * elem = *head;
if(!elem)
return NULL;
while(elem != NULL)
{
if(elem->output_chan == chan)
{
/* Remove elemt from the list */
/* if there is just one element */
if(elem == *head && elem == *tail)
{
*head = NULL;
*tail = NULL;
}
/* if element if at the head */
if(elem == *head && elem != *tail)
{
*head = elem->next;
elem->next->prev = NULL;
}
static void append_to_terminal_message_list(
terminal_message_list ** thisq,
terminal_message_list ** thistail,
int index,
terminal_message_list *msg) {
if(thisq[index] == NULL) {
thisq[index] = msg;
} else {
thistail[index]->next = msg;
msg->prev = thistail[index];
}
thistail[index] = msg;
}
/* if element is at the tail */
if(elem == *tail && elem != *head)
{
*tail = elem->prev;
elem->prev->next = NULL;
}
static void prepend_to_terminal_message_list(
terminal_message_list ** thisq,
terminal_message_list ** thistail,
int index,
terminal_message_list *msg) {
if(thisq[index] == NULL) {
thistail[index] = msg;
} else {
thisq[index]->prev = msg;
msg->next = thisq[index];
}
thisq[index] = msg;
}
/* if element is in the middle */
if(elem->prev)
elem->prev->next = elem->next;
if(elem->next)
elem->next->prev = elem->prev;
static void create_prepend_to_terminal_message_list(
terminal_message_list ** thisq,
terminal_message_list ** thistail,
int index,
terminal_message *msg) {
terminal_message_list* new_entry = (terminal_message_list*)malloc(
sizeof(terminal_message_list));
init_terminal_message_list(new_entry, msg);
if(msg->remote_event_size_bytes) {
void *m_data = model_net_method_get_edata(DRAGONFLY, msg);
size_t s = msg->remote_event_size_bytes + msg->local_event_size_bytes;
new_entry->event_data = (void*)malloc(s);
memcpy(new_entry->event_data, m_data, s);
}
prepend_to_terminal_message_list( thisq, thistail, index, new_entry);
}
//printf("\n Returned element %d %d %d", elem->msg.dest_terminal_id, chan, elem->next_stop);
return elem;
}
elem = elem->next;
}
return NULL;
static terminal_message_list* return_head(
terminal_message_list ** thisq,
terminal_message_list ** thistail,
int index) {
terminal_message_list *head = thisq[index];
if(head != NULL) {
thisq[index] = head->next;
if(head->next != NULL) {
head->next->prev = NULL;
head->next = NULL;
} else {
thistail[index] = NULL;
}
}
return head;
}
/* returns the dragonfly message size */
static int dragonfly_get_msg_sz(void)
{
return sizeof(terminal_message);
static terminal_message_list* return_tail(
terminal_message_list ** thisq,
terminal_message_list ** thistail,
int index) {
terminal_message_list *tail = thistail[index];
if(tail->prev != NULL) {
tail->prev->next = NULL;
thistail[index] = tail->prev;
tail->prev = NULL;
} else {
thistail[index] = NULL;
thisq[index] = NULL;
}
return tail;
}
static void copy_terminal_list_entry( terminal_message_list *cur_entry,
terminal_message *msg) {
terminal_message *cur_msg = &cur_entry->msg;
msg->travel_start_time = cur_msg->travel_start_time;
msg->packet_ID = cur_msg->packet_ID;
strcpy(msg->category, cur_msg->category);
msg->final_dest_gid = cur_msg->final_dest_gid;
msg->sender_lp = cur_msg->sender_lp;
msg->dest_terminal_id = cur_msg->dest_terminal_id;
msg->src_terminal_id = cur_msg->src_terminal_id;
msg->local_id = cur_msg->local_id;
msg->origin_router_id = cur_msg->origin_router_id;
msg->my_N_hop = cur_msg->my_N_hop;
msg->my_l_hop = cur_msg->my_l_hop;
msg->my_g_hop = cur_msg->my_g_hop;
msg->intm_lp_id = cur_msg->intm_lp_id;
msg->saved_channel = cur_msg->saved_channel;
msg->saved_vc = cur_msg->saved_vc;
msg->last_hop = cur_msg->last_hop;
msg->path_type = cur_msg->path_type;
msg->vc_index = cur_msg->vc_index;
msg->output_chan = cur_msg->output_chan;
msg->is_pull = cur_msg->is_pull;
msg->pull_size = cur_msg->pull_size;
msg->intm_group_id = cur_msg->intm_group_id;
msg->chunk_id = cur_msg->chunk_id;
msg->packet_size = cur_msg->packet_size;
msg->local_event_size_bytes = cur_msg->local_event_size_bytes;
msg->remote_event_size_bytes = cur_msg->remote_event_size_bytes;
msg->sender_node = cur_msg->sender_node;
msg->next_stop = cur_msg->next_stop;
msg->magic = cur_msg->magic;
if(msg->local_event_size_bytes + msg->remote_event_size_bytes > 0) {
void *m_data = model_net_method_get_edata(DRAGONFLY, msg);
memcpy(m_data, cur_entry->event_data,
msg->local_event_size_bytes + msg->remote_event_size_bytes);
}
}
static void dragonfly_read_config(const char * anno, dragonfly_param *params){
// shorthand
dragonfly_param *p = params;
......@@ -364,63 +393,64 @@ static void dragonfly_read_config(const char * anno, dragonfly_param *params){
p->num_routers);
}
configuration_get_value_int(&config, "PARAMS", "num_vcs", anno,
/*configuration_get_value_int(&config, "PARAMS", "num_vcs", anno,
&p->num_vcs);
if(!p->num_vcs) {
if(p->num_vcs <= 0) {
p->num_vcs = 1;
fprintf(stderr, "Number of virtual channels not specified, setting to %d\n", p->num_vcs);
}
}*/
p->num_vcs = 3;
configuration_get_value_int(&config, "PARAMS", "local_vc_size", anno, &p->local_vc_size);
if(p->local_vc_size <= 0) {
if(!p->local_vc_size) {
p->local_vc_size = 1024;
fprintf(stderr, "Buffer size of local channels not specified, setting to %d\n", p->local_vc_size);
}
configuration_get_value_int(&config, "PARAMS", "global_vc_size", anno, &p->global_vc_size);
if(p->global_vc_size <= 0) {
if(!p->global_vc_size) {
p->global_vc_size = 2048;
fprintf(stderr, "Buffer size of global channels not specified, setting to %d\n", p->global_vc_size);
}
configuration_get_value_int(&config, "PARAMS", "cn_vc_size", anno, &p->cn_vc_size);
if(p->cn_vc_size <= 0) {
if(!p->cn_vc_size) {
p->cn_vc_size = 1024;
fprintf(stderr, "Buffer size of compute node channels not specified, setting to %d\n", p->cn_vc_size);
}
configuration_get_value_int(&config, "PARAMS", "chunk_size", anno, &p->chunk_size);
if(p->chunk_size <= 0) {
if(!p->chunk_size) {
p->chunk_size = 64;
fprintf(stderr, "Chunk size for packets is not specified, setting to %d\n", p->chunk_size);
fprintf(stderr, "Chunk size for packets is specified, setting to %d\n", p->chunk_size);
}
configuration_get_value_double(&config, "PARAMS", "local_bandwidth", anno, &p->local_bandwidth);
if(p->local_bandwidth <= 0) {
if(!p->local_bandwidth) {
p->local_bandwidth = 5.25;
fprintf(stderr, "Bandwidth of local channels not specified, setting to %lf\n", p->local_bandwidth);
}
configuration_get_value_double(&config, "PARAMS", "global_bandwidth", anno, &p->global_bandwidth);
if(p->global_bandwidth <= 0) {
if(!p->global_bandwidth) {
p->global_bandwidth = 4.7;
fprintf(stderr, "Bandwidth of global channels not specified, setting to %lf\n", p->global_bandwidth);
}
configuration_get_value_double(&config, "PARAMS", "cn_bandwidth", anno, &p->cn_bandwidth);
if(p->cn_bandwidth <= 0) {
if(!p->cn_bandwidth) {
p->cn_bandwidth = 5.25;
fprintf(stderr, "Bandwidth of compute node channels not specified, setting to %lf\n", p->cn_bandwidth);
}
char routing_str[MAX_NAME_LENGTH];
configuration_get_value(&config, "PARAMS", "routing", anno, routing_str,
MAX_NAME_LENGTH);
if(strcmp(routing_str, "minimal") == 0)
routing = MINIMAL;
else if(strcmp(routing_str, "nonminimal")==0 || strcmp(routing_str,"non-minimal")==0)
else if(strcmp(routing_str, "nonminimal")==0 ||
strcmp(routing_str,"non-minimal")==0)
routing = NON_MINIMAL;
else if (strcmp(routing_str, "adaptive") == 0)
routing = ADAPTIVE;
......@@ -437,40 +467,29 @@ static void dragonfly_read_config(const char * anno, dragonfly_param *params){
p->num_cn = p->num_routers/2;
p->num_global_channels = p->num_routers/2;
p->num_groups = p->num_routers * p->num_cn + 1;
p->radix = p->num_vcs *
(p->num_routers + p->num_global_channels + p->num_cn);
p->radix = (p->num_cn + p->num_global_channels + p->num_routers);
p->total_routers = p->num_groups * p->num_routers;
p->total_terminals = p->total_routers * p->num_cn;
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if(!rank) {
printf("\n Total nodes %d routers %d groups %d radix %d \n",
p->num_cn * p->total_routers, p->total_routers, p->num_groups,
p->radix);
}
if(!g_tw_mynode)
printf("\n Total nodes %d routers %d groups %d radix %d num_vc %d ", p->num_cn * p->total_routers,
p->total_routers,
p->num_groups,
p->radix,
p->num_vcs);
}
/* convert GiB/s and bytes to ns */
static tw_stime bytes_to_ns(uint64_t bytes, double GB_p_s)
{
tw_stime time;
/* bytes to GB */
time = ((double)bytes)/(1024.0*1024.0*1024.0);
/* MB to s */
time = time / GB_p_s;
/* s to ns */
time = time * 1000.0 * 1000.0 * 1000.0;
p->cn_delay = (1.0 / p->cn_bandwidth) * p->chunk_size;
p->local_delay = (1.0 / p->local_bandwidth) * p->chunk_size;
p->global_delay = (1.0 / p->global_bandwidth) * p->chunk_size;
p->credit_delay = (1.0 / p->local_bandwidth) * 8; //assume 8 bytes packet
return(time);
}
/* reverse computation for msg ready event */
static void dragonfly_configure(){
anno_map = codes_mapping_get_lp_anno_map(LP_CONFIG_NM);
assert(anno_map);
num_params = anno_map->num_annos + (anno_map->has_unanno_lp > 0);
all_params = calloc(num_params, sizeof(*all_params));
all_params = malloc(num_params * sizeof(*all_params));
for (uint64_t i = 0; i < anno_map->num_annos; i++){
const char * anno = anno_map->annotations[i].ptr;
......@@ -486,32 +505,26 @@ static void dragonfly_report_stats()
{
long long avg_hops, total_finished_packets;
tw_stime avg_time, max_time;
long total_term_events, total_router_events;
int total_minimal_packets, total_nonmin_packets, total_completed_packets;
MPI_Reduce( &total_hops, &avg_hops, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
MPI_Reduce( &N_finished_packets, &total_finished_packets, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
MPI_Reduce( &dragonfly_total_time, &avg_time, 1,MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
MPI_Reduce( &dragonfly_max_latency, &max_time, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
MPI_Reduce( &term_ecount, &total_term_events, 1, MPI_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
MPI_Reduce( &router_ecount, &total_router_events, 1, MPI_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
if(routing == ADAPTIVE || routing == PROG_ADAPTIVE)
{
MPI_Reduce(&minimal_count, &total_minimal_packets, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
MPI_Reduce(&nonmin_count, &total_nonmin_packets, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
MPI_Reduce(&completed_packets, &total_completed_packets, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
}
/* print statistics */
if(!g_tw_mynode)
{
printf("\n total finished packets %lld ", total_finished_packets);
printf(" Average number of hops traversed %f average message latency %lf us maximum message latency %lf us\n", (float)avg_hops/total_finished_packets, avg_time/(total_finished_packets*1000), max_time/1000);
printf(" Average number of hops traversed %f average message latency %lf us maximum message latency %lf us \n", (float)avg_hops/total_finished_packets, avg_time/(total_finished_packets*1000), max_time/1000);
if(routing == ADAPTIVE || routing == PROG_ADAPTIVE)
printf("\n ADAPTIVE ROUTING STATS: %d packets routed minimally %d packets routed non-minimally completed packets %d ", total_minimal_packets, total_nonmin_packets, total_completed_packets);