Commit f928306e authored by Misbah Mubarak's avatar Misbah Mubarak
Browse files

updates to workload replay to account for compute delay, updating mpi sim...

updates to workload replay to account for compute delay, updating mpi sim layer to generate less events, fixing compute delays
parent 7ec32c66
......@@ -91,7 +91,7 @@ typedef struct nw_message nw_message;
typedef unsigned int dumpi_req_id;
static int net_id = 0;
static float noise = 2.0;
static float noise = 0.1;
static int num_nw_lps = 0, num_mpi_lps = 0;
static int num_syn_clients;
......@@ -751,7 +751,7 @@ void arrive_syn_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
(void)bf;
(void)lp;
// printf("\n Data arrived %d total data %ld ", m->fwd.num_bytes, s->syn_data);
// printf("\n Data arrived %lld rank %llu total data %ld ", m->fwd.num_bytes, s->nw_id, s->syn_data);
int data = m->fwd.num_bytes;
s->syn_data += data;
num_syn_bytes_recvd += data;
......@@ -931,6 +931,7 @@ static void codes_exec_mpi_wait_rc(nw_state* s, tw_bf * bf, tw_lp* lp, nw_messag
index++;
}
}
//get_next_mpi_operation_rc(s, bf, lp, m);
codes_issue_next_event_rc(lp);
return;
}
......@@ -967,6 +968,7 @@ static void codes_exec_mpi_wait(nw_state* s, tw_bf * bf, nw_message * m, tw_lp*
tw_output(lp, "\n wait matched at post %d ", req_id);
print_completed_queue(lp, &s->completed_reqs);
}
//get_next_mpi_operation(s, bf, m, lp);
return;
}
++index;
......@@ -1016,6 +1018,7 @@ static void codes_exec_mpi_wait_all_rc(
{
add_completed_reqs(s, lp, m->fwd.num_matched);
codes_issue_next_event_rc(lp);
//get_next_mpi_operation_rc(s, bf, lp, m);
}
return;
}
......@@ -1091,6 +1094,7 @@ static void codes_exec_mpi_wait_all(
free(wait_op);
s->wait_op = NULL;
codes_issue_next_event(lp);
//get_next_mpi_operation(s, bf, lp, m);
}
else
{
......@@ -1172,6 +1176,7 @@ static int rm_matching_rcv(nw_state * ns,
{
bf->c8 = 1;
codes_issue_next_event(lp);
//get_next_mpi_operation(s, bf, lp, m);
}
qlist_del(&qi->ql);
......@@ -1241,6 +1246,7 @@ static int rm_matching_send(nw_state * ns,
{
bf->c6 = 1;
codes_issue_next_event(lp);
//get_next_mpi_operation(s, bf, lp, m);
}
......@@ -1282,10 +1288,13 @@ static void codes_exec_comp_delay(
nw_message* msg;
m->rc.saved_delay = s->compute_time;
s->compute_time += s_to_ns(mpi_op->u.delay.seconds);
ts = s_to_ns(mpi_op->u.delay.seconds);
s->compute_time += mpi_op->u.delay.nsecs;
ts = mpi_op->u.delay.nsecs;
if(ts <= 0)
ts = g_tw_lookahead + 0.1 + tw_rand_exponential(lp->rng, noise);
//ts = s_to_ns(mpi_op->u.delay.seconds);
ts += g_tw_lookahead + 0.1 + tw_rand_exponential(lp->rng, noise);
// ts += g_tw_lookahead + 0.1 + tw_rand_exponential(lp->rng, noise);
assert(ts > 0);
e = tw_event_new( lp->gid, ts , lp );
......@@ -1429,14 +1438,15 @@ static void codes_exec_mpi_send_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_
model_net_event_rc2(lp, &m->event_rc);
if(bf->c4)
codes_issue_next_event_rc(lp);
if(bf->c3)
{
s->num_bytes_sent -= m->rc.saved_num_bytes;
num_bytes_sent -= m->rc.saved_num_bytes;
}
if(bf->c4)
//get_next_mpi_operation_rc(s, bf, m, lp);
codes_issue_next_event_rc(lp);
}
/* executes MPI send and isend operations */
static void codes_exec_mpi_send(nw_state* s,
......@@ -1574,6 +1584,7 @@ static void codes_exec_mpi_send(nw_state* s,
{
bf->c4 = 1;
codes_issue_next_event(lp);
//get_next_mpi_operation(s, bf, m, lp);
}
}
......@@ -1590,6 +1601,8 @@ static tw_stime ns_to_s(tw_stime ns)
static void update_completed_queue_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
if(m->fwd.wait_completed > 0)
s->wait_op->num_completed--;
if(bf->c0)
{
......@@ -1604,10 +1617,9 @@ static void update_completed_queue_rc(nw_state * s, tw_bf * bf, nw_message * m,
s->wait_op = wait_elem;
s->wait_time = m->rc.saved_wait_time;
add_completed_reqs(s, lp, m->fwd.num_matched);
//get_next_mpi_operation_rc(s, bf, m, lp);
codes_issue_next_event_rc(lp);
}
if(m->fwd.wait_completed > 0)
s->wait_op->num_completed--;
}
static void update_completed_queue(nw_state* s,
......@@ -1648,6 +1660,7 @@ static void update_completed_queue(nw_state* s,
rc_stack_push(lp, wait_elem, free, s->processed_wait_op);
s->wait_op = NULL;
//get_next_mpi_operation(s, bf, m, lp);
codes_issue_next_event(lp);
}
}
......@@ -1952,7 +1965,7 @@ void nw_test_init(nw_state* s, tw_lp* lp)
s->app_id = lid.job;
s->local_rank = lid.rank;
if(strcmp(file_name_of_job[lid.job], "synthetic") == 0)
if(strncmp(file_name_of_job[lid.job], "synthetic", 9) == 0)
{
int synthetic_pattern;
sscanf(file_name_of_job[lid.job], "synthetic%d", &synthetic_pattern);
......@@ -1980,6 +1993,7 @@ void nw_test_init(nw_state* s, tw_lp* lp)
{
wrkld_id = codes_workload_load(type_name, params, s->app_id, s->local_rank);
//get_next_mpi_operation(s, bf, m, lp);
codes_issue_next_event(lp);
}
if(enable_sampling && sampling_interval > 0)
......@@ -2090,6 +2104,7 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
if(m->op_type == CODES_WK_SEND && (is_eager == 1 || m->fwd.rend_send == 1))
{
bf->c29 = 1;
//get_next_mpi_operation(s, bf, m, lp);
codes_issue_next_event(lp);
}
else
......@@ -2125,9 +2140,24 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
}
}
static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
static void pull_next_workload_op_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
codes_workload_get_next_rc(wrkld_id, s->app_id, s->local_rank, m->rc.mpi_op);
}
static struct codes_workload_op * pull_next_workload_op(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
struct codes_workload_op * mpi_op = (struct codes_workload_op*)calloc(1, sizeof(struct codes_workload_op));
// printf("\n App id %d local rank %d ", s->app_id, s->local_rank);
//struct codes_workload_op mpi_op;
codes_workload_get_next(wrkld_id, s->app_id, s->local_rank, mpi_op);
m->rc.mpi_op = mpi_op;
m->op_type = mpi_op->op_type;
return mpi_op;
}
static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
pull_next_workload_op_rc(s, bf, m, lp);
if(m->op_type == CODES_WK_END)
{
......@@ -2164,7 +2194,8 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
{
s->num_delays--;
if(disable_delay)
codes_issue_next_event_rc(lp);
get_next_mpi_operation_rc(s, bf, m, lp);
//codes_issue_next_event_rc(lp);
else
{
tw_rand_reverse_unif(lp->rng);
......@@ -2184,7 +2215,8 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
{
s->col_time = 0;
}
codes_issue_next_event_rc(lp);
get_next_mpi_operation_rc(s, bf, m, lp);
//codes_issue_next_event_rc(lp);
}
break;
case CODES_WK_BCAST:
......@@ -2196,7 +2228,8 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
case CODES_WK_COL:
{
s->num_cols--;
codes_issue_next_event_rc(lp);
//codes_issue_next_event_rc(lp);
get_next_mpi_operation_rc(s, bf, m, lp);
}
break;
......@@ -2204,7 +2237,8 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
case CODES_WK_WAITANY:
{
s->num_waitsome--;
codes_issue_next_event_rc(lp);
//codes_issue_next_event_rc(lp);
get_next_mpi_operation_rc(s, bf, m, lp);
}
break;
......@@ -2224,16 +2258,10 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
printf("\n Invalid op type %d ", m->op_type);
}
}
static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
struct codes_workload_op * mpi_op = (struct codes_workload_op*)calloc(1, sizeof(struct codes_workload_op));
// printf("\n App id %d local rank %d ", s->app_id, s->local_rank);
//struct codes_workload_op mpi_op;
codes_workload_get_next(wrkld_id, s->app_id, s->local_rank, mpi_op);
m->rc.mpi_op = mpi_op;
m->op_type = mpi_op->op_type;
struct codes_workload_op * mpi_op = pull_next_workload_op(s, bf, m, lp);
if(mpi_op->op_type == CODES_WK_END)
{
......@@ -2284,7 +2312,8 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
//printf("\n MPI DELAY ");
s->num_delays++;
if(disable_delay)
codes_issue_next_event(lp);
get_next_mpi_operation(s, bf, m, lp);
//codes_issue_next_event(lp);
else
codes_exec_comp_delay(s, m, lp, mpi_op);
}
......@@ -2295,7 +2324,8 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
{
//printf("\n MPI WAITANY WAITSOME ");
s->num_waitsome++;
codes_issue_next_event(lp);
get_next_mpi_operation(s, bf, m, lp);
//codes_issue_next_event(lp);
}
break;
......@@ -2332,7 +2362,8 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
{
s->col_time = tw_now(lp);
}
codes_issue_next_event(lp);
get_next_mpi_operation(s, bf, m, lp);
//codes_issue_next_event(lp);
}
break;
......@@ -2345,7 +2376,8 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
case CODES_WK_COL:
{
s->num_cols++;
codes_issue_next_event(lp);
get_next_mpi_operation(s, bf, m, lp);
//codes_issue_next_event(lp);
}
break;
default:
......@@ -2375,13 +2407,18 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
if(lid.job < 0)
return;
if(strncmp(file_name_of_job[lid.job], "synthetic", 9) == 0)
return;
}
else
{
if(s->nw_id >= (tw_lpid)num_net_traces)
return;
}
if(strcmp(workload_type, "online") == 0)
if(strcmp(workload_type, "online") == 0)
codes_workload_finalize("online_comm_workload", params, s->app_id, s->local_rank);
struct msg_size_info * tmp_msg = NULL;
......@@ -2481,6 +2518,7 @@ void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * l
{
if(bf->c29)
codes_issue_next_event_rc(lp);
//get_next_mpi_operation_rc(s, bf, m, lp);
if(bf->c28)
update_completed_queue_rc(s, bf, m, lp);
}
......@@ -2497,7 +2535,8 @@ void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * l
codes_local_latency_reverse(lp);
if(bf->c10)
codes_issue_next_event_rc(lp);
codes_issue_next_event_rc(lp);
//get_next_mpi_operation_rc(s, bf, m, lp);
if(bf->c8)
update_completed_queue_rc(s, bf, m, lp);
......@@ -2689,7 +2728,7 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
{
ref = fscanf(name_file, "%d %s", &num_traces_of_job[i], file_name_of_job[i]);
if(ref != EOF && strcmp(file_name_of_job[i], "synthetic") == 0)
if(ref != EOF && strncmp(file_name_of_job[i], "synthetic", 9) == 0)
{
num_syn_clients = num_traces_of_job[i];
num_net_traces += num_traces_of_job[i];
......
......@@ -159,7 +159,6 @@ int codes_workload_load(
for(i=0; method_array[i] != NULL; i++)
{
printf("\n loading for workload %s %s ", type, method_array[i]->method_name);
if(strcmp(method_array[i]->method_name, type) == 0)
{
/* load appropriate workload generator */
......
......@@ -32,6 +32,7 @@ static int rank_tbl_pop = 0;
static int total_rank_cnt = 0;
ABT_thread global_prod_thread = NULL;
ABT_xstream self_es;
double cpu_freq = 1.0;
struct shared_context {
int my_rank;
......@@ -278,13 +279,15 @@ void SWM_Irecv(SWM_PEER peer,
void SWM_Compute(long cycle_count)
{
if(!cpu_freq)
cpu_freq = 4.0e9;
/* Add an event in the shared queue and then yield */
struct codes_workload_op wrkld_per_rank;
wrkld_per_rank.op_type = CODES_WK_DELAY;
/* TODO: Check how to convert cycle count into delay? */
wrkld_per_rank.u.delay.nsecs = cycle_count;
wrkld_per_rank.u.delay.nsecs = (cycle_count/cpu_freq);
wrkld_per_rank.u.delay.seconds = (cycle_count / cpu_freq) / (1000.0 * 1000.0 * 1000.0);
#ifdef DBG_COMM
printf("\n compute op delay: %ld ", cycle_count);
#endif
......@@ -713,6 +716,7 @@ static int comm_online_workload_load(const char * params, int app_id, int rank)
// boost::property_tree::json_parser::write_json("file.json", root);
boost::property_tree::json_parser::read_json(jsonFile, root);
uint32_t process_cnt = root.get<uint32_t>("jobs.size", 1);
cpu_freq = root.get<double>("jobs.cfg.cpu_freq");
}
catch(std::exception & e)
{
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment