Commit 1edef080 authored by Misbah Mubarak's avatar Misbah Mubarak

Optimizing the size of the MPI-Sim layer event

parent 45c0bb58
......@@ -129,10 +129,28 @@ struct nw_state
* network event being sent. op is the MPI operation issued by the network workloads API. rv_data holds the data for reverse computation (TODO: Fill this data structure only when the simulation runs in optimistic mode). */
struct nw_message
{
int msg_type;
struct codes_workload_op op;
int msg_type;
/* for reverse computation */
struct codes_workload_op * op;
/* for reverse computation*/
struct
{
/* forward event handler */
struct
{
int op_type;
tw_lpid src_rank;
tw_lpid dest_rank;
int num_bytes;
int data_type;
double sim_start_time;
int16_t req_id;
int tag;
} msg_info;
/* required for reverse computation*/
struct
{
int found_match;
short matched_op;
dumpi_req_id saved_matched_req;
......@@ -142,6 +160,8 @@ struct nw_message
double saved_send_time;
double saved_recv_time;
double saved_wait_time;
} rc;
} u;
};
/* executes MPI wait operation */
......@@ -194,10 +214,10 @@ static void mpi_pending_queue_insert_op(struct mpi_queue_ptrs* mpi_queue, struct
static void remove_req_id(struct completed_requests** requests, int16_t req_id);
/* remove MPI operation from the waiting queue.*/
static int mpi_queue_remove_matching_op(nw_state* s, tw_lp* lp, struct mpi_queue_ptrs* mpi_queue, nw_message* msg);
static int mpi_queue_remove_matching_op(nw_state* s, tw_lp* lp, struct mpi_queue_ptrs* mpi_queue, nw_message * m);
/* remove the tail of the MPI operation from waiting queue */
static int mpi_queue_remove_tail(tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue, struct codes_workload_op* mpi_op);
static int mpi_queue_remove_tail(tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue);
/* insert completed MPI requests in the queue. */
static void mpi_completed_queue_insert_op(struct completed_requests** mpi_completed_queue, dumpi_req_id req_id);
......@@ -338,18 +358,18 @@ static void notify_waits_rc(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, du
if(lp->gid == TRACE)
printf("\n %lf reverse -- notify waits req id %d ", tw_now(lp), completed_req);
printCompletedQueue(s, lp);
if(m->matched_op == 1)
if(m->u.rc.matched_op == 1)
s->pending_waits->num_completed--;
/* if a wait-elem exists, it means the request ID has been matched*/
if(m->matched_op == 2)
if(m->u.rc.matched_op == 2)
{
if(lp->gid == TRACE)
{
printf("\n %lf matched req id %d ", tw_now(lp), completed_req);
printCompletedQueue(s, lp);
}
struct pending_waits* wait_elem = m->saved_pending_wait;
s->wait_time = m->saved_wait_time;
struct pending_waits* wait_elem = m->u.rc.saved_pending_wait;
s->wait_time = m->u.rc.saved_wait_time;
int count = wait_elem->mpi_op->u.waits.count;
for( i = 0; i < count; i++ )
......@@ -371,7 +391,7 @@ static int notify_waits(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_
then the network node LP can go on with fetching the next operation from the log.
If its waitall then wait for all pending requests to complete and then proceed. */
struct pending_waits* wait_elem = s->pending_waits;
m->matched_op = 0;
m->u.rc.matched_op = 0;
if(lp->gid == TRACE)
printf("\n %lf notify waits req id %d ", tw_now(lp), completed_req);
......@@ -385,11 +405,11 @@ static int notify_waits(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_
{
if(wait_elem->mpi_op->u.wait.req_id == completed_req)
{
m->saved_wait_time = s->wait_time;
m->u.rc.saved_wait_time = s->wait_time;
s->wait_time += (tw_now(lp) - wait_elem->start_time);
remove_req_id(&s->completed_reqs, completed_req);
remove_req_id(&s->completed_reqs, completed_req);
m->saved_pending_wait = wait_elem;
m->u.rc.saved_pending_wait = wait_elem;
s->pending_waits = NULL;
codes_issue_next_event(lp);
return 0;
......@@ -405,7 +425,7 @@ static int notify_waits(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_
{
if(lp->gid == TRACE)
printCompletedQueue(s, lp);
m->matched_op = 1;
m->u.rc.matched_op = 1;
wait_elem->num_completed++;
}
}
......@@ -417,10 +437,10 @@ static int notify_waits(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_
printf("\n %lf req %d completed %d", tw_now(lp), completed_req, wait_elem->num_completed);
printCompletedQueue(s, lp);
}
m->matched_op = 2;
m->saved_wait_time = s->wait_time;
m->u.rc.matched_op = 2;
m->u.rc.saved_wait_time = s->wait_time;
s->wait_time += (tw_now(lp) - wait_elem->start_time);
m->saved_pending_wait = wait_elem;
m->u.rc.saved_pending_wait = wait_elem;
s->pending_waits = NULL;
for(i = 0; i < required_count; i++)
remove_req_id(&s->completed_reqs, wait_elem->mpi_op->u.waits.req_ids[i]);
......@@ -440,8 +460,8 @@ static void codes_exec_mpi_wait_rc(nw_state* s, tw_bf* bf, nw_message* m, tw_lp*
}
else
{
s->wait_time = m->saved_wait_time;
mpi_completed_queue_insert_op(&s->completed_reqs, m->op.u.wait.req_id);
s->wait_time = m->u.rc.saved_wait_time;
mpi_completed_queue_insert_op(&s->completed_reqs, m->op->u.wait.req_id);
tw_rand_reverse_unif(lp->rng);
}
}
......@@ -452,51 +472,51 @@ static void codes_exec_mpi_wait(nw_state* s, tw_bf* bf, nw_message* m, tw_lp* lp
/* check in the completed receives queue if the request ID has already been completed.*/
assert(!s->pending_waits);
dumpi_req_id req_id = m->op.u.wait.req_id;
dumpi_req_id req_id = m->op->u.wait.req_id;
unsigned long search_start_time, search_end_time;
struct completed_requests* current = s->completed_reqs;
search_start_time = tw_now(lp);
while(current)
{
{
if(current->req_id == req_id)
{
remove_req_id(&s->completed_reqs, req_id);
m->saved_wait_time = s->wait_time;
m->u.rc.saved_wait_time = s->wait_time;
s->wait_time += tw_now(lp) - search_start_time;
codes_issue_next_event(lp);
return;
}
current = current->next;
}
search_end_time = tw_now(lp);
s->search_overhead += (search_end_time - search_start_time);
/* If not, add the wait operation in the pending 'waits' list. */
struct pending_waits* wait_op = malloc(sizeof(struct pending_waits));
wait_op->mpi_op = &(m->op);
wait_op->num_completed = 0;
wait_op->start_time = search_start_time;
s->pending_waits = wait_op;
}
search_end_time = tw_now(lp);
s->search_overhead += (search_end_time - search_start_time);
/* If not, add the wait operation in the pending 'waits' list. */
struct pending_waits* wait_op = malloc(sizeof(struct pending_waits));
wait_op->mpi_op = m->op;
wait_op->num_completed = 0;
wait_op->start_time = search_start_time;
s->pending_waits = wait_op;
}
static void codes_exec_mpi_wait_all_rc(nw_state* s, tw_bf* bf, nw_message* m, tw_lp* lp)
{
if(lp->gid == TRACE)
{
printf("\n %lf codes exec mpi waitall reverse %d ", tw_now(lp), m->found_match);
printf("\n %lf codes exec mpi waitall reverse %d ", tw_now(lp), m->u.rc.found_match);
printCompletedQueue(s, lp);
}
if(m->found_match)
if(m->u.rc.found_match)
{
int i;
s->wait_time = m->saved_wait_time;
int count = m->op.u.waits.count;
s->wait_time = m->u.rc.saved_wait_time;
int count = m->op->u.waits.count;
dumpi_req_id req_id[count];
for( i = 0; i < count; i++)
{
req_id[i] = m->op.u.waits.req_ids[i];
req_id[i] = m->op->u.waits.req_ids[i];
mpi_completed_queue_insert_op(&s->completed_reqs, req_id[i]);
}
tw_rand_reverse_unif(lp->rng);
......@@ -514,7 +534,7 @@ static void codes_exec_mpi_wait_all_rc(nw_state* s, tw_bf* bf, nw_message* m, tw
static void codes_exec_mpi_wait_all(nw_state* s, tw_bf* bf, nw_message* m, tw_lp* lp)
{
//assert(!s->pending_waits);
int count = m->op.u.waits.count;
int count = m->op->u.waits.count;
int i, num_completed = 0;
dumpi_req_id req_id[count];
struct completed_requests* current = s->completed_reqs;
......@@ -525,16 +545,16 @@ static void codes_exec_mpi_wait_all(nw_state* s, tw_bf* bf, nw_message* m, tw_lp
if(lp->gid == TRACE)
{
printf(" \n (%lf) MPI waitall posted %d count", tw_now(lp), m->op.u.waits.count);
printf(" \n (%lf) MPI waitall posted %d count", tw_now(lp), m->op->u.waits.count);
for(i = 0; i < count; i++)
printf(" %d ", (int)m->op.u.waits.req_ids[i]);
printf(" %d ", (int)m->op->u.waits.req_ids[i]);
printCompletedQueue(s, lp);
}
while(current)
{
for(i = 0; i < count; i++)
{
req_id[i] = m->op.u.waits.req_ids[i];
req_id[i] = m->op->u.waits.req_ids[i];
if(req_id[i] == current->req_id)
num_completed++;
}
......@@ -548,14 +568,14 @@ static void codes_exec_mpi_wait_all(nw_state* s, tw_bf* bf, nw_message* m, tw_lp
s->search_overhead += (search_end_time - start_time);
m->found_match = 0;
m->u.rc.found_match = 0;
if(count == num_completed)
{
m->found_match = 1;
m->u.rc.found_match = 1;
for( i = 0; i < count; i++)
remove_req_id(&s->completed_reqs, req_id[i]);
m->saved_wait_time = s->wait_time;
m->u.rc.saved_wait_time = s->wait_time;
s->wait_time += tw_now(lp) - start_time;
codes_issue_next_event(lp);
}
......@@ -563,7 +583,7 @@ static void codes_exec_mpi_wait_all(nw_state* s, tw_bf* bf, nw_message* m, tw_lp
{
/* If not, add the wait operation in the pending 'waits' list. */
struct pending_waits* wait_op = malloc(sizeof(struct pending_waits));
wait_op->mpi_op = &(m->op);
wait_op->mpi_op = m->op;
wait_op->num_completed = num_completed;
wait_op->start_time = start_time;
s->pending_waits = wait_op;
......@@ -576,11 +596,9 @@ static void remove_req_id(struct completed_requests** mpi_completed_queue, dumpi
struct completed_requests* current = *mpi_completed_queue;
if(!current)
{
printf("\n REQ ID DOES NOT EXIST");
return;
}
if(current->req_id == req_id)
tw_error(TW_LOC, "\n REQ ID DOES NOT EXIST");
if(current->req_id == req_id)
{
*mpi_completed_queue = current->next;
free(current);
......@@ -646,22 +664,24 @@ static void mpi_pending_queue_insert_op(struct mpi_queue_ptrs* mpi_queue, struct
/* match the send/recv operations */
static int match_receive(nw_state* s, tw_lp* lp, tw_lpid lpid, struct codes_workload_op* op1, struct codes_workload_op* op2)
{
assert(op1->op_type == CODES_WK_IRECV || op1->op_type == CODES_WK_RECV);
assert(op2->op_type == CODES_WK_SEND || op2->op_type == CODES_WK_ISEND);
if((op1->u.recv.num_bytes >= op2->u.send.num_bytes) &&
((op1->u.recv.tag == op2->u.send.tag) || op1->u.recv.tag == -1) &&
((op1->u.recv.source_rank == op2->u.send.source_rank) || op1->u.recv.source_rank == -1))
{
s->recv_time += tw_now(lp) - op2->sim_start_time;
mpi_completed_queue_insert_op(&s->completed_reqs, op1->u.recv.req_id);
return 1;
}
return -1;
assert(op1->op_type == CODES_WK_IRECV || op1->op_type == CODES_WK_RECV);
assert(op2->op_type == CODES_WK_SEND || op2->op_type == CODES_WK_ISEND);
if((op1->u.recv.num_bytes >= op2->u.send.num_bytes) &&
((op1->u.recv.tag == op2->u.send.tag) || op1->u.recv.tag == -1) &&
((op1->u.recv.source_rank == op2->u.send.source_rank) || op1->u.recv.source_rank == -1))
{
if(lp->gid == TRACE)
printf("\n op1 rank %d bytes %d ", op1->u.recv.source_rank, op1->u.recv.num_bytes);
s->recv_time += tw_now(lp) - op2->sim_start_time;
mpi_completed_queue_insert_op(&s->completed_reqs, op1->u.recv.req_id);
return 1;
}
return -1;
}
/* used for reverse computation. removes the tail of the queue */
static int mpi_queue_remove_tail(tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue, struct codes_workload_op* mpi_op)
static int mpi_queue_remove_tail(tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue)
{
assert(mpi_queue->queue_tail);
if(mpi_queue->queue_tail == NULL)
......@@ -696,13 +716,13 @@ static int mpi_queue_remove_tail(tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue,
/* search for a matching mpi operation and remove it from the list.
* Record the index in the list from where the element got deleted.
* Index is used for inserting the element once again in the queue for reverse computation. */
static int mpi_queue_remove_matching_op(nw_state* s, tw_lp* lp, struct mpi_queue_ptrs* mpi_queue, nw_message* m)
static int mpi_queue_remove_matching_op(nw_state* s, tw_lp* lp, struct mpi_queue_ptrs* mpi_queue, nw_message * m)
{
struct codes_workload_op * mpi_op = m->op;
if(mpi_queue->queue_head == NULL)
return -1;
struct codes_workload_op* mpi_op = &(m->op);
/* remove mpi operation */
struct mpi_msgs_queue* tmp = mpi_queue->queue_head;
int indx = 0;
......@@ -712,19 +732,16 @@ static int mpi_queue_remove_matching_op(nw_state* s, tw_lp* lp, struct mpi_queue
if(mpi_op->op_type == CODES_WK_SEND || mpi_op->op_type == CODES_WK_ISEND)
{
rcv_val = match_receive(s, lp, lp->gid, tmp->mpi_op, mpi_op);
m->saved_matched_req = tmp->mpi_op->u.recv.req_id;
m->u.rc.saved_matched_req = tmp->mpi_op->u.recv.req_id;
}
else if(mpi_op->op_type == CODES_WK_RECV || mpi_op->op_type == CODES_WK_IRECV)
{
rcv_val = match_receive(s, lp, lp->gid, mpi_op, tmp->mpi_op);
m->saved_matched_req = mpi_op->u.recv.req_id;
m->u.rc.saved_matched_req = mpi_op->u.recv.req_id;
}
if(rcv_val >= 0)
{
//if(tmp->mpi_op->op_type == CODES_WK_RECV)
// *is_blocking = 1;
memcpy(&m->ptr_match_op, &tmp->mpi_op, sizeof(struct codes_workload_op));
memcpy(&m->u.rc.ptr_match_op, &tmp->mpi_op, sizeof(struct codes_workload_op));
if(mpi_queue->queue_head == mpi_queue->queue_tail)
{
mpi_queue->queue_tail = NULL;
......@@ -751,16 +768,16 @@ static int mpi_queue_remove_matching_op(nw_state* s, tw_lp* lp, struct mpi_queue
if(mpi_op->op_type == CODES_WK_SEND || mpi_op->op_type == CODES_WK_ISEND)
{
rcv_val = match_receive(s, lp, lp->gid, elem->mpi_op, mpi_op);
m->saved_matched_req = elem->mpi_op->u.recv.req_id;
m->u.rc.saved_matched_req = elem->mpi_op->u.recv.req_id;
}
else if(mpi_op->op_type == CODES_WK_RECV || mpi_op->op_type == CODES_WK_IRECV)
{
rcv_val = match_receive(s, lp, lp->gid, mpi_op, elem->mpi_op);
m->saved_matched_req = mpi_op->u.recv.req_id;
m->u.rc.saved_matched_req = mpi_op->u.recv.req_id;
}
if(rcv_val >= 0)
{
memcpy(&m->ptr_match_op, &elem->mpi_op, sizeof(struct codes_workload_op));
memcpy(&m->u.rc.ptr_match_op, &elem->mpi_op, sizeof(struct codes_workload_op));
if(elem == mpi_queue->queue_tail)
mpi_queue->queue_tail = tmp;
......@@ -769,9 +786,6 @@ static int mpi_queue_remove_matching_op(nw_state* s, tw_lp* lp, struct mpi_queue
free(elem);
mpi_queue->num_elems--;
/*if(tmp->mpi_op->op_type == CODES_WK_RECV)
*is_blocking = 1;*/
return indx;
}
tmp = tmp->next;
......@@ -797,7 +811,7 @@ static void codes_issue_next_event(tw_lp* lp)
/* Simulate delays between MPI operations */
static void codes_exec_comp_delay(nw_state* s, nw_message* m, tw_lp* lp)
{
struct codes_workload_op* mpi_op = &(m->op);
struct codes_workload_op* mpi_op = m->op;
tw_event* e;
tw_stime ts;
nw_message* msg;
......@@ -816,27 +830,20 @@ static void codes_exec_comp_delay(nw_state* s, nw_message* m, tw_lp* lp)
/* reverse computation operation for MPI irecv */
static void codes_exec_mpi_recv_rc(nw_state* s, nw_message* m, tw_lp* lp)
{
if(lp->gid == TRACE)
printf("\n %lf reverse codes mpi recv req id %d ", tw_now(lp), (int)m->op.u.recv.req_id);
num_bytes_recvd -= m->op.u.recv.num_bytes;
s->recv_time = m->saved_recv_time;
if(m->found_match >= 0)
num_bytes_recvd -= m->op->u.recv.num_bytes;
s->recv_time = m->u.rc.saved_recv_time;
if(m->u.rc.found_match >= 0)
{
s->recv_time = m->saved_recv_time;
//int count = numQueue(s->arrival_queue);
mpi_queue_update(s->arrival_queue, m->ptr_match_op, m->found_match);
remove_req_id(&s->completed_reqs, m->op.u.recv.req_id);
s->recv_time = m->u.rc.saved_recv_time;
mpi_queue_update(s->arrival_queue, m->u.rc.ptr_match_op, m->u.rc.found_match);
remove_req_id(&s->completed_reqs, m->op->u.recv.req_id);
tw_rand_reverse_unif(lp->rng);
/*if(lp->gid == TRACE)
printf("\n Reverse- after adding: arrival queue num_elems %d ", s->arrival_queue->num_elems);*/
}
else if(m->found_match < 0)
else if(m->u.rc.found_match < 0)
{
mpi_queue_remove_tail(lp->gid, s->pending_recvs_queue, &m->op);
if(m->op.op_type == CODES_WK_IRECV)
mpi_queue_remove_tail(lp->gid, s->pending_recvs_queue);
if(m->op->op_type == CODES_WK_IRECV)
tw_rand_reverse_unif(lp->rng);
/*if(lp->gid == TRACE)
printf("\n Reverse- after removing: pending receive queue num_elems %d ", s->pending_recvs_queue->num_elems);*/
}
}
......@@ -847,12 +854,11 @@ static void codes_exec_mpi_recv(nw_state* s, nw_message* m, tw_lp* lp)
If no matching isend is found, the receive operation is queued in the pending queue of
receive operations. */
m->saved_recv_time = s->recv_time;
struct codes_workload_op* mpi_op = &(m->op);
m->u.rc.saved_recv_time = s->recv_time;
struct codes_workload_op* mpi_op = m->op;
mpi_op->sim_start_time = tw_now(lp);
unsigned long long start_searching, end_searching;
num_bytes_recvd += mpi_op->u.recv.num_bytes;
//int count_before = numQueue(s->arrival_queue);
if(lp->gid == TRACE)
printf("\n %lf codes exec mpi recv req id %d", tw_now(lp), (int)mpi_op->u.recv.req_id);
......@@ -868,7 +874,7 @@ static void codes_exec_mpi_recv(nw_state* s, nw_message* m, tw_lp* lp)
if(found_matching_sends < 0)
{
m->found_match = -1;
m->u.rc.found_match = -1;
mpi_pending_queue_insert_op(s->pending_recvs_queue, mpi_op);
/* for mpi irecvs, this is a non-blocking receive so just post it and move on with the trace read. */
......@@ -887,15 +893,15 @@ static void codes_exec_mpi_recv(nw_state* s, nw_message* m, tw_lp* lp)
/* update completed requests list */
//int count_after = numQueue(s->arrival_queue);
//assert(count_before == (count_after+1));
m->found_match = found_matching_sends;
m->u.rc.found_match = found_matching_sends;
codes_issue_next_event(lp);
}
}
/* executes MPI send and isend operations */
static void codes_exec_mpi_send(nw_state* s, nw_message* m, tw_lp* lp)
static void codes_exec_mpi_send(nw_state* s, nw_message * m, tw_lp* lp)
{
struct codes_workload_op* mpi_op = &(m->op);
struct codes_workload_op * mpi_op = m->op;
/* model-net event */
tw_lpid dest_rank;
......@@ -923,20 +929,25 @@ static void codes_exec_mpi_send(nw_state* s, nw_message* m, tw_lp* lp)
nw_message* local_m = malloc(sizeof(nw_message));
nw_message* remote_m = malloc(sizeof(nw_message));
assert(local_m && remote_m);
mpi_op->sim_start_time = tw_now(lp);
local_m->op = *mpi_op;
local_m->msg_type = MPI_SEND_POSTED;
remote_m->op = *mpi_op;
local_m->u.msg_info.sim_start_time = tw_now(lp);
local_m->u.msg_info.dest_rank = mpi_op->u.send.dest_rank;
local_m->u.msg_info.src_rank = mpi_op->u.send.source_rank;
local_m->u.msg_info.op_type = mpi_op->op_type;
local_m->msg_type = MPI_SEND_POSTED;
local_m->u.msg_info.tag = mpi_op->u.send.tag;
local_m->u.msg_info.num_bytes = mpi_op->u.send.num_bytes;
local_m->u.msg_info.req_id = mpi_op->u.send.req_id;
memcpy(remote_m, local_m, sizeof(nw_message));
remote_m->msg_type = MPI_SEND_ARRIVED;
model_net_event(net_id, "test", dest_rank, mpi_op->u.send.num_bytes, 0.0,
sizeof(nw_message), (const void*)remote_m, sizeof(nw_message), (const void*)local_m, lp);
if(TRACE == lp->gid)
printf("\n %lf send req id %d dest %d ", tw_now(lp), (int)mpi_op->u.send.req_id, (int)dest_rank);
/*if(TRACE == lp->gid)
printf("\n !!! %lf send req id %d dest %d nw_message %d ", tw_now(lp), (int)mpi_op->u.send.req_id, (int)dest_rank, sizeof(nw_message));
*/
/* isend executed, now get next MPI operation from the queue */
if(mpi_op->op_type == CODES_WK_ISEND)
codes_issue_next_event(lp);
......@@ -958,16 +969,13 @@ static tw_stime s_to_ns(tw_stime ns)
static void update_send_completion_queue_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
//mpi_queue_remove_matching_op(&s->completed_isend_queue_head, &s->completed_isend_queue_tail, &m->op, SEND);
if(TRACE == lp->gid)
printf("\n %lf reverse-- isend operation completed req id %d ", tw_now(lp), m->op.u.send.req_id);
if(m->op.op_type == CODES_WK_SEND)
if(m->u.msg_info.op_type == CODES_WK_SEND)
tw_rand_reverse_unif(lp->rng);
if(m->op.op_type == CODES_WK_ISEND)
if(m->u.msg_info.op_type == CODES_WK_ISEND)
{
notify_waits_rc(s, bf, lp, m, m->op.u.send.req_id);
remove_req_id(&s->completed_reqs, m->op.u.send.req_id);
notify_waits_rc(s, bf, lp, m, m->u.msg_info.req_id);
remove_req_id(&s->completed_reqs, m->u.msg_info.req_id);
}
}
......@@ -975,15 +983,15 @@ static void update_send_completion_queue_rc(nw_state* s, tw_bf * bf, nw_message
static void update_send_completion_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
if(TRACE == lp->gid)
printf("\n %lf isend operation completed req id %d ", tw_now(lp), m->op.u.send.req_id);
if(m->op.op_type == CODES_WK_ISEND)
printf("\n %lf isend operation completed req id %d ", tw_now(lp), m->u.msg_info.req_id);
if(m->u.msg_info.op_type == CODES_WK_ISEND)
{
mpi_completed_queue_insert_op(&s->completed_reqs, m->op.u.send.req_id);
notify_waits(s, bf, lp, m, m->op.u.send.req_id);
mpi_completed_queue_insert_op(&s->completed_reqs, m->u.msg_info.req_id);
notify_waits(s, bf, lp, m, m->u.msg_info.req_id);
}
/* blocking send operation */
if(m->op.op_type == CODES_WK_SEND)
if(m->u.msg_info.op_type == CODES_WK_SEND)
codes_issue_next_event(lp);
return;
......@@ -992,26 +1000,26 @@ static void update_send_completion_queue(nw_state* s, tw_bf * bf, nw_message * m
/* reverse handler for updating arrival queue function */
static void update_arrival_queue_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
s->send_time = m->saved_send_time;
s->recv_time = m->saved_recv_time;
s->send_time = m->u.rc.saved_send_time;
s->recv_time = m->u.rc.saved_recv_time;
if(m->found_match >= 0)
if(m->u.rc.found_match >= 0)
{
// TODO: Modify for recvs
if(lp->gid == TRACE)
printf("\n %lf reverse-- update arrival queue req ID %d", tw_now(lp), (int) m->saved_matched_req);
dumpi_req_id req_id = m->saved_matched_req;
notify_waits_rc(s, bf, lp, m, m->saved_matched_req);
printf("\n %lf reverse-- update arrival queue req ID %d", tw_now(lp), (int) m->u.rc.saved_matched_req);
dumpi_req_id req_id = m->u.rc.saved_matched_req;
notify_waits_rc(s, bf, lp, m, m->u.rc.saved_matched_req);
//int count = numQueue(s->pending_recvs_queue);
mpi_queue_update(s->pending_recvs_queue, m->ptr_match_op, m->found_match);
remove_req_id(&s->completed_reqs, m->saved_matched_req);
mpi_queue_update(s->pending_recvs_queue, m->u.rc.ptr_match_op, m->u.rc.found_match);
remove_req_id(&s->completed_reqs, m->u.rc.saved_matched_req);
/*if(lp->gid == TRACE)
printf("\n Reverse: after adding pending recvs queue %d ", s->pending_recvs_queue->num_elems);*/
}
else if(m->found_match < 0)
else if(m->u.rc.found_match < 0)
{
mpi_queue_remove_tail(lp->gid, s->arrival_queue, &(m->op));
mpi_queue_remove_tail(lp->gid, s->arrival_queue);
/*if(lp->gid == TRACE)
printf("\n Reverse: after removing arrivals queue %d ", s->arrival_queue->num_elems);*/
}
......@@ -1024,40 +1032,40 @@ static void update_arrival_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp
int is_blocking = 0; /* checks if the recv operation was blocking or not */
unsigned long long start_searching, end_searching;
m->saved_send_time = s->send_time;
m->saved_recv_time = s->recv_time;
m->u.rc.saved_send_time = s->send_time;
m->u.rc.saved_recv_time = s->recv_time;
s->send_time += tw_now(lp) - m->op.sim_start_time;
s->send_time += tw_now(lp) - m->u.msg_info.sim_start_time;
dumpi_req_id req_id = -1;
start_searching = tw_now(lp);
/* Now reconstruct the mpi op */
struct codes_workload_op * arrived_op = (struct codes_workload_op *) malloc(sizeof(struct codes_workload_op));
arrived_op->op_type = m->u.msg_info.op_type;
arrived_op->u.send.source_rank = m->u.msg_info.src_rank;
arrived_op->u.send.dest_rank = m->u.msg_info.dest_rank;
arrived_op->u.send.num_bytes = m->u.msg_info.num_bytes;
arrived_op->u.send.tag = m->u.msg_info.tag;
arrived_op->u.send.req_id = m->u.msg_info.req_id;
m->op = arrived_op;
int found_matching_recv = mpi_queue_remove_matching_op(s, lp, s->pending_recvs_queue, m);
end_searching = tw_now(lp);
s->search_overhead += (end_searching - start_searching);
if(TRACE == lp->gid)
printf("\n %lf update arrival queue req id %d %d", tw_now(lp), arrived_op->u.send.req_id, m->op->u.send.source_rank);
if(found_matching_recv < 0)
{
m->found_match = -1;
mpi_pending_queue_insert_op(s->arrival_queue, &(m->op));
/*if(lp->gid == TRACE)
printf("\n After adding arrivals queue %d ", s->arrival_queue->num_elems);*/
m->u.rc.found_match = -1;
mpi_pending_queue_insert_op(s->arrival_queue, m->op);
}
else
{
if(TRACE == lp->gid)
printf("\n %lf update arrival queue req id %d ", tw_now(lp), m->saved_matched_req);
//int count_after = numQueue(s->pending_recvs_queue);
//assert(count_before == (count_after + 1));
m->found_match = found_matching_recv;
/* unblock the blocking receive */
//if(is_blocking)
// codes_issue_next_event(lp);
//else
//if(lp->gid == TRACE)
// tw_output(lp, "\n matched %d req id %d ", s->pending_recvs_queue->num_elems, req_id);
notify_waits(s, bf, lp, m, m->saved_matched_req);
m->u.rc.found_match = found_matching_recv;
notify_waits(s, bf, lp, m, m->u.rc.saved_matched_req);
}
}
......@@ -1147,22 +1155,22 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
codes_workload_get_next_rc(wrkld_id, 0, (int)s->nw_id, &m->op);
if(m->op.op_type == CODES_WK_END)
codes_workload_get_next_rc(wrkld_id, 0, (int)s->nw_id, m->op);
if(m->op->op_type == CODES_WK_END)
return;
switch(m->op.op_type)
switch(m->op->op_type)
{
case CODES_WK_SEND:
case CODES_WK_ISEND:
{
if(lp->gid == TRACE)
printf("\n %lf reverse send req %d ", tw_now(lp), (int)m->op.u.send.req_id);
model_net_event_rc(net_id, lp, m->op.u.send.num_bytes);
if(m->op.op_type == CODES_WK_ISEND)
printf("\n %lf reverse send req %d ", tw_now(lp), (int)m->op->u.send.req_id);
model_net_event_rc(net_id, lp, m->op->u.send.num_bytes);
if(m->op->op_type == CODES_WK_ISEND)
tw_rand_reverse_unif(lp->rng);
s->num_sends--;
num_bytes_sent -= m->op.u.send.num_bytes;
num_bytes_sent -= m->op->u.send.num_bytes;
}
break;
......@@ -1177,7 +1185,7 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
{