Commit 4edd57c3 authored by Misbah Mubarak's avatar Misbah Mubarak

(i) Adding sampling for dragonfly compute node LPs (ii) Adding file I/O reader...

(i) Adding sampling for dragonfly compute node LPs (ii) Adding file I/O reader that translates the binary format to text
parent f0c850cc
......@@ -65,19 +65,18 @@ struct terminal_message
int sender_radix;
int output_chan;
model_net_event_return event_rc;
int is_pull;
uint64_t pull_size;
int is_pull;
uint64_t pull_size;
/* for reverse computation */
int path_type;
tw_stime saved_available_time;
tw_stime saved_avg_time;
tw_stime saved_rcv_time;
tw_stime saved_busy_time;
tw_stime saved_total_time;
tw_stime saved_hist_start_time;
tw_stime saved_sample_time;
tw_stime msg_start_time;
int saved_hist_num;
......@@ -91,8 +90,6 @@ struct terminal_message
/* LP ID of the sending node, has to be a network node in the dragonfly */
tw_lpid sender_node;
tw_lpid next_stop;
struct dfly_qhash_entry * saved_hash;
};
#endif /* end of include guard: DRAGONFLY_H */
......
......@@ -23,6 +23,6 @@ PARAMS
local_bandwidth="5.25";
global_bandwidth="4.7";
cn_bandwidth="5.25";
message_size="552";
message_size="560";
routing="adaptive";
}
......@@ -157,7 +157,7 @@ void nw_test_init(nw_state* s, tw_lp* lp)
params = (char*)&params_d;
}
/* In this case, the LP will not generate any workload related events*/
if(s->nw_id >= params_d.num_net_traces)
if(s->nw_id >= (tw_lpid)params_d.num_net_traces)
{
//printf("\n network LP not generating events %d ", (int)s->nw_id);
return;
......
......@@ -1179,7 +1179,7 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
int written = 0;
if(!s->nw_id)
written = sprintf(s->output_buf, "# Format <LP ID> <Terminal ID> <Total sends> <Total Recvs> <Bytes sent> <Bytes recvd> <Send time> <Comm. time> <Compute time>");
if(s->nw_id < num_net_traces)
if(s->nw_id < (tw_lpid)num_net_traces)
{
int count_irecv = qlist_count(&s->pending_recvs_queue);
int count_isend = qlist_count(&s->arrival_queue);
......@@ -1314,6 +1314,7 @@ int main( int argc, char** argv )
net_id = *net_ids;
free(net_ids);
//model_net_enable_sampling(5000000, g_tw_ts_end);
codes_mapping_setup();
......
......@@ -343,8 +343,6 @@ int main(
tw_opt_add(app_opt);
tw_init(&argc, &argv);
g_tw_ts_end = s_to_ns(60*60*24*365); /* one year, in nsecs */
if(argc < 2)
{
printf("\n Usage: mpirun <args> --sync=2/3 mapping_file_name.conf (optional --nkp) ");
......@@ -367,6 +365,10 @@ int main(
net_id = *net_ids;
free(net_ids);
/* 5 days of simulation time */
g_tw_ts_end = s_to_ns(5 * 24 * 60 * 60);
//model_net_enable_sampling(8000, 10000);
if(net_id != DRAGONFLY)
{
printf("\n The test works with dragonfly model configuration only! ");
......
......@@ -42,10 +42,12 @@
#define PRINT_ROUTER_TABLE 1
#define DEBUG 0
#define USE_DIRECT_SCHEME 1
#define MAX_STATS 65536
#define LP_CONFIG_NM (model_net_lp_config_names[DRAGONFLY])
#define LP_METHOD_NM (model_net_method_names[DRAGONFLY])
int debug_slot_count = 0;
long term_ecount, router_ecount, term_rev_ecount, router_rev_ecount;
long packet_gen = 0, packet_fin = 0;
......@@ -73,6 +75,8 @@ int terminal_magic_num = 0;
FILE * dragonfly_log = NULL;
int sample_bytes_written = 0;
typedef struct terminal_message_list terminal_message_list;
struct terminal_message_list {
terminal_message msg;
......@@ -126,6 +130,17 @@ struct dfly_hash_key
tw_lpid sender_id;
};
struct dfly_sample_stats
{
tw_lpid terminal_id;
long fin_chunks_sample;
long data_size_sample;
double fin_hops_sample;
tw_stime fin_chunks_time;
tw_stime busy_time_sample;
tw_stime end_time;
};
struct dfly_qhash_entry
{
struct dfly_hash_key key;
......@@ -202,16 +217,26 @@ struct terminal_state
long total_msg_size;
double total_hops;
long finished_msgs;
double finished_chunks;
long finished_chunks;
long finished_packets;
tw_stime last_buf_full;
tw_stime busy_time;
char output_buf[4096];
/* For LP suspend functionality */
int error_ct;
/* For sampling */
long fin_chunks_sample;
long data_size_sample;
double fin_hops_sample;
tw_stime fin_chunks_time;
tw_stime busy_time_sample;
char sample_buf[4096];
struct dfly_sample_stats * sample_stat;
int op_arr_size;
int max_arr_size;
};
/* terminal event type (1-4) */
......@@ -319,13 +344,14 @@ static int dragonfly_rank_hash_compare(
static int dragonfly_hash_func(void *k, int table_size)
{
struct dfly_hash_key *tmp = (struct dfly_hash_key *)k;
//uint32_t pc = 0, pb = 0;
//bj_hashlittle2(tmp, sizeof(*tmp), &pc, &pb);
uint64_t key = (~tmp->message_id) + (tmp->message_id << 18);
uint32_t pc = 0, pb = 0;
bj_hashlittle2(tmp, sizeof(*tmp), &pc, &pb);
/*uint64_t key = (~tmp->message_id) + (tmp->message_id << 18);
key = key * 21;
key = ~key ^ (tmp->sender_id >> 4);
key = key * tmp->sender_id;
return (int)(key & (table_size - 1));
return (int)(key & (table_size - 1));*/
return (int)(pc % (table_size - 1));
}
/* convert GiB/s and bytes to ns */
......@@ -561,7 +587,8 @@ static void dragonfly_report_stats()
/* print statistics */
if(!g_tw_mynode)
{
printf(" Average number of hops traversed %f average chunk latency %lf us maximum chunk latency %lf us avg message size %lf bytes finished messages %lld finished chunks %lld \n", (float)avg_hops/total_finished_chunks, avg_time/(total_finished_chunks*1000), max_time/1000, (float)final_msg_sz/total_finished_msgs, total_finished_msgs, total_finished_chunks);
printf(" Average number of hops traversed %f average chunk latency %lf us maximum chunk latency %lf us avg message size %lf bytes finished messages %lld finished chunks %lld \n",
(float)avg_hops/total_finished_chunks, avg_time/(total_finished_chunks*1000), max_time/1000, (float)final_msg_sz/total_finished_msgs, total_finished_msgs, total_finished_chunks);
if(routing == ADAPTIVE || routing == PROG_ADAPTIVE)
printf("\n ADAPTIVE ROUTING STATS: %d chunks routed minimally %d chunks routed non-minimally completed packets %lld \n",
total_minimal_packets, total_nonmin_packets, total_finished_chunks);
......@@ -902,10 +929,10 @@ static void dragonfly_packet_event_rc(tw_lp *sender)
}
/* given two group IDs, find the router of the src_gid that connects to the dest_gid*/
int getRouterFromGroupID(int dest_gid,
tw_lpid getRouterFromGroupID(int dest_gid,
int src_gid,
int num_routers,
int total_groups)
int total_groups)
{
#if USE_DIRECT_SCHEME
int dest = dest_gid;
......@@ -927,7 +954,7 @@ int getRouterFromGroupID(int dest_gid,
offset=(offset - 1) % (half_channel * num_routers);
// If the destination router is in the same group
int router_id;
tw_lpid router_id;
if(index % 2 != 0)
router_id = group_end - (offset / half_channel); // start from the end
......@@ -1142,8 +1169,6 @@ void packet_send_rc(terminal_state * s, tw_bf * bf, terminal_message * msg,
terminal_message_list* cur_entry = rc_stack_pop(s->st);
// create_prepend_to_terminal_message_list(s->terminal_msgs,
// s->terminal_msgs_tail, 0, cur_entry);
prepend_to_terminal_message_list(s->terminal_msgs,
s->terminal_msgs_tail, 0, cur_entry);
if(bf->c3) {
......@@ -1152,11 +1177,11 @@ void packet_send_rc(terminal_state * s, tw_bf * bf, terminal_message * msg,
if(bf->c4) {
s->in_send_loop = 1;
}
if(bf->c5)
/*if(bf->c5)
{
codes_local_latency_reverse(lp);
s->issueIdle = 1;
}
}*/
return;
}
/* sends the packet from the current dragonfly compute node to the attached router */
......@@ -1251,11 +1276,11 @@ void packet_send(terminal_state * s, tw_bf * bf, terminal_message * msg,
bf->c4 = 1;
s->in_send_loop = 0;
}
if(s->issueIdle) {
/* if(s->issueIdle) {
bf->c5 = 1;
s->issueIdle = 0;
model_net_method_idle_event(codes_local_latency(lp), 0, lp);
}
}*/
return;
}
......@@ -1274,10 +1299,13 @@ void packet_arrive_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw
N_finished_chunks--;
s->finished_chunks--;
s->fin_chunks_sample--;
total_hops -= msg->my_N_hop;
s->total_hops -= msg->my_N_hop;
s->fin_hops_sample -= msg->my_N_hop;
dragonfly_total_time = msg->saved_total_time;
s->fin_chunks_time = msg->saved_sample_time;
s->total_time = msg->saved_avg_time;
struct qhash_head * hash_link = NULL;
......@@ -1311,6 +1339,7 @@ void packet_arrive_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw
s->finished_msgs--;
total_msg_sz -= msg->total_size;
s->total_msg_size -= msg->total_size;
s->data_size_sample -= msg->total_size;
struct dfly_qhash_entry * d_entry_pop = rc_stack_pop(s->st);
qhash_add(s->rank_tbl, &key, &(d_entry_pop->hash_link));
......@@ -1331,7 +1360,8 @@ void packet_arrive_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw
assert(hash_link);
qhash_del(hash_link);
free(tmp->remote_event_data);
free(tmp);
free(tmp);
s->rank_tbl_pop--;
}
return;
}
......@@ -1421,8 +1451,12 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg,
bf->c4 = 0;
bf->c7 = 0;
/* Total overall finished chunks in simulation */
N_finished_chunks++;
/* Finished chunks on a LP basis */
s->finished_chunks++;
/* Finished chunks per sample */
s->fin_chunks_sample++;
/* WE do not allow self messages through dragonfly */
assert(lp->gid != msg->src_terminal_id);
......@@ -1449,6 +1483,11 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg,
if(msg->path_type != MINIMAL && msg->path_type != NON_MINIMAL)
printf("\n Wrong message path type %d ", msg->path_type);
/* save the sample time */
msg->saved_sample_time = s->fin_chunks_time;
s->fin_chunks_time += (tw_now(lp) - msg->travel_start_time);
/* save the total time per LP */
msg->saved_avg_time = s->total_time;
s->total_time += (tw_now(lp) - msg->travel_start_time);
......@@ -1456,6 +1495,7 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg,
dragonfly_total_time += tw_now( lp ) - msg->travel_start_time;
total_hops += msg->my_N_hop;
s->total_hops += msg->my_N_hop;
s->fin_hops_sample += msg->my_N_hop;
mn_stats* stat = model_net_find_stats(msg->category, s->dragonfly_stats_array);
msg->saved_rcv_time = stat->recv_time;
......@@ -1533,6 +1573,7 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg,
N_finished_msgs++;
total_msg_sz += msg->total_size;
s->total_msg_size += msg->total_size;
s->data_size_sample += msg->total_size;
s->finished_msgs++;
assert(tmp->remote_event_data && tmp->remote_event_size > 0);
......@@ -1812,6 +1853,135 @@ static void node_collective_fan_out(terminal_state * s,
}
}
void dragonfly_sample_init(terminal_state * s,
tw_lp * lp)
{
(void)lp;
s->fin_chunks_sample = 0;
s->data_size_sample = 0;
s->fin_hops_sample = 0;
s->fin_chunks_time = 0;
s->busy_time_sample = 0;
s->op_arr_size = 0;
s->max_arr_size = MAX_STATS;
s->sample_stat = malloc(MAX_STATS * sizeof(struct dfly_sample_stats));
/*char buf[1024];
int written = 0;
if(!s->terminal_id)
{
written = sprintf(buf, "# Format <LP ID> <Terminal ID> <Data size> <Avg packet latency> <#flits/packets finished> <Avg hops> <Busy Time>");
lp_io_write(lp->gid, "dragonfly-sampling-stats", written, buf);
}*/
}
void dragonfly_sample_rc_fn(terminal_state * s,
tw_bf * bf,
terminal_message * msg,
tw_lp * lp)
{
(void)lp;
(void)bf;
(void)msg;
s->op_arr_size--;
int cur_indx = s->op_arr_size;
struct dfly_sample_stats stat = s->sample_stat[cur_indx];
s->busy_time_sample = stat.busy_time_sample;
s->fin_chunks_time = stat.fin_chunks_time;
s->fin_hops_sample = stat.fin_hops_sample;
s->data_size_sample = stat.data_size_sample;
s->fin_chunks_sample = stat.fin_chunks_sample;
stat.busy_time_sample = 0;
stat.fin_chunks_time = 0;
stat.fin_hops_sample = 0;
stat.data_size_sample = 0;
stat.fin_chunks_sample = 0;
stat.end_time = 0;
stat.terminal_id = 0;
}
void dragonfly_sample_fn(terminal_state * s,
tw_bf * bf,
terminal_message * msg,
tw_lp * lp)
{
(void)lp;
(void)msg;
(void)bf;
//int i = 0, free_slots = 0;
/* checkout which samples are past the GVT, write them to the file */
/*for(i = 0; i < s->op_arr_size; i++)
{
if(s->sample_stat[i].end_time == 0 || s->sample_stat[i].end_time >= lp->pe->GVT)
break;
free_slots++;
}
if(free_slots > 0)
{
fwrite(s->sample_stat, sizeof(struct dfly_sample_stats), free_slots, fp);
s->op_arr_size -= free_slots;
memmove(s->sample_stat, &(s->sample_stat[free_slots]), sizeof(struct dfly_sample_stats) * s->op_arr_size);
}*/
if(s->op_arr_size >= s->max_arr_size)
{
/* In the worst case, copy array to a new memory location, its very
* expensive operation though */
struct dfly_sample_stats * tmp = malloc((MAX_STATS + s->max_arr_size) * sizeof(struct dfly_sample_stats));
memcpy(tmp, s->sample_stat, s->op_arr_size * sizeof(struct dfly_sample_stats));
free(s->sample_stat);
s->sample_stat = tmp;
s->max_arr_size += MAX_STATS;
}
int cur_indx = s->op_arr_size;
s->sample_stat[cur_indx].terminal_id = s->terminal_id;
s->sample_stat[cur_indx].fin_chunks_sample = s->fin_chunks_sample;
s->sample_stat[cur_indx].data_size_sample = s->data_size_sample;
s->sample_stat[cur_indx].fin_hops_sample = s->fin_hops_sample;
s->sample_stat[cur_indx].fin_chunks_time = s->fin_chunks_time;
s->sample_stat[cur_indx].busy_time_sample = s->busy_time_sample;
s->sample_stat[cur_indx].end_time = tw_now(lp);
s->op_arr_size++;
s->fin_chunks_sample = 0;
s->data_size_sample = 0;
s->fin_hops_sample = 0;
s->fin_chunks_time = 0;
s->busy_time_sample = 0;
}
void dragonfly_sample_fin(terminal_state * s,
tw_lp * lp)
{
(void)lp;
/* int i = 0;
* for(; i < s->op_arr_size; i++)
{
printf("\n Terminal id %ld data size sample %ld fin chunks %ld end time %lf ",
s->terminal_id, s->sample_stat[i].data_size_sample, s->sample_stat[i].fin_chunks_sample, s->sample_stat[i].end_time);
}*/
char file_name[64];
sprintf(file_name, "dragonfly-sampling-%ld.bin", g_tw_mynode);
FILE * fp = fopen(file_name, "a");
fseek(fp, sample_bytes_written, SEEK_SET);
fwrite(s->sample_stat, sizeof(struct dfly_sample_stats), s->op_arr_size, fp);
fclose(fp);
sample_bytes_written += (s->op_arr_size * sizeof(struct dfly_sample_stats));
//printf("\n Bytes written %ld ", sample_bytes_written);
}
void terminal_buf_update_rc(terminal_state * s,
tw_bf * bf,
terminal_message * msg,
......@@ -1823,6 +1993,7 @@ void terminal_buf_update_rc(terminal_state * s,
{
s->busy_time = msg->saved_total_time;
s->last_buf_full = msg->saved_busy_time;
s->busy_time_sample = msg->saved_sample_time;
}
if(bf->c1) {
s->in_send_loop = 0;
......@@ -1850,8 +2021,10 @@ terminal_buf_update(terminal_state * s,
bf->c3 = 1;
msg->saved_total_time = s->busy_time;
msg->saved_busy_time = s->last_buf_full;
msg->saved_sample_time = s->busy_time_sample;
s->busy_time += (tw_now(lp) - s->last_buf_full);
s->busy_time_sample += (tw_now(lp) - s->last_buf_full);
s->last_buf_full = 0.0;
}
......@@ -1927,7 +2100,7 @@ dragonfly_terminal_final( terminal_state * s,
int written = 0;
if(!s->terminal_id)
written = sprintf(s->output_buf, "# Format <LP id> <Terminal ID> <Total Data Size> <Total Time Spent> <# Packets finished> <Avg hops> <Busy Time>\n");
written = sprintf(s->output_buf, "# Format <LP id> <Terminal ID> <Total Data Size> <Avg packet latency> <# Flits/Packets finished> <Avg hops> <Busy Time>\n");
written += sprintf(s->output_buf + written, "%llu %u %ld %lf %ld %lf %lf\n",
LLU(lp->gid), s->terminal_id, s->total_msg_size, s->total_time,
......@@ -1936,17 +2109,6 @@ dragonfly_terminal_final( terminal_state * s,
lp_io_write(lp->gid, "dragonfly-msg-stats", written, s->output_buf);
/*char log[64];
sprintf(log, "dragonfly-msg-stats-%d", routing);
dragonfly_log = fopen(log, "w+");
assert(dragonfly_log);
if(s->finished_chunks)
{
fprintf(dragonfly_log, "%u %ld %lf %lf %lf\n", s->terminal_id, s->total_msg_size, s->total_time, s->finished_chunks, s->total_hops/s->finished_chunks);
printf("%u %ld %lf %lf %lf\n", s->terminal_id, s->total_msg_size, s->total_time, s->finished_chunks, s->total_hops/s->finished_chunks);
}
fclose(dragonfly_log);
*/
if(s->terminal_msgs[0] != NULL)
printf("[%llu] leftover terminal messages \n", LLU(lp->gid));
......@@ -2839,6 +3001,10 @@ struct model_net_method dragonfly_method =
.mn_get_msg_sz = dragonfly_get_msg_sz,
.mn_report_stats = dragonfly_report_stats,
.mn_collective_call = dragonfly_collective,
.mn_collective_call_rc = dragonfly_collective_rc
.mn_collective_call_rc = dragonfly_collective_rc,
.mn_sample_fn = (void*)dragonfly_sample_fn,
.mn_sample_rc_fn = (void*)dragonfly_sample_rc_fn,
.mn_sample_init_fn = (void*)dragonfly_sample_init,
.mn_sample_fini_fn = (void*)dragonfly_sample_fin
};
/* usage mpirun -np n ./read_file_io
n is the number of input bgp-log files */
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <mpi.h>
struct dfly_samples
{
uint64_t terminal_id;
long fin_chunks_sample;
long data_size_sample;
double fin_hops_sample;
double fin_chunks_time;
double busy_time_sample;
double end_time;
};
static struct dfly_samples * event_array = NULL;
int main( int argc, char** argv )
{
int my_rank;
int size;
int i = 0;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
FILE* pFile;
FILE* writeFile;
char buffer_read[64];
char buffer_write[64];
sprintf(buffer_read, "dragonfly-sampling-%d.bin", my_rank);
pFile = fopen(buffer_read, "r+");
struct stat st;
stat(buffer_read, &st);
long in_sz = st.st_size;
event_array = malloc(in_sz);
sprintf(buffer_write, "dragonfly-write-log.%d", my_rank);
writeFile = fopen(buffer_write, "w+");
if(pFile == NULL || writeFile == NULL)
{
fputs("\n File error ", stderr);
return -1;
}
fseek(pFile, 0L, SEEK_SET);
fread(event_array, sizeof(struct dfly_samples), in_sz / sizeof(struct dfly_samples), pFile);
fprintf(writeFile, " Rank ID \t Finished chunks \t Data size \t Finished hops \t Time spent \t Busy time \t Sample end time");
for(i = 0; i < in_sz / sizeof(struct dfly_samples); i++)
{
fprintf(writeFile, "\n %ld \t %ld \t %ld \t %lf \t %lf \t %lf \t %lf ", event_array[i].terminal_id,
event_array[i].fin_chunks_sample,
event_array[i].data_size_sample,
event_array[i].fin_hops_sample,
event_array[i].fin_chunks_time,
event_array[i].busy_time_sample,
event_array[i].end_time);
}
fclose(pFile);
fclose(writeFile);
MPI_Finalize();
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment