Commit 585499b5 authored by Misbah Mubarak's avatar Misbah Mubarak

Adding fixes to optimistic simulation so far

parent ccb75ed2
...@@ -69,6 +69,7 @@ struct recorder_params ...@@ -69,6 +69,7 @@ struct recorder_params
struct dumpi_trace_params { struct dumpi_trace_params {
char file_name[MAX_NAME_LENGTH_WKLD]; char file_name[MAX_NAME_LENGTH_WKLD];
int num_net_traces; int num_net_traces;
int nprocs;
#ifdef ENABLE_CORTEX_PYTHON #ifdef ENABLE_CORTEX_PYTHON
char cortex_script[MAX_NAME_LENGTH_WKLD]; char cortex_script[MAX_NAME_LENGTH_WKLD];
char cortex_class[MAX_NAME_LENGTH_WKLD]; char cortex_class[MAX_NAME_LENGTH_WKLD];
...@@ -162,6 +163,7 @@ struct codes_workload_op ...@@ -162,6 +163,7 @@ struct codes_workload_op
double end_time; double end_time;
double sim_start_time; double sim_start_time;
int64_t sequence_id;
/* parameters for each operation type */ /* parameters for each operation type */
union union
{ {
......
...@@ -41,7 +41,7 @@ static int preserve_wait_ordering = 0; ...@@ -41,7 +41,7 @@ static int preserve_wait_ordering = 0;
static int enable_msg_tracking = 0; static int enable_msg_tracking = 0;
static int is_synthetic = 0; static int is_synthetic = 0;
tw_lpid TRACK_LP = -1; tw_lpid TRACK_LP = -1;
int nprocs = 0;
static double total_syn_data = 0; static double total_syn_data = 0;
static int unmatched = 0; static int unmatched = 0;
char workload_type[128]; char workload_type[128];
...@@ -156,6 +156,7 @@ struct mpi_msgs_queue ...@@ -156,6 +156,7 @@ struct mpi_msgs_queue
int source_rank; int source_rank;
int dest_rank; int dest_rank;
int64_t num_bytes; int64_t num_bytes;
int64_t seq_id;
tw_stime req_init_time; tw_stime req_init_time;
dumpi_req_id req_id; dumpi_req_id req_id;
struct qlist_head ql; struct qlist_head ql;
...@@ -758,31 +759,8 @@ static void add_completed_reqs(nw_state * s, ...@@ -758,31 +759,8 @@ static void add_completed_reqs(nw_state * s,
for(int i = 0; i < count; i++) for(int i = 0; i < count; i++)
{ {
struct completed_requests * req = (struct completed_requests*)rc_stack_pop(s->matched_reqs); struct completed_requests * req = (struct completed_requests*)rc_stack_pop(s->matched_reqs);
// turn on only if wait-all unmatched error arises in optimistic mode. // turn on only if wait-all unmatched error arises in optimistic mode.
if(preserve_wait_ordering) qlist_add(&req->ql, &s->completed_reqs);
{
if(req->index == 0)
{
qlist_add(&req->ql, &s->completed_reqs);
}
else
{
int index = 1;
struct qlist_head * ent = NULL;
qlist_for_each(ent, &s->completed_reqs)
{
if(index == req->index)
{
qlist_add(&req->ql, ent);
}
}//end qlist
}// end else*/
}
else
{
qlist_add(&req->ql, &s->completed_reqs);
}
}//end for }//end for
} }
...@@ -1094,6 +1072,7 @@ static int rm_matching_rcv(nw_state * ns, ...@@ -1094,6 +1072,7 @@ static int rm_matching_rcv(nw_state * ns,
} }
else else
{ {
bf->c12 = 1;
m->rc.saved_recv_time = ns->recv_time; m->rc.saved_recv_time = ns->recv_time;
ns->recv_time += (tw_now(lp) - m->fwd.sim_start_time); ns->recv_time += (tw_now(lp) - m->fwd.sim_start_time);
} }
...@@ -1242,8 +1221,8 @@ static void codes_exec_mpi_recv_rc( ...@@ -1242,8 +1221,8 @@ static void codes_exec_mpi_recv_rc(
{ {
ns->recv_time = m->rc.saved_recv_time; ns->recv_time = m->rc.saved_recv_time;
if(bf->c10) if(bf->c11)
send_ack_back_rc(ns, bf, m, lp); codes_issue_next_event_rc(lp);
if(m->fwd.found_match >= 0) if(m->fwd.found_match >= 0)
{ {
...@@ -1252,6 +1231,8 @@ static void codes_exec_mpi_recv_rc( ...@@ -1252,6 +1231,8 @@ static void codes_exec_mpi_recv_rc(
mpi_msgs_queue * qi = (mpi_msgs_queue*)rc_stack_pop(ns->processed_ops); mpi_msgs_queue * qi = (mpi_msgs_queue*)rc_stack_pop(ns->processed_ops);
if(bf->c10)
send_ack_back_rc(ns, bf, m, lp);
if(m->fwd.found_match == 0) if(m->fwd.found_match == 0)
{ {
qlist_add(&qi->ql, &ns->arrival_queue); qlist_add(&qi->ql, &ns->arrival_queue);
...@@ -1279,12 +1260,9 @@ static void codes_exec_mpi_recv_rc( ...@@ -1279,12 +1260,9 @@ static void codes_exec_mpi_recv_rc(
} }
else if(m->fwd.found_match < 0) else if(m->fwd.found_match < 0)
{ {
struct qlist_head * ent = qlist_pop(&ns->pending_recvs_queue); struct qlist_head * ent = qlist_pop_back(&ns->pending_recvs_queue);
mpi_msgs_queue * qi = qlist_entry(ent, mpi_msgs_queue, ql); mpi_msgs_queue * qi = qlist_entry(ent, mpi_msgs_queue, ql);
free(qi); free(qi);
if(m->op_type == CODES_WK_IRECV)
codes_issue_next_event_rc(lp);
} }
} }
...@@ -1330,7 +1308,7 @@ static void codes_exec_mpi_recv( ...@@ -1330,7 +1308,7 @@ static void codes_exec_mpi_recv(
if(found_matching_sends < 0) if(found_matching_sends < 0)
{ {
m->fwd.found_match = -1; m->fwd.found_match = -1;
qlist_add(&recv_op->ql, &s->pending_recvs_queue); qlist_add_tail(&recv_op->ql, &s->pending_recvs_queue);
} }
else else
...@@ -1404,12 +1382,6 @@ static void codes_exec_mpi_send(nw_state* s, ...@@ -1404,12 +1382,6 @@ static void codes_exec_mpi_send(nw_state* s,
/* model-net event */ /* model-net event */
tw_lpid dest_rank = codes_mapping_get_lpid_from_relative(global_dest_rank, NULL, "nw-lp", NULL, 0); tw_lpid dest_rank = codes_mapping_get_lpid_from_relative(global_dest_rank, NULL, "nw-lp", NULL, 0);
if(is_rend == 1 || (!is_rend && mpi_op->u.send.num_bytes < EAGER_THRESHOLD))
{
bf->c3 = 1;
num_bytes_sent += mpi_op->u.send.num_bytes;
s->num_bytes_sent += mpi_op->u.send.num_bytes;
}
if(enable_sampling) if(enable_sampling)
{ {
if(tw_now(lp) >= s->cur_interval_end) if(tw_now(lp) >= s->cur_interval_end)
...@@ -1507,6 +1479,12 @@ static void codes_exec_mpi_send(nw_state* s, ...@@ -1507,6 +1479,12 @@ static void codes_exec_mpi_send(nw_state* s,
else else
fprintf(workload_log, "\n (%lf) APP ID %d MPI SEND SOURCE %llu DEST %d TAG %d BYTES %"PRId64, fprintf(workload_log, "\n (%lf) APP ID %d MPI SEND SOURCE %llu DEST %d TAG %d BYTES %"PRId64,
tw_now(lp), s->app_id, LLU(s->nw_id), global_dest_rank, mpi_op->u.send.tag, mpi_op->u.send.num_bytes); tw_now(lp), s->app_id, LLU(s->nw_id), global_dest_rank, mpi_op->u.send.tag, mpi_op->u.send.num_bytes);
}
if(is_rend || is_eager)
{
bf->c3 = 1;
s->num_bytes_sent += mpi_op->u.send.num_bytes;
num_bytes_sent += mpi_op->u.send.num_bytes;
} }
/* isend executed, now get next MPI operation from the queue */ /* isend executed, now get next MPI operation from the queue */
if(mpi_op->op_type == CODES_WK_ISEND && (!is_rend || is_eager)) if(mpi_op->op_type == CODES_WK_ISEND && (!is_rend || is_eager))
...@@ -1632,7 +1610,6 @@ static void update_arrival_queue_rc(nw_state* s, ...@@ -1632,7 +1610,6 @@ static void update_arrival_queue_rc(nw_state* s,
tw_bf * bf, tw_bf * bf,
nw_message * m, tw_lp * lp) nw_message * m, tw_lp * lp)
{ {
s->recv_time = m->rc.saved_recv_time;
s->num_bytes_recvd -= m->fwd.num_bytes; s->num_bytes_recvd -= m->fwd.num_bytes;
num_bytes_recvd -= m->fwd.num_bytes; num_bytes_recvd -= m->fwd.num_bytes;
...@@ -1665,14 +1642,19 @@ static void update_arrival_queue_rc(nw_state* s, ...@@ -1665,14 +1642,19 @@ static void update_arrival_queue_rc(nw_state* s,
index++; index++;
} }
} }
if(bf->c12)
s->recv_time = m->rc.saved_recv_time;
if(bf->c10)
send_ack_back_rc(s, bf, m, lp);
if(bf->c9) if(bf->c9)
update_completed_queue_rc(s, bf, m, lp); update_completed_queue_rc(s, bf, m, lp);
else if(bf->c8) if(bf->c8)
codes_issue_next_event_rc(lp); codes_issue_next_event_rc(lp);
} }
else if(m->fwd.found_match < 0) else if(m->fwd.found_match < 0)
{ {
struct qlist_head * ent = qlist_pop(&s->arrival_queue); struct qlist_head * ent = qlist_pop_back(&s->arrival_queue);
mpi_msgs_queue * qi = qlist_entry(ent, mpi_msgs_queue, ql); mpi_msgs_queue * qi = qlist_entry(ent, mpi_msgs_queue, ql);
free(qi); free(qi);
} }
...@@ -1732,7 +1714,7 @@ static void update_arrival_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp ...@@ -1732,7 +1714,7 @@ static void update_arrival_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp
if(found_matching_recv < 0) if(found_matching_recv < 0)
{ {
m->fwd.found_match = -1; m->fwd.found_match = -1;
qlist_add(&arrived_op->ql, &s->arrival_queue); qlist_add_tail(&arrived_op->ql, &s->arrival_queue);
} }
else else
{ {
...@@ -1814,6 +1796,7 @@ void nw_test_init(nw_state* s, tw_lp* lp) ...@@ -1814,6 +1796,7 @@ void nw_test_init(nw_state* s, tw_lp* lp)
if (strcmp(workload_type, "dumpi") == 0){ if (strcmp(workload_type, "dumpi") == 0){
strcpy(params_d.file_name, file_name_of_job[lid.job]); strcpy(params_d.file_name, file_name_of_job[lid.job]);
params_d.num_net_traces = num_traces_of_job[lid.job]; params_d.num_net_traces = num_traces_of_job[lid.job];
params_d.nprocs = nprocs;
params = (char*)&params_d; params = (char*)&params_d;
s->app_id = lid.job; s->app_id = lid.job;
s->local_rank = lid.rank; s->local_rank = lid.rank;
...@@ -2382,7 +2365,7 @@ void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * l ...@@ -2382,7 +2365,7 @@ void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * l
{ {
if(bf->c29) if(bf->c29)
codes_issue_next_event_rc(lp); codes_issue_next_event_rc(lp);
else if(bf->c28) if(bf->c28)
update_completed_queue_rc(s, bf, m, lp); update_completed_queue_rc(s, bf, m, lp);
} }
break; break;
...@@ -2539,7 +2522,7 @@ static int msg_size_hash_compare( ...@@ -2539,7 +2522,7 @@ static int msg_size_hash_compare(
} }
int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv ) int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
{ {
int rank, nprocs; int rank;
int num_nets; int num_nets;
int* net_ids; int* net_ids;
......
...@@ -1174,7 +1174,7 @@ static void packet_generate(terminal_state * s, tw_bf * bf, terminal_custom_mess ...@@ -1174,7 +1174,7 @@ static void packet_generate(terminal_state * s, tw_bf * bf, terminal_custom_mess
nic_ts = g_tw_lookahead + (num_chunks * cn_delay) + tw_rand_unif(lp->rng); nic_ts = g_tw_lookahead + (num_chunks * cn_delay) + tw_rand_unif(lp->rng);
msg->packet_ID = lp->gid + g_tw_nlp * s->packet_counter; msg->packet_ID = s->packet_counter;
msg->my_N_hop = 0; msg->my_N_hop = 0;
msg->my_l_hop = 0; msg->my_l_hop = 0;
msg->my_g_hop = 0; msg->my_g_hop = 0;
...@@ -1257,12 +1257,11 @@ static void packet_generate(terminal_state * s, tw_bf * bf, terminal_custom_mess ...@@ -1257,12 +1257,11 @@ static void packet_generate(terminal_state * s, tw_bf * bf, terminal_custom_mess
static void packet_send_rc(terminal_state * s, tw_bf * bf, terminal_custom_message * msg, static void packet_send_rc(terminal_state * s, tw_bf * bf, terminal_custom_message * msg,
tw_lp * lp) tw_lp * lp)
{ {
if(bf->c10)
s->last_buf_full[0] = msg->saved_busy_time;
if(bf->c1) { if(bf->c1) {
s->in_send_loop = 1; s->in_send_loop = 1;
if(bf->c10)
s->last_buf_full[0] = msg->saved_busy_time;
return; return;
} }
...@@ -1582,7 +1581,7 @@ static void packet_arrive(terminal_state * s, tw_bf * bf, terminal_custom_messag ...@@ -1582,7 +1581,7 @@ static void packet_arrive(terminal_state * s, tw_bf * bf, terminal_custom_messag
assert(lp->gid == msg->dest_terminal_id); assert(lp->gid == msg->dest_terminal_id);
if(msg->packet_ID == LLU(TRACK_PKT)) if(msg->packet_ID == LLU(TRACK_PKT))
printf("\n Packet %llu arrived at lp %llu hops %d ", msg->packet_ID, LLU(lp->gid), msg->my_N_hop); printf("\n Packet %d arrived at lp %llu hops %d ", msg->sender_lp, LLU(lp->gid), msg->my_N_hop);
tw_stime ts = g_tw_lookahead + s->params->credit_delay + tw_rand_unif(lp->rng); tw_stime ts = g_tw_lookahead + s->params->credit_delay + tw_rand_unif(lp->rng);
...@@ -2197,14 +2196,14 @@ static vector<int> get_intra_router(router_state * s, int src_router_id, int des ...@@ -2197,14 +2196,14 @@ static vector<int> get_intra_router(router_state * s, int src_router_id, int des
/* If no direct connection exists then find an intermediate connection */ /* If no direct connection exists then find an intermediate connection */
if(curMap.find(dest_rel_id) == curMap.end()) if(curMap.find(dest_rel_id) == curMap.end())
{ {
/*int src_col = src_rel_id % s->params->num_router_cols; int src_col = src_rel_id % s->params->num_router_cols;
int src_row = src_rel_id / s->params->num_router_cols; int src_row = src_rel_id / s->params->num_router_cols;
int dest_col = dest_rel_id % s->params->num_router_cols; int dest_col = dest_rel_id % s->params->num_router_cols;
int dest_row = dest_rel_id / s->params->num_router_cols; int dest_row = dest_rel_id / s->params->num_router_cols;
//row first, column second //row first, column second
int choice1 = src_row * s->params->num_router_cols + dest_col; /*int choice1 = src_row * s->params->num_router_cols + dest_col;
int choice2 = dest_row * s->params->num_router_cols + src_col; int choice2 = dest_row * s->params->num_router_cols + src_col;
intersection.push_back(offset + choice1); intersection.push_back(offset + choice1);
intersection.push_back(offset + choice2);*/ intersection.push_back(offset + choice2);*/
...@@ -2312,8 +2311,8 @@ get_next_stop(router_state * s, ...@@ -2312,8 +2311,8 @@ get_next_stop(router_state * s,
//printf("\n Dest router id %d %d !!! ", dest_router_id, msg->intm_rtr_id); //printf("\n Dest router id %d %d !!! ", dest_router_id, msg->intm_rtr_id);
if(s->router_id == msg->saved_src_dest) if(s->router_id == msg->saved_src_dest)
{ {
//dest_lp = connectionList[dest_group_id][my_grp_id][msg->saved_src_chan]; dest_lp = connectionList[dest_group_id][my_grp_id][msg->saved_src_chan];
dest_lp = interGroupLinks[s->router_id][dest_group_id][0].dest; //dest_lp = interGroupLinks[s->router_id][dest_group_id][0].dest;
} }
else else
{ {
...@@ -2791,6 +2790,8 @@ router_packet_receive( router_state * s, ...@@ -2791,6 +2790,8 @@ router_packet_receive( router_state * s,
cur_chunk->msg.path_type = routing; /*defaults to the routing algorithm if we cur_chunk->msg.path_type = routing; /*defaults to the routing algorithm if we
don't have adaptive or progressive adaptive routing here*/ don't have adaptive or progressive adaptive routing here*/
// printf("\n Packet %llu source %d arrived at router %d ", msg->packet_ID, msg->src_terminal_id, s->router_id);
/* Set the default route as minimal for prog-adaptive */ /* Set the default route as minimal for prog-adaptive */
if(routing == PROG_ADAPTIVE && cur_chunk->msg.last_hop == TERMINAL) if(routing == PROG_ADAPTIVE && cur_chunk->msg.last_hop == TERMINAL)
cur_chunk->msg.path_type = MINIMAL; cur_chunk->msg.path_type = MINIMAL;
......
...@@ -39,7 +39,6 @@ ...@@ -39,7 +39,6 @@
#define MAX_LENGTH_FILE 512 #define MAX_LENGTH_FILE 512
#define MAX_OPERATIONS 32768 #define MAX_OPERATIONS 32768
#define DUMPI_IGNORE_DELAY 100 #define DUMPI_IGNORE_DELAY 100
#define RANK_HASH_TABLE_SIZE 400
/* This variable is defined in src/network-workloads/model-net-mpi-replay.c */ /* This variable is defined in src/network-workloads/model-net-mpi-replay.c */
extern struct codes_jobmap_ctx *jobmap_ctx; extern struct codes_jobmap_ctx *jobmap_ctx;
...@@ -153,7 +152,7 @@ static void* dumpi_init_op_data() ...@@ -153,7 +152,7 @@ static void* dumpi_init_op_data()
assert(tmp); assert(tmp);
tmp->op_array = malloc(MAX_OPERATIONS * sizeof(struct codes_workload_op)); tmp->op_array = malloc(MAX_OPERATIONS * sizeof(struct codes_workload_op));
assert(tmp->op_array); assert(tmp->op_array);
tmp->op_arr_ndx = 0; tmp->op_arr_ndx = 0;
tmp->op_arr_cnt = MAX_OPERATIONS; tmp->op_arr_cnt = MAX_OPERATIONS;
return (void *)tmp; return (void *)tmp;
...@@ -197,7 +196,7 @@ static void dumpi_roll_back_prev_op(void * mpi_op_array) ...@@ -197,7 +196,7 @@ static void dumpi_roll_back_prev_op(void * mpi_op_array)
{ {
dumpi_op_data_array *array = (dumpi_op_data_array*)mpi_op_array; dumpi_op_data_array *array = (dumpi_op_data_array*)mpi_op_array;
array->op_arr_ndx--; array->op_arr_ndx--;
assert(array->op_arr_ndx >= 0); //assert(array->op_arr_ndx >= 0);
} }
/* removes the next operation from the array */ /* removes the next operation from the array */
static void dumpi_remove_next_op(void *mpi_op_array, struct codes_workload_op *mpi_op, static void dumpi_remove_next_op(void *mpi_op_array, struct codes_workload_op *mpi_op,
...@@ -207,13 +206,16 @@ static void dumpi_remove_next_op(void *mpi_op_array, struct codes_workload_op *m ...@@ -207,13 +206,16 @@ static void dumpi_remove_next_op(void *mpi_op_array, struct codes_workload_op *m
dumpi_op_data_array *array = (dumpi_op_data_array*)mpi_op_array; dumpi_op_data_array *array = (dumpi_op_data_array*)mpi_op_array;
//printf("\n op array index %d array count %d ", array->op_arr_ndx, array->op_arr_cnt); //printf("\n op array index %d array count %d ", array->op_arr_ndx, array->op_arr_cnt);
if (array->op_arr_ndx == array->op_arr_cnt) if (array->op_arr_ndx >= array->op_arr_cnt)
{ {
mpi_op->op_type = CODES_WK_END; mpi_op->op_type = CODES_WK_END;
mpi_op->sequence_id = array->op_arr_ndx;
array->op_arr_ndx++;
} }
else else
{ {
struct codes_workload_op *tmp = &(array->op_array[array->op_arr_ndx]); struct codes_workload_op *tmp = &(array->op_array[array->op_arr_ndx]);
tmp->sequence_id = array->op_arr_ndx;
*mpi_op = *tmp; *mpi_op = *tmp;
array->op_arr_ndx++; array->op_arr_ndx++;
} }
...@@ -781,9 +783,10 @@ int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank) ...@@ -781,9 +783,10 @@ int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank)
if(rank >= dumpi_params->num_net_traces) if(rank >= dumpi_params->num_net_traces)
return -1; return -1;
int hash_size = (dumpi_params->num_net_traces / dumpi_params->nprocs) + 1;
if(!rank_tbl) if(!rank_tbl)
{ {
rank_tbl = qhash_init(hash_rank_compare, quickhash_64bit_hash, RANK_HASH_TABLE_SIZE); rank_tbl = qhash_init(hash_rank_compare, quickhash_64bit_hash, hash_size);
if(!rank_tbl) if(!rank_tbl)
return -1; return -1;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment