diff --git a/src/network-workloads/model-net-mpi-replay.c b/src/network-workloads/model-net-mpi-replay.c index 83d3d22e89b118444d930950c5d3eb19bd13cff7..e76e772070ef07d099ede48d641493c7c40ca10d 100644 --- a/src/network-workloads/model-net-mpi-replay.c +++ b/src/network-workloads/model-net-mpi-replay.c @@ -294,10 +294,12 @@ struct nw_message // for callbacks - time message was received double msg_send_time; int req_id; + int matched_req; int tag; int app_id; int found_match; short wait_completed; + short rend_send; } fwd; struct { @@ -309,7 +311,7 @@ struct nw_message } rc; }; -static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, mpi_msgs_queue * mpi_op); +static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, mpi_msgs_queue * mpi_op, int matched_req); static void send_ack_back_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp); /* executes MPI isend and send operations */ @@ -1056,6 +1058,7 @@ static int rm_matching_rcv(nw_state * ns, { int matched = 0; int index = 0; + int is_rend = 0; struct qlist_head *ent = NULL; mpi_msgs_queue * qi = NULL; @@ -1084,21 +1087,28 @@ static int rm_matching_rcv(nw_state * ns, /* Matching receive found, need to notify the sender to transmit * the data * (only works in sequential mode)*/ bf->c10 = 1; - send_ack_back(ns, bf, m, lp, qitem); + is_rend = 1; + send_ack_back(ns, bf, m, lp, qitem, qi->req_id); } - m->rc.saved_recv_time = ns->recv_time; - ns->recv_time += (tw_now(lp) - m->fwd.sim_start_time); - - if(qi->op_type == CODES_WK_IRECV) + else + { + m->rc.saved_recv_time = ns->recv_time; + ns->recv_time += (tw_now(lp) - m->fwd.sim_start_time); + } + if(qi->op_type == CODES_WK_IRECV && !is_rend) { + bf->c9 = 1; /*if(ns->nw_id == (tw_lpid)TRACK_LP) { printf("\n Completed irecv req id %d ", qi->req_id); }*/ update_completed_queue(ns, bf, m, lp, qi->req_id); } - else if(qi->op_type == CODES_WK_RECV) + else if(qi->op_type == CODES_WK_RECV && !is_rend) + { + bf->c8 = 1; codes_issue_next_event(lp); + } qlist_del(&qi->ql); @@ -1137,14 +1147,16 @@ static int rm_matching_send(nw_state * ns, if(enable_msg_tracking && (qi->num_bytes < EAGER_THRESHOLD)) update_message_size(ns, lp, bf, m, qi, 1, 0); + m->fwd.matched_req = qitem->req_id; + int is_rend = 0; if(qitem->num_bytes >= EAGER_THRESHOLD) { /* Matching receive found, need to notify the sender to transmit * the data */ bf->c10 = 1; - send_ack_back(ns, bf, m, lp, qi); + is_rend = 1; + send_ack_back(ns, bf, m, lp, qi, qitem->req_id); } - rc_stack_push(lp, qi, free, ns->processed_ops); m->rc.saved_recv_time = ns->recv_time; ns->recv_time += (tw_now(lp) - qitem->req_init_time); @@ -1155,11 +1167,16 @@ static int rm_matching_send(nw_state * ns, print_completed_queue(lp, &ns->completed_reqs); }*/ - if(qitem->op_type == CODES_WK_IRECV) + if(qitem->op_type == CODES_WK_IRECV && !is_rend) + { + bf->c9 = 1; update_completed_queue(ns, bf, m, lp, qitem->req_id); + } + qlist_del(&qi->ql); + rc_stack_push(lp, qi, free, ns->processed_ops); return index; } return -1; @@ -1245,11 +1262,12 @@ static void codes_exec_mpi_recv_rc( index++; } } - if(m->op_type == CODES_WK_IRECV) + if(bf->c9) { update_completed_queue_rc(ns, bf, m, lp); } - codes_issue_next_event_rc(lp); + if(bf->c6) + codes_issue_next_event_rc(lp); } else if(m->fwd.found_match < 0) { @@ -1303,12 +1321,14 @@ static void codes_exec_mpi_recv( /* for mpi irecvs, this is a non-blocking receive so just post it and move on with the trace read. */ if(mpi_op->op_type == CODES_WK_IRECV) { + bf->c6 = 1; codes_issue_next_event(lp); return; } } else { + bf->c6 = 1; m->fwd.found_match = found_matching_sends; codes_issue_next_event(lp); } @@ -1363,6 +1383,7 @@ static void codes_exec_mpi_send(nw_state* s, bf->c1 = 0; bf->c4 = 0; + int is_eager = 0; /* model-net event */ int global_dest_rank = mpi_op->u.send.dest_rank; @@ -1415,16 +1436,18 @@ static void codes_exec_mpi_send(nw_state* s, local_m.op_type = mpi_op->op_type; local_m.msg_type = MPI_SEND_POSTED; local_m.fwd.tag = mpi_op->u.send.tag; + local_m.fwd.rend_send = 0; local_m.fwd.num_bytes = mpi_op->u.send.num_bytes; local_m.fwd.req_id = mpi_op->u.send.req_id; local_m.fwd.app_id = s->app_id; - + local_m.fwd.matched_req = m->fwd.matched_req; if(mpi_op->u.send.num_bytes < EAGER_THRESHOLD) { /* directly issue a model-net send */ bf->c15 = 1; + is_eager = 1; s->num_sends++; tw_stime copy_overhead = copy_per_byte_eager * mpi_op->u.send.num_bytes; local_m.fwd.sim_start_time = tw_now(lp); @@ -1460,6 +1483,7 @@ static void codes_exec_mpi_send(nw_state* s, /* initiate the actual data transfer, local completion message is sent * for any blocking sends. */ local_m.fwd.sim_start_time = mpi_op->sim_start_time; + local_m.fwd.rend_send = 1; remote_m = local_m; remote_m.msg_type = MPI_REND_ARRIVED; @@ -1479,7 +1503,7 @@ static void codes_exec_mpi_send(nw_state* s, tw_now(lp), s->app_id, LLU(s->nw_id), global_dest_rank, mpi_op->u.send.tag, mpi_op->u.send.num_bytes); } /* isend executed, now get next MPI operation from the queue */ - if(mpi_op->op_type == CODES_WK_ISEND && !is_rend) + if(mpi_op->op_type == CODES_WK_ISEND && (is_rend || is_eager)) { bf->c4 = 1; codes_issue_next_event(lp); @@ -1556,6 +1580,7 @@ static void update_completed_queue(nw_state* s, struct pending_waits* wait_elem = s->wait_op; rc_stack_push(lp, wait_elem, free, s->processed_wait_op); s->wait_op = NULL; + codes_issue_next_event(lp); } } @@ -1567,7 +1592,7 @@ static void send_ack_back_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp /* Send an ack back to the sender */ model_net_event_rc2(lp, &m->event_rc); } -static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, mpi_msgs_queue * mpi_op) +static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, mpi_msgs_queue * mpi_op, int matched_req) { (void)bf; @@ -1589,6 +1614,7 @@ static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, m remote_m.fwd.tag = mpi_op->tag; remote_m.fwd.num_bytes = mpi_op->num_bytes; remote_m.fwd.req_id = mpi_op->req_id; + remote_m.fwd.matched_req = matched_req; m->event_rc = model_net_event_mctx(net_id, &group_ratio, &group_ratio, "test", dest_rank, CONTROL_MSG_SZ, (self_overhead + soft_delay_mpi + nic_delay), @@ -1633,9 +1659,9 @@ static void update_arrival_queue_rc(nw_state* s, index++; } } - if(qi->op_type == CODES_WK_IRECV) + if(bf->c9) update_completed_queue_rc(s, bf, m, lp); - else if(qi->op_type == CODES_WK_RECV) + else if(bf->c8) codes_issue_next_event_rc(lp); } else if(m->fwd.found_match < 0) @@ -1914,6 +1940,21 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp) m_callback->msg_type = MPI_SEND_ARRIVED_CB; m_callback->fwd.msg_send_time = tw_now(lp) - m->fwd.sim_start_time; tw_event_send(e_callback); + + /* request id pending completion */ + if(m->fwd.matched_req >= 0) + { + bf->c8 = 1; + update_completed_queue(s, bf, m, lp, m->fwd.matched_req); + } + else /* blocking receive pending completion*/ + { + bf->c10 = 1; + codes_issue_next_event(lp); + } + + m->rc.saved_recv_time = ns->recv_time; + ns->recv_time += (tw_now(lp) - m->fwd.sim_start_time); } break; @@ -1941,19 +1982,29 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp) case MPI_SEND_POSTED: { - if(m->op_type == CODES_WK_SEND) + int is_eager = 0; + + if(m->fwd.num_bytes < EAGER_THRESHOLD) + is_eager = 1; + + if(m->op_type == CODES_WK_SEND && (is_eager == 1 || m->fwd.rend_send == 1)) + { + bf->c29 = 1; codes_issue_next_event(lp); + } else - if(m->op_type == CODES_WK_ISEND) + if(m->op_type == CODES_WK_ISEND && (is_eager == 1 || m->fwd.rend_send == 1)) { //tw_output(lp, "\n isend req id %llu ", m->fwd.req_id); - update_completed_queue(s, bf, m, lp, m->fwd.req_id); + bf->c28 = 1; + update_completed_queue(s, bf, m, lp, m->fwd.req_id); } else tw_error(TW_LOC, "\n Invalid op type "); } break; - case MPI_OP_GET_NEXT: + + case MPI_OP_GET_NEXT: get_next_mpi_operation(s, bf, m, lp); break; @@ -2325,9 +2376,9 @@ void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * l case MPI_SEND_POSTED: { - if(m->op_type == CODES_WK_SEND) + if(bf->c29) codes_issue_next_event_rc(lp); - else if(m->op_type == CODES_WK_ISEND) + else if(bf->c28) update_completed_queue_rc(s, bf, m, lp); } break; @@ -2339,7 +2390,17 @@ void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * l break; case MPI_REND_ARRIVED: + { codes_local_latency_reverse(lp); + + if(bf->c10) + codes_issue_next_event_rc(lp); + + if(bf->c8) + update_completed_queue_rc(s, bf, m, lp); + + ns->recv_time = m->rc.saved_recv_time; + } break; case MPI_OP_GET_NEXT: