From 2a241e71e9ba0791fcd17dd878d8d5a71bc57214 Mon Sep 17 00:00:00 2001 From: mubarak Date: Fri, 31 Aug 2018 16:03:59 -0400 Subject: [PATCH] enabling qos for dragonfly-plus (wip) --- codes/net/dragonfly-plus.h | 10 + src/networks/model-net/dragonfly-plus.C | 1079 ++++++++++++++--- .../methods/codes-online-comm-wrkld.C | 1 - 3 files changed, 912 insertions(+), 178 deletions(-) diff --git a/codes/net/dragonfly-plus.h b/codes/net/dragonfly-plus.h index 602dbc7..797e3d8 100644 --- a/codes/net/dragonfly-plus.h +++ b/codes/net/dragonfly-plus.h @@ -83,6 +83,16 @@ struct terminal_plus_message int is_pull; uint32_t pull_size; + /* for counting reverse calls */ + short num_rngs; + short num_cll; + + /* qos related attributes */ + int qos_index; + short last_saved_qos; + short qos_reset1; + short qos_reset2; + /* for reverse computation */ int path_type; tw_stime saved_available_time; diff --git a/src/networks/model-net/dragonfly-plus.C b/src/networks/model-net/dragonfly-plus.C index 929c264..05bcad7 100644 --- a/src/networks/model-net/dragonfly-plus.C +++ b/src/networks/model-net/dragonfly-plus.C @@ -37,6 +37,7 @@ #define CREDIT_SIZE 8 #define DFLY_HASH_TABLE_SIZE 40000 #define SHOW_ADAPTIVE_STATS 1 +#define BW_MONITOR 1 // debugging parameters #define TRACK -1 @@ -83,10 +84,20 @@ extern "C" { } #endif +static tw_stime max_qos_monitor = 5000000000; static int debug_slot_count = 0; static long term_ecount, router_ecount, term_rev_ecount, router_rev_ecount; static long packet_gen = 0, packet_fin = 0; +/* bw monitoring time in nanosecs */ +static int bw_reset_window = 5000000; + +#define indexer3d(_ptr, _x, _y, _z, _maxx, _maxy, _maxz) \ + ((_ptr) + _z * (_maxx * _maxz) + _y * (_maxx) + _x) + +#define indexer2d(_ptr, _x, _y, _maxx, _maxy) \ + ((_ptr) + _y * (_maxx) + _x) + static double maxd(double a, double b) { return a < b ? b : a; @@ -202,6 +213,10 @@ struct dragonfly_plus_param int num_cn; int intra_grp_radix; + // qos params + int num_qos_levels; + int * qos_bandwidths; + // dfp params start int num_level_chans; // number of channels between levels of the group(?) int num_router_spine; // number of spine routers (top level) @@ -294,10 +309,21 @@ struct terminal_state terminal_plus_message_list **terminal_msgs_tail; int in_send_loop; struct mn_stats dragonfly_stats_array[CATEGORY_MAX]; + + int * qos_status; + int * qos_data; + + int num_term_rc_windows; + int rc_index; + int* last_qos_status; + int* last_qos_data; + + int last_qos_lvl; + int is_monitoring_bw; struct rc_stack *st; int issueIdle; - int terminal_length; + int * terminal_length; const char *anno; const dragonfly_plus_param *params; @@ -348,6 +374,8 @@ typedef enum event_t { R_SEND, R_ARRIVE, R_BUFFER, + R_BANDWIDTH, + T_BANDWIDTH, } event_t; /* whether the last hop of a packet was global, local or a terminal */ @@ -358,6 +386,20 @@ enum last_hop TERMINAL, }; +typedef enum qos_priority +{ + Q_HIGH =0, + Q_MEDIUM, + Q_LOW, + Q_UNKNOWN, +} qos_priority; + +typedef enum qos_status +{ + Q_ACTIVE = 1, + Q_OVERBW, +} qos_status; + // Used to denote whether a connection is one that would allow a packet to continue along a minimal path or not // Specifically used to clearly pass whether a connection is a minimal one through to the connection scoring function typedef enum conn_minimality_t @@ -451,7 +493,18 @@ struct router_state int *global_channel; tw_stime *next_output_available_time; - tw_stime **last_buf_full; + tw_stime *last_buf_full; + + /* qos related state variables */ + int rc_index; + int num_rtr_rc_windows; + int is_monitoring_bw; + int* last_qos_lvl; + int** qos_status; + int** qos_data; + /* for reverse handler of qos */ + int* last_qos_status; + int* last_qos_data; tw_stime *busy_time; tw_stime *busy_time_sample; @@ -687,6 +740,43 @@ static void dragonfly_read_config(const char *anno, dragonfly_plus_param *params p->global_vc_size = 2048; fprintf(stderr, "Buffer size of global channels not specified, setting to %d\n", p->global_vc_size); } + + rc = configuration_get_value_int(&config, "PARAMS", "qos_levels", anno, &p->num_qos_levels); + if(rc) { + p->num_qos_levels = 1; + fprintf(stderr, "Number of QOS levels not specified, setting to %d\n", p->num_qos_levels); + } + + char qos_levels_str[MAX_NAME_LENGTH]; + rc = configuration_get_value(&config, "PARAMS", "qos_bandwidth", anno, qos_levels_str, MAX_NAME_LENGTH); + p->qos_bandwidths = (int*)calloc(p->num_qos_levels, sizeof(int)); + + if(p->num_qos_levels > 1) + { + int total_bw = 0; + char * token; + token = strtok(qos_levels_str, ","); + int i = 0; + while(token != NULL) + { + sscanf(token, "%d", &p->qos_bandwidths[i]); + total_bw += p->qos_bandwidths[i]; + if(p->qos_bandwidths[i] <= 0) + { + tw_error(TW_LOC, "\n Invalid bandwidth levels"); + } + i++; + token = strtok(NULL,","); + } + assert(total_bw <= 100); + } + else + p->qos_bandwidths[0] = 100; + + rc = configuration_get_value_double(&config, "PARAMS", "max_qos_monitor", anno, &max_qos_monitor); + if(rc) { + printf("\n Setting adaptive threshold to %lf ", max_qos_monitor); + } rc = configuration_get_value_int(&config, "PARAMS", "cn_vc_size", anno, &p->cn_vc_size); if (rc) { @@ -1108,6 +1198,47 @@ int dragonfly_plus_get_assigned_router_id(int terminal_id, const dragonfly_plus_ return router_id; } +void reset_rtr_bw_counters(router_state * s, + tw_bf * bf, + terminal_plus_message * msg, + tw_lp * lp) +{ + int num_qos_levels = s->params->num_qos_levels; + if(msg->type == R_BANDWIDTH) + { + for(int k = 0; k < s->num_rtr_rc_windows; k++) + { + for(int i = 0; i < s->params->radix; i++) + { + for(int j = 0; j < num_qos_levels; j++) + { + *(indexer3d(s->last_qos_status, k, i, j, s->num_rtr_rc_windows, s->params->radix, num_qos_levels)) = 0; + *(indexer3d(s->last_qos_data, k, i, j, s->num_rtr_rc_windows, s->params->radix, num_qos_levels)) = 0; + } + } + } + s->rc_index = 0; + } +} +void reset_bw_counters(terminal_state * s, + tw_bf * bf, + terminal_plus_message * msg, + tw_lp * lp) +{ + int num_qos_levels = s->params->num_qos_levels; + if(msg->type == T_BANDWIDTH) + { + for(int i = 0; i < s->num_term_rc_windows; i++) + { + for(int j = 0; j < s->params->num_qos_levels; j++) + { + *(indexer2d(s->last_qos_status, i, j, s->num_term_rc_windows, num_qos_levels)) = 0; + *(indexer2d(s->last_qos_data, i, j, s->num_term_rc_windows, num_qos_levels)) = 0; + } + } + s->rc_index = 0; + } +} /* initialize a dragonfly compute node terminal */ void terminal_plus_init(terminal_state *s, tw_lp *lp) { @@ -1132,7 +1263,9 @@ void terminal_plus_init(terminal_state *s, tw_lp *lp) s->params = &all_params[id]; } + int num_qos_levels = s->params->num_qos_levels; int num_lps = codes_mapping_get_lp_count(lp_group_name, 1, LP_CONFIG_NM_TERM, s->anno, 0); + s->terminal_id = codes_mapping_get_lp_relative_id(lp->gid, 0, 0); s->router_id = dragonfly_plus_get_assigned_router_id(s->terminal_id, s->params); @@ -1157,8 +1290,29 @@ void terminal_plus_init(terminal_state *s, tw_lp *lp) s->rev_events = 0; rc_stack_create(&s->st); + s->num_vcs = 1; - s->vc_occupancy = (int *) calloc(s->num_vcs, sizeof(int)); + if(num_qos_levels > 1) + s->num_vcs *= num_qos_levels; + + /* Whether the virtual channel group is active or over-bw*/ + s->qos_status = (int*)calloc(num_qos_levels, sizeof(int)); + + /* How much data has been transmitted on the virtual channel group within + * the window */ + s->qos_data = (int*)calloc(num_qos_levels, sizeof(int)); + + /* for reverse handlers */ + s->last_qos_status = (int*)calloc(s->num_term_rc_windows * num_qos_levels, sizeof(int)); + s->last_qos_data = (int*)calloc(s->num_term_rc_windows * num_qos_levels, sizeof(int)); + + for(i = 0; i < num_qos_levels; i++) + { + s->qos_data[i] = 0; + s->qos_status[i] = Q_ACTIVE; + } + + s->last_qos_lvl = 0; s->last_buf_full = (tw_stime *) calloc(s->num_vcs, sizeof(tw_stime)); for (i = 0; i < s->num_vcs; i++) { @@ -1166,15 +1320,12 @@ void terminal_plus_init(terminal_state *s, tw_lp *lp) s->vc_occupancy[i] = 0; } - s->rank_tbl = NULL; s->terminal_msgs = (terminal_plus_message_list **) calloc(s->num_vcs, sizeof(terminal_plus_message_list *)); s->terminal_msgs_tail = (terminal_plus_message_list **) calloc(s->num_vcs, sizeof(terminal_plus_message_list *)); - s->terminal_msgs[0] = NULL; - s->terminal_msgs_tail[0] = NULL; - s->terminal_length = 0; + s->terminal_length = (int*)calloc(num_qos_levels, sizeof(int)); s->in_send_loop = 0; s->issueIdle = 0; @@ -1218,9 +1369,19 @@ void router_plus_setup(router_state *r, tw_lp *lp) // printf("\n Local router id %d global id %d ", r->router_id, lp->gid); + r->num_rtr_rc_windows = 100; + r->rc_index = 0; + r->is_monitoring_bw = 0; r->fwd_events = 0; r->rev_events = 0; + // QoS related variables + // for reverse computation of QoS + /* history window for bandwidth reverse computation */ + int num_qos_levels = p->num_qos_levels; + r->last_qos_status = (int*)calloc(r->num_rtr_rc_windows * r->params->radix * num_qos_levels, sizeof(int)); + r->last_qos_data = (int*)calloc(r->num_rtr_rc_windows * r->params->radix * num_qos_levels, sizeof(int)); + // Determine if router is a spine or a leaf int intra_group_id = r->router_id % p->num_routers; if (intra_group_id >= (p->num_routers / 2)) { //TODO this assumes symmetric spine and leafs @@ -1244,6 +1405,9 @@ void router_plus_setup(router_state *r, tw_lp *lp) r->link_traffic_sample = (int64_t *) calloc(p->radix, sizeof(int64_t)); r->vc_occupancy = (int **) calloc(p->radix, sizeof(int *)); + r->qos_data = (int**)calloc(p->radix, sizeof(int*)); + r->last_qos_lvl = (int*)calloc(p->radix, sizeof(int)); + r->qos_status = (int**)calloc(p->radix, sizeof(int*)); r->in_send_loop = (int *) calloc(p->radix, sizeof(int)); r->pending_msgs = (terminal_plus_message_list ***) calloc(p->radix, sizeof(terminal_plus_message_list **)); @@ -1254,7 +1418,7 @@ void router_plus_setup(router_state *r, tw_lp *lp) r->queued_msgs_tail = (terminal_plus_message_list ***) calloc(p->radix, sizeof(terminal_plus_message_list **)); r->queued_count = (int *) calloc(p->radix, sizeof(int)); - r->last_buf_full = (tw_stime **) calloc(p->radix, sizeof(tw_stime *)); + r->last_buf_full = (tw_stime*) calloc(p->radix, sizeof(tw_stime *)); r->busy_time = (tw_stime *) calloc(p->radix, sizeof(tw_stime)); r->busy_time_sample = (tw_stime *) calloc(p->radix, sizeof(tw_stime)); @@ -1262,9 +1426,11 @@ void router_plus_setup(router_state *r, tw_lp *lp) for (int i = 0; i < p->radix; i++) { // Set credit & router occupancy + r->last_buf_full[i] = 0.0; r->busy_time[i] = 0.0; r->busy_time_sample[i] = 0.0; r->next_output_available_time[i] = 0; + r->last_qos_lvl[i] = 0; r->link_traffic[i] = 0; r->link_traffic_sample[i] = 0; r->queued_count[i] = 0; @@ -1272,15 +1438,22 @@ void router_plus_setup(router_state *r, tw_lp *lp) r->vc_occupancy[i] = (int *) calloc(p->num_vcs, sizeof(int)); r->pending_msgs[i] = (terminal_plus_message_list **) calloc(p->num_vcs, sizeof(terminal_plus_message_list *)); - r->last_buf_full[i] = (tw_stime *) calloc(p->num_vcs, sizeof(tw_stime)); r->pending_msgs_tail[i] = (terminal_plus_message_list **) calloc(p->num_vcs, sizeof(terminal_plus_message_list *)); r->queued_msgs[i] = (terminal_plus_message_list **) calloc(p->num_vcs, sizeof(terminal_plus_message_list *)); r->queued_msgs_tail[i] = (terminal_plus_message_list **) calloc(p->num_vcs, sizeof(terminal_plus_message_list *)); + + r->qos_status[i] = (int*)calloc(num_qos_levels, sizeof(int)); + r->qos_data[i] = (int*)calloc(num_qos_levels, sizeof(int)); + + for(int j = 0; j < num_qos_levels; j++) + { + r->qos_status[i][j] = Q_ACTIVE; + r->qos_data[i][j] = 0; + } for (int j = 0; j < p->num_vcs; j++) { - r->last_buf_full[i][j] = 0.0; r->vc_occupancy[i][j] = 0; r->pending_msgs[i][j] = NULL; r->pending_msgs_tail[i][j] = NULL; @@ -1293,6 +1466,299 @@ void router_plus_setup(router_state *r, tw_lp *lp) return; } +int get_vcg_from_category(terminal_plus_message * msg) +{ + if(strcmp(msg->category, "high") == 0) + return Q_HIGH; + else if(strcmp(msg->category, "medium") == 0) + return Q_MEDIUM; + else + tw_error(TW_LOC, "\n priority needs to be specified with qos_levels>1 %d", msg->category); +} + +static int get_rtr_bandwidth_consumption(router_state * s, int qos_lvl, int output_port) +{ + assert(qos_lvl >= Q_HIGH && qos_lvl <= Q_LOW); + assert(output_port < s->params->intra_grp_radix + s->params->num_global_connections + s->params->num_cn); + + int bandwidth = s->params->cn_bandwidth; + if(output_port < s->params->intra_grp_radix) + bandwidth = s->params->local_bandwidth; + else if(output_port < s->params->intra_grp_radix + s->params->num_global_connections) + bandwidth = s->params->global_bandwidth; + + /* conversion into bytes/sec from GiB/sec */ + double max_bw = bandwidth * 1024.0 * 1024.0 * 1024.0; + /* conversion into bytes per one nanosecs */ + double max_bw_per_ns = max_bw / (1000.0 * 1000.0 * 1000.0); + /* derive maximum bytes that can be transferred during the window */ + double max_bytes_per_win = max_bw_per_ns * bw_reset_window; + + int percent_bw = (((double)s->qos_data[output_port][qos_lvl]) / max_bytes_per_win) * 100; +// printf("\n percent bw consumed by qos_lvl %d is %d bytes transferred %d max_bw %lf ", qos_lvl, percent_bw, s->qos_data[output_port][qos_lvl], max_bw_per_ns); + return percent_bw; + +} + +static int get_term_bandwidth_consumption(terminal_state * s, int qos_lvl) +{ + assert(qos_lvl >= Q_HIGH && qos_lvl <= Q_LOW); + + /* conversion into bytes/sec from GiB/sec */ + double max_bw = s->params->cn_bandwidth * 1024.0 * 1024.0 * 1024.0; + /* conversion into bytes per one nanosecs */ + double max_bw_per_ns = max_bw / (1000.0 * 1000.0 * 1000.0); + /* derive maximum bytes that can be transferred during the window */ + double max_bytes_per_win = max_bw_per_ns * bw_reset_window; + int percent_bw = (((double)s->qos_data[qos_lvl]) / max_bytes_per_win) * 100; +// printf("\n At terminal %lf max bytes %d percent %d ", max_bytes_per_win, s->qos_data[qos_lvl], percent_bw); + return percent_bw; +} +/* reverse handler for router BW monitor */ +void issue_rtr_bw_monitor_event_rc(router_state * s, tw_bf * bf, terminal_plus_message * msg, tw_lp * lp) +{ + int num_qos_levels = s->params->num_qos_levels; + int rc_index = msg->qos_index; + + for(int i = 0 ; i < msg->num_cll; i++) + codes_local_latency_reverse(lp); + + for(int j = 0; j < s->params->radix; j++) + { + for(int k = 0; k < num_qos_levels; k++) + { + s->qos_status[j][k] = *(indexer3d(s->last_qos_status, rc_index, j, k, s->num_rtr_rc_windows, s->params->radix, num_qos_levels)); + s->qos_data[j][k] = *(indexer3d(s->last_qos_data, rc_index, j, k, s->num_rtr_rc_windows, s->params->radix, num_qos_levels)); + *(indexer3d(s->last_qos_status, rc_index, j, k, s->num_rtr_rc_windows, s->params->radix, num_qos_levels)) = 0; + *(indexer3d(s->last_qos_data, rc_index, j, k, s->num_rtr_rc_windows, s->params->radix, num_qos_levels)) = 0; + } + } +} +void issue_rtr_bw_monitor_event(router_state * s, tw_bf * bf, terminal_plus_message * msg, tw_lp * lp) +{ + msg->num_cll = 0; + msg->num_rngs = 0; + + int num_qos_levels = s->params->num_qos_levels; + int rc_index = s->rc_index; + int num_rtr_rc_windows = s->num_rtr_rc_windows; + + /* dynamically reallocate the array.. */ + if(s->rc_index == s->num_rtr_rc_windows) + { + s->num_rtr_rc_windows *= 2; + int * tmp1 = (int*)calloc(s->num_rtr_rc_windows * s->params->radix * num_qos_levels, sizeof(int)); + int * tmp2 = (int*)calloc(s->num_rtr_rc_windows * s->params->radix * num_qos_levels, sizeof(int)); + /* now copy elements one by one. can't use memcpy with 2d array. */ + for(int i = 0; i < num_rtr_rc_windows; i++) + { + for(int j = 0; j < s->params->radix; j++) + { + for(int k = 0; k < num_qos_levels; k++) + { + *(indexer3d(tmp1, i, j, k, s->num_rtr_rc_windows, s->params->radix, num_qos_levels)) = *(indexer3d(s->last_qos_status, i, j, k, num_rtr_rc_windows, s->params->radix, num_qos_levels)); + *(indexer3d(tmp2, i, j, k, s->num_rtr_rc_windows, s->params->radix, num_qos_levels)) = *(indexer3d(s->last_qos_data, i, j, k, num_rtr_rc_windows, s->params->radix, num_qos_levels)); + } + } + } + free(s->last_qos_status); + free(s->last_qos_data); + + s->last_qos_status = tmp1; + s->last_qos_data = tmp2; + } + assert(rc_index < s->num_rtr_rc_windows && rc_index >= 0); + + for(int j = 0; j < s->params->radix; j++) + { + for(int k = 0; k < num_qos_levels; k++) + { + *(indexer3d(s->last_qos_status, rc_index, j, k, s->num_rtr_rc_windows, s->params->radix, num_qos_levels)) = s->qos_status[j][k]; + *(indexer3d(s->last_qos_data, rc_index, j, k, s->num_rtr_rc_windows, s->params->radix, num_qos_levels)) = s->qos_data[j][k]; + } + } + + msg->qos_index = s->rc_index; + s->rc_index++; + + /*for(int j = 0; j < s->params->radix; j++) + { + for(int k = 0; k < num_qos_levels; k++) + { + int bw_consumed = get_rtr_bandwidth_consumption(s, k, j); + if(s->router_id == 0) + { + //fprintf(dragonfly_rtr_bw_log, "\n %d %f %d %d %d %d %d %f", s->router_id, tw_now(lp), j, k, bw_consumed, s->qos_status[j][k], s->qos_data[j][k], s->busy_time_sample[j]); + + } + } + }*/ + for(int j = 0; j < s->params->radix; j++) + { + /* Reset the qos status and bandwidth consumption. */ + for(int k = 0; k < num_qos_levels; k++) + { + s->qos_status[j][k] = Q_ACTIVE; + s->qos_data[j][k] = 0; + } + //s->busy_time_sample[j] = 0; + } + + if(tw_now(lp) > max_qos_monitor) + return; + + msg->num_cll++; + tw_stime bw_ts = bw_reset_window + codes_local_latency(lp); + terminal_plus_message *m; + tw_event * e = model_net_method_event_new(lp->gid, bw_ts, lp, + DRAGONFLY_CUSTOM_ROUTER, (void**)&m, NULL); + m->type = R_BANDWIDTH; + m->magic = router_magic_num; + tw_event_send(e); +} + +void issue_bw_monitor_event_rc(terminal_state * s, tw_bf * bf, terminal_plus_message * msg, tw_lp * lp) +{ + for(int i = 0 ; i < msg->num_cll; i++) + codes_local_latency_reverse(lp); + + int num_qos_levels = s->params->num_qos_levels; + int num_term_rc_wins = s->num_term_rc_windows; + int rc_index = msg->qos_index; + + for(int k = 0; k < num_qos_levels; k++) + { + s->qos_status[k] = *(indexer2d(s->last_qos_status, rc_index, k, num_term_rc_wins, num_qos_levels)); + s->qos_data[k] = *(indexer2d(s->last_qos_data, rc_index, k, num_term_rc_wins, num_qos_levels)); + *(indexer2d(s->last_qos_status, rc_index, k, num_term_rc_wins, num_qos_levels)) = 0; + *(indexer2d(s->last_qos_data, rc_index, k, num_term_rc_wins, num_qos_levels)) = 0; + } +} +/* resets the bandwidth numbers recorded so far */ +void issue_bw_monitor_event(terminal_state * s, tw_bf * bf, terminal_plus_message * msg, tw_lp * lp) +{ + + msg->num_cll = 0; + msg->num_rngs = 0; + int num_qos_levels = s->params->num_qos_levels; + int rc_index = s->rc_index; + int num_term_rc_wins = s->num_term_rc_windows; + + /* dynamically reallocate array if index has reached max-size */ + if(s->rc_index == s->num_term_rc_windows) + { + s->num_term_rc_windows *= 2; + int * tmp1 = (int*)calloc(s->num_term_rc_windows * num_qos_levels, sizeof(int)); + int * tmp2 = (int*)calloc(s->num_term_rc_windows * num_qos_levels, sizeof(int)); + + /* now copy elements one by one. can't use memcpy with 2d array. */ + for(int i = 0; i < s->num_term_rc_windows; i++) + { + for(int j = 0; j < num_qos_levels; j++) + { + *(indexer2d(tmp1, i, j, s->num_term_rc_windows, num_qos_levels)) = *(indexer2d(s->last_qos_status, i, j, num_term_rc_wins, num_qos_levels)); + *(indexer2d(tmp2, i, j, s->num_term_rc_windows, num_qos_levels)) = *(indexer2d(s->last_qos_data, i, j, num_term_rc_wins, num_qos_levels)); + } + } + free(s->last_qos_status); + free(s->last_qos_data); + + s->last_qos_status = tmp1; + s->last_qos_data = tmp2; + } + /* Reset the qos status and bandwidth consumption. */ + for(int k = 0; k < num_qos_levels; k++) + { + *(indexer2d(s->last_qos_status, rc_index, k, num_term_rc_wins, num_qos_levels)) = s->qos_status[k]; + *(indexer2d(s->last_qos_data, rc_index, k, num_term_rc_wins, num_qos_levels)) = s->qos_data[k]; + s->qos_status[k] = Q_ACTIVE; + s->qos_data[k] = 0; + } + msg->qos_index = s->rc_index; + s->rc_index++; + assert(s->rc_index < s->num_term_rc_windows); + +/* if(s->router_id == 0) + { + fprintf(dragonfly_term_bw_log, "\n %d %lf %lf ", s->terminal_id, tw_now(lp), s->busy_time_sample); + s->busy_time_sample = 0; + } + */ + if(tw_now(lp) > max_qos_monitor) + return; + + msg->num_cll++; + terminal_plus_message * m; + tw_stime bw_ts = bw_reset_window + codes_local_latency(lp); + tw_event * e = model_net_method_event_new(lp->gid, bw_ts, lp, DRAGONFLY_CUSTOM, + (void**)&m, NULL); + m->type = T_BANDWIDTH; + m->magic = terminal_magic_num; + tw_event_send(e); +} + +static int get_next_vcg(terminal_state * s, tw_bf * bf, terminal_plus_message * msg, tw_lp * lp) +{ + int num_qos_levels = s->params->num_qos_levels; + + if(num_qos_levels == 1) + { + if(s->terminal_msgs[0] == NULL || s->vc_occupancy[0] + s->params->chunk_size > s->params->cn_vc_size) + return -1; + else + return 0; + } + + int bw_consumption[num_qos_levels]; + + /* First make sure the bandwidth consumptions are up to date. */ + for(int k = 0; k < num_qos_levels; k++) + { + if(s->qos_status[k] != Q_OVERBW) + { + bw_consumption[k] = get_term_bandwidth_consumption(s, k); + if(bw_consumption[k] > s->params->qos_bandwidths[k]) + { + if(k == 0) + msg->qos_reset1 = 1; + else if(k == 1) + msg->qos_reset2 = 1; + + s->qos_status[k] = Q_OVERBW; + } + } + } + if(BW_MONITOR == 1) + { + for(int i = 0; i < num_qos_levels; i++) + { + if(s->qos_status[i] == Q_ACTIVE) + { + if(s->terminal_msgs[i] != NULL && s->vc_occupancy[i] + s->params->chunk_size <= s->params->cn_vc_size) + return i; + } + } + } + + + int next_rr_vcg = (s->last_qos_lvl + 1) % num_qos_levels; + /* All vcgs are exceeding their bandwidth limits*/ + for(int i = 0; i < num_qos_levels; i++) + { + if(s->terminal_msgs[i] != NULL && s->vc_occupancy[i] + s->params->chunk_size <= s->params->cn_vc_size) + { + bf->c2 = 1; + + if(msg->last_saved_qos < 0) + msg->last_saved_qos = s->last_qos_lvl; + + s->last_qos_lvl = next_rr_vcg; + return i; + } + next_rr_vcg = (next_rr_vcg + 1) % num_qos_levels; + } + return -1; +} /* MM: These packet events (packet_send, packet_receive etc.) will be used as is, basically, the routing * functions will be changed only. */ @@ -1367,7 +1833,7 @@ static void dragonfly_plus_packet_event_rc(tw_lp *sender) * sending router. */ /*When a packet is sent from the current router and a buffer slot becomes available, a credit is sent back to * schedule another packet event*/ -static void router_credit_send(router_state *s, terminal_plus_message *msg, tw_lp *lp, int sq) +static void router_credit_send(router_state *s, terminal_plus_message *msg, tw_lp *lp, int sq, short* rng_counter) { tw_event *buf_e; tw_stime ts; @@ -1390,6 +1856,7 @@ static void router_credit_send(router_state *s, terminal_plus_message *msg, tw_l else printf("\n Invalid message type"); + (*rng_counter)++; ts = g_tw_lookahead + p->credit_delay + tw_rand_unif(lp->rng); if (is_terminal) { @@ -1421,27 +1888,37 @@ static void packet_generate_rc(terminal_state *s, tw_bf *bf, terminal_plus_messa s->packet_gen--; packet_gen--; s->packet_counter--; + + for(int i = 0; i < msg->num_rngs; i++) + tw_rand_reverse_unif(lp->rng); - tw_rand_reverse_unif(lp->rng); + for(int i = 0; i < msg->num_cll; i++) + codes_local_latency_reverse(lp); + int num_qos_levels = s->params->num_qos_levels; int num_chunks = msg->packet_size / s->params->chunk_size; if (msg->packet_size < s->params->chunk_size) num_chunks++; + + int vcg = 0; + if(num_qos_levels > 1) + { + vcg = get_vcg_from_category(msg); + assert(vcg == Q_HIGH || vcg == Q_MEDIUM); + } + assert(vcg < num_qos_levels); + int i; for (i = 0; i < num_chunks; i++) { - delete_terminal_plus_message_list(return_tail(s->terminal_msgs, s->terminal_msgs_tail, 0)); - s->terminal_length -= s->params->chunk_size; + delete_terminal_plus_message_list(return_tail(s->terminal_msgs, s->terminal_msgs_tail, vcg)); + s->terminal_length[vcg] -= s->params->chunk_size; } if (bf->c5) { - codes_local_latency_reverse(lp); s->in_send_loop = 0; } if (bf->c11) { s->issueIdle = 0; - if (bf->c8) { - s->last_buf_full[0] = msg->saved_busy_time; - } } struct mn_stats *stat; stat = model_net_find_stats(msg->category, s->dragonfly_stats_array); @@ -1453,8 +1930,44 @@ static void packet_generate_rc(terminal_state *s, tw_bf *bf, terminal_plus_messa /* generates packet at the current dragonfly compute node */ static void packet_generate(terminal_state *s, tw_bf *bf, terminal_plus_message *msg, tw_lp *lp) { + msg->num_rngs = 0; + msg->num_cll = 0; + packet_gen++; s->packet_gen++; + + int num_qos_levels = s->params->num_qos_levels; + + if(num_qos_levels > 1) + { + tw_lpid router_id; + codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, NULL, + &mapping_type_id, NULL, &mapping_rep_id, &mapping_offset); + codes_mapping_get_lp_id(lp_group_name, LP_CONFIG_NM_ROUT, NULL, 0, + s->router_id / num_routers_per_mgrp, s->router_id % num_routers_per_mgrp, &router_id); + if(s->is_monitoring_bw == 0) + { + bf->c1 = 1; + /* Issue an event on both terminal and router to monitor bandwidth */ + msg->num_cll++; + tw_stime bw_ts = bw_reset_window + codes_local_latency(lp); + terminal_plus_message * m; + tw_event * e = model_net_method_event_new(lp->gid, bw_ts, lp, DRAGONFLY_CUSTOM, + (void**)&m, NULL); + m->type = T_BANDWIDTH; + m->magic = terminal_magic_num; + s->is_monitoring_bw = 1; + tw_event_send(e); + } + } + + int vcg = 0; + if(num_qos_levels > 1) + { + vcg = get_vcg_from_category(msg); + assert(vcg == Q_HIGH || vcg == Q_MEDIUM); + } + assert(vcg < num_qos_levels); tw_stime ts, nic_ts; @@ -1485,6 +1998,7 @@ static void packet_generate(terminal_state *s, tw_bf *bf, terminal_plus_message if (msg->packet_size < s->params->chunk_size) cn_delay = bytes_to_ns(msg->packet_size % s->params->chunk_size, s->params->cn_bandwidth); + msg->num_rngs++; nic_ts = g_tw_lookahead + (num_chunks * cn_delay) + tw_rand_unif(lp->rng); // msg->packet_ID = lp->gid + g_tw_nlp * s->packet_counter; @@ -1520,29 +2034,24 @@ static void packet_generate(terminal_state *s, tw_bf *bf, terminal_plus_message msg->local_event_size_bytes); } + cur_chunk->msg.output_chan = vcg; cur_chunk->msg.chunk_id = i; cur_chunk->msg.origin_router_id = s->router_id; - append_to_terminal_plus_message_list(s->terminal_msgs, s->terminal_msgs_tail, 0, cur_chunk); - s->terminal_length += s->params->chunk_size; + append_to_terminal_plus_message_list(s->terminal_msgs, s->terminal_msgs_tail, vcg, cur_chunk); + s->terminal_length[vcg] += s->params->chunk_size; } - if (s->terminal_length < 2 * s->params->cn_vc_size) { + if (s->terminal_length[vcg] < 2 * s->params->cn_vc_size) { model_net_method_idle_event(nic_ts, 0, lp); } else { bf->c11 = 1; s->issueIdle = 1; - - if (s->last_buf_full[0] == 0.0) { - bf->c8 = 1; - msg->saved_busy_time = s->last_buf_full[0]; - /* TODO: Assumes a single vc from terminal to router */ - s->last_buf_full[0] = tw_now(lp); - } } if (s->in_send_loop == 0) { bf->c5 = 1; + msg->num_cll++; ts = codes_local_latency(lp); terminal_plus_message *m; tw_event *e = model_net_method_event_new(lp->gid, ts, lp, DRAGONFLY_PLUS, (void **) &m, NULL); @@ -1565,46 +2074,66 @@ static void packet_generate(terminal_state *s, tw_bf *bf, terminal_plus_message return; } -static void packet_send_rc(terminal_state *s, tw_bf *bf, terminal_plus_message *msg, tw_lp *lp) +static void packet_send_rc(terminal_state * s, tw_bf * bf, terminal_plus_message * msg, + tw_lp * lp) { - if (bf->c1) { + int num_qos_levels = s->params->num_qos_levels; + + if(msg->qos_reset1) + s->qos_status[0] = Q_ACTIVE; + if(msg->qos_reset2) + s->qos_status[1] = Q_ACTIVE; + + if(msg->last_saved_qos) + s->last_qos_lvl = msg->last_saved_qos; + + if(bf->c1) { s->in_send_loop = 1; - - if (bf->c10) - s->last_buf_full[0] = msg->saved_busy_time; - + if(bf->c3) + s->last_buf_full[0] = msg->saved_busy_time; + return; - } - - tw_rand_reverse_unif(lp->rng); - s->terminal_available_time = msg->saved_available_time; - if (bf->c2) { + } + + for(int i = 0; i < msg->num_cll; i++) { codes_local_latency_reverse(lp); - } - - s->terminal_length += s->params->chunk_size; - // s->packet_counter--; - s->vc_occupancy[0] -= s->params->chunk_size; - - terminal_plus_message_list *cur_entry = (terminal_plus_message_list *) rc_stack_pop(s->st); - - prepend_to_terminal_plus_message_list(s->terminal_msgs, s->terminal_msgs_tail, 0, cur_entry); - if (bf->c3) { + } + + for(int i = 0; i < msg->num_rngs; i++) + { tw_rand_reverse_unif(lp->rng); - } - if (bf->c4) { + } + int vcg = msg->saved_vc; + s->terminal_available_time = msg->saved_available_time; + + s->terminal_length[vcg] += s->params->chunk_size; + /*TODO: MM change this to the vcg */ + s->vc_occupancy[vcg] -= s->params->chunk_size; + + terminal_plus_message_list* cur_entry = (terminal_plus_message_list *)rc_stack_pop(s->st); + + int data_size = s->params->chunk_size; + if(cur_entry->msg.packet_size < s->params->chunk_size) + data_size = cur_entry->msg.packet_size % s->params->chunk_size; + + s->qos_data[vcg] -= data_size; + + prepend_to_terminal_plus_message_list(s->terminal_msgs, + s->terminal_msgs_tail, vcg, cur_entry); + if(bf->c4) { s->in_send_loop = 1; - } - if (bf->c5) { - tw_rand_reverse_unif(lp->rng); - s->issueIdle = 1; - if (bf->c6) { + } + if(bf->c5) + { + s->issueIdle = 1; + if(bf->c6) + { s->busy_time = msg->saved_total_time; s->last_buf_full[0] = msg->saved_busy_time; s->busy_time_sample = msg->saved_sample_time; - } - } - return; + } + } + return; } /* sends the packet from the current dragonfly compute node to the attached router */ static void packet_send(terminal_state *s, tw_bf *bf, terminal_plus_message *msg, tw_lp *lp) @@ -1613,32 +2142,53 @@ static void packet_send(terminal_state *s, tw_bf *bf, terminal_plus_message *msg tw_event *e; terminal_plus_message *m; tw_lpid router_id; - - terminal_plus_message_list *cur_entry = s->terminal_msgs[0]; - - if (s->vc_occupancy[0] + s->params->chunk_size > s->params->cn_vc_size) { - if (s->last_buf_full[0] == 0.0) { - bf->c10 = 1; - msg->saved_busy_time = s->last_buf_full[0]; - s->last_buf_full[0] = tw_now(lp); - } - } - - if (s->vc_occupancy[0] + s->params->chunk_size > s->params->cn_vc_size || cur_entry == NULL) { - bf->c1 = 1; - s->in_send_loop = 0; - return; + + int vcg = 0; + int num_qos_levels = s->params->num_qos_levels; + + msg->last_saved_qos = -1; + msg->qos_reset1 = -1; + msg->qos_reset2 = -1; + msg->num_rngs = 0; + msg->num_cll = 0; + + if(num_qos_levels > 1) + vcg = get_next_vcg(s, bf, msg, lp); + + /* For a terminal to router connection, there would be as many VCGs as number + * of VCs*/ + + if(vcg == -1) { + bf->c1 = 1; + s->in_send_loop = 0; + if(!s->last_buf_full[0]) + { + bf->c3 = 1; + msg->saved_busy_time = s->last_buf_full[0]; + s->last_buf_full[0] = tw_now(lp); + } + return; } + + int data_size = s->params->chunk_size; + msg->saved_vc = vcg; + terminal_plus_message_list* cur_entry = s->terminal_msgs[vcg]; + uint64_t num_chunks = cur_entry->msg.packet_size / s->params->chunk_size; if (cur_entry->msg.packet_size < s->params->chunk_size) num_chunks++; tw_stime delay = s->params->cn_delay; if ((cur_entry->msg.packet_size < s->params->chunk_size) && (cur_entry->msg.chunk_id == num_chunks - 1)) + { + data_size = cur_entry->msg.packet_size % s->params->chunk_size; delay = bytes_to_ns(cur_entry->msg.packet_size % s->params->chunk_size, s->params->cn_bandwidth); + } + s->qos_data[vcg] += data_size; msg->saved_available_time = s->terminal_available_time; + msg->num_rngs++; ts = g_tw_lookahead + delay + tw_rand_unif(lp->rng); s->terminal_available_time = maxd(s->terminal_available_time, tw_now(lp)); s->terminal_available_time += ts; @@ -1673,6 +2223,7 @@ static void packet_send(terminal_state *s, tw_bf *bf, terminal_plus_message *msg if (cur_entry->msg.chunk_id == num_chunks - 1 && (cur_entry->msg.local_event_size_bytes > 0)) { bf->c2 = 1; + msg->num_cll++; tw_stime local_ts = codes_local_latency(lp); tw_event *e_new = tw_event_new(cur_entry->msg.sender_lp, local_ts, lp); void *m_new = tw_event_data(e_new); @@ -1680,18 +2231,25 @@ static void packet_send(terminal_state *s, tw_bf *bf, terminal_plus_message *msg memcpy(m_new, local_event, cur_entry->msg.local_event_size_bytes); tw_event_send(e_new); } + // s->packet_counter++; - s->vc_occupancy[0] += s->params->chunk_size; - cur_entry = return_head(s->terminal_msgs, s->terminal_msgs_tail, 0); + s->vc_occupancy[vcg] += s->params->chunk_size; + cur_entry = return_head(s->terminal_msgs, s->terminal_msgs_tail, vcg); rc_stack_push(lp, cur_entry, delete_terminal_plus_message_list, s->st); - s->terminal_length -= s->params->chunk_size; + s->terminal_length[vcg] -= s->params->chunk_size; + + int next_vcg = 0; + + if(num_qos_levels > 1) + next_vcg = get_next_vcg(s, bf, msg, lp); - cur_entry = s->terminal_msgs[0]; + cur_entry = s->terminal_msgs[next_vcg]; /* if there is another packet inline then schedule another send event */ - if (cur_entry != NULL && s->vc_occupancy[0] + s->params->chunk_size <= s->params->cn_vc_size) { + if (cur_entry != NULL && s->vc_occupancy[next_vcg] + s->params->chunk_size <= s->params->cn_vc_size) { bf->c3 = 1; terminal_plus_message *m_new; + msg->num_rngs++; ts += tw_rand_unif(lp->rng); e = model_net_method_event_new(lp->gid, ts, lp, DRAGONFLY_PLUS, (void **) &m_new, NULL); m_new->type = T_SEND; @@ -1706,6 +2264,7 @@ static void packet_send(terminal_state *s, tw_bf *bf, terminal_plus_message *msg if (s->issueIdle) { bf->c5 = 1; s->issueIdle = 0; + msg->num_rngs++; ts += tw_rand_unif(lp->rng); model_net_method_idle_event(ts, 0, lp); @@ -1725,11 +2284,16 @@ static void packet_send(terminal_state *s, tw_bf *bf, terminal_plus_message *msg static void packet_arrive_rc(terminal_state *s, tw_bf *bf, terminal_plus_message *msg, tw_lp *lp) { + for(int i = 0; i < msg->num_rngs; i++) + tw_rand_reverse_unif(lp->rng); + + for(int i = 0; i < msg->num_cll; i++) + codes_local_latency_reverse(lp); + if (bf->c31) { s->packet_fin--; packet_fin--; } - tw_rand_reverse_unif(lp->rng); if (msg->path_type == MINIMAL) minimal_count--; else @@ -1777,8 +2341,6 @@ static void packet_arrive_rc(terminal_state *s, tw_bf *bf, terminal_plus_message } if (bf->c7) { // assert(!hash_link); - if (bf->c8) - tw_rand_reverse_unif(lp->rng); N_finished_msgs--; s->finished_msgs--; total_msg_sz -= msg->total_size; @@ -1814,6 +2376,7 @@ static void send_remote_event(terminal_state *s, terminal_plus_message *msg, tw_ { void *tmp_ptr = model_net_method_get_edata(DRAGONFLY_PLUS, msg); // tw_stime ts = g_tw_lookahead + bytes_to_ns(msg->remote_event_size_bytes, (1/s->params->cn_bandwidth)); + msg->num_rngs++; tw_stime ts = g_tw_lookahead + mpi_soft_overhead + tw_rand_unif(lp->rng); if (msg->is_pull) { bf->c4 = 1; @@ -1844,6 +2407,9 @@ static void packet_arrive(terminal_state *s, tw_bf *bf, terminal_plus_message *m { printf("Terminal received a packet with %d hops! (Notify on > than %d)\n",msg->my_N_hop, s->params->max_hops_notify); } + + msg->num_rngs = 0; + msg->num_cll = 0; if (!s->rank_tbl) s->rank_tbl = qhash_init(dragonfly_rank_hash_compare, dragonfly_hash_func, DFLY_HASH_TABLE_SIZE); @@ -1882,6 +2448,7 @@ static void packet_arrive(terminal_state *s, tw_bf *bf, terminal_plus_message *m if (msg->packet_ID == LLU(TRACK_PKT) && msg->src_terminal_id == T_ID) printf("\n Packet %llu arrived at lp %llu hops %d ", msg->packet_ID, LLU(lp->gid), msg->my_N_hop); + msg->num_rngs++; tw_stime ts = g_tw_lookahead + s->params->credit_delay + tw_rand_unif(lp->rng); // no method_event here - message going to router @@ -2283,12 +2850,19 @@ void dragonfly_plus_sample_fin(terminal_state *s, tw_lp *lp) static void terminal_buf_update_rc(terminal_state *s, tw_bf *bf, terminal_plus_message *msg, tw_lp *lp) { - s->vc_occupancy[0] += s->params->chunk_size; - codes_local_latency_reverse(lp); + int vcg = 0; + int num_qos_levels = s->params->num_qos_levels; + + for(int i = 0; i < msg->num_cll; i++) + codes_local_latency_reverse(lp); + + if(num_qos_levels > 1) + vcg = get_vcg_from_category(msg); + + s->vc_occupancy[vcg] += s->params->chunk_size; if (bf->c1) { s->in_send_loop = 0; } - return; } /* update the compute node-router channel buffer */ @@ -2298,6 +2872,7 @@ static void terminal_buf_update(terminal_state *s, tw_bf *bf, terminal_plus_mess bf->c2 = 0; bf->c3 = 0; + msg->num_cll++; tw_stime ts = codes_local_latency(lp); s->vc_occupancy[0] -= s->params->chunk_size; @@ -2593,9 +3168,10 @@ static int dfp_score_connection(router_state *s, tw_bf *bf, terminal_plus_messag return score; } -static vector< Connection > dfp_select_two_connections(router_state *s, tw_bf *bf, terminal_plus_message *msg, tw_lp *lp, vector< Connection > conns) +static vector< Connection > dfp_select_two_connections(router_state *s, tw_bf *bf, terminal_plus_message *msg, tw_lp *lp, vector< Connection > conns, short* rng_counter) { if(conns.size() < 2) { + (*rng_counter)+=2; tw_rand_integer(lp->rng,0,2); //ensure this function always uses two rngs tw_rand_integer(lp->rng,0,2); if(conns.size() == 1) @@ -2608,6 +3184,7 @@ static vector< Connection > dfp_select_two_connections(router_state *s, tw_bf *b int num_conns = conns.size(); + (*rng_counter)+=2; rand_sel_1 = tw_rand_integer(lp->rng, 0, num_conns-1); rand_sel_2_offset = tw_rand_integer(lp->rng, 0, num_conns-1); //number of indices to count up from the previous selected one. Avoids selecting same one twice int rand_sel_2 = (rand_sel_1 + rand_sel_2_offset) % num_conns; @@ -2621,6 +3198,7 @@ static vector< Connection > dfp_select_two_connections(router_state *s, tw_bf *b static Connection get_absolute_best_connection_from_conns(router_state *s, tw_bf *bf, terminal_plus_message *msg, tw_lp *lp, vector conns) { + msg->num_rngs+=2; tw_rand_integer(lp->rng,0,1); tw_rand_integer(lp->rng,0,1); @@ -2673,6 +3251,7 @@ static Connection get_absolute_best_connection_from_conns(router_state *s, tw_bf static Connection get_best_connection_from_conns(router_state *s, tw_bf *bf, terminal_plus_message *msg, tw_lp *lp, vector conns) { if (conns.size() == 0) { + msg->num_rngs+=2; tw_rand_integer(lp->rng, 0, 2); tw_rand_integer(lp->rng, 0, 2); Connection bad_conn; @@ -2681,12 +3260,13 @@ static Connection get_best_connection_from_conns(router_state *s, tw_bf *bf, ter return bad_conn; } if (conns.size() < 2) { + msg->num_rngs+=2; tw_rand_integer(lp->rng, 0, 2); tw_rand_integer(lp->rng, 0, 2); return conns[0]; } int num_to_compare = 2; //TODO make this a configurable - vector< Connection > selected_conns = dfp_select_two_connections(s, bf, msg, lp, conns); + vector< Connection > selected_conns = dfp_select_two_connections(s, bf, msg, lp, conns, &(msg->num_rngs)); int scores[num_to_compare]; int best_score_index = 0; @@ -3023,6 +3603,7 @@ static Connection do_dfp_routing(router_state *s, ConnectionType conn_type = poss_next_stops[0].conn_type; Connection best_min_conn; if (conn_type == CONN_GLOBAL) { + msg->num_rngs++; int rand_sel = tw_rand_integer(lp->rng, 0, poss_next_stops.size() -1); return poss_next_stops[rand_sel]; } @@ -3084,27 +3665,15 @@ static Connection do_dfp_routing(router_state *s, static void do_dfp_routing_rc(router_state *s, tw_bf *bf, terminal_plus_message *msg, tw_lp *lp, int fdest_router_id) { + for(int i = 0; i < msg->num_cll; i++) { + codes_local_latency_reverse(lp); + } + + for(int i = 0; i < msg->num_rngs; i++) { + tw_rand_reverse_unif(lp->rng); + } int my_group_id = s->router_id / s->params->num_routers; int fdest_group_id = fdest_router_id / s->params->num_routers; - - - if (my_group_id == fdest_group_id) { - tw_rand_reverse_unif(lp->rng); - tw_rand_reverse_unif(lp->rng); - } - else if (isRoutingAdaptive(routing)) { - tw_rand_reverse_unif(lp->rng); - tw_rand_reverse_unif(lp->rng); - tw_rand_reverse_unif(lp->rng); - tw_rand_reverse_unif(lp->rng); - } - else if (isRoutingMinimal(routing)) { - tw_rand_reverse_unif(lp->rng); - } - else { - tw_rand_reverse_unif(lp->rng); - tw_rand_reverse_unif(lp->rng); - } } static void router_verify_valid_receipt(router_state *s, tw_bf *bf, terminal_plus_message *msg, tw_lp *lp) @@ -3165,33 +3734,116 @@ static void router_verify_valid_receipt(router_state *s, tw_bf *bf, terminal_plu assert(has_valid_connection); } +static int get_next_router_vcg(router_state * s, tw_bf * bf, terminal_plus_message * msg, tw_lp * lp) +{ + int num_qos_levels = s->params->num_qos_levels; + + int vcs_per_qos = s->params->num_vcs / num_qos_levels; + int output_port = msg->vc_index; + int vcg = 0; + int base_limit = 0; + + int chunk_size = s->params->chunk_size; + int bw_consumption[num_qos_levels]; + /* First make sure the bandwidth consumptions are up to date. */ + if(BW_MONITOR == 1) + { + for(int k = 0; k < num_qos_levels; k++) + { + if(s->qos_status[output_port][k] != Q_OVERBW) + { + bw_consumption[k] = get_rtr_bandwidth_consumption(s, k, output_port); + if(bw_consumption[k] > s->params->qos_bandwidths[k]) + { +// printf("\n Router %d QoS %d exceeded allowed bandwidth %d ", s->router_id, k, bw_consumption[k]); + if(k == 0) + msg->qos_reset1 = 1; + else if(k == 1) + msg->qos_reset2 = 1; + + s->qos_status[output_port][k] = Q_OVERBW; + } + } + } + int vc_size = s->params->global_vc_size; + if(output_port < s->params->intra_grp_radix) + vc_size = s->params->local_vc_size; + + /* TODO: If none of the vcg is exceeding bandwidth limit then select high + * priority traffic first. */ + for(int i = 0; i < num_qos_levels; i++) + { + if(s->qos_status[output_port][i] == Q_ACTIVE) + { + int base_limit = i * vcs_per_qos; + for(int k = base_limit; k < base_limit + vcs_per_qos; k ++) + { + if(s->pending_msgs[output_port][k] != NULL) + return k; + } + } + } + } + + /* All vcgs are exceeding their bandwidth limits*/ + msg->last_saved_qos = s->last_qos_lvl[output_port]; + int next_rr_vcg = (s->last_qos_lvl[output_port] + 1) % num_qos_levels; + + for(int i = 0; i < num_qos_levels; i++) + { + base_limit = next_rr_vcg * vcs_per_qos; + for(int k = base_limit; k < base_limit + vcs_per_qos; k++) + { + if(s->pending_msgs[output_port][k] != NULL) + { + if(msg->last_saved_qos < 0) + msg->last_saved_qos = s->last_qos_lvl[output_port]; + + s->last_qos_lvl[output_port] = next_rr_vcg; + return k; + } + } + next_rr_vcg = (next_rr_vcg + 1) % num_qos_levels; + assert(next_rr_vcg < 2); + } + return -1; +} static void router_packet_receive_rc(router_state *s, tw_bf *bf, terminal_plus_message *msg, tw_lp *lp) { router_rev_ecount++; router_ecount--; + msg->num_cll = 0; + msg->num_rngs = 0; + int output_port = msg->saved_vc; int output_chan = msg->saved_channel; + + for(int i = 0 ; i < msg->num_cll; i++) + codes_local_latency_reverse(lp); + + for(int i = 0; i < msg->num_rngs; i++) + tw_rand_reverse_unif(lp->rng); + + if(bf->c1) + s->is_monitoring_bw = 0; //do dfp routing reverse int dest_router_id = dragonfly_plus_get_assigned_router_id(msg->dfp_dest_terminal_id, s->params); do_dfp_routing_rc(s, bf, msg, lp, dest_router_id); - // tw_rand_reverse_unif(lp->rng); nm may14-2018, i think this was wrongly still here if (bf->c2) { - tw_rand_reverse_unif(lp->rng); terminal_plus_message_list *tail = return_tail(s->pending_msgs[output_port], s->pending_msgs_tail[output_port], output_chan); delete_terminal_plus_message_list(tail); s->vc_occupancy[output_port][output_chan] -= s->params->chunk_size; if (bf->c3) { - codes_local_latency_reverse(lp); s->in_send_loop[output_port] = 0; } } if (bf->c4) { if (bf->c22) { - s->last_buf_full[output_port][output_chan] = msg->saved_busy_time; + s->last_buf_full[output_port] = msg->saved_busy_time; } delete_terminal_plus_message_list( return_tail(s->queued_msgs[output_port], s->queued_msgs_tail[output_port], output_chan)); @@ -3204,10 +3856,38 @@ static void router_packet_receive_rc(router_state *s, tw_bf *bf, terminal_plus_m /* Packet arrives at the router and a credit is sent back to the sending terminal/router */ static void router_packet_receive(router_state *s, tw_bf *bf, terminal_plus_message *msg, tw_lp *lp) { + + msg->num_cll = 0; + msg->num_rngs = 0; + router_verify_valid_receipt(s, bf, msg, lp); router_ecount++; tw_stime ts; + + int num_qos_levels = s->params->num_qos_levels; + int vcs_per_qos = s->params->num_vcs / num_qos_levels; + + if(num_qos_levels > 1) + { + if(s->is_monitoring_bw == 0) + { + bf->c1 = 1; + msg->num_cll++; + tw_stime bw_ts = bw_reset_window + codes_local_latency(lp); + terminal_plus_message * m; + tw_event * e = model_net_method_event_new(lp->gid, bw_ts, lp, + DRAGONFLY_CUSTOM_ROUTER, (void**)&m, NULL); + m->type = R_BANDWIDTH; + m->magic = router_magic_num; + tw_event_send(e); + s->is_monitoring_bw = 1; + } + } + int vcg = 0; + if(num_qos_levels > 1) + vcg = get_vcg_from_category(msg); + int next_stop = -1, output_port = -1, output_chan = -1, adap_chan = -1; int dfp_dest_terminal_id = msg->dfp_dest_terminal_id; @@ -3261,6 +3941,9 @@ static void router_packet_receive(router_state *s, tw_bf *bf, terminal_plus_mess } else tw_error(TW_LOC, "upward channel flag has invalid value\n"); + + assert(output_chan < vcs_per_qos); + output_chan = output_chan + (vcg * vcs_per_qos); ConnectionType port_type = s->connMan->get_port_type(output_port); int max_vc_size = 0; @@ -3306,13 +3989,14 @@ static void router_packet_receive(router_state *s, tw_bf *bf, terminal_plus_mess if (s->vc_occupancy[output_port][output_chan] + s->params->chunk_size <= max_vc_size) { bf->c2 = 1; - router_credit_send(s, msg, lp, -1); + router_credit_send(s, msg, lp, -1, &(msg->num_rngs)); append_to_terminal_plus_message_list(s->pending_msgs[output_port], s->pending_msgs_tail[output_port], output_chan, cur_chunk); s->vc_occupancy[output_port][output_chan] += s->params->chunk_size; if (s->in_send_loop[output_port] == 0) { bf->c3 = 1; terminal_plus_message *m; + msg->num_cll++; ts = codes_local_latency(lp); tw_event *e = model_net_method_event_new(lp->gid, ts, lp, DRAGONFLY_PLUS_ROUTER, (void **) &m, NULL); @@ -3332,16 +4016,6 @@ static void router_packet_receive(router_state *s, tw_bf *bf, terminal_plus_mess output_chan, cur_chunk); s->queued_count[output_port] += s->params->chunk_size; - /* a check for pending msgs is non-empty then we dont set anything. If - * that is empty then we check if last_buf_full is set or not. If already - * set then we don't overwrite it. If two packets arrive next to each other - * then the first person should be setting it. */ - if (s->pending_msgs[output_port][output_chan] == NULL && - s->last_buf_full[output_port][output_chan] == 0.0) { - bf->c22 = 1; - msg->saved_busy_time = s->last_buf_full[output_port][output_chan]; - s->last_buf_full[output_port][output_chan] = tw_now(lp); - } } msg->saved_vc = output_port; @@ -3353,15 +4027,39 @@ static void router_packet_send_rc(router_state *s, tw_bf *bf, terminal_plus_mess { router_ecount--; router_rev_ecount++; - + + int num_qos_levels = s->params->num_qos_levels; int output_port = msg->saved_vc; - int output_chan = msg->saved_channel; - if (bf->c1) { + + if(msg->qos_reset1) + s->qos_status[output_port][0] = Q_ACTIVE; + if(msg->qos_reset2) + s->qos_status[output_port][1] = Q_ACTIVE; + + if(msg->last_saved_qos) + s->last_qos_lvl[output_port] = msg->last_saved_qos; + + if(bf->c1) { s->in_send_loop[output_port] = 1; - return; + if(bf->c2) { + s->last_buf_full[output_port] = msg->saved_busy_time; + } + return; } + + for(int i = 0; i < msg->num_rngs; i++) + tw_rand_reverse_unif(lp->rng); - tw_rand_reverse_unif(lp->rng); + for(int i = 0; i < msg->num_cll; i++) + codes_local_latency_reverse(lp); + + int output_chan = msg->saved_channel; + if(bf->c8) + { + s->busy_time[output_port] = msg->saved_rcv_time; + s->busy_time_sample[output_port] = msg->saved_sample_time; + s->last_buf_full[output_port] = msg->saved_busy_time; + } terminal_plus_message_list *cur_entry = (terminal_plus_message_list *) rc_stack_pop(s->st); assert(cur_entry); @@ -3379,10 +4077,6 @@ static void router_packet_send_rc(router_state *s, tw_bf *bf, terminal_plus_mess prepend_to_terminal_plus_message_list(s->pending_msgs[output_port], s->pending_msgs_tail[output_port], output_chan, cur_entry); - if (bf->c3) { - tw_rand_reverse_unif(lp->rng); - } - if (bf->c4) { s->in_send_loop[output_port] = 1; } @@ -3401,24 +4095,48 @@ static void router_packet_send(router_state *s, tw_bf *bf, terminal_plus_message terminal_plus_message_list *cur_entry = NULL; - int output_chan = s->params->num_vcs - 1; - for (int k = s->params->num_vcs - 1; k >= 0; k--) { - cur_entry = s->pending_msgs[output_port][k]; - if (cur_entry != NULL) { - output_chan = k; - break; - } - } + /* reset qos rc handler before incrementing it */ + msg->last_saved_qos = -1; + msg->qos_reset1 = -1; + msg->qos_reset2 = -1; + msg->num_cll = 0; + msg->num_rngs = 0; + + int num_qos_levels = s->params->num_qos_levels; + int output_chan = get_next_router_vcg(s, bf, msg, lp); msg->saved_vc = output_port; msg->saved_channel = output_chan; - - if (cur_entry == NULL) { - bf->c1 = 1; - s->in_send_loop[output_port] = 0; - return; + + if(output_chan < 0) { + bf->c1 = 1; + s->in_send_loop[output_port] = 0; + if(s->queued_count[output_port] && !s->last_buf_full[output_port]) + { + bf->c2 = 1; + msg->saved_busy_time = s->last_buf_full[output_port]; + s->last_buf_full[output_port] = tw_now(lp); + } + return; + } + + cur_entry = s->pending_msgs[output_port][output_chan]; + + assert(cur_entry != NULL); + + if(s->last_buf_full[output_port]) + { + bf->c8 = 1; + msg->saved_rcv_time = s->busy_time[output_port]; + msg->saved_busy_time = s->last_buf_full[output_port]; + msg->saved_sample_time = s->busy_time_sample[output_port]; + s->busy_time[output_port] += (tw_now(lp) - s->last_buf_full[output_port]); + s->busy_time_sample[output_port] += (tw_now(lp) - s->last_buf_full[output_port]); + s->last_buf_full[output_port] = 0.0; } + int vcg = get_vcg_from_category(&(cur_entry->msg)); + int to_terminal = 1, global = 0; double delay = s->params->cn_delay; double bandwidth = s->params->cn_bandwidth; @@ -3447,6 +4165,7 @@ static void router_packet_send(router_state *s, tw_bf *bf, terminal_plus_message if ((cur_entry->msg.packet_size < s->params->chunk_size) && (cur_entry->msg.chunk_id == num_chunks - 1)) bytetime = bytes_to_ns(cur_entry->msg.packet_size % s->params->chunk_size, bandwidth); + msg->num_rngs++; ts = g_tw_lookahead + tw_rand_unif(lp->rng) + bytetime + s->params->router_delay; msg->saved_available_time = s->next_output_available_time[output_port]; @@ -3480,10 +4199,12 @@ static void router_packet_send(router_state *s, tw_bf *bf, terminal_plus_message m->intm_lp_id = lp->gid; m->magic = router_magic_num; + int msg_size = s->params->chunk_size; if ((cur_entry->msg.packet_size % s->params->chunk_size) && (cur_entry->msg.chunk_id == num_chunks - 1)) { bf->c11 = 1; s->link_traffic[output_port] += (cur_entry->msg.packet_size % s->params->chunk_size); s->link_traffic_sample[output_port] += (cur_entry->msg.packet_size % s->params->chunk_size); + msg_size = cur_entry->msg.packet_size % s->params->chunk_size; } else { bf->c12 = 1; @@ -3505,33 +4226,31 @@ static void router_packet_send(router_state *s, tw_bf *bf, terminal_plus_message } tw_event_send(e); + s->qos_data[output_port][vcg] += msg_size; cur_entry = return_head(s->pending_msgs[output_port], s->pending_msgs_tail[output_port], output_chan); rc_stack_push(lp, cur_entry, delete_terminal_plus_message_list, s->st); - s->next_output_available_time[output_port] -= s->params->router_delay; - ts -= s->params->router_delay; - - cur_entry = NULL; + + int next_output_chan = get_next_router_vcg(s, bf, msg, lp); - for (int k = s->params->num_vcs - 1; k >= 0; k--) { - cur_entry = s->pending_msgs[output_port][k]; - if (cur_entry != NULL) - break; - } - if (cur_entry != NULL) { - bf->c3 = 1; - terminal_plus_message *m_new; - ts += g_tw_lookahead + tw_rand_unif(lp->rng); - e = model_net_method_event_new(lp->gid, ts, lp, DRAGONFLY_PLUS_ROUTER, (void **) &m_new, NULL); - m_new->type = R_SEND; - m_new->magic = router_magic_num; - m_new->vc_index = output_port; - tw_event_send(e); - } - else { - bf->c4 = 1; - s->in_send_loop[output_port] = 0; - } + if(next_output_chan < 0) + { + bf->c4 = 1; + s->in_send_loop[output_port] = 0; + return; + } + cur_entry = s->pending_msgs[output_port][next_output_chan]; + assert(cur_entry != NULL); + + terminal_plus_message *m_new; + msg->num_rngs++; + ts += g_tw_lookahead + tw_rand_unif(lp->rng); + e = model_net_method_event_new(lp->gid, ts, lp, DRAGONFLY_PLUS_ROUTER, (void **) &m_new, NULL); + m_new->type = R_SEND; + m_new->magic = router_magic_num; + m_new->vc_index = output_port; + tw_event_send(e); + return; } @@ -3540,22 +4259,27 @@ static void router_buf_update_rc(router_state *s, tw_bf *bf, terminal_plus_messa int indx = msg->vc_index; int output_chan = msg->output_chan; s->vc_occupancy[indx][output_chan] += s->params->chunk_size; + + for(int i = 0; i < msg->num_rngs; i++) + tw_rand_reverse_unif(lp->rng); + + for(int i = 0; i < msg->num_cll; i++) + codes_local_latency_reverse(lp); + if (bf->c3) { s->busy_time[indx] = msg->saved_rcv_time; s->busy_time_sample[indx] = msg->saved_sample_time; - s->last_buf_full[indx][output_chan] = msg->saved_busy_time; + s->last_buf_full[indx] = msg->saved_busy_time; } if (bf->c1) { terminal_plus_message_list *head = return_tail(s->pending_msgs[indx], s->pending_msgs_tail[indx], output_chan); - tw_rand_reverse_unif(lp->rng); prepend_to_terminal_plus_message_list(s->queued_msgs[indx], s->queued_msgs_tail[indx], output_chan, head); s->vc_occupancy[indx][output_chan] -= s->params->chunk_size; s->queued_count[indx] += s->params->chunk_size; } if (bf->c2) { - codes_local_latency_reverse(lp); s->in_send_loop[indx] = 0; } } @@ -3566,20 +4290,20 @@ static void router_buf_update(router_state *s, tw_bf *bf, terminal_plus_message int output_chan = msg->output_chan; s->vc_occupancy[indx][output_chan] -= s->params->chunk_size; - if (s->last_buf_full[indx][output_chan] > 0.0) { + if (s->last_buf_full[indx] > 0.0) { bf->c3 = 1; msg->saved_rcv_time = s->busy_time[indx]; - msg->saved_busy_time = s->last_buf_full[indx][output_chan]; + msg->saved_busy_time = s->last_buf_full[indx]; msg->saved_sample_time = s->busy_time_sample[indx]; - s->busy_time[indx] += (tw_now(lp) - s->last_buf_full[indx][output_chan]); - s->busy_time_sample[indx] += (tw_now(lp) - s->last_buf_full[indx][output_chan]); - s->last_buf_full[indx][output_chan] = 0.0; + s->busy_time[indx] += (tw_now(lp) - s->last_buf_full[indx]); + s->busy_time_sample[indx] += (tw_now(lp) - s->last_buf_full[indx]); + s->last_buf_full[indx] = 0.0; } if (s->queued_msgs[indx][output_chan] != NULL) { bf->c1 = 1; terminal_plus_message_list *head = return_head(s->queued_msgs[indx], s->queued_msgs_tail[indx], output_chan); - router_credit_send(s, &head->msg, lp, 1); + router_credit_send(s, &head->msg, lp, 1, &(msg->num_rngs)); append_to_terminal_plus_message_list(s->pending_msgs[indx], s->pending_msgs_tail[indx], output_chan, head); s->vc_occupancy[indx][output_chan] += s->params->chunk_size; @@ -3588,6 +4312,7 @@ static void router_buf_update(router_state *s, tw_bf *bf, terminal_plus_message if (s->in_send_loop[indx] == 0 && s->pending_msgs[indx][output_chan] != NULL) { bf->c2 = 1; terminal_plus_message *m; + msg->num_cll++; tw_stime ts = codes_local_latency(lp); tw_event *e = model_net_method_event_new(lp->gid, ts, lp, DRAGONFLY_PLUS_ROUTER, (void **) &m, NULL); m->type = R_SEND; @@ -3682,7 +4407,7 @@ tw_lptype dragonfly_plus_lps[] = { (pre_run_f) NULL, (event_f) terminal_plus_event, (revent_f) terminal_plus_rc_event_handler, - (commit_f) NULL, + (commit_f) reset_bw_counters, (final_f) dragonfly_plus_terminal_final, (map_f) codes_mapping, sizeof(terminal_state), @@ -3692,7 +4417,7 @@ tw_lptype dragonfly_plus_lps[] = { (pre_run_f) NULL, (event_f) router_plus_event, (revent_f) router_plus_rc_event_handler, - (commit_f) NULL, + (commit_f) reset_rtr_bw_counters, (final_f) dragonfly_plus_router_final, (map_f) codes_mapping, sizeof(router_state), diff --git a/src/workload/methods/codes-online-comm-wrkld.C b/src/workload/methods/codes-online-comm-wrkld.C index 40d9763..bbf15bf 100644 --- a/src/workload/methods/codes-online-comm-wrkld.C +++ b/src/workload/methods/codes-online-comm-wrkld.C @@ -827,7 +827,6 @@ static int comm_online_workload_load(const char * params, int app_id, int rank) else tw_error(TW_LOC, "\n Undefined workload type %s ", o_params->workload_name); - printf("\n file path %s ", path.c_str()); try { std::ifstream jsonFile(path.c_str()); boost::property_tree::json_parser::read_json(jsonFile, root); -- 2.26.2