Commit 17989778 authored by Misbah Mubarak's avatar Misbah Mubarak

adding time-stepped series data for mpi-replay and network models

parent bcff32e6
......@@ -4,7 +4,7 @@ LPGROUPS
{
repetitions="168";
# name of this lp changes according to the model
nw-lp="6";
nw-lp="12";
# these lp names will be the same for dragonfly-custom model
modelnet_dragonfly_plus="6";
modelnet_dragonfly_plus_router="1";
......@@ -35,11 +35,15 @@ PARAMS
# buffer size in bytes for compute node virtual channels
cn_vc_size="32768";
# bandwidth in GiB/s for local channels
local_bandwidth="25.0";
local_bandwidth="2.0";
# number of qos levels
num_qos_levels="2";
# bandwidth of each qos level
qos_bandwidth="10,90";
# bandwidth in GiB/s for global channels
global_bandwidth="25.0";
global_bandwidth="2.0";
# bandwidth in GiB/s for compute node-router channels
cn_bandwidth="25.0";
cn_bandwidth="2.0";
# ROSS message size
message_size="656";
# number of compute nodes connected to router, dictated by dragonfly config file
......@@ -51,7 +55,7 @@ PARAMS
# network config file for inter-group connections
inter-group-connections="../src/network-workloads/conf/dragonfly-plus/dfp_1k_inter";
# routing protocol to be used - 'minimal', 'non-minimal-spine', 'non-minimal-leaf', 'prog-adaptive'
routing="prog-adaptive";
routing="minimal";
# route scoring protocol to be used - options are 'alpha', 'beta', or 'delta' - 'gamma' has been deprecated
route_scoring_metric="delta";
# minimal route threshold before considering non-minimal paths
......
......@@ -107,6 +107,7 @@ static int num_syn_clients;
static int syn_type = 0;
FILE * workload_log = NULL;
FILE * data_log = NULL;
FILE * msg_size_log = NULL;
FILE * workload_agg_log = NULL;
FILE * workload_meta_log = NULL;
......@@ -125,8 +126,8 @@ double avg_time = 0, avg_comm_time = 0, avg_wait_time = 0, avg_send_time = 0, av
/* runtime option for disabling computation time simulation */
static int disable_delay = 0;
static int enable_sampling = 0;
static double sampling_interval = 5000000;
static int enable_sampling = 1;
static double sampling_interval = 1000000;
static double sampling_end_time = 3000000000;
static int enable_debug = 0;
......@@ -277,6 +278,9 @@ struct nw_state
/* Pending wait operation */
struct pending_waits * wait_op;
/* data sent per rank */
double * data_sent_per_rank;
/* Message size latency information */
struct qhash_table * msg_sz_table;
struct qlist_head msg_sz_list;
......@@ -296,7 +300,7 @@ struct nw_state
/* For sampling data */
int sampling_indx;
int max_arr_size;
struct mpi_workload_sample * mpi_wkld_samples;
struct mpi_workload_sample* mpi_wkld_samples;
char output_buf[512];
char col_stats[64];
};
......@@ -1580,6 +1584,10 @@ static void codes_exec_mpi_send(nw_state* s,
else
tw_error(TW_LOC, "\n Invalid priority type %d", priority_type);
struct codes_jobmap_id jid;
jid = codes_jobmap_to_local_id(s->nw_id, jobmap_ctx);
int num_ranks = codes_jobmap_get_num_ranks(jid.job, jobmap_ctx);
int is_eager = 0;
/* model-net event */
int global_dest_rank = mpi_op->u.send.dest_rank;
......@@ -1604,6 +1612,13 @@ static void codes_exec_mpi_send(nw_state* s,
s->mpi_wkld_samples[indx].nw_id = s->nw_id;
s->mpi_wkld_samples[indx].app_id = s->app_id;
s->mpi_wkld_samples[indx].sample_end_time = s->cur_interval_end;
for(int i = 0; i < num_ranks; i++)
{
if(s->data_sent_per_rank[i] > 0)
fprintf(data_log, "\n %lf %d %d %d %lf", s->cur_interval_end, s->app_id, s->local_rank, i, s->data_sent_per_rank[i]);
s->data_sent_per_rank[i] = 0;
}
s->sampling_indx++;
s->cur_interval_end += sampling_interval;
}
......@@ -1618,6 +1633,7 @@ static void codes_exec_mpi_send(nw_state* s,
int indx = s->sampling_indx;
s->mpi_wkld_samples[indx].num_sends_sample++;
s->mpi_wkld_samples[indx].num_bytes_sample += mpi_op->u.send.num_bytes;
s->data_sent_per_rank[mpi_op->u.send.dest_rank] += mpi_op->u.send.num_bytes;
}
nw_message local_m;
nw_message remote_m;
......@@ -2056,11 +2072,11 @@ void nw_test_init(nw_state* s, tw_lp* lp)
params = (char*)&oc_params;
strcpy(type_name, "online_comm_workload");
}
s->data_sent_per_rank = (double*)calloc(num_traces_of_job[lid.job], sizeof(double));
s->app_id = lid.job;
s->local_rank = lid.rank;
double overhead;
int rc = configuration_get_value_double(&config, "PARAMS", "self_msg_overhead", NULL, &overhead);
......@@ -2126,11 +2142,11 @@ void nw_test_init(nw_state* s, tw_lp* lp)
{
s->max_arr_size = MAX_STATS;
s->cur_interval_end = sampling_interval;
if(!g_tw_mynode && !s->nw_id)
/*if(!g_tw_mynode && !s->nw_id)
{
fprintf(workload_meta_log, "\n mpi_proc_id app_id num_waits "
" num_sends num_bytes_sent sample_end_time");
}
}*/
}
return;
}
......@@ -2581,8 +2597,8 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
}
if(enable_sampling)
{
fseek(workload_agg_log, sample_bytes_written, SEEK_SET);
fwrite(s->mpi_wkld_samples, sizeof(struct mpi_workload_sample), s->sampling_indx + 1, workload_agg_log);
//fseek(workload_agg_log, sample_bytes_written, SEEK_SET);
//fwrite(s->mpi_wkld_samples, sizeof(struct mpi_workload_sample), s->sampling_indx + 1, workload_agg_log);
}
sample_bytes_written += (s->sampling_indx * sizeof(struct mpi_workload_sample));
if(s->wait_time > max_wait_time)
......@@ -2596,6 +2612,9 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
written = 0;
if(data_log != NULL)
fclose(data_log);
if(debug_cols)
written += sprintf(s->col_stats + written, "%llu \t %lf \n", LLU(s->nw_id), ns_to_s(s->all_reduce_time / s->num_all_reduce));
......@@ -2904,6 +2923,16 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
net_id = *net_ids;
free(net_ids);
if(enable_sampling)
{
data_log = fopen("data-exchange-log", "w+");
if(!data_log)
{
printf("\n Error! unable to open data log");
MPI_Finalize();
return -1;
}
}
if(enable_debug)
{
workload_log = fopen("mpi-op-logs", "w+");
......@@ -2949,8 +2978,8 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
group_ratio = codes_mctx_set_group_ratio(NULL, true);
if(enable_sampling)
model_net_enable_sampling(sampling_interval, sampling_end_time);
// if(enable_sampling)
// model_net_enable_sampling(sampling_interval, sampling_end_time);
codes_mapping_setup();
......
......@@ -25,7 +25,7 @@
#include <cortex/topology.h>
#endif
#define DUMP_CONNECTIONS 0
#define DUMP_CONNECTIONS 1
#define PRINT_CONFIG 1
#define CREDIT_SIZE 8
#define DFLY_HASH_TABLE_SIZE 4999
......@@ -55,13 +55,13 @@ static int BIAS_MIN = 1;
static int DF_DALLY = 0;
static int adaptive_threshold = 1024;
static tw_stime max_qos_monitor = 5000000000;
static tw_stime max_qos_monitor = 3000000000;
static long num_local_packets_sr = 0;
static long num_local_packets_sg = 0;
static long num_remote_packets = 0;
/* time in nanosecs */
static int bw_reset_window = 5000000;
static int bw_reset_window = 1000000;
#define indexer3d(_ptr, _x, _y, _z, _maxx, _maxy, _maxz) \
((_ptr) + _z * (_maxx * _maxz) + _y * (_maxx) + _x)
......@@ -135,7 +135,7 @@ static int num_intra_nonmin_hops = 4;
static int num_intra_min_hops = 2;
static FILE * dragonfly_rtr_bw_log = NULL;
//static FILE * dragonfly_term_bw_log = NULL;
static FILE * dragonfly_term_bw_log = NULL;
static int sample_bytes_written = 0;
static int sample_rtr_bytes_written = 0;
......@@ -1142,6 +1142,12 @@ void issue_bw_monitor_event(terminal_state * s, tw_bf * bf, terminal_custom_mess
int num_qos_levels = s->params->num_qos_levels;
int rc_index = s->rc_index;
int num_term_rc_wins = s->num_term_rc_windows;
if(s->qos_data[0] > 0)
fprintf(dragonfly_term_bw_log, "\n %lf %d %d %lf ", tw_now(lp), s->terminal_id, s->qos_data[0], s->busy_time_sample);
s->busy_time_sample = 0;
/* dynamically reallocate array if index has reached max-size */
if(s->rc_index >= s->num_term_rc_windows)
......@@ -1177,12 +1183,6 @@ void issue_bw_monitor_event(terminal_state * s, tw_bf * bf, terminal_custom_mess
s->rc_index++;
assert(s->rc_index < s->num_term_rc_windows);
/* if(s->router_id == 0)
{
fprintf(dragonfly_term_bw_log, "\n %d %lf %lf ", s->terminal_id, tw_now(lp), s->busy_time_sample);
s->busy_time_sample = 0;
}
*/
if(tw_now(lp) > max_qos_monitor)
return;
......@@ -1267,10 +1267,15 @@ void issue_rtr_bw_monitor_event(router_state * s, tw_bf * bf, terminal_custom_me
for(int k = 0; k < num_qos_levels; k++)
{
int bw_consumed = get_rtr_bandwidth_consumption(s, k, j);
if(s->router_id == 0)
/* if(s->router_id == 0)
{
fprintf(dragonfly_rtr_bw_log, "\n %d %f %d %d %d %d %d %f", s->router_id, tw_now(lp), j, k, bw_consumed, s->qos_status[j][k], s->qos_data[j][k], s->busy_time_sample[j]);
}*/
if(s->link_traffic_sample[j] > 0)
{
fprintf(dragonfly_rtr_bw_log, "\n %f %d %d %llu %lf ", tw_now(lp), s->router_id, j, s->link_traffic_sample[j], s->busy_time_sample[j]);
//printf("\n %f %d %d %llu %lf ", tw_now(lp), s->router_id, j, s->link_traffic_sample[j], s->busy_time_sample[j]);
}
}
}
......@@ -1282,7 +1287,8 @@ void issue_rtr_bw_monitor_event(router_state * s, tw_bf * bf, terminal_custom_me
s->qos_status[j][k] = Q_ACTIVE;
s->qos_data[j][k] = 0;
}
//s->busy_time_sample[j] = 0;
s->busy_time_sample[j] = 0;
s->link_traffic_sample[j] = 0;
}
if(tw_now(lp) > max_qos_monitor)
......@@ -1428,13 +1434,12 @@ terminal_custom_init( terminal_state * s,
s->in_send_loop = 0;
s->issueIdle = 0;
/*if(s->terminal_id == 0)
if(s->terminal_id == 0)
{
char term_bw_log[64];
sprintf(term_bw_log, "terminal-bw-tracker");
dragonfly_term_bw_log = fopen(term_bw_log, "w");
fprintf(dragonfly_term_bw_log, "\n term-id time-stamp port-id busy-time");
}*/
dragonfly_term_bw_log = fopen("terminal_bw_tracker", "w+");
if(dragonfly_term_bw_log == NULL)
tw_error(TW_LOC, "\n Unable to open file");
}
return;
}
......@@ -1767,7 +1772,7 @@ static void packet_generate(terminal_state * s, tw_bf * bf, terminal_custom_mess
packet_gen++;
int num_qos_levels = s->params->num_qos_levels;
if(num_qos_levels > 1)
// if(num_qos_levels > 1)
{
tw_lpid router_id;
codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, NULL,
......@@ -3112,7 +3117,7 @@ dragonfly_custom_terminal_final( terminal_state * s,
if(s->terminal_id == 0)
{
//fclose(dragonfly_term_bw_log);
fclose(dragonfly_term_bw_log);
char meta_filename[128];
sprintf(meta_filename, "dragonfly-cn-stats.meta");
......@@ -4003,7 +4008,7 @@ router_packet_receive( router_state * s,
int num_qos_levels = s->params->num_qos_levels;
int vcs_per_qos = s->params->num_vcs / num_qos_levels;
if(num_qos_levels > 1)
// if(num_qos_levels > 1)
{
if(s->is_monitoring_bw == 0)
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment