Commit b93f407c authored by Misbah Mubarak's avatar Misbah Mubarak
Browse files

Fixing some memory issues with the MPI Sim layer

parent af6be11d
......@@ -24,6 +24,6 @@ PARAMS
local_bandwidth="5.25";
global_bandwidth="4.7";
cn_bandwidth="5.25";
message_size="512";
message_size="528";
routing="minimal";
}
......@@ -2,7 +2,7 @@ LPGROUPS
{
MODELNET_GRP
{
repetitions="27";
repetitions="8";
modelnet_simplenet="1";
nw-lp="1";
}
......
......@@ -46,28 +46,28 @@ enum MPI_NW_EVENTS
{
MPI_OP_GET_NEXT=1,
MPI_SEND_ARRIVED,
MPI_SEND_ARRIVED_CB, // for tracking message times on sender
MPI_SEND_ARRIVED_CB, // for tracking message times on sender
MPI_SEND_POSTED,
};
/* stores pointers of pending MPI operations to be matched with their respective sends/receives. */
struct mpi_msgs_queue
{
struct codes_workload_op* mpi_op;
struct mpi_msgs_queue* next;
struct codes_workload_op * mpi_op;
struct mpi_msgs_queue * next;
};
/* stores request IDs of completed MPI operations (Isends or Irecvs) */
struct completed_requests
{
dumpi_req_id req_id;
struct completed_requests* next;
struct completed_requests * next;
};
/* for wait operations, store the pending operation and number of completed waits so far. */
struct pending_waits
{
struct codes_workload_op* mpi_op;
struct codes_workload_op * mpi_op;
int num_completed;
tw_stime start_time;
};
......@@ -99,30 +99,22 @@ struct nw_state
/* time spent by the LP in executing the app trace*/
double start_time;
double elapsed_time;
/* time spent in compute operations */
double compute_time;
/* time spent in message send/isend */
double send_time;
/* time spent in message receive */
double recv_time;
/* time spent in wait operation */
double wait_time;
/* FIFO for isend messages arrived on destination */
struct mpi_queue_ptrs* arrival_queue;
struct mpi_queue_ptrs * arrival_queue;
/* FIFO for irecv messages posted but not yet matched with send operations */
struct mpi_queue_ptrs* pending_recvs_queue;
struct mpi_queue_ptrs * pending_recvs_queue;
/* list of pending waits (and saved pending wait for reverse computation) */
struct pending_waits* pending_waits;
struct pending_waits * pending_waits;
/* List of completed send/receive requests */
struct completed_requests* completed_reqs;
struct completed_requests * completed_reqs;
};
/* data for handling reverse computation.
......@@ -155,87 +147,90 @@ struct nw_message
/* required for reverse computation*/
struct
{
int found_match;
short matched_op;
dumpi_req_id saved_matched_req;
struct codes_workload_op* ptr_match_op;
struct pending_waits* saved_pending_wait;
double saved_send_time;
double saved_recv_time;
double saved_wait_time;
int found_match;
short matched_op;
dumpi_req_id saved_matched_req;
struct codes_workload_op* ptr_match_op;
struct pending_waits* saved_pending_wait;
double saved_send_time;
double saved_recv_time;
double saved_wait_time;
} rc;
} u;
};
/* executes MPI wait operation */
static void codes_exec_mpi_wait(nw_state* s, tw_bf* bf, nw_message* m, tw_lp* lp);
static void codes_exec_mpi_wait(
nw_state* s, tw_lp* lp, nw_message * m, struct codes_workload_op * mpi_op);
/* reverse of mpi wait function. */
static void codes_exec_mpi_wait_rc(nw_state* s, tw_bf* bf, nw_message* m, tw_lp* lp);
static void codes_exec_mpi_wait_rc(
nw_state* s, tw_bf* bf, nw_message* m, tw_lp* lp);
/* executes MPI isend and send operations */
static void codes_exec_mpi_send(nw_state* s, nw_message* m, tw_lp* lp);
static void codes_exec_mpi_send(
nw_state* s, tw_lp* lp, struct codes_workload_op * mpi_op);
/* execute MPI irecv operation */
static void codes_exec_mpi_recv(nw_state* s, nw_message* m, tw_lp* lp);
static void codes_exec_mpi_recv(
nw_state* s, tw_lp* lp, nw_message * m, struct codes_workload_op * mpi_op);
/* reverse of mpi recv function. */
static void codes_exec_mpi_recv_rc(nw_state* s, nw_message* m, tw_lp* lp);
static void codes_exec_mpi_recv_rc(
nw_state* s, nw_message* m, tw_lp* lp);
/* execute the computational delay */
static void codes_exec_comp_delay(nw_state* s, nw_message* m, tw_lp* lp);
static void codes_exec_comp_delay(
nw_state* s, tw_lp* lp, struct codes_workload_op * mpi_op);
/* execute collective operation, currently only skips these operations. */
static void codes_exec_mpi_col(nw_state* s, nw_message* m, tw_lp* lp);
static void codes_exec_mpi_col(
nw_state* s, tw_lp* lp);
/* gets the next MPI operation from the network-workloads API. */
static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp);
static void get_next_mpi_operation(
nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp);
/* reverse handler of get next mpi operation. */
static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp);
static void get_next_mpi_operation_rc(
nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp);
/* Makes a call to get_next_mpi_operation. */
static void codes_issue_next_event(tw_lp* lp);
///////////////////// HELPER FUNCTIONS FOR MPI MESSAGE QUEUE HANDLING ///////////////
/* upon arrival of local completion message, inserts operation in completed send queue */
static void update_send_completion_queue(nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
static void update_send_completion_queue(
nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
/* reverse of the above function */
static void update_send_completion_queue_rc(nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
static void update_send_completion_queue_rc(
nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
/* upon arrival of an isend operation, updates the arrival queue of the network */
static void update_arrival_queue(nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
static void update_arrival_queue(
nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
/* reverse of the above function */
static void update_arrival_queue_rc(nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
static void update_arrival_queue_rc(
nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
/* callback to a message sender for computing message time */
static void update_message_time(nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
static void update_message_time_rc(nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
static void update_message_time(
nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
/* reverse for computing message time */
static void update_message_time_rc(
nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
/* insert MPI operation in the waiting queue*/
static void mpi_pending_queue_insert_op(struct mpi_queue_ptrs* mpi_queue, struct codes_workload_op* mpi_op);
static void mpi_pending_queue_insert_op(
struct mpi_queue_ptrs* mpi_queue, struct codes_workload_op* mpi_op);
/* remove completed request IDs from the queue for reuse. Reverse of above function. */
static void remove_req_id(struct completed_requests** requests, int16_t req_id);
static void remove_req_id(
struct completed_requests** requests, int16_t req_id);
/* remove MPI operation from the waiting queue.*/
static int mpi_queue_remove_matching_op(nw_state* s, tw_lp* lp, struct mpi_queue_ptrs* mpi_queue, nw_message * m);
static int mpi_queue_remove_matching_op(
nw_state* s, tw_lp* lp, nw_message * m, struct mpi_queue_ptrs* mpi_queue, struct codes_workload_op * mpi_op);
/* remove the tail of the MPI operation from waiting queue */
static int mpi_queue_remove_tail(tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue);
static int mpi_queue_remove_tail(
tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue);
/* insert completed MPI requests in the queue. */
static void mpi_completed_queue_insert_op(struct completed_requests** mpi_completed_queue, dumpi_req_id req_id);
static void mpi_completed_queue_insert_op(
struct completed_requests** mpi_completed_queue, dumpi_req_id req_id);
/* notifies the wait operations (if any) about the completed receives and sends requests. */
static int notify_waits(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_req_id req_id);
static int notify_waits(
nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_req_id req_id);
/* reverse of notify waits function. */
static void notify_waits_rc(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_req_id completed_req);
static void notify_waits_rc(
nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_req_id completed_req);
/* conversion from seconds to eanaoseconds */
static tw_stime s_to_ns(tw_stime ns);
......@@ -260,10 +255,7 @@ static struct mpi_queue_ptrs* queue_init()
/* helper function: counts number of elements in the queue */
static int numQueue(struct mpi_queue_ptrs* mpi_queue)
{
struct mpi_msgs_queue* tmp = malloc(sizeof(struct mpi_msgs_queue));
assert(tmp);
tmp = mpi_queue->queue_head;
struct mpi_msgs_queue* tmp = mpi_queue->queue_head;
int count = 0;
while(tmp)
......@@ -272,7 +264,6 @@ static int numQueue(struct mpi_queue_ptrs* mpi_queue)
tmp = tmp->next;
}
return count;
free(tmp);
}
/* prints elements in a send/recv queue */
......@@ -370,27 +361,29 @@ static void notify_waits_rc(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, du
*/
if(lp->gid == TRACE)
printf("\n %lf reverse -- notify waits req id %d ", tw_now(lp), completed_req);
printCompletedQueue(s, lp);
if(m->u.rc.matched_op == 1)
s->pending_waits->num_completed--;
/* if a wait-elem exists, it means the request ID has been matched*/
if(m->u.rc.matched_op == 2)
{
if(lp->gid == TRACE)
{
printf("\n %lf matched req id %d ", tw_now(lp), completed_req);
printCompletedQueue(s, lp);
}
if(lp->gid == TRACE)
{
printf("\n %lf matched req id %d ", tw_now(lp), completed_req);
printCompletedQueue(s, lp);
}
struct pending_waits* wait_elem = m->u.rc.saved_pending_wait;
s->wait_time = m->u.rc.saved_wait_time;
int count = wait_elem->mpi_op->u.waits.count;
s->wait_time = m->u.rc.saved_wait_time;
int count = wait_elem->mpi_op->u.waits.count;
for( i = 0; i < count; i++ )
mpi_completed_queue_insert_op(&s->completed_reqs, wait_elem->mpi_op->u.waits.req_ids[i]);
for( i = 0; i < count; i++ )
mpi_completed_queue_insert_op(&s->completed_reqs, wait_elem->mpi_op->u.waits.req_ids[i]);
wait_elem->num_completed--;
s->pending_waits = wait_elem;
tw_rand_reverse_unif(lp->rng);
wait_elem->num_completed--;
s->pending_waits = wait_elem;
tw_rand_reverse_unif(lp->rng);
}
}
......@@ -428,8 +421,7 @@ static int notify_waits(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_
return 0;
}
}
else
if(op_type == CODES_WK_WAITALL)
else if(op_type == CODES_WK_WAITALL)
{
int required_count = wait_elem->mpi_op->u.waits.count;
for(i = 0; i < required_count; i++)
......@@ -443,23 +435,25 @@ static int notify_waits(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_
}
}
if(wait_elem->num_completed == required_count)
{
if(lp->gid == TRACE)
{
printf("\n %lf req %d completed %d", tw_now(lp), completed_req, wait_elem->num_completed);
printCompletedQueue(s, lp);
}
m->u.rc.matched_op = 2;
m->u.rc.saved_wait_time = s->wait_time;
s->wait_time += (tw_now(lp) - wait_elem->start_time);
m->u.rc.saved_pending_wait = wait_elem;
s->pending_waits = NULL;
for(i = 0; i < required_count; i++)
remove_req_id(&s->completed_reqs, wait_elem->mpi_op->u.waits.req_ids[i]);
codes_issue_next_event(lp); //wait completed
}
}
if(wait_elem->num_completed == required_count)
{
if(lp->gid == TRACE)
{
printf("\n %lf req %d completed %d", tw_now(lp), completed_req, wait_elem->num_completed);
printCompletedQueue(s, lp);
}
m->u.rc.matched_op = 2;
m->u.rc.saved_wait_time = s->wait_time;
s->wait_time += (tw_now(lp) - wait_elem->start_time);
m->u.rc.saved_pending_wait = wait_elem;
s->pending_waits = NULL;
for(i = 0; i < required_count; i++)
remove_req_id(&s->completed_reqs, wait_elem->mpi_op->u.waits.req_ids[i]);
codes_issue_next_event(lp); //wait completed
}
}
return 0;
}
......@@ -469,21 +463,21 @@ static void codes_exec_mpi_wait_rc(nw_state* s, tw_bf* bf, nw_message* m, tw_lp*
if(s->pending_waits)
{
s->pending_waits = NULL;
return;
return;
}
else
{
mpi_completed_queue_insert_op(&s->completed_reqs, m->op->u.wait.req_id);
tw_rand_reverse_unif(lp->rng);
mpi_completed_queue_insert_op(&s->completed_reqs, m->op->u.wait.req_id);
tw_rand_reverse_unif(lp->rng);
}
}
/* execute MPI wait operation */
static void codes_exec_mpi_wait(nw_state* s, tw_bf* bf, nw_message* m, tw_lp* lp)
static void codes_exec_mpi_wait(nw_state* s, tw_lp* lp, nw_message * m, struct codes_workload_op * mpi_op)
{
/* check in the completed receives queue if the request ID has already been completed.*/
assert(!s->pending_waits);
dumpi_req_id req_id = m->op->u.wait.req_id;
dumpi_req_id req_id = mpi_op->u.wait.req_id;
struct completed_requests* current = s->completed_reqs;
while(current) {
......@@ -498,7 +492,7 @@ static void codes_exec_mpi_wait(nw_state* s, tw_bf* bf, nw_message* m, tw_lp* lp
/* If not, add the wait operation in the pending 'waits' list. */
struct pending_waits* wait_op = malloc(sizeof(struct pending_waits));
wait_op->mpi_op = m->op;
wait_op->mpi_op = mpi_op;
wait_op->num_completed = 0;
wait_op->start_time = tw_now(lp);
s->pending_waits = wait_op;
......@@ -506,38 +500,39 @@ static void codes_exec_mpi_wait(nw_state* s, tw_bf* bf, nw_message* m, tw_lp* lp
static void codes_exec_mpi_wait_all_rc(nw_state* s, tw_bf* bf, nw_message* m, tw_lp* lp)
{
if(lp->gid == TRACE)
{
printf("\n %lf codes exec mpi waitall reverse %d ", tw_now(lp), m->u.rc.found_match);
printCompletedQueue(s, lp);
}
if(lp->gid == TRACE)
{
printf("\n %lf codes exec mpi waitall reverse %d ", tw_now(lp), m->u.rc.found_match);
printCompletedQueue(s, lp);
}
if(m->u.rc.found_match)
{
int i;
int count = m->op->u.waits.count;
dumpi_req_id req_id[count];
for( i = 0; i < count; i++)
{
req_id[i] = m->op->u.waits.req_ids[i];
mpi_completed_queue_insert_op(&s->completed_reqs, req_id[i]);
}
tw_rand_reverse_unif(lp->rng);
}
int i;
int count = m->op->u.waits.count;
dumpi_req_id req_id[count];
for( i = 0; i < count; i++)
{
req_id[i] = m->op->u.waits.req_ids[i];
mpi_completed_queue_insert_op(&s->completed_reqs, req_id[i]);
}
tw_rand_reverse_unif(lp->rng);
}
else
{
struct pending_waits* wait_op = s->pending_waits;
free(wait_op);
s->pending_waits = NULL;
assert(!s->pending_waits);
if(lp->gid == TRACE)
printf("\n %lf Nullifying codes waitall ", tw_now(lp));
}
struct pending_waits* wait_op = s->pending_waits;
free(wait_op);
s->pending_waits = NULL;
assert(!s->pending_waits);
if(lp->gid == TRACE)
printf("\n %lf Nullifying codes waitall ", tw_now(lp));
}
}
static void codes_exec_mpi_wait_all(nw_state* s, tw_bf* bf, nw_message* m, tw_lp* lp)
static void codes_exec_mpi_wait_all(
nw_state* s, tw_lp* lp, nw_message * m, struct codes_workload_op * mpi_op)
{
//assert(!s->pending_waits);
int count = m->op->u.waits.count;
int count = mpi_op->u.waits.count;
int i, num_completed = 0;
dumpi_req_id req_id[count];
struct completed_requests* current = s->completed_reqs;
......@@ -545,18 +540,18 @@ static void codes_exec_mpi_wait_all(nw_state* s, tw_bf* bf, nw_message* m, tw_lp
/* check number of completed irecvs in the completion queue */
if(lp->gid == TRACE)
{
printf(" \n (%lf) MPI waitall posted %d count", tw_now(lp), m->op->u.waits.count);
printf(" \n (%lf) MPI waitall posted %d count", tw_now(lp), mpi_op->u.waits.count);
for(i = 0; i < count; i++)
printf(" %d ", (int)m->op->u.waits.req_ids[i]);
printf(" %d ", (int)mpi_op->u.waits.req_ids[i]);
printCompletedQueue(s, lp);
}
while(current)
{
for(i = 0; i < count; i++)
{
req_id[i] = m->op->u.waits.req_ids[i];
req_id[i] = mpi_op->u.waits.req_ids[i];
if(req_id[i] == current->req_id)
num_completed++;
num_completed++;
}
current = current->next;
}
......@@ -577,7 +572,7 @@ static void codes_exec_mpi_wait_all(nw_state* s, tw_bf* bf, nw_message* m, tw_lp
{
/* If not, add the wait operation in the pending 'waits' list. */
struct pending_waits* wait_op = malloc(sizeof(struct pending_waits));
wait_op->mpi_op = m->op;
wait_op->mpi_op = mpi_op;
wait_op->num_completed = num_completed;
wait_op->start_time = tw_now(lp);
s->pending_waits = wait_op;
......@@ -585,14 +580,15 @@ static void codes_exec_mpi_wait_all(nw_state* s, tw_bf* bf, nw_message* m, tw_lp
}
/* request ID is being reused so delete it from the list once the matching is done */
static void remove_req_id(struct completed_requests** mpi_completed_queue, dumpi_req_id req_id)
static void remove_req_id(
struct completed_requests** mpi_completed_queue, dumpi_req_id req_id)
{
struct completed_requests* current = *mpi_completed_queue;
if(!current)
tw_error(TW_LOC, "\n REQ ID DOES NOT EXIST");
if(current->req_id == req_id)
if(current->req_id == req_id)
{
*mpi_completed_queue = current->next;
free(current);
......@@ -605,17 +601,18 @@ static void remove_req_id(struct completed_requests** mpi_completed_queue, dumpi
elem = current->next;
if(elem->req_id == req_id)
{
current->next = elem->next;
free(elem);
return;
}
current->next = elem->next;
free(elem);
return;
}
current = current->next;
}
return;
}
/* inserts mpi operation in the completed requests queue */
static void mpi_completed_queue_insert_op(struct completed_requests** mpi_completed_queue, dumpi_req_id req_id)
static void mpi_completed_queue_insert_op(
struct completed_requests** mpi_completed_queue, dumpi_req_id req_id)
{
struct completed_requests* reqs = malloc(sizeof(struct completed_requests));
assert(reqs);
......@@ -624,9 +621,9 @@ static void mpi_completed_queue_insert_op(struct completed_requests** mpi_comple
if(!(*mpi_completed_queue))
{
reqs->next = NULL;
*mpi_completed_queue = reqs;
return;
reqs->next = NULL;
*mpi_completed_queue = reqs;
return;
}
reqs->next = *mpi_completed_queue;
*mpi_completed_queue = reqs;
......@@ -634,14 +631,15 @@ static void mpi_completed_queue_insert_op(struct completed_requests** mpi_comple
}
/* insert MPI send or receive operation in the queues starting from tail. Unmatched sends go to arrival queue and unmatched receives go to pending receives queues. */
static void mpi_pending_queue_insert_op(struct mpi_queue_ptrs* mpi_queue, struct codes_workload_op* mpi_op)
static void mpi_pending_queue_insert_op(
struct mpi_queue_ptrs* mpi_queue, struct codes_workload_op* mpi_op)
{
/* insert mpi operation */
struct mpi_msgs_queue* elem = malloc(sizeof(struct mpi_msgs_queue));
assert(elem);
elem->mpi_op = mpi_op;
elem->next = NULL;
elem->next = NULL;
if(!mpi_queue->queue_head)
mpi_queue->queue_head = elem;
......@@ -649,14 +647,15 @@ static void mpi_pending_queue_insert_op(struct mpi_queue_ptrs* mpi_queue, struct
if(mpi_queue->queue_tail)
mpi_queue->queue_tail->next = elem;
mpi_queue->queue_tail = elem;
mpi_queue->queue_tail = elem;
mpi_queue->num_elems++;
return;
}
/* match the send/recv operations */
static int match_receive(nw_state* s, tw_lp* lp, tw_lpid lpid, struct codes_workload_op* op1, struct codes_workload_op* op2)
static int match_receive(
nw_state* s, tw_lp* lp, tw_lpid lpid, struct codes_workload_op* op1, struct codes_workload_op* op2)
{
assert(op1->op_type == CODES_WK_IRECV || op1->op_type == CODES_WK_RECV);
assert(op2->op_type == CODES_WK_SEND || op2->op_type == CODES_WK_ISEND);
......@@ -710,10 +709,8 @@ static int mpi_queue_remove_tail(tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue)
/* search for a matching mpi operation and remove it from the list.
* Record the index in the list from where the element got deleted.
* Index is used for inserting the element once again in the queue for reverse computation. */
static int mpi_queue_remove_matching_op(nw_state* s, tw_lp* lp, struct mpi_queue_ptrs* mpi_queue, nw_message * m)
static int mpi_queue_remove_matching_op(nw_state* s, tw_lp* lp, nw_message * m, struct mpi_queue_ptrs* mpi_queue, struct codes_workload_op * mpi_op)
{
struct codes_workload_op * mpi_op = m->op;
if(mpi_queue->queue_head == NULL)
return -1;
......@@ -736,7 +733,7 @@ static int mpi_queue_remove_matching_op(nw_state* s, tw_lp* lp, struct mpi_queue
if(rcv_val >= 0)
{
/* TODO: fix RC */
/*memcpy(&m->u.rc.ptr_match_op, &tmp->mpi_op, sizeof(struct codes_workload_op));*/
m->u.rc.ptr_match_op = tmp->mpi_op;
if(mpi_queue->queue_head == mpi_queue->queue_tail)
{
mpi_queue->queue_tail = NULL;
......@@ -762,20 +759,19 @@ static int mpi_queue_remove_matching_op(nw_state* s, tw_lp* lp, struct mpi_queue
if(mpi_op->op_type == CODES_WK_SEND || mpi_op->op_type == CODES_WK_ISEND)
{
rcv_val = match_receive(s, lp, lp->gid, elem->mpi_op, mpi_op);
rcv_val = match_receive(s, lp, lp->gid, elem->mpi_op, mpi_op);