Commit f2d4cad6 authored by Misbah Mubarak's avatar Misbah Mubarak

Fixing ordering issue with waits (should resolve the unmatched sends/receives...

Fixing ordering issue with waits (should resolve the unmatched sends/receives error with traces like minife)
parent 83bc8e21
......@@ -156,7 +156,7 @@ struct codes_workload_op
*/
/* what type of operation this is */
enum codes_workload_op_type op_type;
int op_type;
/* currently only used by network workloads */
double start_time;
double end_time;
......@@ -198,7 +198,7 @@ struct codes_workload_op
int16_t data_type; /* MPI data type to be matched with the recv */
int count; /* number of elements to be received */
int tag; /* tag of the message */
int32_t req_id;
int req_id;
} send;
struct {
/* TODO: not sure why source rank is here */
......@@ -208,7 +208,7 @@ struct codes_workload_op
int16_t data_type; /* MPI data type to be matched with the send */
int count; /* number of elements to be sent */
int tag; /* tag of the message */
int32_t req_id;
int req_id;
} recv;
/* TODO: non-stub for other collectives */
struct {
......@@ -216,14 +216,14 @@ struct codes_workload_op
} collective;
struct {
int count;
int32_t* req_ids;
int* req_ids;
} waits;
struct {
int32_t req_id;
int req_id;
} wait;
struct
{
int32_t req_id;
int req_id;
}
free;
}u;
......
......@@ -35,12 +35,15 @@ static int msg_size_hash_compare(
/* NOTE: Message tracking works in sequential mode only! */
static int debug_cols = 0;
int enable_msg_tracking = 0;
int is_synthetic = 0;
/* Turning on this option slows down optimistic mode substantially. Only turn
* on if you get issues with wait-all completion with traces. */
static int preserve_wait_ordering = 0;
static int enable_msg_tracking = 0;
static int is_synthetic = 0;
tw_lpid TRACK_LP = -1;
static double total_syn_data = 0;
int unmatched = 0;
static int unmatched = 0;
char workload_type[128];
char workload_file[8192];
char offset_file[8192];
......@@ -83,7 +86,7 @@ static char cortex_gen[512] = "\0";
typedef struct nw_state nw_state;
typedef struct nw_message nw_message;
typedef int32_t dumpi_req_id;
typedef int dumpi_req_id;
static int net_id = 0;
static float noise = 2.0;
......@@ -161,15 +164,16 @@ struct mpi_msgs_queue
/* stores request IDs of completed MPI operations (Isends or Irecvs) */
struct completed_requests
{
dumpi_req_id req_id;
int req_id;
struct qlist_head ql;
int index;
};
/* for wait operations, store the pending operation and number of completed waits so far. */
struct pending_waits
{
int op_type;
int32_t req_ids[MAX_WAIT_REQS];
int req_ids[MAX_WAIT_REQS];
int num_completed;
int count;
tw_stime start_time;
......@@ -204,6 +208,7 @@ struct nw_state
struct rc_stack * processed_ops;
struct rc_stack * processed_wait_op;
struct rc_stack * matched_reqs;
// struct rc_stack * indices;
/* count of sends, receives, collectives and delays */
unsigned long num_sends;
......@@ -288,7 +293,7 @@ struct nw_message
double sim_start_time;
// for callbacks - time message was received
double msg_send_time;
int32_t req_id;
int req_id;
int tag;
int app_id;
int found_match;
......@@ -396,7 +401,7 @@ static void update_message_size(
/* update hash table */
if(!hash_link)
{
struct msg_size_info * msg_info = malloc(sizeof(struct msg_size_info));
struct msg_size_info * msg_info = (struct msg_size_info*)malloc(sizeof(struct msg_size_info));
msg_info->msg_size = qitem->num_bytes;
msg_info->num_msgs = 1;
msg_info->agg_latency = tw_now(lp) - msg_init_time;
......@@ -467,7 +472,7 @@ static void notify_background_traffic(
tw_event * e;
struct nw_message * m_new;
e = tw_event_new(global_dest_id, ts, lp);
m_new = tw_event_data(e);
m_new = (struct nw_message*)tw_event_data(e);
m_new->msg_type = CLI_BCKGND_FIN;
tw_event_send(e);
}
......@@ -526,7 +531,7 @@ static void notify_neighbor(
tw_event * e;
struct nw_message * m_new;
e = tw_event_new(global_dest_id, ts, lp);
m_new = tw_event_data(e);
m_new = (struct nw_message*)tw_event_data(e);
m_new->msg_type = CLI_NBR_FINISH;
tw_event_send(e);
}
......@@ -640,7 +645,7 @@ static void gen_synthetic_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * l
tw_event * e;
nw_message * m_new;
e = tw_event_new(lp->gid, ts, lp);
m_new = tw_event_data(e);
m_new = (struct nw_message*)tw_event_data(e);
m_new->msg_type = CLI_BCKGND_GEN;
tw_event_send(e);
}
......@@ -666,7 +671,7 @@ void arrive_syn_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
num_syn_bytes_recvd += data;
}
/* Debugging functions, may generate unused function warning */
static void print_waiting_reqs(int32_t * reqs, int count)
static void print_waiting_reqs(uint32_t * reqs, int count)
{
lprintf("\n Waiting reqs: %d count", count);
int i;
......@@ -688,20 +693,21 @@ static void print_msgs_queue(struct qlist_head * head, int is_send)
printf(" \n Source %d Dest %d bytes %llu tag %d ", current->source_rank, current->dest_rank, current->num_bytes, current->tag);
}
}
static void print_completed_queue(struct qlist_head * head)
static void print_completed_queue(tw_lp * lp, struct qlist_head * head)
{
printf("\n Completed queue: ");
// printf("\n Completed queue: ");
struct qlist_head * ent = NULL;
struct completed_requests* current = NULL;
tw_output(lp, "\n");
qlist_for_each(ent, head)
{
current = qlist_entry(ent, completed_requests, ql);
printf(" %d ", current->req_id);
tw_output(lp, " %llu ", current->req_id);
}
}
static int clear_completed_reqs(nw_state * s,
tw_lp * lp,
int32_t * reqs, int count)
int * reqs, int count)
{
(void)s;
(void)lp;
......@@ -714,25 +720,31 @@ static int clear_completed_reqs(nw_state * s,
struct completed_requests * current = NULL;
struct completed_requests * prev = NULL;
int index = 0;
qlist_for_each(ent, &s->completed_reqs)
{
current = qlist_entry(ent, completed_requests, ql);
if(prev)
if(prev)
{
rc_stack_push(lp, prev, free, s->matched_reqs);
prev = NULL;
}
current = qlist_entry(ent, completed_requests, ql);
current->index = index;
if(current->req_id == reqs[i])
{
++matched;
qlist_del(&current->ql);
prev = current;
}
else
prev = NULL;
++index;
}
if(prev)
rc_stack_push(lp, prev, free, s->matched_reqs);
{
rc_stack_push(lp, prev, free, s->matched_reqs);
prev = NULL;
}
}
return matched;
}
......@@ -741,12 +753,35 @@ static void add_completed_reqs(nw_state * s,
int count)
{
(void)lp;
int i;
for( i = 0; i < count; i++)
for(int i = 0; i < count; i++)
{
struct completed_requests * req = rc_stack_pop(s->matched_reqs);
qlist_add(&req->ql, &s->completed_reqs);
}
struct completed_requests * req = (struct completed_requests*)rc_stack_pop(s->matched_reqs);
// turn on only if wait-all unmatched error arises in optimistic mode.
if(preserve_wait_ordering)
{
if(req->index == 0)
{
qlist_add(&req->ql, &s->completed_reqs);
}
else
{
int index = 1;
struct qlist_head * ent = NULL;
qlist_for_each(ent, &s->completed_reqs)
{
if(index == req->index)
{
qlist_add(&req->ql, ent);
}
}//end qlist
}// end else*/
}
else
{
qlist_add(&req->ql, &s->completed_reqs);
}
}//end for
}
/* helper function - maps an MPI rank to an LP id */
......@@ -757,7 +792,7 @@ static tw_lpid rank_to_lpid(int rank)
static int notify_posted_wait(nw_state* s,
tw_bf * bf, nw_message * m, tw_lp * lp,
dumpi_req_id completed_req)
int completed_req)
{
(void)bf;
......@@ -774,6 +809,7 @@ static int notify_posted_wait(nw_state* s,
if(op_type == CODES_WK_WAIT &&
(wait_elem->req_ids[0] == completed_req))
{
m->fwd.wait_completed = 1;
wait_completed = 1;
}
else if(op_type == CODES_WK_WAITALL
......@@ -809,33 +845,48 @@ static int notify_posted_wait(nw_state* s,
}
/* reverse handler of MPI wait operation */
static void codes_exec_mpi_wait_rc(nw_state* s, tw_bf * bf, tw_lp* lp)
static void codes_exec_mpi_wait_rc(nw_state* s, tw_bf * bf, tw_lp* lp, nw_message * m)
{
if(bf->c2)
{
struct pending_waits * wait_op = s->wait_op;
free(wait_op);
s->wait_op = NULL;
}
if(bf->c1)
{
completed_requests * qi = (completed_requests*)rc_stack_pop(s->processed_ops);
if(m->fwd.found_match == 0)
{
qlist_add(&qi->ql, &s->completed_reqs);
}
else
{
int index = 1;
struct qlist_head * ent = NULL;
qlist_for_each(ent, &s->completed_reqs)
{
if(index == m->fwd.found_match)
{
qlist_add(&qi->ql, ent);
break;
}
index++;
}
}
codes_issue_next_event_rc(lp);
completed_requests * qi = rc_stack_pop(s->processed_ops);
qlist_add(&qi->ql, &s->completed_reqs);
return;
}
return;
struct pending_waits * wait_op = s->wait_op;
free(wait_op);
s->wait_op = NULL;
}
/* execute MPI wait operation */
static void codes_exec_mpi_wait(nw_state* s, tw_bf * bf, tw_lp* lp, struct codes_workload_op * mpi_op)
static void codes_exec_mpi_wait(nw_state* s, tw_bf * bf, nw_message * m, tw_lp* lp, struct codes_workload_op * mpi_op)
{
/* check in the completed receives queue if the request ID has already been completed.*/
assert(!s->wait_op);
dumpi_req_id req_id = mpi_op->u.wait.req_id;
int req_id = mpi_op->u.wait.req_id;
struct completed_requests* current = NULL;
struct qlist_head * ent = NULL;
int index = 0;
qlist_for_each(ent, &s->completed_reqs)
{
current = qlist_entry(ent, completed_requests, ql);
......@@ -845,13 +896,24 @@ static void codes_exec_mpi_wait(nw_state* s, tw_bf * bf, tw_lp* lp, struct codes
qlist_del(&current->ql);
rc_stack_push(lp, current, free, s->processed_ops);
codes_issue_next_event(lp);
m->fwd.found_match = index;
/*if(s->nw_id == (tw_lpid)TRACK_LP)
{
tw_output(lp, "\n wait matched at post %d ", req_id);
print_completed_queue(lp, &s->completed_reqs);
}*/
return;
}
++index;
}
bf->c2 = 1;
/*if(s->nw_id == (tw_lpid)TRACK_LP)
{
tw_output(lp, "\n wait posted %llu ", req_id);
print_completed_queue(lp, &s->completed_reqs);
}*/
/* If not, add the wait operation in the pending 'waits' list. */
struct pending_waits* wait_op = malloc(sizeof(struct pending_waits));
struct pending_waits* wait_op = (struct pending_waits*)malloc(sizeof(struct pending_waits));
wait_op->op_type = mpi_op->op_type;
wait_op->req_ids[0] = req_id;
wait_op->count = 1;
......@@ -918,7 +980,7 @@ static void codes_exec_mpi_wait_all(
}
if(s->sampling_indx >= MAX_STATS)
{
struct mpi_workload_sample * tmp = calloc((MAX_STATS + s->max_arr_size), sizeof(struct mpi_workload_sample));
struct mpi_workload_sample * tmp = (struct mpi_workload_sample*)calloc((MAX_STATS + s->max_arr_size), sizeof(struct mpi_workload_sample));
memcpy(tmp, s->mpi_wkld_samples, s->sampling_indx);
free(s->mpi_wkld_samples);
s->mpi_wkld_samples = tmp;
......@@ -934,16 +996,16 @@ static void codes_exec_mpi_wait_all(
int i = 0, num_matched = 0;
m->fwd.num_matched = 0;
if(lp->gid == TRACK_LP)
/*if(lp->gid == TRACK_LP)
{
printf("\n MPI Wait all posted ");
print_waiting_reqs(mpi_op->u.waits.req_ids, count);
print_completed_queue(&s->completed_reqs);
}
print_completed_queue(lp, &s->completed_reqs);
}*/
/* check number of completed irecvs in the completion queue */
for(i = 0; i < count; i++)
{
dumpi_req_id req_id = mpi_op->u.waits.req_ids[i];
int req_id = mpi_op->u.waits.req_ids[i];
struct qlist_head * ent = NULL;
struct completed_requests* current = NULL;
qlist_for_each(ent, &s->completed_reqs)
......@@ -968,7 +1030,7 @@ static void codes_exec_mpi_wait_all(
else
{
/* If not, add the wait operation in the pending 'waits' list. */
struct pending_waits* wait_op = malloc(sizeof(struct pending_waits));
struct pending_waits* wait_op = (struct pending_waits*)malloc(sizeof(struct pending_waits));
wait_op->count = count;
wait_op->op_type = mpi_op->op_type;
assert(count < MAX_WAIT_REQS);
......@@ -1028,7 +1090,13 @@ static int rm_matching_rcv(nw_state * ns,
ns->recv_time += (tw_now(lp) - m->fwd.sim_start_time);
if(qi->op_type == CODES_WK_IRECV)
{
/*if(ns->nw_id == (tw_lpid)TRACK_LP)
{
printf("\n Completed irecv req id %d ", qi->req_id);
}*/
update_completed_queue(ns, bf, m, lp, qi->req_id);
}
else if(qi->op_type == CODES_WK_RECV)
codes_issue_next_event(lp);
......@@ -1081,7 +1149,12 @@ static int rm_matching_send(nw_state * ns,
m->rc.saved_recv_time = ns->recv_time;
ns->recv_time += (tw_now(lp) - qitem->req_init_time);
// printf("\n Completed req id %d ", qitem->req_id);
/*if(ns->nw_id == (tw_lpid)TRACK_LP && qitem->op_type == CODES_WK_IRECV)
{
tw_output(lp, "\n Completed recv req id %d ", qitem->req_id);
print_completed_queue(lp, &ns->completed_reqs);
}*/
if(qitem->op_type == CODES_WK_IRECV)
update_completed_queue(ns, bf, m, lp, qitem->req_id);
......@@ -1107,7 +1180,7 @@ static void codes_issue_next_event(tw_lp* lp)
ts = g_tw_lookahead + 0.1 + tw_rand_exponential(lp->rng, noise);
assert(ts > 0);
e = tw_event_new( lp->gid, ts, lp );
msg = tw_event_data(e);
msg = (nw_message*)tw_event_data(e);
msg->msg_type = MPI_OP_GET_NEXT;
tw_event_send(e);
......@@ -1129,7 +1202,7 @@ static void codes_exec_comp_delay(
assert(ts > 0);
e = tw_event_new( lp->gid, ts , lp );
msg = tw_event_data(e);
msg = (nw_message*)tw_event_data(e);
msg->msg_type = MPI_OP_GET_NEXT;
tw_event_send(e);
......@@ -1150,19 +1223,15 @@ static void codes_exec_mpi_recv_rc(
if(m->fwd.found_match >= 0)
{
ns->recv_time = m->rc.saved_recv_time;
int queue_count = qlist_count(&ns->arrival_queue);
//int queue_count = qlist_count(&ns->arrival_queue);
mpi_msgs_queue * qi = rc_stack_pop(ns->processed_ops);
mpi_msgs_queue * qi = (mpi_msgs_queue*)rc_stack_pop(ns->processed_ops);
if(!m->fwd.found_match)
if(m->fwd.found_match == 0)
{
qlist_add(&qi->ql, &ns->arrival_queue);
}
else if(m->fwd.found_match >= queue_count)
{
qlist_add_tail(&qi->ql, &ns->arrival_queue);
}
else if(m->fwd.found_match > 0 && m->fwd.found_match < queue_count)
else
{
int index = 1;
struct qlist_head * ent = NULL;
......@@ -1184,7 +1253,7 @@ static void codes_exec_mpi_recv_rc(
}
else if(m->fwd.found_match < 0)
{
struct qlist_head * ent = qlist_pop_back(&ns->pending_recvs_queue);
struct qlist_head * ent = qlist_pop(&ns->pending_recvs_queue);
mpi_msgs_queue * qi = qlist_entry(ent, mpi_msgs_queue, ql);
free(qi);
......@@ -1219,9 +1288,9 @@ static void codes_exec_mpi_recv(
//printf("\n Req id %d bytes %d source %d tag %d ", recv_op->req_id, recv_op->num_bytes, recv_op->source_rank, recv_op->tag);
if(s->nw_id == (tw_lpid)TRACK_LP)
printf("\n Receive op posted num bytes %llu source %d ", recv_op->num_bytes,
recv_op->source_rank);
// if(s->nw_id == (tw_lpid)TRACK_LP)
// printf("\n Receive op posted num bytes %llu source %d ", recv_op->num_bytes,
// recv_op->source_rank);
int found_matching_sends = rm_matching_send(s, bf, m, lp, recv_op);
......@@ -1229,7 +1298,7 @@ static void codes_exec_mpi_recv(
if(found_matching_sends < 0)
{
m->fwd.found_match = -1;
qlist_add_tail(&recv_op->ql, &s->pending_recvs_queue);
qlist_add(&recv_op->ql, &s->pending_recvs_queue);
/* for mpi irecvs, this is a non-blocking receive so just post it and move on with the trace read. */
if(mpi_op->op_type == CODES_WK_IRECV)
......@@ -1268,11 +1337,13 @@ static void codes_exec_mpi_send_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_
s->cur_interval_end -= sampling_interval;
}
}
if(bf->c15 || bf->c16)
s->num_sends--;
model_net_event_rc2(lp, &m->event_rc);
if(bf->c4)
codes_issue_next_event_rc(lp);
s->num_sends--;
if(bf->c3)
{
......@@ -1326,7 +1397,7 @@ static void codes_exec_mpi_send(nw_state* s,
}
if(s->sampling_indx >= MAX_STATS)
{
struct mpi_workload_sample * tmp = calloc((MAX_STATS + s->max_arr_size), sizeof(struct mpi_workload_sample));
struct mpi_workload_sample * tmp = (struct mpi_workload_sample*)calloc((MAX_STATS + s->max_arr_size), sizeof(struct mpi_workload_sample));
memcpy(tmp, s->mpi_wkld_samples, s->sampling_indx);
free(s->mpi_wkld_samples);
s->mpi_wkld_samples = tmp;
......@@ -1352,7 +1423,9 @@ static void codes_exec_mpi_send(nw_state* s,
if(mpi_op->u.send.num_bytes < EAGER_THRESHOLD)
{
/* directly issue a model-net send */
bf->c15 = 1;
s->num_sends++;
tw_stime copy_overhead = copy_per_byte_eager * mpi_op->u.send.num_bytes;
local_m.fwd.sim_start_time = tw_now(lp);
......@@ -1366,6 +1439,8 @@ static void codes_exec_mpi_send(nw_state* s,
{
/* Initiate the handshake. Issue a control message to the destination first. No local message,
* only remote message sent. */
bf->c16 = 1;
s->num_sends++;
remote_m.fwd.sim_start_time = tw_now(lp);
remote_m.fwd.dest_rank = mpi_op->u.send.dest_rank;
remote_m.fwd.src_rank = mpi_op->u.send.source_rank;
......@@ -1427,19 +1502,14 @@ static void update_completed_queue_rc(nw_state * s, tw_bf * bf, nw_message * m,
if(bf->c0)
{
struct qlist_head * ent = qlist_pop_back(&s->completed_reqs);
struct qlist_head * ent = qlist_pop(&s->completed_reqs);
completed_requests * req = qlist_entry(ent, completed_requests, ql);
/*if(lp->gid == TRACK)
{
printf("\n After popping %ld ", req->req_id);
print_completed_queue(&s->completed_reqs);
}*/
free(req);
}
else if(bf->c1)
{
struct pending_waits* wait_elem = rc_stack_pop(s->processed_wait_op);
struct pending_waits* wait_elem = (struct pending_waits*)rc_stack_pop(s->processed_wait_op);
s->wait_op = wait_elem;
s->wait_time = m->rc.saved_wait_time;
add_completed_reqs(s, lp, m->fwd.num_matched);
......@@ -1465,20 +1535,21 @@ static void update_completed_queue(nw_state* s,
if(!waiting)
{
bf->c0 = 1;
completed_requests * req = malloc(sizeof(completed_requests));
completed_requests * req = (completed_requests*)malloc(sizeof(completed_requests));
req->req_id = req_id;
qlist_add_tail(&req->ql, &s->completed_reqs);
qlist_add(&req->ql, &s->completed_reqs);
if(lp->gid == TRACK_LP)
/*if(s->nw_id == (tw_lpid)TRACK_LP)
{
printf("\n Forward mode adding %d ", req_id);
print_completed_queue(&s->completed_reqs);
}
tw_output(lp, "\n Forward mode adding %d ", req_id);
print_completed_queue(lp, &s->completed_reqs);
}*/
}
else
{
bf->c1 = 1;
m->fwd.num_matched = clear_completed_reqs(s, lp, s->wait_op->req_ids, s->wait_op->count);
m->rc.saved_wait_time = s->wait_time;
s->wait_time += (tw_now(lp) - s->wait_op->start_time);
......@@ -1541,18 +1612,14 @@ static void update_arrival_queue_rc(nw_state* s,
if(m->fwd.found_match >= 0)
{
mpi_msgs_queue * qi = rc_stack_pop(s->processed_ops);
int queue_count = qlist_count(&s->pending_recvs_queue);
mpi_msgs_queue * qi = (mpi_msgs_queue*)rc_stack_pop(s->processed_ops);
// int queue_count = qlist_count(&s->pending_recvs_queue);
if(!m->fwd.found_match)
if(m->fwd.found_match == 0)
{
qlist_add(&qi->ql, &s->pending_recvs_queue);
}
else if(m->fwd.found_match >= queue_count)
{
qlist_add_tail(&qi->ql, &s->pending_recvs_queue);
}
else if(m->fwd.found_match > 0 && m->fwd.found_match < queue_count)
else
{
int index = 1;
struct qlist_head * ent = NULL;
......@@ -1573,7 +1640,7 @@ static void update_arrival_queue_rc(nw_state* s,
}
else if(m->fwd.found_match < 0)
{
struct qlist_head * ent = qlist_pop_back(&s->arrival_queue);
struct qlist_head * ent = qlist_pop(&s->arrival_queue);
mpi_msgs_queue * qi = qlist_entry(ent, mpi_msgs_queue, ql);
free(qi);
}
......@@ -1609,7 +1676,7 @@ static void update_arrival_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp
tw_event *e_callback =
tw_event_new(rank_to_lpid(global_src_id),
ts, lp);
nw_message *m_callback = tw_event_data(e_callback);
nw_message *m_callback = (nw_message*)tw_event_data(e_callback);
m_callback->msg_type = MPI_SEND_ARRIVED_CB;
m_callback->fwd.msg_send_time = tw_now(lp) - m->fwd.sim_start_time;
tw_event_send(e_callback);
......@@ -1624,16 +1691,16 @@ static void update_arrival_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp
arrived_op->num_bytes = m->fwd.num_bytes;
arrived_op->dest_rank = m->fwd.dest_rank;
if(s->nw_id == (tw_lpid)TRACK_LP)
printf("\n Send op arrived source rank %d num bytes %llu ", arrived_op->source_rank,
arrived_op->num_bytes);
// if(s->nw_id == (tw_lpid)TRACK_LP)
// printf("\n Send op arrived source rank %d num bytes %llu", arrived_op->source_rank,
// arrived_op->num_bytes);
int found_matching_recv = rm_matching_rcv(s, bf, m, lp, arrived_op);
if(found_matching_recv < 0)
{
m->fwd.found_match = -1;
qlist_add_tail(&arrived_op->ql, &s->arrival_queue);
qlist_add(&arrived_op->ql, &s->arrival_queue);
}
else
{
......@@ -1674,7 +1741,7 @@ void nw_test_init(nw_state* s, tw_lp* lp)
memset(s, 0, sizeof(*s));
s->nw_id = codes_mapping_get_lp_relative_id(lp->gid, 0, 0);
s->mpi_wkld_samples = calloc(MAX_STATS, sizeof(struct mpi_workload_sample));
s->mpi_wkld_samples = (struct mpi_workload_sample*)calloc(MAX_STATS, sizeof(struct mpi_workload_sample));
s->sampling_indx = 0;
s->is_finished = 0;
s->cur_interval_end = 0;
......@@ -1744,10 +1811,12 @@ void nw_test_init(nw_state* s, tw_lp* lp)
rc_stack_create(&s->processed_ops);
rc_stack_create(&s->processed_wait_op);
rc_stack_create(&s->matched_reqs);
// rc_stack_create(&s->indices);
assert(s->processed_ops != NULL);
assert(s->processed_wait_op != NULL);
assert(s->matched_reqs != NULL);
// assert(s->indices != NULL);
/* clock starts ticking when the first event is processed */
s->start_time = tw_now(lp);
......@@ -1765,7 +1834,7 @@ void nw_test_init(nw_state* s, tw_lp* lp)
nw_message * m_new;
tw_stime ts = tw_rand_exponential(lp->rng, mean_interval/1000);
e = tw_event_new(lp->gid, ts, lp);
m_new = tw_event_data(e);
m_new = (nw_message*)tw_event_data(e);
m_new->msg_type = CLI_BCKGND_GEN;
tw_event_send(e);
is_synthetic = 1;
......@@ -1809,6 +1878,7 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
//*(int *)bf = (int)0;
rc_stack_gc(lp, s->matched_reqs);
// rc_stack_gc(lp, s->indices);
rc_stack_gc(lp, s->processed_ops);
rc_stack_gc(lp, s->processed_wait_op);
......@@ -1840,7 +1910,7 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
tw_event *e_callback =
tw_event_new(rank_to_lpid(global_src_id),
codes_local_latency(lp), lp);
nw_message *m_callback = tw_event_data(e_callback);
nw_message *m_callback = (nw_message*)tw_event_data(e_callback);
m_callback->msg_type = MPI_SEND_ARRIVED_CB;
m_callback->fwd.msg_send_time = tw_now(lp) - m->fwd.sim_start_time;
tw_event_send(e_callback);
......@@ -1876,6 +1946,7 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
else
if(m->op_type == CODES_WK_ISEND)
{
//tw_output(lp, "\n isend req id %llu ", m->fwd.req_id);
update_completed_queue(s, bf, m, lp, m->fwd.req_id);
}
else
......@@ -1916,6 +1987,9 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
if(bf->c9)
return;
if(bf->c19)
return;
notify_neighbor_rc(s, lp, bf, m);
return;
}
......@@ -1955,7 +2029,7 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
{
s->num_all_reduce--;
s->col_time = m->rc.saved_send_time;
s->all_reduce_time -= s->col_time;
s->all_reduce_time = m->rc.saved_delay;
}
else
{
......@@ -1988,7 +2062,7 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
case CODES_WK_WAIT:
{
s->num_wait--;
codes_exec_mpi_wait_rc(s, bf, lp);