Commit bf9dbce4 authored by Misbah Mubarak's avatar Misbah Mubarak

updates to MPI Sim Layer and dragonfly model

parent de244764
......@@ -25,5 +25,5 @@ PARAMS
global_bandwidth="4.7";
cn_bandwidth="5.25";
message_size="528";
routing="minimal";
routing="adaptive";
}
......@@ -14,8 +14,9 @@
#include "codes/rc-stack.h"
#include "codes/quicklist.h"
#define TRACK -1
#define TRACE -1
#define MAX_WAIT_REQS 200
#define MAX_WAIT_REQS 512
char workload_type[128];
char workload_file[8192];
......@@ -37,6 +38,10 @@ static int net_id = 0;
static float noise = 5.0;
static int num_net_lps, num_nw_lps;
#define CS_LP_DBG 0
#define lprintf(_fmt, ...) \
do {if (CS_LP_DBG) printf(_fmt, __VA_ARGS__);} while (0)
long long num_bytes_sent=0;
long long num_bytes_recvd=0;
......@@ -85,9 +90,10 @@ struct completed_requests
struct pending_waits
{
int op_type;
int req_ids[MAX_WAIT_REQS];
int16_t req_ids[MAX_WAIT_REQS];
int num_completed;
tw_stime start_time;
int count;
tw_stime start_time;
struct qlist_head ql;
};
......@@ -104,6 +110,7 @@ struct nw_state
struct rc_stack * processed_ops;
struct rc_stack * matched_qitems;
struct rc_stack * matched_reqs;
/* count of sends, receives, collectives and delays */
unsigned long num_sends;
......@@ -143,47 +150,36 @@ struct nw_state
struct nw_message
{
int msg_type;
struct
{
/* forward event handler */
struct
{
int op_type;
tw_lpid src_rank;
tw_lpid dest_rank;
int num_bytes;
int data_type;
double sim_start_time;
// for callbacks - time message was received
double msg_send_time;
int16_t req_id;
int tag;
} msg_info;
/* required for reverse computation*/
struct
{
int found_match;
short matched_op;
dumpi_req_id saved_matched_req;
double saved_send_time;
double saved_recv_time;
double saved_wait_time;
} rc;
} u;
int op_type;
tw_lpid src_rank;
tw_lpid dest_rank;
int num_bytes;
int data_type;
double sim_start_time;
// for callbacks - time message was received
double msg_send_time;
int16_t req_id;
int tag;
int found_match;
short wait_completed;
dumpi_req_id saved_matched_req;
double saved_send_time;
double saved_recv_time;
double saved_wait_time;
struct mpi_msgs_queue * saved_item;
};
/* executes MPI isend and send operations */
static void codes_exec_mpi_send(
nw_state* s, tw_lp* lp, struct codes_workload_op * mpi_op);
nw_state* s, tw_bf * bf, nw_message * m, tw_lp* lp, struct codes_workload_op * mpi_op);
/* execute MPI irecv operation */
static void codes_exec_mpi_recv(
nw_state* s, tw_lp* lp, nw_message * m, struct codes_workload_op * mpi_op);
nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, struct codes_workload_op * mpi_op);
/* reverse of mpi recv function. */
static void codes_exec_mpi_recv_rc(
nw_state* s, nw_message* m, tw_lp* lp, struct codes_workload_op * mpi_op);
nw_state* s, tw_bf * bf, nw_message* m, tw_lp* lp, struct codes_workload_op * mpi_op);
/* execute the computational delay */
static void codes_exec_comp_delay(
nw_state* s, tw_lp* lp, struct codes_workload_op * mpi_op);
......@@ -202,6 +198,14 @@ static void codes_issue_next_event_rc(tw_lp* lp);
///////////////////// HELPER FUNCTIONS FOR MPI MESSAGE QUEUE HANDLING ///////////////
/* upon arrival of local completion message, inserts operation in completed send queue */
/* upon arrival of an isend operation, updates the arrival queue of the network */
static void update_completed_queue(
nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp, dumpi_req_id req_id);
/* reverse of the above function */
static void update_completed_queue_rc(
nw_state*s,
tw_bf * bf,
nw_message * m,
tw_lp * lp);
static void update_arrival_queue(
nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
/* reverse of the above function */
......@@ -217,261 +221,261 @@ static void update_message_time_rc(
/* conversion from seconds to eanaoseconds */
static tw_stime s_to_ns(tw_stime ns);
static void print_waiting_reqs(int16_t * reqs, int count)
{
printf("\n Waiting reqs: ");
int i;
for(i = 0; i < count; i++ )
printf(" %d ", reqs[i]);
}
static void print_completed_queue(struct qlist_head * head)
{
printf("\n Completed queue: ");
struct qlist_head * ent = NULL;
struct completed_requests* current = NULL;
qlist_for_each(ent, head)
{
current = qlist_entry(ent, completed_requests, ql);
printf(" %ld ", current->req_id);
}
}
static void clear_completed_reqs(nw_state * s,
tw_lp * lp,
int16_t * reqs, int count)
{
int i;
for( i = 0; i < count; i++)
{
printf("\n req %d ", reqs[i]);
print_completed_queue(&s->completed_reqs);
struct qlist_head * ent = NULL;
qlist_for_each(ent, &s->completed_reqs)
{
struct completed_requests* current =
qlist_entry(ent, completed_requests, ql);
if(current->req_id == reqs[i])
{
qlist_del(&current->ql);
rc_stack_push(lp, current, free, s->matched_reqs);
}
}
}
}
static void add_completed_reqs(nw_state * s,
int16_t * reqs, int count)
{
int i;
for( i = 0; i < count; i++)
{
struct completed_requests * req = rc_stack_pop(s->matched_reqs);
req->req_id = reqs[i];
qlist_add(&req->ql, &s->completed_reqs);
}
}
/* helper function - maps an MPI rank to an LP id */
static tw_lpid rank_to_lpid(int rank)
{
return codes_mapping_get_lpid_from_relative(rank, NULL, "nw-lp", NULL, 0);
}
/* reverse handler of notify_waits function. */
/*static void notify_waits_rc(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_req_id completed_req)
static void notify_posted_wait_rc(
nw_state* s, tw_bf * bf,
nw_message * m,
tw_lp * lp)
{
int i;
*//*if(bf->c1)
{*/
/* if pending wait is still present and is of type MPI_WAIT then do nothing*/
/* s->wait_time = s->saved_wait_time;
mpi_completed_queue_insert_op(&s->completed_reqs, completed_req);
s->pending_waits = wait_elem;
s->saved_pending_wait = NULL;
}
*/
/* if(lp->gid == TRACE)
printf("\n %lf reverse -- notify waits req id %d ", tw_now(lp), completed_req);
printCompletedQueue(s, lp);
if(m->u.rc.matched_op == 1)
s->pending_waits->num_completed--;
*//* if a wait-elem exists, it means the request ID has been matched*/
/* if(m->u.rc.matched_op == 2)
if(m->wait_completed > 0)
{
if(lp->gid == TRACE)
{
printf("\n %lf matched req id %d ", tw_now(lp), completed_req);
printCompletedQueue(s, lp);
}
struct pending_waits* wait_elem = m->u.rc.saved_pending_wait;
s->wait_time = m->u.rc.saved_wait_time;
int count = wait_elem->mpi_op->u.waits.count;
for( i = 0; i < count; i++ )
mpi_completed_queue_insert_op(&s->completed_reqs, wait_elem->mpi_op->u.waits.req_ids[i]);
wait_elem->num_completed--;
s->pending_waits = wait_elem;
tw_rand_reverse_unif(lp->rng);
}
}*/
/* notify the completed send/receive request to the wait operation. */
/*static int notify_waits(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_req_id completed_req)
s->wait_op->num_completed--;
}
}
static int notify_posted_wait(nw_state* s,
tw_bf * bf, nw_message * m, tw_lp * lp,
dumpi_req_id completed_req)
{
int i;
*//* traverse the pending waits list and look what type of wait operations are
there. If its just a single wait and the request ID has just been completed,
then the network node LP can go on with fetching the next operation from the log.
If its waitall then wait for all pending requests to complete and then proceed. */
/*struct pending_waits* wait_elem = s->pending_waits;
m->u.rc.matched_op = 0;
if(lp->gid == TRACE)
printf("\n %lf notify waits req id %d ", tw_now(lp), completed_req);
if(!wait_elem)
return 0;
struct pending_waits* wait_elem = s->wait_op;
int wait_completed = 0;
int op_type = wait_elem->mpi_op->op_type;
if(!wait_elem)
{
m->wait_completed = 0;
return 0;
}
int op_type = wait_elem->op_type;
if(op_type == CODES_WK_WAIT)
{
if(wait_elem->mpi_op->u.wait.req_id == completed_req)
{
m->u.rc.saved_wait_time = s->wait_time;
s->wait_time += (tw_now(lp) - wait_elem->start_time);
remove_req_id(&s->completed_reqs, completed_req);
m->u.rc.saved_pending_wait = wait_elem;
s->pending_waits = NULL;
codes_issue_next_event(lp);
return 0;
}
}
else if(op_type == CODES_WK_WAITALL)
{
int required_count = wait_elem->mpi_op->u.waits.count;
for(i = 0; i < required_count; i++)
{
if(wait_elem->mpi_op->u.waits.req_ids[i] == completed_req)
{
if(lp->gid == TRACE)
printCompletedQueue(s, lp);
m->u.rc.matched_op = 1;
wait_elem->num_completed++;
}
}
if(wait_elem->num_completed == required_count)
{
if(lp->gid == TRACE)
if(op_type == CODES_WK_WAIT &&
(wait_elem->req_ids[0] == completed_req))
{
wait_completed = 1;
}
else if(op_type == CODES_WK_WAITALL
|| op_type == CODES_WK_WAITANY
|| op_type == CODES_WK_WAITSOME)
{
int i;
for(i = 0; i < wait_elem->count; i++)
{
if(wait_elem->req_ids[i] == completed_req)
{
printf("\n %lf req %d completed %d", tw_now(lp), completed_req, wait_elem->num_completed);
printCompletedQueue(s, lp);
wait_elem->num_completed++;
assert(wait_elem->num_completed <= wait_elem->count);
if(wait_elem->num_completed == wait_elem->count)
wait_completed = 1;
m->wait_completed = 1;
}
m->u.rc.matched_op = 2;
m->u.rc.saved_wait_time = s->wait_time;
s->wait_time += (tw_now(lp) - wait_elem->start_time);
m->u.rc.saved_pending_wait = wait_elem;
s->pending_waits = NULL;
for(i = 0; i < required_count; i++)
remove_req_id(&s->completed_reqs, wait_elem->mpi_op->u.waits.req_ids[i]);
codes_issue_next_event(lp); //wait completed
}
}
}
return 0;
return wait_completed;
}
*/
/* reverse handler of MPI wait operation */
/*static void codes_exec_mpi_wait_rc(nw_state* s, nw_message* m, tw_lp* lp, struct codes_workload_op * mpi_op)
static void codes_exec_mpi_wait_rc(nw_state* s, tw_lp* lp, struct codes_workload_op * mpi_op)
{
if(s->pending_waits)
if(s->wait_op)
{
s->pending_waits = NULL;
return;
struct pending_waits * wait_op = s->wait_op;
free(wait_op);
s->wait_op = NULL;
}
else
{
codes_issue_next_event_rc(lp);
mpi_completed_queue_insert_op(&s->completed_reqs, mpi_op->u.wait.req_id);
rc_stack_pop(s->st);
completed_requests * qi = rc_stack_pop(s->matched_reqs);
qlist_add(&qi->ql, &s->completed_reqs);
}
return;
}
*/
/* execute MPI wait operation */
/*static void codes_exec_mpi_wait(nw_state* s, tw_lp* lp, nw_message * m, struct codes_workload_op * mpi_op)
static void codes_exec_mpi_wait(nw_state* s, tw_lp* lp, struct codes_workload_op * mpi_op)
{
*/ /* check in the completed receives queue if the request ID has already been completed.*/
/* assert(!s->pending_waits);
/* check in the completed receives queue if the request ID has already been completed.*/
assert(!s->wait_op);
dumpi_req_id req_id = mpi_op->u.wait.req_id;
struct completed_requests* current = NULL;
struct completed_requests* current = s->completed_reqs;
while(current) {
if(current->req_id == req_id) {
remove_req_id(&s->completed_reqs, req_id);
m->u.rc.saved_wait_time = s->wait_time;
struct qlist_head * ent = NULL;
qlist_for_each(ent, &s->completed_reqs)
{
current = qlist_entry(ent, completed_requests, ql);
if(current->req_id == req_id)
{
qlist_del(&current->ql);
rc_stack_push(lp, current, free, s->matched_reqs);
codes_issue_next_event(lp);
return;
}
current = current->next;
}
*/ /* If not, add the wait operation in the pending 'waits' list. */
/*struct pending_waits* wait_op = malloc(sizeof(struct pending_waits));
wait_op->mpi_op = mpi_op;
/* If not, add the wait operation in the pending 'waits' list. */
struct pending_waits* wait_op = malloc(sizeof(struct pending_waits));
wait_op->op_type = mpi_op->op_type;
wait_op->req_ids[0] = req_id;
wait_op->count = 1;
wait_op->num_completed = 0;
wait_op->start_time = tw_now(lp);
s->pending_waits = wait_op;
s->wait_op = wait_op;
// rc_stack_push(lp, wait_op, free, s->st);
return;
}
static void codes_exec_mpi_wait_all_rc(nw_state* s, nw_message* m, tw_lp* lp, struct codes_workload_op * mpi_op)
static void codes_exec_mpi_wait_all_rc(
nw_state* s,
tw_bf * bf,
nw_message * m,
tw_lp* lp,
struct codes_workload_op * mpi_op)
{
if(lp->gid == TRACE)
{
printf("\n %lf codes exec mpi waitall reverse %d ", tw_now(lp), m->u.rc.found_match);
printCompletedQueue(s, lp);
}
if(m->u.rc.found_match)
{
int i;
int count = mpi_op->u.waits.count;
dumpi_req_id req_id[count];
for( i = 0; i < count; i++)
{
req_id[i] = mpi_op->u.waits.req_ids[i];
mpi_completed_queue_insert_op(&s->completed_reqs, req_id[i]);
}
codes_issue_next_event_rc(lp);
}
else
{
struct pending_waits* wait_op = s->pending_waits;
rc_stack_pop(s->st);
s->pending_waits = NULL;
assert(!s->pending_waits);
if(lp->gid == TRACE)
printf("\n %lf Nullifying codes waitall ", tw_now(lp));
}
if(s->wait_op)
{
struct pending_waits * wait_op = s->wait_op;
free(wait_op);
s->wait_op = NULL;
}
else
{
add_completed_reqs(s, mpi_op->u.waits.req_ids, mpi_op->u.waits.count);
codes_issue_next_event_rc(lp);
}
return;
}
static void codes_exec_mpi_wait_all(
nw_state* s, tw_lp* lp, nw_message * m, struct codes_workload_op * mpi_op)
nw_state* s,
tw_bf * bf,
nw_message * m,
tw_lp* lp,
struct codes_workload_op * mpi_op)
{
//assert(!s->pending_waits);
int count = mpi_op->u.waits.count;
*//* If the count is not less than max wait reqs then stop */
/*assert(count < MAX_WAIT_REQS);
int i, num_completed = 0;
dumpi_req_id req_id[count];
struct completed_requests* current = s->completed_reqs;
*//* check number of completed irecvs in the completion queue */
/*if(lp->gid == TRACE)
{
printf(" \n (%lf) MPI waitall posted %d count", tw_now(lp), mpi_op->u.waits.count);
for(i = 0; i < count; i++)
printf(" %d ", (int)mpi_op->u.waits.req_ids[i]);
printCompletedQueue(s, lp);
}
while(current)
{
for(i = 0; i < count; i++)
{
req_id[i] = mpi_op->u.waits.req_ids[i];
if(req_id[i] == current->req_id)
num_completed++;
}
current = current->next;
}
/* If the count is not less than max wait reqs then stop */
assert(count < MAX_WAIT_REQS);
if(TRACE== lp->gid)
printf("\n %lf Num completed %d count %d ", tw_now(lp), num_completed, count);
int i = 0, num_matched = 0;
m->u.rc.found_match = 0;
if(count == num_completed)
if(s->nw_id == TRACK)
{
m->u.rc.found_match = 1;
for( i = 0; i < count; i++)
remove_req_id(&s->completed_reqs, req_id[i]);
print_waiting_reqs(mpi_op->u.waits.req_ids, count);
print_completed_queue(&s->completed_reqs);
}
/* check number of completed irecvs in the completion queue */
for(i = 0; i < count; i++)
{
dumpi_req_id req_id = mpi_op->u.waits.req_ids[i];
struct qlist_head * ent = NULL;
struct completed_requests* current = NULL;
qlist_for_each(ent, &s->completed_reqs)
{
current = qlist_entry(ent, completed_requests, ql);
if(current->req_id == req_id)
num_matched++;
}
}
codes_issue_next_event(lp);
if(s->nw_id == TRACK)
printf("\n num matched %ld count %ld ", num_matched, count);
m->found_match = num_matched;
if(num_matched == count)
{
/* No need to post a MPI Wait all then, issue next event */
/* Remove all completed requests from the list */
s->wait_op = NULL;
clear_completed_reqs(s, lp, mpi_op->u.waits.req_ids, count);
codes_issue_next_event(lp);
}
else
{*/
/* If not, add the wait operation in the pending 'waits' list. */
/*struct pending_waits* wait_op = malloc(sizeof(struct pending_waits));
wait_op->mpi_op = mpi_op;
wait_op->num_completed = num_completed;
{
/* If not, add the wait operation in the pending 'waits' list. */
struct pending_waits* wait_op = malloc(sizeof(struct pending_waits));
wait_op->count = count;
wait_op->op_type = mpi_op->op_type;
assert(count < MAX_WAIT_REQS);
for(i = 0; i < count; i++)
wait_op->req_ids[i] = mpi_op->u.waits.req_ids[i];
wait_op->num_completed = num_matched;
wait_op->start_time = tw_now(lp);
rc_stack_push(lp, wait_op, free, s->st);
s->pending_waits = wait_op;
s->wait_op = wait_op;
}
}*/
return;
}
/* search for a matching mpi operation and remove it from the list.
* Record the index in the list from where the element got deleted.
* Index is used for inserting the element once again in the queue for reverse computation. */
static int rm_matching_rcv(nw_state * ns, tw_lp * lp, mpi_msgs_queue * qitem)
static int rm_matching_rcv(nw_state * ns,
tw_bf * bf,
nw_message * m,
tw_lp * lp,
mpi_msgs_queue * qitem)
{
int matched = 0;
int index = 0;
struct qlist_head *ent = NULL;
mpi_msgs_queue * qi = NULL;
qlist_for_each(ent, &ns->pending_recvs_queue){
qi = qlist_entry(ent, mpi_msgs_queue, ql);
if((qi->num_bytes >= qitem->num_bytes)
......@@ -486,7 +490,14 @@ static int rm_matching_rcv(nw_state * ns, tw_lp * lp, mpi_msgs_queue * qitem)
if(matched)
{
m->saved_item = qi;
m->saved_recv_time = ns->recv_time;
ns->recv_time += (tw_now(lp) - qi->req_init_time);
if(qi->op_type == CODES_WK_IRECV)
{
update_completed_queue(ns, bf, m, lp, qi->req_id);
}
qlist_del(&qi->ql);
rc_stack_push(lp, qi, free, ns->matched_qitems);
return index;
......@@ -494,9 +505,11 @@ static int rm_matching_rcv(nw_state * ns, tw_lp * lp, mpi_msgs_queue * qitem)
return -1;
}
static int rm_matching_send(nw_state * ns, tw_lp * lp, mpi_msgs_queue * qitem)
static int rm_matching_send(nw_state * ns,
tw_bf * bf,
nw_message * m,
tw_lp * lp, mpi_msgs_queue * qitem)
{
int matched = 0;
struct qlist_head *ent = NULL;
mpi_msgs_queue * qi = NULL;
......@@ -516,6 +529,13 @@ static int rm_matching_send(nw_state * ns, tw_lp * lp, mpi_msgs_queue * qitem)
if(matched)
{
m->saved_recv_time = ns->recv_time;
m->saved_item = qi;
ns->recv_time += (tw_now(lp) - qitem->req_init_time);
if(qitem->op_type == CODES_WK_IRECV)
update_completed_queue(ns, bf, m, lp, qitem->req_id);
qlist_del(&qi->ql);
rc_stack_push(lp, qi, free, ns->matched_qitems);
return index;
......@@ -570,33 +590,37 @@ static void codes_exec_comp_delay(
}
/* reverse computation operation for MPI irecv */
static void codes_exec_mpi_recv_rc(nw_state* ns, nw_message* m, tw_lp* lp, struct codes_workload_op * mpi_op)
static void codes_exec_mpi_recv_rc(
nw_state* ns,
tw_bf * bf,
nw_message* m,
tw_lp* lp,
struct codes_workload_op * mpi_op)
{
num_bytes_recvd -= mpi_op->u.recv.num_bytes;
ns->recv_time = m->u.rc.saved_recv_time;
if(m->u.rc.found_match >= 0)
ns->recv_time = m->saved_recv_time;
if(m->found_match >= 0)
{
rc_stack_pop(ns->matched_qitems);
ns->recv_time = m->u.rc.saved_recv_time;
ns->recv_time = m->saved_recv_time;
int queue_count = qlist_count(&ns->arrival_queue);
mpi_msgs_queue * qi = rc_stack_pop(ns->matched_qitems);
if(!m->u.rc.found_match)
if(!m->found_match)
{
qlist_add(&qi->ql, &ns->arrival_queue);
}
else if(m->u.rc.found_match == queue_count)
else if(m->found_match >= queue_count)
{
qlist_add_tail(&qi->ql, &ns->arrival_queue);
}
else if(m->u.rc.found_match > 0 && m->u.rc.found_match < queue_count)
else if(m->found_match > 0 && m->found_match < queue_count)
{
int index = 1;
struct qlist_head * ent = NULL;
qlist_for_each(ent, &ns->arrival_queue)
{
if(index == m->u.rc.found_match)
if(index == m->found_match)
{
qlist_add(&qi->ql, ent);
break;
......@@ -604,9 +628,12 @@ static void codes_exec_mpi_recv_rc(nw_state* ns, nw_message* m, tw_lp* lp, struc
index++;
}
}
if(qi->op_type == CODES_WK_IRECV)
update_completed_queue_rc(ns, bf, m, lp);
codes_issue_next_event_rc(lp);
}
else if(m->u.rc.found_match < 0)
else if(m->found_match < 0)
{
struct qlist_head * ent = qlist_pop_back(&ns->pending_recvs_queue);
mpi_msgs_queue * qi = qlist_entry(ent, mpi_msgs_queue, ql);
......@@ -618,13 +645,18 @@ static void codes_exec_mpi_recv_rc(nw_state* ns, nw_message* m, tw_lp* lp, struc
}
/* Execute MPI Irecv operation (non-blocking receive) */
static void codes_exec_mpi_recv(nw_state* s, tw_lp* lp, nw_message * m, struct codes_workload_op * mpi_op)
static void codes_exec_mpi_recv(
nw_state* s,
tw_bf * bf,
nw_message