Commit 12eaad5e authored by mubarak's avatar mubarak

Updating MPI wait/wait_all code. Adding config files used in application runs.

parent 1e822752
......@@ -2,7 +2,7 @@ LPGROUPS
{
MODELNET_GRP
{
repetitions="18";
repetitions="2048";
nw-lp="1";
modelnet_torus="1";
}
......@@ -10,15 +10,15 @@ LPGROUPS
PARAMS
{
packet_size="512";
message_size="296";
message_size="336";
modelnet_order=( "torus" );
# scheduler options
modelnet_scheduler="fcfs";
net_startup_ns="1.5";
net_bw_mbps="20000";
n_dims="3";
dim_length="3,3,2";
link_bandwidth="2.0";
n_dims="5";
dim_length="8,4,4,4,4";
link_bandwidth="10.0";
buffer_size="1310720";
num_vc="1";
chunk_size="64";
......
......@@ -2,7 +2,7 @@ LPGROUPS
{
MODELNET_GRP
{
repetitions="8";
repetitions="27";
nw-lp="1";
modelnet_simplenet="1";
}
......@@ -10,7 +10,7 @@ LPGROUPS
PARAMS
{
packet_size="512";
message_size="296";
message_size="784";
modelnet_order=( "simplenet" );
# scheduler options
modelnet_scheduler="fcfs";
......
......@@ -4,6 +4,7 @@
*
*/
#include <ross.h>
#include <inttypes.h>
#include "codes/codes-nw-workload.h"
#include "codes/codes.h"
......@@ -11,7 +12,7 @@
#include "codes/codes_mapping.h"
#include "codes/model-net.h"
#define TRACE 0
#define TRACE -1
#define DEBUG 0
char workload_type[128];
......@@ -22,13 +23,15 @@ static int num_net_traces = 0;
typedef struct nw_state nw_state;
typedef struct nw_message nw_message;
typedef int16_t dumpi_req_id;
static int net_id = 0;
static float noise = 5.0;
static int num_net_lps, num_nw_lps;
long long num_bytes_sent=0;
long long num_bytes_recvd=0;
long long max_time = 0;
double max_time = 0, max_comm_time = 0, max_wait_time = 0, max_send_time = 0, max_recv_time = 0;
double avg_time = 0, avg_comm_time = 0, avg_wait_time = 0, avg_send_time = 0, avg_recv_time = 0;
/* global variables for codes mapping */
static char lp_group_name[MAX_NAME_LENGTH], lp_type_name[MAX_NAME_LENGTH], annotation[MAX_NAME_LENGTH];
......@@ -41,12 +44,28 @@ enum MPI_NW_EVENTS
MPI_SEND_POSTED,
};
/* stores pointers of pending MPI operations to be matched with their respective sends/receives. */
struct mpi_msgs_queue
{
mpi_event_list* mpi_op;
struct mpi_msgs_queue* next;
};
/* stores request ID of completed MPI operations (Isends or Irecvs) */
struct completed_requests
{
dumpi_req_id req_id;
struct completed_requests* next;
};
/* for wait operations, store the pending operation and number of completed waits */
struct pending_waits
{
mpi_event_list* mpi_op;
int num_completed;
tw_stime start_time;
};
/* maintains the head and tail of the queue, as well as the number of elements currently in queue */
struct mpi_queue_ptrs
{
......@@ -67,18 +86,39 @@ struct nw_state
unsigned long num_recvs;
unsigned long num_cols;
unsigned long num_delays;
unsigned long num_wait;
unsigned long num_waitall;
unsigned long num_waitsome;
/* time spent by the LP in executing the app trace*/
unsigned long long elapsed_time;
double elapsed_time;
/* time spent in compute operations */
unsigned long long compute_time;
double compute_time;
/* search time */
double search_overhead;
/* time spent in message send/isend */
double send_time;
/* time spent in message receive */
double recv_time;
/* time spent in wait operation */
double wait_time;
/* FIFO for isend messages arrived on destination */
struct mpi_queue_ptrs* arrival_queue;
/* list of completed isend operations */
struct mpi_queue_ptrs* completed_isend_queue;
/* FIFO for irecv messages posted but not yet matched with send operations */
struct mpi_queue_ptrs* pending_recvs_queue;
/* list of pending waits */
struct pending_waits* pending_waits;
/* List of completed send/receive requests */
struct completed_requests* completed_reqs;
};
/* network event being sent. msg_type is the type of message being sent, found_match is the index of the list maintained for reverse computation, op is the MPI event to be executed/reversed */
......@@ -86,6 +126,7 @@ struct nw_message
{
int msg_type;
int found_match;
//dumpi_req_id matched_recv;
struct mpi_event_list op;
};
......@@ -104,23 +145,37 @@ static void update_arrival_queue(nw_state*s, tw_bf* bf, nw_message* m, tw_lp * l
/* reverse of the above function */
static void update_arrival_queue_rc(nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
/* insert MPI operation in the queue*/
static void mpi_queue_insert_op(struct mpi_queue_ptrs* mpi_queue, mpi_event_list* mpi_op);
/* insert MPI operation in the waiting queue*/
static void mpi_pending_queue_insert_op(struct mpi_queue_ptrs* mpi_queue, mpi_event_list* mpi_op);
/* remove completed request IDs from the queue for reuse */
static void remove_req_id(struct completed_requests** requests, int16_t req_id);
/* remove MPI operation from the queue */
static int mpi_queue_remove_matching_op(tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue, mpi_event_list* mpi_op);
/* remove MPI operation from the waiting queue.
is_blocking is an output parameter which tells if the matched operation was blocking receive or not
dumpi_req_id is an output parameter which tells the request ID of the matched receive operation*/
static int mpi_queue_remove_matching_op(nw_state* s, tw_lp* lp, tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue, mpi_event_list* mpi_op, int* is_blocking, dumpi_req_id* req_id);
/* remove the tail of the MPI operation */
/* remove the tail of the MPI operation from waiting queue */
static int mpi_queue_remove_tail(tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue, mpi_event_list* mpi_op);
/* insert completed MPI requests in the queue. */
static int mpi_completed_queue_insert_op(struct completed_requests** mpi_completed_queue, dumpi_req_id req_id);
/* conversion from seconds to nanaoseconds */
static tw_stime s_to_ns(tw_stime ns);
/* executes MPI wait operation */
static void codes_exec_mpi_wait(nw_state* s, nw_message* m, tw_lp* lp);
/* executes MPI waitsome operation */
static void codes_exec_mpi_waitsome(nw_state* s, nw_message* m, tw_lp* lp);
/* executes MPI isend and send operations */
static void codes_exec_mpi_send(nw_state* s, nw_message* m, tw_lp* lp);
/* execute MPI irecv operation */
static void codes_exec_mpi_irecv(nw_state* s, nw_message* m, tw_lp* lp);
static void codes_exec_mpi_recv(nw_state* s, nw_message* m, tw_lp* lp);
/* execute the computational delay */
static void codes_exec_comp_delay(nw_state* s, nw_message* m, tw_lp* lp);
......@@ -131,6 +186,9 @@ static void codes_exec_mpi_col(nw_state* s, nw_message* m, tw_lp* lp);
/* issue next event */
static void codes_issue_next_event(tw_lp* lp);
/* notifies the wait operations for completion */
static int notify_waits(nw_state* s, tw_lp* lp, dumpi_req_id req_id);
/* initializes the queue and allocates memory */
static struct mpi_queue_ptrs* queue_init()
{
......@@ -225,8 +283,236 @@ static void mpi_queue_update(struct mpi_queue_ptrs* mpi_queue, mpi_event_list* m
return;
}
static void printCompletedQueue(nw_state* s, tw_lp* lp)
{
if(TRACE == lp->gid)
{
printf("\n contents of completed operations queue ");
struct completed_requests* current = s->completed_reqs;
while(current)
{
printf(" %d ",current->req_id);
current = current->next;
}
}
}
/* notify the completed request in the pending waits queue. */
static int notify_waits(nw_state* s, tw_lp* lp, dumpi_req_id completed_req)
{
int i;
/* traverse the pending waits list and look what type of wait operations are
there. If its just a single wait and the request ID has just been completed,
then the network node LP can go on with fetching the next operation from the log.
If its waitall then wait for all pending requests to complete and then proceed. */
if(TRACE == lp->gid)
printf("\n notifying wait operation completed req %d ", (int16_t)completed_req);
struct pending_waits* wait_elem = s->pending_waits;
if(!wait_elem)
return 0;
int op_type = wait_elem->mpi_op->op_type;
if(op_type == CODES_NW_WAIT)
{
if(wait_elem->mpi_op->u.wait.req_id == completed_req)
{
s->wait_time += (tw_now(lp) - wait_elem->start_time);
remove_req_id(&s->completed_reqs, completed_req);
s->pending_waits = NULL;
codes_issue_next_event(lp);
return 0;
}
}
else
if(op_type == CODES_NW_WAITALL || op_type == CODES_NW_WAITSOME)
{
for(i = 0; i < wait_elem->mpi_op->u.waits.count; i++)
{
if(wait_elem->mpi_op->u.waits.req_ids[i] == completed_req)
wait_elem->num_completed++;
}
if(TRACE == lp->gid)
printf("\n completed wait count %d ", wait_elem->num_completed);
int required_count = wait_elem->mpi_op->u.waits.count;
if((op_type == CODES_NW_WAITALL && wait_elem->num_completed == required_count)
|| (op_type == CODES_NW_WAITSOME && wait_elem->num_completed > 0))
{
if(TRACE == lp->gid)
printf("\n waitall/some matched! ");
s->wait_time += (tw_now(lp) - wait_elem->start_time);
s->pending_waits = NULL;
for(i = 0; i < wait_elem->num_completed; i++)
remove_req_id(&s->completed_reqs, wait_elem->mpi_op->u.waits.req_ids[i]);
printCompletedQueue(s, lp);
codes_issue_next_event(lp); //wait completed
}
}
return 0;
}
/* execute MPI wait operation */
static void codes_exec_mpi_wait(nw_state* s, nw_message* m, tw_lp* lp)
{
/* check in the completed receives queue if the request ID has already been completed.*/
dumpi_req_id req_id = m->op.u.wait.req_id;
unsigned long search_start_time, search_end_time;
struct completed_requests* current = s->completed_reqs;
search_start_time = tw_now(lp);
while(current)
{
if(current->req_id == req_id)
{
remove_req_id(&s->completed_reqs, req_id);
s->wait_time += tw_now(lp) - search_start_time;
codes_issue_next_event(lp);
return;
}
current = current->next;
}
search_end_time = tw_now(lp);
s->search_overhead += (search_end_time - search_start_time);
/* If not, add the wait operation in the pending 'waits' list. */
struct pending_waits* wait_op = malloc(sizeof(struct pending_waits));
wait_op->mpi_op = &(m->op);
wait_op->num_completed = 0;
wait_op->start_time = search_start_time;
s->pending_waits = wait_op;
}
static void codes_exec_mpi_wait_all_some(nw_state* s, nw_message* m, tw_lp* lp)
{
int count = m->op.u.waits.count;
int i, num_completed = 0;
dumpi_req_id req_id[count];
struct completed_requests* current = s->completed_reqs;
/* check number of completed irecvs in the completion queue */
unsigned long start_time, search_end_time;
start_time = tw_now(lp);
if(lp->gid == TRACE)
{
printf(" \n MPI waitall posted %d count", m->op.u.waits.count);
for(i = 0; i < count; i++)
printf(" %d ", (int)m->op.u.waits.req_ids[i]);
printCompletedQueue(s, lp);
}
while(current)
{
for(i = 0; i < count; i++)
{
req_id[i] = m->op.u.waits.req_ids[i];
if(req_id[i] == current->req_id)
num_completed++;
}
current = current->next;
}
search_end_time = tw_now(lp);
if(TRACE== lp->gid)
printf("\n Num completed %d count %d ", num_completed, count);
s->search_overhead += (search_end_time - start_time);
if((m->op.op_type == CODES_NW_WAITALL && count == num_completed) ||
(m->op.op_type == CODES_NW_WAITSOME && num_completed > 0))
{
for( i = 0; i < num_completed; i++)
remove_req_id(&s->completed_reqs, req_id[i]);
s->wait_time += tw_now(lp) - start_time;
codes_issue_next_event(lp);
return;
}
else
{
/* If not, add the wait operation in the pending 'waits' list. */
struct pending_waits* wait_op = malloc(sizeof(struct pending_waits));
wait_op->mpi_op = &(m->op);
wait_op->num_completed = num_completed;
wait_op->start_time = start_time;
s->pending_waits = wait_op;
}
}
/* request ID is being reused so delete it from the list once the matching is done */
static void remove_req_id(struct completed_requests** mpi_completed_queue, dumpi_req_id req_id)
{
struct completed_requests* current = *mpi_completed_queue;
if(!current)
{
printf("\n REQ ID DOES NOT EXIST");
return;
}
if(current->req_id == req_id)
{
*mpi_completed_queue = current->next;
free(current);
return;
}
struct completed_requests* elem;
while(current->next)
{
elem = current->next;
if(elem->req_id == req_id)
{
current->next = elem->next;
free(elem);
return;
}
current = current->next;
}
return;
}
/* inserts mpi operation in the completed requests queue */
static int mpi_completed_queue_insert_op(struct completed_requests** mpi_completed_queue, dumpi_req_id req_id)
{
struct completed_requests* reqs = malloc(sizeof(struct completed_requests));
assert(reqs);
// printf("\n inserting op %d ", req_id);
reqs->req_id = req_id;
reqs->next = NULL;
if(!(*mpi_completed_queue))
{
*mpi_completed_queue = reqs;
return 0;
}
reqs->next = *mpi_completed_queue;
*mpi_completed_queue = reqs;
return 0;
}
/* remove mpi operation just inserted in the completed requests queue. */
static int mpi_completed_queue_remove_op(struct completed_requests** mpi_completed_queue)
{
struct completed_requests* reqs = *mpi_completed_queue;
if(!reqs)
{
printf("\n ERROR! NO ELEMENT IN THE QUEUE ");
return -1;
}
*mpi_completed_queue = reqs->next;
free(reqs);
return 0;
}
/* insert MPI send or receive operation in the queues starting from tail. Unmatched sends go to arrival queue and unmatched receives go to pending receives queues. */
static void mpi_queue_insert_op(struct mpi_queue_ptrs* mpi_queue, mpi_event_list* mpi_op)
static void mpi_pending_queue_insert_op(struct mpi_queue_ptrs* mpi_queue, mpi_event_list* mpi_op)
{
/* insert mpi operation */
struct mpi_msgs_queue* elem = malloc(sizeof(struct mpi_msgs_queue));
......@@ -248,29 +534,20 @@ static void mpi_queue_insert_op(struct mpi_queue_ptrs* mpi_queue, mpi_event_list
}
/* match the send/recv operations */
static int match_receive(tw_lpid lpid, mpi_event_list* op1, mpi_event_list* op2)
static int match_receive(nw_state* s, tw_lp* lp, tw_lpid lpid, mpi_event_list* op1, mpi_event_list* op2)
{
/* Match the MPI send with the receive */
if(op1->op_type == CODES_NW_ISEND || op1->op_type == CODES_NW_SEND)
{
if((op2->u.recv.num_bytes >= op1->u.send.num_bytes) &&
((op2->u.recv.tag == op1->u.send.tag) || op2->u.recv.tag == -1) &&
((op2->u.recv.source_rank == op1->u.send.source_rank) || op2->u.recv.source_rank == -1))
{
return 1;
}
}
else
if(op1->op_type == CODES_NW_IRECV || op1->op_type == CODES_NW_RECV)
{
if((op1->u.recv.num_bytes >= op2->u.send.num_bytes) &&
assert(op1->op_type == CODES_NW_IRECV || op1->op_type == CODES_NW_RECV);
assert(op2->op_type == CODES_NW_SEND || op2->op_type == CODES_NW_ISEND);
if((op1->u.recv.num_bytes >= op2->u.send.num_bytes) &&
((op1->u.recv.tag == op2->u.send.tag) || op1->u.recv.tag == -1) &&
((op1->u.recv.source_rank == op2->u.send.source_rank) || op1->u.recv.source_rank == -1))
{
mpi_completed_queue_insert_op(&s->completed_reqs, op1->u.recv.req_id);
s->recv_time += tw_now(lp) - op2->sim_start_time;
return 1;
}
}
return 0;
return -1;
}
/* used for reverse computation. removes the tail of the queue */
......@@ -309,7 +586,7 @@ static int mpi_queue_remove_tail(tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue,
/* search for a matching mpi operation and remove it from the list.
* Record the index in the list from where the element got deleted.
* Index is used for inserting the element once again in the queue for reverse computation. */
static int mpi_queue_remove_matching_op(tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue, mpi_event_list* mpi_op)
static int mpi_queue_remove_matching_op(nw_state* s, tw_lp* lp, tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue, mpi_event_list* mpi_op, int* is_blocking, dumpi_req_id* req_id)
{
if(mpi_queue->queue_head == NULL)
return -1;
......@@ -319,8 +596,23 @@ static int mpi_queue_remove_matching_op(tw_lpid lpid, struct mpi_queue_ptrs* mpi
int indx = 0;
/* if head of the list has the required mpi op to be deleted */
if(match_receive(lpid, tmp->mpi_op, mpi_op))
int rcv_val = 0;
if(mpi_op->op_type == CODES_NW_SEND || mpi_op->op_type == CODES_NW_ISEND)
{
rcv_val = match_receive(s, lp, lpid, tmp->mpi_op, mpi_op);
*req_id = tmp->mpi_op->u.recv.req_id;
}
else if(mpi_op->op_type == CODES_NW_RECV || mpi_op->op_type == CODES_NW_IRECV)
{
rcv_val = match_receive(s, lp, lpid, mpi_op, tmp->mpi_op);
*req_id = mpi_op->u.recv.req_id;
}
if(rcv_val >= 0)
{
if(tmp->mpi_op->op_type == CODES_NW_RECV)
*is_blocking = 1;
if(mpi_queue->queue_head == mpi_queue->queue_tail)
{
mpi_queue->queue_tail = NULL;
......@@ -332,6 +624,8 @@ static int mpi_queue_remove_matching_op(tw_lpid lpid, struct mpi_queue_ptrs* mpi
mpi_queue->queue_head = tmp->next;
free(tmp);
}
mpi_queue->num_elems--;
return indx;
}
......@@ -343,7 +637,17 @@ static int mpi_queue_remove_matching_op(tw_lpid lpid, struct mpi_queue_ptrs* mpi
{
indx++;
elem = tmp->next;
if(match_receive(lpid, elem->mpi_op, mpi_op))
if(mpi_op->op_type == CODES_NW_SEND || mpi_op->op_type == CODES_NW_ISEND)
{
rcv_val = match_receive(s, lp, lpid, elem->mpi_op, mpi_op);
*req_id = elem->mpi_op->u.recv.req_id;
}
else if(mpi_op->op_type == CODES_NW_RECV || mpi_op->op_type == CODES_NW_IRECV)
{
rcv_val = match_receive(s, lp, lpid, mpi_op, elem->mpi_op);
}
if(rcv_val >= 0)
{
if(elem == mpi_queue->queue_tail)
mpi_queue->queue_tail = tmp;
......@@ -351,9 +655,12 @@ static int mpi_queue_remove_matching_op(tw_lpid lpid, struct mpi_queue_ptrs* mpi
free(elem);
mpi_queue->num_elems--;
if(tmp->mpi_op->op_type == CODES_NW_RECV)
*is_blocking = 1;
return indx;
}
tmp = tmp->next;
}
return -1;
......@@ -394,19 +701,22 @@ static void codes_exec_comp_delay(nw_state* s, nw_message* m, tw_lp* lp)
}
/* reverse computation operation for MPI irecv */
static void codes_exec_mpi_irecv_rc(nw_state* s, nw_message* m, tw_lp* lp)
static void codes_exec_mpi_recv_rc(nw_state* s, nw_message* m, tw_lp* lp)
{
num_bytes_recvd -= m->op.u.recv.num_bytes;
if(m->found_match >= 0)
{
//int count = numQueue(s->arrival_queue);
mpi_queue_update(s->arrival_queue, &m->op, m->found_match);
mpi_completed_queue_remove_op(&s->completed_reqs);
/*if(lp->gid == TRACE)
printf("\n Reverse- after adding: arrival queue num_elems %d ", s->arrival_queue->num_elems);*/
}
else if(m->found_match < 0)
{
mpi_queue_remove_tail(lp->gid, s->pending_recvs_queue, &m->op);
if(m->op.op_type == CODES_NW_IRECV)
tw_rand_reverse_unif(lp->rng);
/*if(lp->gid == TRACE)
printf("\n Reverse- after removing: pending receive queue num_elems %d ", s->pending_recvs_queue->num_elems);*/
}
......@@ -415,37 +725,56 @@ static void codes_exec_mpi_irecv_rc(nw_state* s, nw_message* m, tw_lp* lp)
}
/* Execute MPI Irecv operation (non-blocking receive) */
static void codes_exec_mpi_irecv(nw_state* s, nw_message* m, tw_lp* lp)
static void codes_exec_mpi_recv(nw_state* s, nw_message* m, tw_lp* lp)
{
/* Once an irecv is posted, list of completed sends is checked to find a matching isend.
If no matching isend is found, the receive operation is queued in the pending queue of
receive operations. */
struct mpi_event_list* mpi_op = &(m->op);
assert(mpi_op->op_type == CODES_NW_IRECV);
struct mpi_event_list* mpi_op = &(m->op);
mpi_op->sim_start_time = tw_now(lp);
unsigned long long start_searching, end_searching;
num_bytes_recvd += mpi_op->u.recv.num_bytes;
int count_before = numQueue(s->arrival_queue);
int found_matching_sends = mpi_queue_remove_matching_op(lp->gid, s->arrival_queue, mpi_op);
//int count_before = numQueue(s->arrival_queue);
if(lp->gid == TRACE)
printf("\n codes exec mpi recv req id %d", (int)mpi_op->u.recv.req_id);
start_searching = tw_now(lp);
dumpi_req_id req_id;
int found_matching_sends = mpi_queue_remove_matching_op(s, lp, lp->gid, s->arrival_queue, mpi_op, 0, &req_id);
/* save the req id inserted in the completed queue for reverse computation. */
//m->matched_recv = req_id;
end_searching = tw_now(lp);
s->search_overhead += (end_searching - start_searching);
if(found_matching_sends < 0)
{
m->found_match = -1;
mpi_queue_insert_op(s->pending_recvs_queue, mpi_op);
/*if(lp->gid == TRACE)
printf("\n After adding: pending receives queue num_elems %d ", s->pending_recvs_queue->num_elems);*/
mpi_pending_queue_insert_op(s->pending_recvs_queue, mpi_op);
/* for mpi irecvs, this is a non-blocking receive so just post it and move on with the trace read. */
if(lp->gid == TRACE)
printf("\n queued");
if(mpi_op->op_type == CODES_NW_IRECV)
{
codes_issue_next_event(lp);
return;
}
else
printf("\n CODES MPI RECV OPERATION!!! ");
}
else
else
{
/*if(lp->gid == TRACE)
printf("\n After removing: arrival queue num_elems %d ", s->arrival_queue->num_elems);*/
int count_after = numQueue(s->arrival_queue);
assert(count_before == (count_after+1));
m->found_match = found_matching_sends;
printf("\n Matched after removing: arrival queue num_elems %d ", s->arrival_queue->num_elems);*/
/* update completed requests list */
//int count_after = numQueue(s->arrival_queue);
//assert(count_before == (count_after+1));
//m->found_match = found_matching_sends;
codes_issue_next_event(lp);
}
/* issue next MPI operation */
codes_issue_next_event(lp);
}
/* executes MPI send and isend operations */
......@@ -480,15 +809,19 @@ static void codes_exec_mpi_send(nw_state* s, nw_message* m, tw_lp* lp)
nw_message* local_m = malloc(sizeof(nw_message));
nw_message* remote_m = malloc(sizeof(nw_message));
assert(local_m && remote_m);
mpi_op->sim_start_time = tw_now(lp);
local_m->op = *mpi_op;
local_m->msg_type = MPI_SEND_POSTED;
remote_m->op = *mpi_op;
remote_m->msg_type = MPI_SEND_ARRIVED;
model_net_event(net_id, "test", dest_rank, mpi_op->u.send.num_bytes, 0.0,
sizeof(nw_message), (const void*)remote_m, sizeof(nw_message), (const void*)local_m, lp);
if(TRACE == lp->gid)
printf("\n send req id %d dest %d ", (int)mpi_op->u.send.req_id, (int)dest_rank);
/* isend executed, now get next MPI operation from the queue */
if(mpi_op->op_type == CODES_NW_ISEND)
codes_issue_next_event(lp);
......@@ -512,7 +845,14 @@ static void update_send_completion_queue_rc(nw_state* s, tw_bf * bf, nw_message
//mpi_queue_remove_matching_op(&s->completed_isend_queue_head, &s->completed_isend_queue_tail, &m->op, SEND);
if(m->op.op_type == CODES_NW_SEND)
{
tw_rand_reverse_unif(lp->rng);
}
if(m->op.op_type == CODES_NW_ISEND)
{
mpi_completed_queue_remove_op(&s->completed_reqs);
}
}
/* completed isends are added in the list */
......@@ -520,9 +860,13 @@ static void update_send_completion_queue(nw_state* s, tw_bf * bf, nw_message * m
{
//if(m->op.op_type == CODES_NW_SEND)
// printf("\n LP %ld Local isend operation completed ", lp->gid);
//mpi_queue_insert_op(&s->completed_isend_queue_head, &s->completed_isend_queue_tail, &m->op);
if(m->op.op_type == CODES_NW_ISEND)
{
mpi_completed_queue_insert_op(&s->completed_reqs, m->op.u.send.req_id);
notify_waits(s, lp, m->op.u.send.req_id);
}
/* blocking send operation */
if(m->op.op_type == CODES_NW_SEND)
codes_issue_next_event(lp);
......@@ -552,23 +896,39 @@ static void update_arrival_queue_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_
/* once an isend operation arrives, the pending receives queue is checked to find out if there is a irecv that has already been posted. If no isend has been posted, */
static void