Commit bf8c37d2 authored by Misbah Mubarak's avatar Misbah Mubarak

Adding changes to support reverse handlers in model-net-mpi-replay

parent 2a7724c1
......@@ -24,7 +24,7 @@
#define TRACE -1
#define MAX_WAIT_REQS 512
#define CS_LP_DBG 0
#define EAGER_THRESHOLD 81920000
#define EAGER_THRESHOLD 8192
#define RANK_HASH_TABLE_SZ 2000
#define NOISE 3.0
#define NW_LP_NM "nw-lp"
......@@ -293,6 +293,8 @@ struct nw_message
};
static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, mpi_msgs_queue * mpi_op);
static void send_ack_back_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp);
/* executes MPI isend and send operations */
static void codes_exec_mpi_send(
nw_state* s, tw_bf * bf, nw_message * m, tw_lp* lp, struct codes_workload_op * mpi_op, int is_rend);
......@@ -343,6 +345,14 @@ static void update_message_time_rc(
/* conversion from seconds to eanaoseconds */
static tw_stime s_to_ns(tw_stime ns);
static void update_message_size_rc(
struct nw_state * ns,
tw_lp * lp,
tw_bf * bf,
struct nw_message * m)
{
}
/* update the message size */
static void update_message_size(
struct nw_state * ns,
......@@ -354,15 +364,12 @@ static void update_message_size(
int is_send)
{
struct qhash_head * hash_link = NULL;
tw_stime copy_overhead = 0;
tw_stime msg_init_time = qitem->req_init_time;
if(!ns->msg_sz_table)
ns->msg_sz_table = qhash_init(msg_size_hash_compare, quickhash_64bit_hash, RANK_HASH_TABLE_SZ);
hash_link = qhash_search(ns->msg_sz_table, &(qitem->num_bytes));
if(is_eager)
copy_overhead = copy_per_byte_eager * qitem->num_bytes;
if(is_send)
msg_init_time = m->fwd.sim_start_time;
......@@ -373,7 +380,7 @@ static void update_message_size(
struct msg_size_info * msg_info = malloc(sizeof(struct msg_size_info));
msg_info->msg_size = qitem->num_bytes;
msg_info->num_msgs = 1;
msg_info->agg_latency = tw_now(lp) + copy_overhead - msg_init_time;
msg_info->agg_latency = tw_now(lp) - msg_init_time;
msg_info->avg_latency = msg_info->agg_latency;
qhash_add(ns->msg_sz_table, &(msg_info->msg_size), &(msg_info->hash_link));
qlist_add(&msg_info->ql, &ns->msg_sz_list);
......@@ -383,7 +390,7 @@ static void update_message_size(
{
struct msg_size_info * tmp = qhash_entry(hash_link, struct msg_size_info, hash_link);
tmp->num_msgs++;
tmp->agg_latency += tw_now(lp) + copy_overhead - msg_init_time;
tmp->agg_latency += tw_now(lp) - msg_init_time;
tmp->avg_latency = (tmp->agg_latency / tmp->num_msgs);
// printf("\n Msg size %d aggregate latency %f num messages %d ", qitem->num_bytes, tmp->agg_latency, tmp->num_msgs);
}
......@@ -963,7 +970,8 @@ static int rm_matching_rcv(nw_state * ns,
if(qitem->num_bytes >= EAGER_THRESHOLD)
{
/* Matching receive found, need to notify the sender to transmit
* the data */
* the data * (only works in sequential mode)*/
bf->c10 = 1;
send_ack_back(ns, bf, m, lp, qitem);
}
m->rc.saved_recv_time = ns->recv_time;
......@@ -1015,6 +1023,7 @@ static int rm_matching_send(nw_state * ns,
{
/* Matching receive found, need to notify the sender to transmit
* the data */
bf->c10 = 1;
send_ack_back(ns, bf, m, lp, qi);
}
m->rc.saved_recv_time = ns->recv_time;
......@@ -1079,6 +1088,10 @@ static void codes_exec_mpi_recv_rc(
tw_lp* lp)
{
ns->recv_time = m->rc.saved_recv_time;
if(bf->c10)
send_ack_back_rc(ns, bf, m, lp);
if(m->fwd.found_match >= 0)
{
ns->recv_time = m->rc.saved_recv_time;
......@@ -1184,6 +1197,28 @@ int get_global_id_of_job_rank(tw_lpid job_rank, int app_id)
int global_rank = codes_jobmap_to_global_id(lid, jobmap_ctx);
return global_rank;
}
static void codes_exec_mpi_send_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
if(enable_sampling)
{
int indx = s->sampling_indx;
s->mpi_wkld_samples[indx].num_sends_sample--;
s->mpi_wkld_samples[indx].num_bytes_sample -= m->rc.saved_num_bytes;
if(bf->c1)
{
s->sampling_indx--;
s->cur_interval_end -= sampling_interval;
}
}
model_net_event_rc2(lp, &m->event_rc);
if(m->op_type == CODES_WK_ISEND)
codes_issue_next_event_rc(lp);
s->num_sends--;
s->num_bytes_sent -= m->rc.saved_num_bytes;
num_bytes_sent -= m->rc.saved_num_bytes;
}
/* executes MPI send and isend operations */
static void codes_exec_mpi_send(nw_state* s,
tw_bf * bf,
......@@ -1245,16 +1280,19 @@ static void codes_exec_mpi_send(nw_state* s,
local_m.fwd.num_bytes = mpi_op->u.send.num_bytes;
local_m.fwd.req_id = mpi_op->u.send.req_id;
local_m.fwd.app_id = s->app_id;
if(mpi_op->u.send.num_bytes < EAGER_THRESHOLD)
{
/* directly issue a model-net send */
tw_stime copy_overhead = copy_per_byte_eager * mpi_op->u.send.num_bytes;
local_m.fwd.sim_start_time = tw_now(lp);
remote_m = local_m;
remote_m.msg_type = MPI_SEND_ARRIVED;
m->event_rc = model_net_event_mctx(net_id, &group_ratio, &group_ratio,
"test", dest_rank, mpi_op->u.send.num_bytes, (self_overhead + soft_delay_mpi + nic_delay),
"test", dest_rank, mpi_op->u.send.num_bytes, (self_overhead + copy_overhead + soft_delay_mpi + nic_delay),
sizeof(nw_message), (const void*)&remote_m, sizeof(nw_message), (const void*)&local_m, lp);
}
else if (is_rend == 0)
......@@ -1417,7 +1455,11 @@ static void update_arrival_queue_rc(nw_state* s,
s->num_bytes_recvd -= m->fwd.num_bytes;
num_bytes_recvd -= m->fwd.num_bytes;
codes_local_latency_reverse(lp);
if(bf->c1)
codes_local_latency_reverse(lp);
if(bf->c10)
send_ack_back_rc(s, bf, m, lp);
if(m->fwd.found_match >= 0)
{
......@@ -1483,6 +1525,7 @@ static void update_arrival_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp
if(m->fwd.num_bytes < EAGER_THRESHOLD)
{
bf->c1 = 1;
tw_event *e_callback =
tw_event_new(rank_to_lpid(global_src_id),
codes_local_latency(lp), lp);
......@@ -1770,25 +1813,7 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
case CODES_WK_SEND:
case CODES_WK_ISEND:
{
if(enable_sampling)
{
int indx = s->sampling_indx;
s->mpi_wkld_samples[indx].num_sends_sample--;
s->mpi_wkld_samples[indx].num_bytes_sample -= m->rc.saved_num_bytes;
if(bf->c1)
{
s->sampling_indx--;
s->cur_interval_end -= sampling_interval;
}
}
model_net_event_rc2(lp, &m->event_rc);
if(m->op_type == CODES_WK_ISEND)
codes_issue_next_event_rc(lp);
s->num_sends--;
s->num_bytes_sent -= m->rc.saved_num_bytes;
num_bytes_sent -= m->rc.saved_num_bytes;
codes_exec_mpi_send_rc(s, bf, m, lp);
}
break;
......@@ -2077,6 +2102,16 @@ void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * l
}
break;
case MPI_REND_ACK_ARRIVED:
{
codes_exec_mpi_send_rc(s, bf, m, lp);
}
break;
case MPI_REND_ARRIVED:
codes_local_latency_reverse(lp);
break;
case MPI_OP_GET_NEXT:
get_next_mpi_operation_rc(s, bf, m, lp);
break;
......@@ -2110,7 +2145,7 @@ const tw_optdef app_opt [] =
TWOPT_UINT("disable_compute", disable_delay, "disable compute simulation"),
TWOPT_UINT("enable_mpi_debug", enable_debug, "enable debugging of MPI sim layer (works with sync=1 only)"),
TWOPT_UINT("sampling_interval", sampling_interval, "sampling interval for MPI operations"),
TWOPT_UINT("enable_sampling", enable_sampling, "enable sampling"),
TWOPT_UINT("enable_sampling", enable_sampling, "enable sampling (only works in sequential mode)"),
TWOPT_STIME("sampling_end_time", sampling_end_time, "sampling_end_time"),
TWOPT_CHAR("lp-io-dir", lp_io_dir, "Where to place io output (unspecified -> no output"),
TWOPT_UINT("lp-io-use-suffix", lp_io_use_suffix, "Whether to append uniq suffix to lp-io directory (default 0)"),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment