Commit 35d787cd authored by Misbah Mubarak's avatar Misbah Mubarak

changing event triggers for rendezvous protocol, next event is now issued when...

changing event triggers for rendezvous protocol, next event is now issued when actual data transfer completes
parent 3435735f
......@@ -294,10 +294,12 @@ struct nw_message
// for callbacks - time message was received
double msg_send_time;
int req_id;
int matched_req;
int tag;
int app_id;
int found_match;
short wait_completed;
short rend_send;
} fwd;
struct
{
......@@ -309,7 +311,7 @@ struct nw_message
} rc;
};
static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, mpi_msgs_queue * mpi_op);
static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, mpi_msgs_queue * mpi_op, int matched_req);
static void send_ack_back_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp);
/* executes MPI isend and send operations */
......@@ -1056,6 +1058,7 @@ static int rm_matching_rcv(nw_state * ns,
{
int matched = 0;
int index = 0;
int is_rend = 0;
struct qlist_head *ent = NULL;
mpi_msgs_queue * qi = NULL;
......@@ -1084,21 +1087,28 @@ static int rm_matching_rcv(nw_state * ns,
/* Matching receive found, need to notify the sender to transmit
* the data * (only works in sequential mode)*/
bf->c10 = 1;
send_ack_back(ns, bf, m, lp, qitem);
is_rend = 1;
send_ack_back(ns, bf, m, lp, qitem, qi->req_id);
}
m->rc.saved_recv_time = ns->recv_time;
ns->recv_time += (tw_now(lp) - m->fwd.sim_start_time);
if(qi->op_type == CODES_WK_IRECV)
else
{
m->rc.saved_recv_time = ns->recv_time;
ns->recv_time += (tw_now(lp) - m->fwd.sim_start_time);
}
if(qi->op_type == CODES_WK_IRECV && !is_rend)
{
bf->c9 = 1;
/*if(ns->nw_id == (tw_lpid)TRACK_LP)
{
printf("\n Completed irecv req id %d ", qi->req_id);
}*/
update_completed_queue(ns, bf, m, lp, qi->req_id);
}
else if(qi->op_type == CODES_WK_RECV)
else if(qi->op_type == CODES_WK_RECV && !is_rend)
{
bf->c8 = 1;
codes_issue_next_event(lp);
}
qlist_del(&qi->ql);
......@@ -1137,14 +1147,16 @@ static int rm_matching_send(nw_state * ns,
if(enable_msg_tracking && (qi->num_bytes < EAGER_THRESHOLD))
update_message_size(ns, lp, bf, m, qi, 1, 0);
m->fwd.matched_req = qitem->req_id;
int is_rend = 0;
if(qitem->num_bytes >= EAGER_THRESHOLD)
{
/* Matching receive found, need to notify the sender to transmit
* the data */
bf->c10 = 1;
send_ack_back(ns, bf, m, lp, qi);
is_rend = 1;
send_ack_back(ns, bf, m, lp, qi, qitem->req_id);
}
rc_stack_push(lp, qi, free, ns->processed_ops);
m->rc.saved_recv_time = ns->recv_time;
ns->recv_time += (tw_now(lp) - qitem->req_init_time);
......@@ -1155,11 +1167,16 @@ static int rm_matching_send(nw_state * ns,
print_completed_queue(lp, &ns->completed_reqs);
}*/
if(qitem->op_type == CODES_WK_IRECV)
if(qitem->op_type == CODES_WK_IRECV && !is_rend)
{
bf->c9 = 1;
update_completed_queue(ns, bf, m, lp, qitem->req_id);
}
qlist_del(&qi->ql);
rc_stack_push(lp, qi, free, ns->processed_ops);
return index;
}
return -1;
......@@ -1245,11 +1262,12 @@ static void codes_exec_mpi_recv_rc(
index++;
}
}
if(m->op_type == CODES_WK_IRECV)
if(bf->c9)
{
update_completed_queue_rc(ns, bf, m, lp);
}
codes_issue_next_event_rc(lp);
if(bf->c6)
codes_issue_next_event_rc(lp);
}
else if(m->fwd.found_match < 0)
{
......@@ -1303,12 +1321,14 @@ static void codes_exec_mpi_recv(
/* for mpi irecvs, this is a non-blocking receive so just post it and move on with the trace read. */
if(mpi_op->op_type == CODES_WK_IRECV)
{
bf->c6 = 1;
codes_issue_next_event(lp);
return;
}
}
else
{
bf->c6 = 1;
m->fwd.found_match = found_matching_sends;
codes_issue_next_event(lp);
}
......@@ -1363,6 +1383,7 @@ static void codes_exec_mpi_send(nw_state* s,
bf->c1 = 0;
bf->c4 = 0;
int is_eager = 0;
/* model-net event */
int global_dest_rank = mpi_op->u.send.dest_rank;
......@@ -1415,16 +1436,18 @@ static void codes_exec_mpi_send(nw_state* s,
local_m.op_type = mpi_op->op_type;
local_m.msg_type = MPI_SEND_POSTED;
local_m.fwd.tag = mpi_op->u.send.tag;
local_m.fwd.rend_send = 0;
local_m.fwd.num_bytes = mpi_op->u.send.num_bytes;
local_m.fwd.req_id = mpi_op->u.send.req_id;
local_m.fwd.app_id = s->app_id;
local_m.fwd.matched_req = m->fwd.matched_req;
if(mpi_op->u.send.num_bytes < EAGER_THRESHOLD)
{
/* directly issue a model-net send */
bf->c15 = 1;
is_eager = 1;
s->num_sends++;
tw_stime copy_overhead = copy_per_byte_eager * mpi_op->u.send.num_bytes;
local_m.fwd.sim_start_time = tw_now(lp);
......@@ -1460,6 +1483,7 @@ static void codes_exec_mpi_send(nw_state* s,
/* initiate the actual data transfer, local completion message is sent
* for any blocking sends. */
local_m.fwd.sim_start_time = mpi_op->sim_start_time;
local_m.fwd.rend_send = 1;
remote_m = local_m;
remote_m.msg_type = MPI_REND_ARRIVED;
......@@ -1479,7 +1503,7 @@ static void codes_exec_mpi_send(nw_state* s,
tw_now(lp), s->app_id, LLU(s->nw_id), global_dest_rank, mpi_op->u.send.tag, mpi_op->u.send.num_bytes);
}
/* isend executed, now get next MPI operation from the queue */
if(mpi_op->op_type == CODES_WK_ISEND && !is_rend)
if(mpi_op->op_type == CODES_WK_ISEND && (is_rend || is_eager))
{
bf->c4 = 1;
codes_issue_next_event(lp);
......@@ -1556,6 +1580,7 @@ static void update_completed_queue(nw_state* s,
struct pending_waits* wait_elem = s->wait_op;
rc_stack_push(lp, wait_elem, free, s->processed_wait_op);
s->wait_op = NULL;
codes_issue_next_event(lp);
}
}
......@@ -1567,7 +1592,7 @@ static void send_ack_back_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp
/* Send an ack back to the sender */
model_net_event_rc2(lp, &m->event_rc);
}
static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, mpi_msgs_queue * mpi_op)
static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, mpi_msgs_queue * mpi_op, int matched_req)
{
(void)bf;
......@@ -1589,6 +1614,7 @@ static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, m
remote_m.fwd.tag = mpi_op->tag;
remote_m.fwd.num_bytes = mpi_op->num_bytes;
remote_m.fwd.req_id = mpi_op->req_id;
remote_m.fwd.matched_req = matched_req;
m->event_rc = model_net_event_mctx(net_id, &group_ratio, &group_ratio,
"test", dest_rank, CONTROL_MSG_SZ, (self_overhead + soft_delay_mpi + nic_delay),
......@@ -1633,9 +1659,9 @@ static void update_arrival_queue_rc(nw_state* s,
index++;
}
}
if(qi->op_type == CODES_WK_IRECV)
if(bf->c9)
update_completed_queue_rc(s, bf, m, lp);
else if(qi->op_type == CODES_WK_RECV)
else if(bf->c8)
codes_issue_next_event_rc(lp);
}
else if(m->fwd.found_match < 0)
......@@ -1914,6 +1940,21 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
m_callback->msg_type = MPI_SEND_ARRIVED_CB;
m_callback->fwd.msg_send_time = tw_now(lp) - m->fwd.sim_start_time;
tw_event_send(e_callback);
/* request id pending completion */
if(m->fwd.matched_req >= 0)
{
bf->c8 = 1;
update_completed_queue(s, bf, m, lp, m->fwd.matched_req);
}
else /* blocking receive pending completion*/
{
bf->c10 = 1;
codes_issue_next_event(lp);
}
m->rc.saved_recv_time = ns->recv_time;
ns->recv_time += (tw_now(lp) - m->fwd.sim_start_time);
}
break;
......@@ -1941,19 +1982,29 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
case MPI_SEND_POSTED:
{
if(m->op_type == CODES_WK_SEND)
int is_eager = 0;
if(m->fwd.num_bytes < EAGER_THRESHOLD)
is_eager = 1;
if(m->op_type == CODES_WK_SEND && (is_eager == 1 || m->fwd.rend_send == 1))
{
bf->c29 = 1;
codes_issue_next_event(lp);
}
else
if(m->op_type == CODES_WK_ISEND)
if(m->op_type == CODES_WK_ISEND && (is_eager == 1 || m->fwd.rend_send == 1))
{
//tw_output(lp, "\n isend req id %llu ", m->fwd.req_id);
update_completed_queue(s, bf, m, lp, m->fwd.req_id);
bf->c28 = 1;
update_completed_queue(s, bf, m, lp, m->fwd.req_id);
}
else
tw_error(TW_LOC, "\n Invalid op type ");
}
break;
case MPI_OP_GET_NEXT:
case MPI_OP_GET_NEXT:
get_next_mpi_operation(s, bf, m, lp);
break;
......@@ -2325,9 +2376,9 @@ void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * l
case MPI_SEND_POSTED:
{
if(m->op_type == CODES_WK_SEND)
if(bf->c29)
codes_issue_next_event_rc(lp);
else if(m->op_type == CODES_WK_ISEND)
else if(bf->c28)
update_completed_queue_rc(s, bf, m, lp);
}
break;
......@@ -2339,7 +2390,17 @@ void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * l
break;
case MPI_REND_ARRIVED:
{
codes_local_latency_reverse(lp);
if(bf->c10)
codes_issue_next_event_rc(lp);
if(bf->c8)
update_completed_queue_rc(s, bf, m, lp);
ns->recv_time = m->rc.saved_recv_time;
}
break;
case MPI_OP_GET_NEXT:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment