Commit 8de53c54 authored by Misbah Mubarak's avatar Misbah Mubarak

updates to mpi-sim-layer: displaying message latency by size

parent 086a61de
......@@ -281,7 +281,7 @@ struct nw_state
int max_arr_size;
struct mpi_workload_sample * mpi_wkld_samples;
char output_buf[512];
char output_buf2[512];
//char output_buf2[512];
char col_stats[64];
};
......@@ -424,7 +424,7 @@ static void update_message_size(
msg_info->agg_latency = tw_now(lp) - msg_init_time;
msg_info->avg_latency = msg_info->agg_latency;
assert(ns->msg_sz_table);
printf("\n Msg size %d aggregate latency %f num messages %d ", msg_info->msg_size, msg_info->agg_latency, msg_info->num_msgs);
printf("\n Msg size %lld aggregate latency %f num messages %d ", msg_info->msg_size, msg_info->agg_latency, msg_info->num_msgs);
qhash_add(ns->msg_sz_table, &(msg_info->msg_size), &(msg_info->hash_link));
qlist_add(&msg_info->ql, &ns->msg_sz_list);
......@@ -435,7 +435,7 @@ static void update_message_size(
tmp->num_msgs++;
tmp->agg_latency += tw_now(lp) - msg_init_time;
tmp->avg_latency = (tmp->agg_latency / tmp->num_msgs);
printf("\n Msg size %d aggregate latency %f num messages %d ", qitem->num_bytes, tmp->agg_latency, tmp->num_msgs);
printf("\n Msg size %lld aggregate latency %f num messages %d ", qitem->num_bytes, tmp->agg_latency, tmp->num_msgs);
}
}
static void notify_background_traffic_rc(
......@@ -2425,7 +2425,8 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
struct qlist_head * ent = NULL;
if(s->local_rank == 0 && enable_msg_tracking)
written += sprintf(s->output_buf2, "\n rank_id message_size num_messages avg_latency");
// written += sprintf(s->output_buf2, "\n rank_id message_size num_messages avg_latency");
fprintf(msg_size_log, "\n rank_id message_size num_messages avg_latency");
if(enable_msg_tracking)
{
......@@ -2433,19 +2434,21 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
{
tmp_msg = qlist_entry(ent, struct msg_size_info, ql);
if(s->local_rank == 0)
written += sprintf(s->output_buf2 + written, "\n Rank %d Msg size %"PRId64" num_msgs %d agg_latency %f avg_latency %f",
/*if(s->local_rank == 0)
{
fprintf(msg_size_log, "\n Rank %d Msg size %lld num_msgs %d agg_latency %f avg_latency %f",
s->local_rank, tmp_msg->msg_size, tmp_msg->num_msgs, tmp_msg->agg_latency, tmp_msg->avg_latency);
//fprintf(msg_size_log, "\n Rank %d Msg size %d num_msgs %d agg_latency %f avg_latency %f",
}*/
// written += sprintf(s->output_buf2 + written, "\n Rank %d Msg size %"PRId64" num_msgs %d agg_latency %f avg_latency %f",
// s->local_rank, tmp_msg->msg_size, tmp_msg->num_msgs, tmp_msg->agg_latency, tmp_msg->avg_latency);
//if(s->local_rank == 0)
{
written += sprintf(s->output_buf2 + written, "\n %llu %"PRId64" %d %f",
fprintf(msg_size_log, "\n %llu %"PRId64" %d %f",
LLU(s->nw_id), tmp_msg->msg_size, tmp_msg->num_msgs, tmp_msg->avg_latency);
}
}
}
lp_io_write(lp->gid, (char*)"mpi-msg-sz-log", written, s->output_buf2);
//lp_io_write(lp->gid, (char*)"mpi-msg-sz-log", written, s->output_buf2);
int count_irecv = 0, count_isend = 0;
count_irecv = qlist_count(&s->pending_recvs_queue);
count_isend = qlist_count(&s->arrival_queue);
......@@ -2493,10 +2496,11 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
written = 0;
if(debug_cols)
{
written += sprintf(s->col_stats + written, "%llu \t %lf \n", LLU(s->nw_id), ns_to_s(s->all_reduce_time / s->num_all_reduce));
lp_io_write(lp->gid, (char*)"avg-all-reduce-time", written, s->col_stats);
lp_io_write(lp->gid, (char*)"avg-all-reduce-time", written, s->col_stats);
}
avg_time += s->elapsed_time;
avg_comm_time += (s->elapsed_time - s->compute_time);
avg_wait_time += s->wait_time;
......@@ -2802,9 +2806,16 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
return -1;
}
}
/* if(enable_msg_tracking)
if(enable_msg_tracking)
{
msg_size_log = fopen("mpi-msg-sz-logs", "w+");
char log_name[64];
sprintf(log_name, "mpi-msg-sz-logs-%s-syn-sz-%d-mean-%f",
file_name_of_job[0],
payload_sz,
mean_interval);
msg_size_log = fopen(log_name, "w+");
if(!msg_size_log)
{
......@@ -2812,7 +2823,7 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
MPI_Finalize();
return -1;
}
}*/
}
char agg_log_name[512];
sprintf(agg_log_name, "%s/mpi-aggregate-logs-%d.bin", sampling_dir, rank);
workload_agg_log = fopen(agg_log_name, "w+");
......@@ -2851,6 +2862,9 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
if(enable_debug)
fclose(workload_log);
if(enable_msg_tracking)
fclose(msg_size_log);
long long total_bytes_sent, total_bytes_recvd;
double max_run_time, avg_run_time;
double max_comm_run_time, avg_comm_run_time;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment