Commit 086a61de authored by Misbah Mubarak's avatar Misbah Mubarak

Adding changes to message tracking

parent 7a62429f
......@@ -23,7 +23,7 @@
#define TRACE -1
#define MAX_WAIT_REQS 512
#define CS_LP_DBG 1
#define RANK_HASH_TABLE_SZ 2000
#define RANK_HASH_TABLE_SZ 512
#define NW_LP_NM "nw-lp"
#define lprintf(_fmt, ...) \
do {if (CS_LP_DBG) printf(_fmt, __VA_ARGS__);} while (0)
......@@ -37,7 +37,7 @@ static int debug_cols = 0;
/* Turning on this option slows down optimistic mode substantially. Only turn
* on if you get issues with wait-all completion with traces. */
static int preserve_wait_ordering = 0;
static int enable_msg_tracking = 0;
static int enable_msg_tracking = 1;
static int is_synthetic = 0;
tw_lpid TRACK_LP = -1;
int nprocs = 0;
......@@ -197,7 +197,7 @@ struct msg_size_info
int num_msgs;
tw_stime agg_latency;
tw_stime avg_latency;
struct qhash_head * hash_link;
struct qhash_head hash_link;
struct qlist_head ql;
};
typedef struct mpi_msgs_queue mpi_msgs_queue;
......@@ -281,6 +281,7 @@ struct nw_state
int max_arr_size;
struct mpi_workload_sample * mpi_wkld_samples;
char output_buf[512];
char output_buf2[512];
char col_stats[64];
};
......@@ -406,7 +407,7 @@ static void update_message_size(
struct qhash_head * hash_link = NULL;
tw_stime msg_init_time = qitem->req_init_time;
if(!ns->msg_sz_table)
if(ns->msg_sz_table == NULL)
ns->msg_sz_table = qhash_init(msg_size_hash_compare, quickhash_64bit_hash, RANK_HASH_TABLE_SZ);
hash_link = qhash_search(ns->msg_sz_table, &(qitem->num_bytes));
......@@ -417,14 +418,16 @@ static void update_message_size(
/* update hash table */
if(!hash_link)
{
struct msg_size_info * msg_info = (struct msg_size_info*)calloc(1, sizeof(struct msg_size_info));
struct msg_size_info * msg_info = (struct msg_size_info*)malloc(sizeof(struct msg_size_info));
msg_info->msg_size = qitem->num_bytes;
msg_info->num_msgs = 1;
msg_info->agg_latency = tw_now(lp) - msg_init_time;
msg_info->avg_latency = msg_info->agg_latency;
qhash_add(ns->msg_sz_table, &(msg_info->msg_size), msg_info->hash_link);
assert(ns->msg_sz_table);
printf("\n Msg size %d aggregate latency %f num messages %d ", msg_info->msg_size, msg_info->agg_latency, msg_info->num_msgs);
qhash_add(ns->msg_sz_table, &(msg_info->msg_size), &(msg_info->hash_link));
qlist_add(&msg_info->ql, &ns->msg_sz_list);
//printf("\n Msg size %d aggregate latency %f num messages %d ", m->fwd.num_bytes, msg_info->agg_latency, msg_info->num_msgs);
}
else
{
......@@ -432,7 +435,7 @@ static void update_message_size(
tmp->num_msgs++;
tmp->agg_latency += tw_now(lp) - msg_init_time;
tmp->avg_latency = (tmp->agg_latency / tmp->num_msgs);
// printf("\n Msg size %d aggregate latency %f num messages %d ", qitem->num_bytes, tmp->agg_latency, tmp->num_msgs);
printf("\n Msg size %d aggregate latency %f num messages %d ", qitem->num_bytes, tmp->agg_latency, tmp->num_msgs);
}
}
static void notify_background_traffic_rc(
......@@ -1526,7 +1529,7 @@ static void codes_exec_mpi_send(nw_state* s,
remote_m = local_m;
remote_m.msg_type = MPI_SEND_ARRIVED;
m->event_rc = model_net_event_mctx(net_id, &group_ratio, &group_ratio,
"test", dest_rank, mpi_op->u.send.num_bytes, (self_overhead + copy_overhead + soft_delay_mpi + nic_delay),
"mpi-workload", dest_rank, mpi_op->u.send.num_bytes, (self_overhead + copy_overhead + soft_delay_mpi + nic_delay),
sizeof(nw_message), (const void*)&remote_m, sizeof(nw_message), (const void*)&local_m, lp);
}
else if (is_rend == 0)
......@@ -1546,7 +1549,7 @@ static void codes_exec_mpi_send(nw_state* s,
remote_m.fwd.app_id = s->app_id;
m->event_rc = model_net_event_mctx(net_id, &group_ratio, &group_ratio,
"test", dest_rank, CONTROL_MSG_SZ, (self_overhead + soft_delay_mpi + nic_delay),
"mpi-workload", dest_rank, CONTROL_MSG_SZ, (self_overhead + soft_delay_mpi + nic_delay),
sizeof(nw_message), (const void*)&remote_m, 0, NULL, lp);
}
else if(is_rend == 1)
......@@ -1559,7 +1562,7 @@ static void codes_exec_mpi_send(nw_state* s,
remote_m.msg_type = MPI_REND_ARRIVED;
m->event_rc = model_net_event_mctx(net_id, &group_ratio, &group_ratio,
"test", dest_rank, mpi_op->u.send.num_bytes, (self_overhead + soft_delay_mpi + nic_delay),
"mpi-workload", dest_rank, mpi_op->u.send.num_bytes, (self_overhead + soft_delay_mpi + nic_delay),
sizeof(nw_message), (const void*)&remote_m, sizeof(nw_message), (const void*)&local_m, lp);
}
if(enable_debug && !is_rend)
......@@ -1697,7 +1700,7 @@ static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, m
remote_m.fwd.matched_req = matched_req;
m->event_rc = model_net_event_mctx(net_id, &group_ratio, &group_ratio,
"test", dest_rank, CONTROL_MSG_SZ, (self_overhead + soft_delay_mpi + nic_delay),
"mpi-workload", dest_rank, CONTROL_MSG_SZ, (self_overhead + soft_delay_mpi + nic_delay),
sizeof(nw_message), (const void*)&remote_m, 0, NULL, lp);
}
......@@ -2391,9 +2394,6 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
total_syn_data += s->syn_data;
int written = 0;
if(!s->nw_id)
written = sprintf(s->output_buf, "# Format <LP ID> <Terminal ID> <Total sends> <Total Recvs> <Bytes sent> <Bytes recvd> <Send time> <Comm. time> <Compute time> <Job ID>");
/*if(s->wait_op)
{
lprintf("\n Incomplete wait operation Rank %llu ", s->nw_id);
......@@ -2425,24 +2425,27 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
struct qlist_head * ent = NULL;
if(s->local_rank == 0 && enable_msg_tracking)
fprintf(msg_size_log, "\n rank_id message_size num_messages avg_latency");
written += sprintf(s->output_buf2, "\n rank_id message_size num_messages avg_latency");
if(enable_msg_tracking)
{
qlist_for_each(ent, &s->msg_sz_list)
{
tmp_msg = qlist_entry(ent, struct msg_size_info, ql);
printf("\n Rank %d Msg size %"PRId64" num_msgs %d agg_latency %f avg_latency %f",
if(s->local_rank == 0)
written += sprintf(s->output_buf2 + written, "\n Rank %d Msg size %"PRId64" num_msgs %d agg_latency %f avg_latency %f",
s->local_rank, tmp_msg->msg_size, tmp_msg->num_msgs, tmp_msg->agg_latency, tmp_msg->avg_latency);
//fprintf(msg_size_log, "\n Rank %d Msg size %d num_msgs %d agg_latency %f avg_latency %f",
// s->local_rank, tmp_msg->msg_size, tmp_msg->num_msgs, tmp_msg->agg_latency, tmp_msg->avg_latency);
if(s->local_rank == 0)
//if(s->local_rank == 0)
{
fprintf(msg_size_log, "\n %llu %"PRId64" %d %f",
written += sprintf(s->output_buf2 + written, "\n %llu %"PRId64" %d %f",
LLU(s->nw_id), tmp_msg->msg_size, tmp_msg->num_msgs, tmp_msg->avg_latency);
}
}
}
lp_io_write(lp->gid, (char*)"mpi-msg-sz-log", written, s->output_buf2);
int count_irecv = 0, count_isend = 0;
count_irecv = qlist_count(&s->pending_recvs_queue);
count_isend = qlist_count(&s->arrival_queue);
......@@ -2452,6 +2455,11 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
printf("\n LP %llu unmatched irecvs %d unmatched sends %d Total sends %ld receives %ld collectives %ld delays %ld wait alls %ld waits %ld send time %lf wait %lf",
LLU(lp->gid), count_irecv, count_isend, s->num_sends, s->num_recvs, s->num_cols, s->num_delays, s->num_waitall, s->num_wait, s->send_time, s->wait_time);
}
written = 0;
if(!s->nw_id)
written = sprintf(s->output_buf, "# Format <LP ID> <Terminal ID> <Total sends> <Total Recvs> <Bytes sent> <Bytes recvd> <Send time> <Comm. time> <Compute time> <Job ID>");
written += sprintf(s->output_buf + written, "\n %llu %llu %ld %ld %ld %ld %lf %lf %lf %d", LLU(lp->gid), LLU(s->nw_id), s->num_sends, s->num_recvs, s->num_bytes_sent,
s->num_bytes_recvd, s->send_time, s->elapsed_time - s->compute_time, s->compute_time, s->app_id);
lp_io_write(lp->gid, (char*)"mpi-replay-stats", written, s->output_buf);
......@@ -2794,7 +2802,7 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
return -1;
}
}
if(enable_msg_tracking)
/* if(enable_msg_tracking)
{
msg_size_log = fopen("mpi-msg-sz-logs", "w+");
......@@ -2804,7 +2812,7 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
MPI_Finalize();
return -1;
}
}
}*/
char agg_log_name[512];
sprintf(agg_log_name, "%s/mpi-aggregate-logs-%d.bin", sampling_dir, rank);
workload_agg_log = fopen(agg_log_name, "w+");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment