Commit 8bb120e6 authored by Misbah Mubarak's avatar Misbah Mubarak
Browse files

Fixing some type conversion and message type issues

parent aa1d0d73
...@@ -135,40 +135,27 @@ struct nw_state ...@@ -135,40 +135,27 @@ struct nw_state
* network event being sent. op is the MPI operation issued by the network workloads API. rv_data holds the data for reverse computation (TODO: Fill this data structure only when the simulation runs in optimistic mode). */ * network event being sent. op is the MPI operation issued by the network workloads API. rv_data holds the data for reverse computation (TODO: Fill this data structure only when the simulation runs in optimistic mode). */
struct nw_message struct nw_message
{ {
int msg_type; int msg_type;
int op_type;
struct tw_lpid src_rank;
{ tw_lpid dest_rank;
/* forward event handler */ int num_bytes;
struct int data_type;
{ double sim_start_time;
int op_type; // for callbacks - time message was received
tw_lpid src_rank; double msg_send_time;
tw_lpid dest_rank; int16_t req_id;
int num_bytes; int tag;
int data_type; dumpi_req_id saved_matched_req;
double sim_start_time; struct codes_workload_op* ptr_match_op;
// for callbacks - time message was received int found_match;
double msg_send_time; short matched_op;
int16_t req_id; struct codes_workload_op* saved_op;
int tag; struct pending_waits* saved_pending_wait;
} msg_info;
double saved_send_time;
/* required for reverse computation*/ double saved_recv_time;
struct double saved_wait_time;
{
int found_match;
short matched_op;
dumpi_req_id saved_matched_req;
struct codes_workload_op* ptr_match_op;
struct codes_workload_op* saved_op;
struct pending_waits* saved_pending_wait;
double saved_send_time;
double saved_recv_time;
double saved_wait_time;
} rc;
} u;
}; };
/* executes MPI wait operation */ /* executes MPI wait operation */
...@@ -289,11 +276,11 @@ static void printQueue(tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue, char* msg ...@@ -289,11 +276,11 @@ static void printQueue(tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue, char* msg
while(tmp) while(tmp)
{ {
if(tmp->mpi_op->op_type == CODES_WK_SEND || tmp->mpi_op->op_type == CODES_WK_ISEND) if(tmp->mpi_op->op_type == CODES_WK_SEND || tmp->mpi_op->op_type == CODES_WK_ISEND)
printf("\n lpid %ld send operation data type %d count %d tag %d source %d", printf("\n lpid %llu send operation data type %d count %d tag %d source %d",
lpid, tmp->mpi_op->u.send.data_type, tmp->mpi_op->u.send.count, lpid, tmp->mpi_op->u.send.data_type, tmp->mpi_op->u.send.count,
tmp->mpi_op->u.send.tag, tmp->mpi_op->u.send.source_rank); tmp->mpi_op->u.send.tag, tmp->mpi_op->u.send.source_rank);
else if(tmp->mpi_op->op_type == CODES_WK_IRECV || tmp->mpi_op->op_type == CODES_WK_RECV) else if(tmp->mpi_op->op_type == CODES_WK_IRECV || tmp->mpi_op->op_type == CODES_WK_RECV)
printf("\n lpid %ld recv operation data type %d count %d tag %d source %d", printf("\n lpid %llu recv operation data type %d count %d tag %d source %d",
lpid, tmp->mpi_op->u.recv.data_type, tmp->mpi_op->u.recv.count, lpid, tmp->mpi_op->u.recv.data_type, tmp->mpi_op->u.recv.count,
tmp->mpi_op->u.recv.tag, tmp->mpi_op->u.recv.source_rank ); tmp->mpi_op->u.recv.tag, tmp->mpi_op->u.recv.source_rank );
else else
...@@ -375,18 +362,18 @@ static void notify_waits_rc(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, du ...@@ -375,18 +362,18 @@ static void notify_waits_rc(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, du
printCompletedQueue(s, lp); printCompletedQueue(s, lp);
if(m->u.rc.matched_op == 1) if(m->matched_op == 1)
s->pending_waits->num_completed--; s->pending_waits->num_completed--;
/* if a wait-elem exists, it means the request ID has been matched*/ /* if a wait-elem exists, it means the request ID has been matched*/
if(m->u.rc.matched_op == 2) if(m->matched_op == 2)
{ {
if(s->nw_id == TRACE) if(s->nw_id == TRACE)
{ {
printf("\n %lf matched req id %d ", tw_now(lp), completed_req); printf("\n %lf matched req id %d ", tw_now(lp), completed_req);
printCompletedQueue(s, lp); printCompletedQueue(s, lp);
} }
struct pending_waits* wait_elem = m->u.rc.saved_pending_wait; struct pending_waits* wait_elem = m->saved_pending_wait;
s->wait_time = m->u.rc.saved_wait_time; s->wait_time = m->saved_wait_time;
int count = wait_elem->mpi_op->u.waits.count; int count = wait_elem->mpi_op->u.waits.count;
for( i = 0; i < count; i++ ) for( i = 0; i < count; i++ )
...@@ -407,7 +394,7 @@ static int notify_waits(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_ ...@@ -407,7 +394,7 @@ static int notify_waits(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_
then the network node LP can go on with fetching the next operation from the log. then the network node LP can go on with fetching the next operation from the log.
If its waitall then wait for all pending requests to complete and then proceed. */ If its waitall then wait for all pending requests to complete and then proceed. */
struct pending_waits* wait_elem = s->pending_waits; struct pending_waits* wait_elem = s->pending_waits;
m->u.rc.matched_op = 0; m->matched_op = 0;
if(s->nw_id == TRACE) if(s->nw_id == TRACE)
printf("\n %lf notify waits req id %d ", tw_now(lp), completed_req); printf("\n %lf notify waits req id %d ", tw_now(lp), completed_req);
...@@ -421,11 +408,11 @@ static int notify_waits(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_ ...@@ -421,11 +408,11 @@ static int notify_waits(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_
{ {
if(wait_elem->mpi_op->u.wait.req_id == completed_req) if(wait_elem->mpi_op->u.wait.req_id == completed_req)
{ {
m->u.rc.saved_wait_time = s->wait_time; m->saved_wait_time = s->wait_time;
s->wait_time += (tw_now(lp) - wait_elem->start_time); s->wait_time += (tw_now(lp) - wait_elem->start_time);
remove_req_id(&s->completed_reqs, completed_req); remove_req_id(&s->completed_reqs, completed_req);
m->u.rc.saved_pending_wait = wait_elem; m->saved_pending_wait = wait_elem;
s->pending_waits = NULL; s->pending_waits = NULL;
codes_issue_next_event(lp); codes_issue_next_event(lp);
return 0; return 0;
...@@ -440,7 +427,7 @@ static int notify_waits(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_ ...@@ -440,7 +427,7 @@ static int notify_waits(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_
{ {
if(s->nw_id == TRACE) if(s->nw_id == TRACE)
printCompletedQueue(s, lp); printCompletedQueue(s, lp);
m->u.rc.matched_op = 1; m->matched_op = 1;
wait_elem->num_completed++; wait_elem->num_completed++;
} }
} }
...@@ -452,10 +439,10 @@ static int notify_waits(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_ ...@@ -452,10 +439,10 @@ static int notify_waits(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_
printf("\n %lf req %d completed %d", tw_now(lp), completed_req, wait_elem->num_completed); printf("\n %lf req %d completed %d", tw_now(lp), completed_req, wait_elem->num_completed);
printCompletedQueue(s, lp); printCompletedQueue(s, lp);
} }
m->u.rc.matched_op = 2; m->matched_op = 2;
m->u.rc.saved_wait_time = s->wait_time; m->saved_wait_time = s->wait_time;
s->wait_time += (tw_now(lp) - wait_elem->start_time); s->wait_time += (tw_now(lp) - wait_elem->start_time);
m->u.rc.saved_pending_wait = wait_elem; m->saved_pending_wait = wait_elem;
s->pending_waits = NULL; s->pending_waits = NULL;
for(i = 0; i < required_count; i++) for(i = 0; i < required_count; i++)
...@@ -494,7 +481,7 @@ static void codes_exec_mpi_wait(nw_state* s, tw_lp* lp, nw_message * m, struct c ...@@ -494,7 +481,7 @@ static void codes_exec_mpi_wait(nw_state* s, tw_lp* lp, nw_message * m, struct c
while(current) { while(current) {
if(current->req_id == req_id) { if(current->req_id == req_id) {
remove_req_id(&s->completed_reqs, req_id); remove_req_id(&s->completed_reqs, req_id);
m->u.rc.saved_wait_time = s->wait_time; m->saved_wait_time = s->wait_time;
codes_issue_next_event(lp); codes_issue_next_event(lp);
return; return;
} }
...@@ -515,10 +502,10 @@ static void codes_exec_mpi_wait_all_rc(nw_state* s, nw_message* m, tw_lp* lp, st ...@@ -515,10 +502,10 @@ static void codes_exec_mpi_wait_all_rc(nw_state* s, nw_message* m, tw_lp* lp, st
{ {
if(s->nw_id == TRACE) if(s->nw_id == TRACE)
{ {
printf("\n %lf codes exec mpi waitall reverse %d ", tw_now(lp), m->u.rc.found_match); printf("\n %lf codes exec mpi waitall reverse %d ", tw_now(lp), m->found_match);
printCompletedQueue(s, lp); printCompletedQueue(s, lp);
} }
if(m->u.rc.found_match) if(m->found_match)
{ {
int i; int i;
int count = mpi_op->u.waits.count; int count = mpi_op->u.waits.count;
...@@ -533,8 +520,6 @@ static void codes_exec_mpi_wait_all_rc(nw_state* s, nw_message* m, tw_lp* lp, st ...@@ -533,8 +520,6 @@ static void codes_exec_mpi_wait_all_rc(nw_state* s, nw_message* m, tw_lp* lp, st
} }
else else
{ {
struct pending_waits* wait_op = s->pending_waits;
//rc_stack_pop(s->st);
s->pending_waits = NULL; s->pending_waits = NULL;
assert(!s->pending_waits); assert(!s->pending_waits);
if(lp->gid == TRACE) if(lp->gid == TRACE)
...@@ -572,10 +557,10 @@ static void codes_exec_mpi_wait_all( ...@@ -572,10 +557,10 @@ static void codes_exec_mpi_wait_all(
if(TRACE== lp->gid) if(TRACE== lp->gid)
printf("\n %lf Num completed %d count %d ", tw_now(lp), num_completed, count); printf("\n %lf Num completed %d count %d ", tw_now(lp), num_completed, count);
m->u.rc.found_match = 0; m->found_match = 0;
if(count == num_completed) if(count == num_completed)
{ {
m->u.rc.found_match = 1; m->found_match = 1;
for( i = 0; i < count; i++) for( i = 0; i < count; i++)
remove_req_id(&s->completed_reqs, req_id[i]); remove_req_id(&s->completed_reqs, req_id[i]);
...@@ -737,17 +722,17 @@ static int mpi_queue_remove_matching_op(nw_state* s, tw_lp* lp, nw_message * m, ...@@ -737,17 +722,17 @@ static int mpi_queue_remove_matching_op(nw_state* s, tw_lp* lp, nw_message * m,
if(mpi_op->op_type == CODES_WK_SEND || mpi_op->op_type == CODES_WK_ISEND) if(mpi_op->op_type == CODES_WK_SEND || mpi_op->op_type == CODES_WK_ISEND)
{ {
rcv_val = match_receive(s, lp, lp->gid, tmp->mpi_op, mpi_op); rcv_val = match_receive(s, lp, lp->gid, tmp->mpi_op, mpi_op);
m->u.rc.saved_matched_req = tmp->mpi_op->u.recv.req_id; m->saved_matched_req = tmp->mpi_op->u.recv.req_id;
} }
else if(mpi_op->op_type == CODES_WK_RECV || mpi_op->op_type == CODES_WK_IRECV) else if(mpi_op->op_type == CODES_WK_RECV || mpi_op->op_type == CODES_WK_IRECV)
{ {
rcv_val = match_receive(s, lp, lp->gid, mpi_op, tmp->mpi_op); rcv_val = match_receive(s, lp, lp->gid, mpi_op, tmp->mpi_op);
m->u.rc.saved_matched_req = mpi_op->u.recv.req_id; m->saved_matched_req = mpi_op->u.recv.req_id;
} }
if(rcv_val >= 0) if(rcv_val >= 0)
{ {
/* TODO: fix RC */ /* TODO: fix RC */
m->u.rc.ptr_match_op = tmp->mpi_op; m->ptr_match_op = tmp->mpi_op;
if(mpi_queue->queue_head == mpi_queue->queue_tail) if(mpi_queue->queue_head == mpi_queue->queue_tail)
{ {
mpi_queue->queue_tail = NULL; mpi_queue->queue_tail = NULL;
...@@ -774,16 +759,16 @@ static int mpi_queue_remove_matching_op(nw_state* s, tw_lp* lp, nw_message * m, ...@@ -774,16 +759,16 @@ static int mpi_queue_remove_matching_op(nw_state* s, tw_lp* lp, nw_message * m,
if(mpi_op->op_type == CODES_WK_SEND || mpi_op->op_type == CODES_WK_ISEND) if(mpi_op->op_type == CODES_WK_SEND || mpi_op->op_type == CODES_WK_ISEND)
{ {
rcv_val = match_receive(s, lp, lp->gid, elem->mpi_op, mpi_op); rcv_val = match_receive(s, lp, lp->gid, elem->mpi_op, mpi_op);
m->u.rc.saved_matched_req = elem->mpi_op->u.recv.req_id; m->saved_matched_req = elem->mpi_op->u.recv.req_id;
} }
else if(mpi_op->op_type == CODES_WK_RECV || mpi_op->op_type == CODES_WK_IRECV) else if(mpi_op->op_type == CODES_WK_RECV || mpi_op->op_type == CODES_WK_IRECV)
{ {
rcv_val = match_receive(s, lp, lp->gid, mpi_op, elem->mpi_op); rcv_val = match_receive(s, lp, lp->gid, mpi_op, elem->mpi_op);
m->u.rc.saved_matched_req = mpi_op->u.recv.req_id; m->saved_matched_req = mpi_op->u.recv.req_id;
} }
if(rcv_val >= 0) if(rcv_val >= 0)
{ {
m->u.rc.ptr_match_op = elem->mpi_op; m->ptr_match_op = elem->mpi_op;
if(elem == mpi_queue->queue_tail) if(elem == mpi_queue->queue_tail)
mpi_queue->queue_tail = tmp; mpi_queue->queue_tail = tmp;
...@@ -837,15 +822,15 @@ static void codes_exec_comp_delay( ...@@ -837,15 +822,15 @@ static void codes_exec_comp_delay(
static void codes_exec_mpi_recv_rc(nw_state* s, nw_message* m, tw_lp* lp, struct codes_workload_op * mpi_op) static void codes_exec_mpi_recv_rc(nw_state* s, nw_message* m, tw_lp* lp, struct codes_workload_op * mpi_op)
{ {
num_bytes_recvd -= mpi_op->u.recv.num_bytes; num_bytes_recvd -= mpi_op->u.recv.num_bytes;
s->recv_time = m->u.rc.saved_recv_time; s->recv_time = m->saved_recv_time;
if(m->u.rc.found_match >= 0) if(m->found_match >= 0)
{ {
s->recv_time = m->u.rc.saved_recv_time; s->recv_time = m->saved_recv_time;
mpi_queue_update(s->arrival_queue, m->u.rc.ptr_match_op, m->u.rc.found_match); mpi_queue_update(s->arrival_queue, m->ptr_match_op, m->found_match);
remove_req_id(&s->completed_reqs, mpi_op->u.recv.req_id); remove_req_id(&s->completed_reqs, mpi_op->u.recv.req_id);
tw_rand_reverse_unif(lp->rng); tw_rand_reverse_unif(lp->rng);
} }
else if(m->u.rc.found_match < 0) else if(m->found_match < 0)
{ {
mpi_queue_remove_tail(lp->gid, s->pending_recvs_queue); mpi_queue_remove_tail(lp->gid, s->pending_recvs_queue);
if(mpi_op->op_type == CODES_WK_IRECV) if(mpi_op->op_type == CODES_WK_IRECV)
...@@ -860,14 +845,13 @@ static void codes_exec_mpi_recv(nw_state* s, tw_lp* lp, nw_message * m, struct c ...@@ -860,14 +845,13 @@ static void codes_exec_mpi_recv(nw_state* s, tw_lp* lp, nw_message * m, struct c
If no matching isend is found, the receive operation is queued in the pending queue of If no matching isend is found, the receive operation is queued in the pending queue of
receive operations. */ receive operations. */
m->u.rc.saved_recv_time = s->recv_time; m->saved_recv_time = s->recv_time;
mpi_op->sim_start_time = tw_now(lp); mpi_op->sim_start_time = tw_now(lp);
num_bytes_recvd += mpi_op->u.recv.num_bytes; num_bytes_recvd += mpi_op->u.recv.num_bytes;
if(lp->gid == TRACE) if(lp->gid == TRACE)
printf("\n %lf codes exec mpi recv req id %d", tw_now(lp), (int)mpi_op->u.recv.req_id); printf("\n %lf codes exec mpi recv req id %d", tw_now(lp), (int)mpi_op->u.recv.req_id);
dumpi_req_id req_id;
int found_matching_sends = mpi_queue_remove_matching_op(s, lp, m, s->arrival_queue, mpi_op); int found_matching_sends = mpi_queue_remove_matching_op(s, lp, m, s->arrival_queue, mpi_op);
/* save the req id inserted in the completed queue for reverse computation. */ /* save the req id inserted in the completed queue for reverse computation. */
...@@ -875,7 +859,7 @@ static void codes_exec_mpi_recv(nw_state* s, tw_lp* lp, nw_message * m, struct c ...@@ -875,7 +859,7 @@ static void codes_exec_mpi_recv(nw_state* s, tw_lp* lp, nw_message * m, struct c
if(found_matching_sends < 0) if(found_matching_sends < 0)
{ {
m->u.rc.found_match = -1; m->found_match = -1;
mpi_pending_queue_insert_op(s->pending_recvs_queue, mpi_op); mpi_pending_queue_insert_op(s->pending_recvs_queue, mpi_op);
/* for mpi irecvs, this is a non-blocking receive so just post it and move on with the trace read. */ /* for mpi irecvs, this is a non-blocking receive so just post it and move on with the trace read. */
...@@ -889,7 +873,7 @@ static void codes_exec_mpi_recv(nw_state* s, tw_lp* lp, nw_message * m, struct c ...@@ -889,7 +873,7 @@ static void codes_exec_mpi_recv(nw_state* s, tw_lp* lp, nw_message * m, struct c
} }
else else
{ {
m->u.rc.found_match = found_matching_sends; m->found_match = found_matching_sends;
codes_issue_next_event(lp); codes_issue_next_event(lp);
} }
} }
...@@ -924,14 +908,14 @@ static void codes_exec_mpi_send(nw_state* s, tw_lp* lp, struct codes_workload_op ...@@ -924,14 +908,14 @@ static void codes_exec_mpi_send(nw_state* s, tw_lp* lp, struct codes_workload_op
nw_message local_m; nw_message local_m;
nw_message remote_m; nw_message remote_m;
local_m.u.msg_info.sim_start_time = tw_now(lp); local_m.sim_start_time = tw_now(lp);
local_m.u.msg_info.dest_rank = mpi_op->u.send.dest_rank; local_m.dest_rank = mpi_op->u.send.dest_rank;
local_m.u.msg_info.src_rank = mpi_op->u.send.source_rank; local_m.src_rank = mpi_op->u.send.source_rank;
local_m.u.msg_info.op_type = mpi_op->op_type; local_m.op_type = mpi_op->op_type;
local_m.msg_type = MPI_SEND_POSTED; local_m.msg_type = MPI_SEND_POSTED;
local_m.u.msg_info.tag = mpi_op->u.send.tag; local_m.tag = mpi_op->u.send.tag;
local_m.u.msg_info.num_bytes = mpi_op->u.send.num_bytes; local_m.num_bytes = mpi_op->u.send.num_bytes;
local_m.u.msg_info.req_id = mpi_op->u.send.req_id; local_m.req_id = mpi_op->u.send.req_id;
remote_m = local_m; remote_m = local_m;
remote_m.msg_type = MPI_SEND_ARRIVED; remote_m.msg_type = MPI_SEND_ARRIVED;
...@@ -961,13 +945,13 @@ static tw_stime s_to_ns(tw_stime ns) ...@@ -961,13 +945,13 @@ static tw_stime s_to_ns(tw_stime ns)
static void update_send_completion_queue_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp) static void update_send_completion_queue_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{ {
//mpi_queue_remove_matching_op(&s->completed_isend_queue_head, &s->completed_isend_queue_tail, &m->op, SEND); //mpi_queue_remove_matching_op(&s->completed_isend_queue_head, &s->completed_isend_queue_tail, &m->op, SEND);
if(m->u.msg_info.op_type == CODES_WK_SEND) if(m->op_type == CODES_WK_SEND)
tw_rand_reverse_unif(lp->rng); tw_rand_reverse_unif(lp->rng);
if(m->u.msg_info.op_type == CODES_WK_ISEND) if(m->op_type == CODES_WK_ISEND)
{ {
notify_waits_rc(s, bf, lp, m, m->u.msg_info.req_id); notify_waits_rc(s, bf, lp, m, m->req_id);
remove_req_id(&s->completed_reqs, m->u.msg_info.req_id); remove_req_id(&s->completed_reqs, m->req_id);
} }
} }
...@@ -975,15 +959,15 @@ static void update_send_completion_queue_rc(nw_state* s, tw_bf * bf, nw_message ...@@ -975,15 +959,15 @@ static void update_send_completion_queue_rc(nw_state* s, tw_bf * bf, nw_message
static void update_send_completion_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp) static void update_send_completion_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{ {
if(TRACE == lp->gid) if(TRACE == lp->gid)
printf("\n %lf isend operation completed req id %d ", tw_now(lp), m->u.msg_info.req_id); printf("\n %lf isend operation completed req id %d ", tw_now(lp), m->req_id);
if(m->u.msg_info.op_type == CODES_WK_ISEND) if(m->op_type == CODES_WK_ISEND)
{ {
mpi_completed_queue_insert_op(&s->completed_reqs, m->u.msg_info.req_id); mpi_completed_queue_insert_op(&s->completed_reqs, m->req_id);
notify_waits(s, bf, lp, m, m->u.msg_info.req_id); notify_waits(s, bf, lp, m, m->req_id);
} }
/* blocking send operation */ /* blocking send operation */
if(m->u.msg_info.op_type == CODES_WK_SEND) if(m->op_type == CODES_WK_SEND)
codes_issue_next_event(lp); codes_issue_next_event(lp);
return; return;
...@@ -992,26 +976,25 @@ static void update_send_completion_queue(nw_state* s, tw_bf * bf, nw_message * m ...@@ -992,26 +976,25 @@ static void update_send_completion_queue(nw_state* s, tw_bf * bf, nw_message * m
/* reverse handler for updating arrival queue function */ /* reverse handler for updating arrival queue function */
static void update_arrival_queue_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp) static void update_arrival_queue_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{ {
s->recv_time = m->u.rc.saved_recv_time; s->recv_time = m->saved_recv_time;
codes_local_latency_reverse(lp); codes_local_latency_reverse(lp);
//rc_stack_pop(s->st); //rc_stack_pop(s->st);
if(m->u.rc.found_match >= 0) if(m->found_match >= 0)
{ {
// TODO: Modify for recvs // TODO: Modify for recvs
if(lp->gid == TRACE) if(lp->gid == TRACE)
printf("\n %lf reverse-- update arrival queue req ID %d", tw_now(lp), (int) m->u.rc.saved_matched_req); printf("\n %lf reverse-- update arrival queue req ID %d", tw_now(lp), (int) m->saved_matched_req);
dumpi_req_id req_id = m->u.rc.saved_matched_req; notify_waits_rc(s, bf, lp, m, m->saved_matched_req);
notify_waits_rc(s, bf, lp, m, m->u.rc.saved_matched_req);
//int count = numQueue(s->pending_recvs_queue); //int count = numQueue(s->pending_recvs_queue);
mpi_queue_update(s->pending_recvs_queue, m->u.rc.ptr_match_op, m->u.rc.found_match); mpi_queue_update(s->pending_recvs_queue, m->ptr_match_op, m->found_match);
remove_req_id(&s->completed_reqs, m->u.rc.saved_matched_req); remove_req_id(&s->completed_reqs, m->saved_matched_req);
/*if(lp->gid == TRACE) /*if(lp->gid == TRACE)
printf("\n Reverse: after adding pending recvs queue %d ", s->pending_recvs_queue->num_elems);*/ printf("\n Reverse: after adding pending recvs queue %d ", s->pending_recvs_queue->num_elems);*/
} }
else if(m->u.rc.found_match < 0) else if(m->found_match < 0)
{ {
mpi_queue_remove_tail(lp->gid, s->arrival_queue); mpi_queue_remove_tail(lp->gid, s->arrival_queue);
/*if(lp->gid == TRACE) /*if(lp->gid == TRACE)
...@@ -1023,33 +1006,29 @@ static void update_arrival_queue_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_ ...@@ -1023,33 +1006,29 @@ static void update_arrival_queue_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_
static void update_arrival_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp) static void update_arrival_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{ {
//int count_before = numQueue(s->pending_recvs_queue); //int count_before = numQueue(s->pending_recvs_queue);
int is_blocking = 0; /* checks if the recv operation was blocking or not */ m->saved_recv_time = s->recv_time;
m->u.rc.saved_recv_time = s->recv_time;
// send a callback to the sender to increment times // send a callback to the sender to increment times
tw_event *e_callback = tw_event *e_callback =
tw_event_new(rank_to_lpid(m->u.msg_info.src_rank), tw_event_new(rank_to_lpid(m->src_rank),
codes_local_latency(lp), lp); codes_local_latency(lp), lp);
nw_message *m_callback = tw_event_data(e_callback); nw_message *m_callback = tw_event_data(e_callback);
m_callback->msg_type = MPI_SEND_ARRIVED_CB; m_callback->msg_type = MPI_SEND_ARRIVED_CB;
m_callback->u.msg_info.msg_send_time = tw_now(lp) - m->u.msg_info.sim_start_time; m_callback->msg_send_time = tw_now(lp) - m->sim_start_time;
tw_event_send(e_callback); tw_event_send(e_callback);
/*NOTE: this computes send time with respect to the receiver, not the /*NOTE: this computes send time with respect to the receiver, not the
* sender * sender
* s->send_time += tw_now(lp) - m->u.msg_info.sim_start_time; */ * s->send_time += tw_now(lp) - m->u.msg_info.sim_start_time; */
dumpi_req_id req_id = -1;
/* Now reconstruct the mpi op */ /* Now reconstruct the mpi op */
struct codes_workload_op * arrived_op = (struct codes_workload_op *) malloc(sizeof(struct codes_workload_op)); struct codes_workload_op * arrived_op = (struct codes_workload_op *) malloc(sizeof(struct codes_workload_op));
arrived_op->sim_start_time = m->u.msg_info.sim_start_time; arrived_op->sim_start_time = m->sim_start_time;
arrived_op->op_type = m->u.msg_info.op_type; arrived_op->op_type = m->op_type;
arrived_op->u.send.source_rank = m->u.msg_info.src_rank; arrived_op->u.send.source_rank = m->src_rank;
arrived_op->u.send.dest_rank = m->u.msg_info.dest_rank; arrived_op->u.send.dest_rank = m->dest_rank;
arrived_op->u.send.num_bytes = m->u.msg_info.num_bytes; arrived_op->u.send.num_bytes = m->num_bytes;
arrived_op->u.send.tag = m->u.msg_info.tag; arrived_op->u.send.tag = m->tag;
arrived_op->u.send.req_id = m->u.msg_info.req_id; arrived_op->u.send.req_id = m->req_id;
//rc_stack_push(lp, arrived_op, free, s->st); //rc_stack_push(lp, arrived_op, free, s->st);
int found_matching_recv = mpi_queue_remove_matching_op(s, lp, m, s->pending_recvs_queue, arrived_op); int found_matching_recv = mpi_queue_remove_matching_op(s, lp, m, s->pending_recvs_queue, arrived_op);
...@@ -1058,13 +1037,14 @@ static void update_arrival_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp ...@@ -1058,13 +1037,14 @@ static void update_arrival_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp
printf("\n %lf update arrival queue req id %d %d", tw_now(lp), arrived_op->u.send.req_id, arrived_op->u.send.source_rank); printf("\n %lf update arrival queue req id %d %d", tw_now(lp), arrived_op->u.send.req_id, arrived_op->u.send.source_rank);
if(found_matching_recv < 0) if(found_matching_recv < 0)
{ {
m->u.rc.found_match = -1; m->found_match = -1;
mpi_pending_queue_insert_op(s->arrival_queue, arrived_op);