Commit 241452e7 authored by Xin Wang's avatar Xin Wang

adding collective overhead v1

parent da4b4dd7
......@@ -35,6 +35,7 @@ static int msg_size_hash_compare(
/* NOTE: Message tracking works in sequential mode only! */
static int debug_cols = 0;
static int enable_col_overhead = 0;
/* Turning on this option slows down optimistic mode substantially. Only turn
* on if you get issues with wait-all completion with traces. */
......@@ -73,6 +74,7 @@ int num_traces_of_job[5];
tw_stime soft_delay_mpi = 1250;
tw_stime nic_delay = 250;
tw_stime copy_per_byte_eager = 0.55;
tw_stime col_overhead_per_byte = 1.5;
char file_name_of_job[5][8192];
struct codes_jobmap_ctx *jobmap_ctx;
......@@ -214,6 +216,7 @@ struct nw_state
int app_id;
int local_rank;
int synthetic_pattern;
int is_collective;
int is_finished;
int neighbor_completed;
......@@ -1501,7 +1504,10 @@ static void codes_exec_mpi_send(nw_state* s,
local_m.fwd.num_bytes = mpi_op->u.send.num_bytes;
local_m.fwd.req_id = mpi_op->u.send.req_id;
local_m.fwd.app_id = s->app_id;
tw_stime collective_overhead = 0;
if(s->is_collective)
collective_overhead = col_overhead_per_byte * mpi_op->u.send.num_bytes;
if(mpi_op->u.send.num_bytes < EAGER_THRESHOLD)
{
......@@ -1515,7 +1521,7 @@ static void codes_exec_mpi_send(nw_state* s,
remote_m = local_m;
remote_m.msg_type = MPI_SEND_ARRIVED;
m->event_rc = model_net_event_mctx(net_id, &group_ratio, &group_ratio,
"test", dest_rank, mpi_op->u.send.num_bytes, (self_overhead + copy_overhead + soft_delay_mpi + nic_delay),
"test", dest_rank, mpi_op->u.send.num_bytes, (self_overhead + collective_overhead + copy_overhead + soft_delay_mpi + nic_delay),
sizeof(nw_message), (const void*)&remote_m, sizeof(nw_message), (const void*)&local_m, lp);
}
else if (is_rend == 0)
......@@ -1547,7 +1553,7 @@ static void codes_exec_mpi_send(nw_state* s,
remote_m.msg_type = MPI_REND_ARRIVED;
m->event_rc = model_net_event_mctx(net_id, &group_ratio, &group_ratio,
"test", dest_rank, mpi_op->u.send.num_bytes, (self_overhead + soft_delay_mpi + nic_delay),
"test", dest_rank, mpi_op->u.send.num_bytes, (self_overhead + collective_overhead + soft_delay_mpi + nic_delay),
sizeof(nw_message), (const void*)&remote_m, sizeof(nw_message), (const void*)&local_m, lp);
}
if(enable_debug && !is_rend)
......@@ -2114,7 +2120,7 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
{
s->col_latency[s->num_msg_sizes] = 0;
s->col_msizes[s->num_msg_sizes] = 0;
//todo: reverse handler for num_msg_sizes
//todo: reverse handler for num_msg_sizes
s->num_all_reduce--;
s->col_time = m->rc.saved_send_time;
s->all_reduce_time = m->rc.saved_delay;
......@@ -2132,7 +2138,7 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
{
s->col_latency[s->num_msg_sizes] = 0;
s->col_msizes[s->num_msg_sizes] = 0;
//todo: reverse handler for num_msg_sizes
//todo: reverse handler for num_msg_sizes
s->num_all_reduce--;
s->col_time = m->rc.saved_send_time;
s->all_reduce_time -= s->col_time;
......@@ -2289,11 +2295,14 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
s->all_reduce_time += (tw_now(lp) - s->col_time);
m->rc.saved_send_time = s->col_time;
s->col_time = 0;
s->is_collective = 0;
s->num_all_reduce++;
}
else
{
s->col_time = tw_now(lp);
if(enable_col_overhead)
s->is_collective = 1;
}
codes_issue_next_event(lp);
}
......@@ -2320,11 +2329,14 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
s->all_reduce_time += (tw_now(lp) - s->col_time);
m->rc.saved_send_time = s->col_time;
s->col_time = 0;
s->is_collective = 0;
s->num_all_reduce++;
}
else
{
s->col_time = tw_now(lp);
if(enable_col_overhead)
s->is_collective = 1;
}
codes_issue_next_event(lp);
}
......@@ -2524,6 +2536,7 @@ const tw_optdef app_opt [] =
TWOPT_UINT("disable_compute", disable_delay, "disable compute simulation"),
TWOPT_UINT("preserve_wait_ordering", preserve_wait_ordering, "only enable when getting unmatched send/recv errors in optimistic mode (turning on slows down simulation)"),
TWOPT_UINT("debug_cols", debug_cols, "completion time of collective operations (currently MPI_AllReduce)"),
TWOPT_UINT("enable_col_overhead", enable_col_overhead, "adding overhead to collective operations (currently MPI_AllReduce)"),
TWOPT_UINT("enable_mpi_debug", enable_debug, "enable debugging of MPI sim layer (works with sync=1 only)"),
TWOPT_UINT("sampling_interval", sampling_interval, "sampling interval for MPI operations"),
TWOPT_UINT("enable_sampling", enable_sampling, "enable sampling (only works in sequential mode)"),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment