Commit c017f396 authored by Misbah Mubarak's avatar Misbah Mubarak
Browse files

Merge branch 'rend-fix' into 'master'

Fixing early event triggers when using rendezvous protocol

See merge request !40
parents 3435735f ff9590ed
...@@ -294,10 +294,12 @@ struct nw_message ...@@ -294,10 +294,12 @@ struct nw_message
// for callbacks - time message was received // for callbacks - time message was received
double msg_send_time; double msg_send_time;
int req_id; int req_id;
int matched_req;
int tag; int tag;
int app_id; int app_id;
int found_match; int found_match;
short wait_completed; short wait_completed;
short rend_send;
} fwd; } fwd;
struct struct
{ {
...@@ -309,7 +311,7 @@ struct nw_message ...@@ -309,7 +311,7 @@ struct nw_message
} rc; } rc;
}; };
static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, mpi_msgs_queue * mpi_op); static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, mpi_msgs_queue * mpi_op, int matched_req);
static void send_ack_back_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp); static void send_ack_back_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp);
/* executes MPI isend and send operations */ /* executes MPI isend and send operations */
...@@ -1056,6 +1058,7 @@ static int rm_matching_rcv(nw_state * ns, ...@@ -1056,6 +1058,7 @@ static int rm_matching_rcv(nw_state * ns,
{ {
int matched = 0; int matched = 0;
int index = 0; int index = 0;
int is_rend = 0;
struct qlist_head *ent = NULL; struct qlist_head *ent = NULL;
mpi_msgs_queue * qi = NULL; mpi_msgs_queue * qi = NULL;
...@@ -1084,21 +1087,28 @@ static int rm_matching_rcv(nw_state * ns, ...@@ -1084,21 +1087,28 @@ static int rm_matching_rcv(nw_state * ns,
/* Matching receive found, need to notify the sender to transmit /* Matching receive found, need to notify the sender to transmit
* the data * (only works in sequential mode)*/ * the data * (only works in sequential mode)*/
bf->c10 = 1; bf->c10 = 1;
send_ack_back(ns, bf, m, lp, qitem); is_rend = 1;
send_ack_back(ns, bf, m, lp, qitem, qi->req_id);
} }
m->rc.saved_recv_time = ns->recv_time; else
ns->recv_time += (tw_now(lp) - m->fwd.sim_start_time); {
m->rc.saved_recv_time = ns->recv_time;
if(qi->op_type == CODES_WK_IRECV) ns->recv_time += (tw_now(lp) - m->fwd.sim_start_time);
}
if(qi->op_type == CODES_WK_IRECV && !is_rend)
{ {
bf->c9 = 1;
/*if(ns->nw_id == (tw_lpid)TRACK_LP) /*if(ns->nw_id == (tw_lpid)TRACK_LP)
{ {
printf("\n Completed irecv req id %d ", qi->req_id); printf("\n Completed irecv req id %d ", qi->req_id);
}*/ }*/
update_completed_queue(ns, bf, m, lp, qi->req_id); update_completed_queue(ns, bf, m, lp, qi->req_id);
} }
else if(qi->op_type == CODES_WK_RECV) else if(qi->op_type == CODES_WK_RECV && !is_rend)
{
bf->c8 = 1;
codes_issue_next_event(lp); codes_issue_next_event(lp);
}
qlist_del(&qi->ql); qlist_del(&qi->ql);
...@@ -1137,14 +1147,16 @@ static int rm_matching_send(nw_state * ns, ...@@ -1137,14 +1147,16 @@ static int rm_matching_send(nw_state * ns,
if(enable_msg_tracking && (qi->num_bytes < EAGER_THRESHOLD)) if(enable_msg_tracking && (qi->num_bytes < EAGER_THRESHOLD))
update_message_size(ns, lp, bf, m, qi, 1, 0); update_message_size(ns, lp, bf, m, qi, 1, 0);
m->fwd.matched_req = qitem->req_id;
int is_rend = 0;
if(qitem->num_bytes >= EAGER_THRESHOLD) if(qitem->num_bytes >= EAGER_THRESHOLD)
{ {
/* Matching receive found, need to notify the sender to transmit /* Matching receive found, need to notify the sender to transmit
* the data */ * the data */
bf->c10 = 1; bf->c10 = 1;
send_ack_back(ns, bf, m, lp, qi); is_rend = 1;
send_ack_back(ns, bf, m, lp, qi, qitem->req_id);
} }
rc_stack_push(lp, qi, free, ns->processed_ops);
m->rc.saved_recv_time = ns->recv_time; m->rc.saved_recv_time = ns->recv_time;
ns->recv_time += (tw_now(lp) - qitem->req_init_time); ns->recv_time += (tw_now(lp) - qitem->req_init_time);
...@@ -1155,11 +1167,16 @@ static int rm_matching_send(nw_state * ns, ...@@ -1155,11 +1167,16 @@ static int rm_matching_send(nw_state * ns,
print_completed_queue(lp, &ns->completed_reqs); print_completed_queue(lp, &ns->completed_reqs);
}*/ }*/
if(qitem->op_type == CODES_WK_IRECV) if(qitem->op_type == CODES_WK_IRECV && !is_rend)
{
bf->c9 = 1;
update_completed_queue(ns, bf, m, lp, qitem->req_id); update_completed_queue(ns, bf, m, lp, qitem->req_id);
}
qlist_del(&qi->ql); qlist_del(&qi->ql);
rc_stack_push(lp, qi, free, ns->processed_ops);
return index; return index;
} }
return -1; return -1;
...@@ -1245,11 +1262,12 @@ static void codes_exec_mpi_recv_rc( ...@@ -1245,11 +1262,12 @@ static void codes_exec_mpi_recv_rc(
index++; index++;
} }
} }
if(m->op_type == CODES_WK_IRECV) if(bf->c9)
{ {
update_completed_queue_rc(ns, bf, m, lp); update_completed_queue_rc(ns, bf, m, lp);
} }
codes_issue_next_event_rc(lp); if(bf->c6)
codes_issue_next_event_rc(lp);
} }
else if(m->fwd.found_match < 0) else if(m->fwd.found_match < 0)
{ {
...@@ -1303,12 +1321,14 @@ static void codes_exec_mpi_recv( ...@@ -1303,12 +1321,14 @@ static void codes_exec_mpi_recv(
/* for mpi irecvs, this is a non-blocking receive so just post it and move on with the trace read. */ /* for mpi irecvs, this is a non-blocking receive so just post it and move on with the trace read. */
if(mpi_op->op_type == CODES_WK_IRECV) if(mpi_op->op_type == CODES_WK_IRECV)
{ {
bf->c6 = 1;
codes_issue_next_event(lp); codes_issue_next_event(lp);
return; return;
} }
} }
else else
{ {
bf->c6 = 1;
m->fwd.found_match = found_matching_sends; m->fwd.found_match = found_matching_sends;
codes_issue_next_event(lp); codes_issue_next_event(lp);
} }
...@@ -1363,6 +1383,7 @@ static void codes_exec_mpi_send(nw_state* s, ...@@ -1363,6 +1383,7 @@ static void codes_exec_mpi_send(nw_state* s,
bf->c1 = 0; bf->c1 = 0;
bf->c4 = 0; bf->c4 = 0;
int is_eager = 0;
/* model-net event */ /* model-net event */
int global_dest_rank = mpi_op->u.send.dest_rank; int global_dest_rank = mpi_op->u.send.dest_rank;
...@@ -1415,16 +1436,18 @@ static void codes_exec_mpi_send(nw_state* s, ...@@ -1415,16 +1436,18 @@ static void codes_exec_mpi_send(nw_state* s,
local_m.op_type = mpi_op->op_type; local_m.op_type = mpi_op->op_type;
local_m.msg_type = MPI_SEND_POSTED; local_m.msg_type = MPI_SEND_POSTED;
local_m.fwd.tag = mpi_op->u.send.tag; local_m.fwd.tag = mpi_op->u.send.tag;
local_m.fwd.rend_send = 0;
local_m.fwd.num_bytes = mpi_op->u.send.num_bytes; local_m.fwd.num_bytes = mpi_op->u.send.num_bytes;
local_m.fwd.req_id = mpi_op->u.send.req_id; local_m.fwd.req_id = mpi_op->u.send.req_id;
local_m.fwd.app_id = s->app_id; local_m.fwd.app_id = s->app_id;
local_m.fwd.matched_req = m->fwd.matched_req;
if(mpi_op->u.send.num_bytes < EAGER_THRESHOLD) if(mpi_op->u.send.num_bytes < EAGER_THRESHOLD)
{ {
/* directly issue a model-net send */ /* directly issue a model-net send */
bf->c15 = 1; bf->c15 = 1;
is_eager = 1;
s->num_sends++; s->num_sends++;
tw_stime copy_overhead = copy_per_byte_eager * mpi_op->u.send.num_bytes; tw_stime copy_overhead = copy_per_byte_eager * mpi_op->u.send.num_bytes;
local_m.fwd.sim_start_time = tw_now(lp); local_m.fwd.sim_start_time = tw_now(lp);
...@@ -1460,6 +1483,7 @@ static void codes_exec_mpi_send(nw_state* s, ...@@ -1460,6 +1483,7 @@ static void codes_exec_mpi_send(nw_state* s,
/* initiate the actual data transfer, local completion message is sent /* initiate the actual data transfer, local completion message is sent
* for any blocking sends. */ * for any blocking sends. */
local_m.fwd.sim_start_time = mpi_op->sim_start_time; local_m.fwd.sim_start_time = mpi_op->sim_start_time;
local_m.fwd.rend_send = 1;
remote_m = local_m; remote_m = local_m;
remote_m.msg_type = MPI_REND_ARRIVED; remote_m.msg_type = MPI_REND_ARRIVED;
...@@ -1479,7 +1503,7 @@ static void codes_exec_mpi_send(nw_state* s, ...@@ -1479,7 +1503,7 @@ static void codes_exec_mpi_send(nw_state* s,
tw_now(lp), s->app_id, LLU(s->nw_id), global_dest_rank, mpi_op->u.send.tag, mpi_op->u.send.num_bytes); tw_now(lp), s->app_id, LLU(s->nw_id), global_dest_rank, mpi_op->u.send.tag, mpi_op->u.send.num_bytes);
} }
/* isend executed, now get next MPI operation from the queue */ /* isend executed, now get next MPI operation from the queue */
if(mpi_op->op_type == CODES_WK_ISEND && !is_rend) if(mpi_op->op_type == CODES_WK_ISEND && (!is_rend || is_eager))
{ {
bf->c4 = 1; bf->c4 = 1;
codes_issue_next_event(lp); codes_issue_next_event(lp);
...@@ -1556,6 +1580,7 @@ static void update_completed_queue(nw_state* s, ...@@ -1556,6 +1580,7 @@ static void update_completed_queue(nw_state* s,
struct pending_waits* wait_elem = s->wait_op; struct pending_waits* wait_elem = s->wait_op;
rc_stack_push(lp, wait_elem, free, s->processed_wait_op); rc_stack_push(lp, wait_elem, free, s->processed_wait_op);
s->wait_op = NULL; s->wait_op = NULL;
codes_issue_next_event(lp); codes_issue_next_event(lp);
} }
} }
...@@ -1567,7 +1592,7 @@ static void send_ack_back_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp ...@@ -1567,7 +1592,7 @@ static void send_ack_back_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp
/* Send an ack back to the sender */ /* Send an ack back to the sender */
model_net_event_rc2(lp, &m->event_rc); model_net_event_rc2(lp, &m->event_rc);
} }
static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, mpi_msgs_queue * mpi_op) static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, mpi_msgs_queue * mpi_op, int matched_req)
{ {
(void)bf; (void)bf;
...@@ -1589,6 +1614,7 @@ static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, m ...@@ -1589,6 +1614,7 @@ static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, m
remote_m.fwd.tag = mpi_op->tag; remote_m.fwd.tag = mpi_op->tag;
remote_m.fwd.num_bytes = mpi_op->num_bytes; remote_m.fwd.num_bytes = mpi_op->num_bytes;
remote_m.fwd.req_id = mpi_op->req_id; remote_m.fwd.req_id = mpi_op->req_id;
remote_m.fwd.matched_req = matched_req;
m->event_rc = model_net_event_mctx(net_id, &group_ratio, &group_ratio, m->event_rc = model_net_event_mctx(net_id, &group_ratio, &group_ratio,
"test", dest_rank, CONTROL_MSG_SZ, (self_overhead + soft_delay_mpi + nic_delay), "test", dest_rank, CONTROL_MSG_SZ, (self_overhead + soft_delay_mpi + nic_delay),
...@@ -1633,9 +1659,9 @@ static void update_arrival_queue_rc(nw_state* s, ...@@ -1633,9 +1659,9 @@ static void update_arrival_queue_rc(nw_state* s,
index++; index++;
} }
} }
if(qi->op_type == CODES_WK_IRECV) if(bf->c9)
update_completed_queue_rc(s, bf, m, lp); update_completed_queue_rc(s, bf, m, lp);
else if(qi->op_type == CODES_WK_RECV) else if(bf->c8)
codes_issue_next_event_rc(lp); codes_issue_next_event_rc(lp);
} }
else if(m->fwd.found_match < 0) else if(m->fwd.found_match < 0)
...@@ -1914,6 +1940,21 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp) ...@@ -1914,6 +1940,21 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
m_callback->msg_type = MPI_SEND_ARRIVED_CB; m_callback->msg_type = MPI_SEND_ARRIVED_CB;
m_callback->fwd.msg_send_time = tw_now(lp) - m->fwd.sim_start_time; m_callback->fwd.msg_send_time = tw_now(lp) - m->fwd.sim_start_time;
tw_event_send(e_callback); tw_event_send(e_callback);
/* request id pending completion */
if(m->fwd.matched_req >= 0)
{
bf->c8 = 1;
update_completed_queue(s, bf, m, lp, m->fwd.matched_req);
}
else /* blocking receive pending completion*/
{
bf->c10 = 1;
codes_issue_next_event(lp);
}
m->rc.saved_recv_time = s->recv_time;
s->recv_time += (tw_now(lp) - m->fwd.sim_start_time);
} }
break; break;
...@@ -1941,19 +1982,27 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp) ...@@ -1941,19 +1982,27 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
case MPI_SEND_POSTED: case MPI_SEND_POSTED:
{ {
if(m->op_type == CODES_WK_SEND) int is_eager = 0;
if(m->fwd.num_bytes < EAGER_THRESHOLD)
is_eager = 1;
if(m->op_type == CODES_WK_SEND && (is_eager == 1 || m->fwd.rend_send == 1))
{
bf->c29 = 1;
codes_issue_next_event(lp); codes_issue_next_event(lp);
}
else else
if(m->op_type == CODES_WK_ISEND) if(m->op_type == CODES_WK_ISEND && (is_eager == 1 || m->fwd.rend_send == 1))
{ {
//tw_output(lp, "\n isend req id %llu ", m->fwd.req_id); //tw_output(lp, "\n isend req id %llu ", m->fwd.req_id);
update_completed_queue(s, bf, m, lp, m->fwd.req_id); bf->c28 = 1;
update_completed_queue(s, bf, m, lp, m->fwd.req_id);
} }
else
tw_error(TW_LOC, "\n Invalid op type ");
} }
break; break;
case MPI_OP_GET_NEXT:
case MPI_OP_GET_NEXT:
get_next_mpi_operation(s, bf, m, lp); get_next_mpi_operation(s, bf, m, lp);
break; break;
...@@ -2325,9 +2374,9 @@ void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * l ...@@ -2325,9 +2374,9 @@ void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * l
case MPI_SEND_POSTED: case MPI_SEND_POSTED:
{ {
if(m->op_type == CODES_WK_SEND) if(bf->c29)
codes_issue_next_event_rc(lp); codes_issue_next_event_rc(lp);
else if(m->op_type == CODES_WK_ISEND) else if(bf->c28)
update_completed_queue_rc(s, bf, m, lp); update_completed_queue_rc(s, bf, m, lp);
} }
break; break;
...@@ -2339,7 +2388,17 @@ void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * l ...@@ -2339,7 +2388,17 @@ void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * l
break; break;
case MPI_REND_ARRIVED: case MPI_REND_ARRIVED:
{
codes_local_latency_reverse(lp); codes_local_latency_reverse(lp);
if(bf->c10)
codes_issue_next_event_rc(lp);
if(bf->c8)
update_completed_queue_rc(s, bf, m, lp);
s->recv_time = m->rc.saved_recv_time;
}
break; break;
case MPI_OP_GET_NEXT: case MPI_OP_GET_NEXT:
......
...@@ -2166,7 +2166,7 @@ void dragonfly_custom_router_final(router_state * s, ...@@ -2166,7 +2166,7 @@ void dragonfly_custom_router_final(router_state * s,
written = 0; written = 0;
if(!s->router_id) if(!s->router_id)
{ {
written = sprintf(s->output_buf, "# Format <LP ID> <Group ID> <Router ID> <Busy time per router port(s)>"); written = sprintf(s->output_buf, "# Format <LP ID> <Group ID> <Router ID> <Link Traffic per router port(s)>");
written += sprintf(s->output_buf + written, "# Router ports in the order: %d green links, %d black links %d global channels \n", written += sprintf(s->output_buf + written, "# Router ports in the order: %d green links, %d black links %d global channels \n",
p->num_router_cols * p->num_row_chans, p->num_router_rows * p->num_col_chans, p->num_global_channels); p->num_router_cols * p->num_row_chans, p->num_router_rows * p->num_col_chans, p->num_global_channels);
} }
......
...@@ -422,6 +422,7 @@ int handleDUMPIISend(const dumpi_isend *prm, uint16_t thread, const dumpi_time * ...@@ -422,6 +422,7 @@ int handleDUMPIISend(const dumpi_isend *prm, uint16_t thread, const dumpi_time *
wrkld_per_rank.u.send.count = prm->count; wrkld_per_rank.u.send.count = prm->count;
wrkld_per_rank.u.send.data_type = prm->datatype; wrkld_per_rank.u.send.data_type = prm->datatype;
wrkld_per_rank.u.send.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype); wrkld_per_rank.u.send.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
assert(wrkld_per_rank.u.send.num_bytes >= 0); assert(wrkld_per_rank.u.send.num_bytes >= 0);
wrkld_per_rank.u.send.req_id = prm->request; wrkld_per_rank.u.send.req_id = prm->request;
wrkld_per_rank.u.send.dest_rank = prm->dest; wrkld_per_rank.u.send.dest_rank = prm->dest;
...@@ -506,6 +507,7 @@ int handleDUMPIRecv(const dumpi_recv *prm, uint16_t thread, ...@@ -506,6 +507,7 @@ int handleDUMPIRecv(const dumpi_recv *prm, uint16_t thread,
wrkld_per_rank.u.recv.data_type = prm->datatype; wrkld_per_rank.u.recv.data_type = prm->datatype;
wrkld_per_rank.u.recv.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype); wrkld_per_rank.u.recv.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
assert(wrkld_per_rank.u.recv.num_bytes >= 0); assert(wrkld_per_rank.u.recv.num_bytes >= 0);
wrkld_per_rank.u.recv.req_id = -1;
wrkld_per_rank.u.recv.source_rank = prm->source; wrkld_per_rank.u.recv.source_rank = prm->source;
wrkld_per_rank.u.recv.dest_rank = -1; wrkld_per_rank.u.recv.dest_rank = -1;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment