Commit af03699c authored by Noah Wolfe's avatar Noah Wolfe Committed by Noah Wolfe
Browse files

Updates to mpi-replay sampling statistics collection

parent f6337845
......@@ -144,8 +144,18 @@ struct mpi_workload_sample
int nw_id;
int app_id;
unsigned long num_sends_sample;
unsigned long num_bytes_sample;
unsigned long num_recvs_sample;
unsigned long num_cols_sample;
unsigned long num_delays_sample;
unsigned long num_wait_all_sample;
unsigned long num_waits_sample;
unsigned long num_bytes_sample;
unsigned long min_bytes_sample;
unsigned long max_bytes_sample;
double send_time_sample;
double wait_time_sample;
double comm_time_sample;
double comp_time_sample;
double sample_end_time;
};
/* stores pointers of pending MPI operations to be matched with their respective sends/receives. */
......@@ -269,6 +279,7 @@ struct nw_state
int max_arr_size;
struct mpi_workload_sample * mpi_wkld_samples;
char output_buf[512];
char output_buf2[16384];
char col_stats[64];
};
......@@ -306,6 +317,8 @@ struct nw_message
double saved_wait_time;
double saved_delay;
int64_t saved_num_bytes;
int64_t saved_min_bytes;
int64_t saved_max_bytes;
} rc;
};
......@@ -323,7 +336,7 @@ static void codes_exec_mpi_recv_rc(
nw_state* s, tw_bf * bf, nw_message* m, tw_lp* lp);
/* execute the computational delay */
static void codes_exec_comp_delay(
nw_state* s, nw_message * m, tw_lp* lp, struct codes_workload_op * mpi_op);
nw_state* s, nw_message * m, tw_lp* lp, struct codes_workload_op * mpi_op, tw_bf * bf);
/* gets the next MPI operation from the network-workloads API. */
static void get_next_mpi_operation(
nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp);
......@@ -362,6 +375,43 @@ static void update_message_time_rc(
/* conversion from seconds to eanaoseconds */
static tw_stime s_to_ns(tw_stime ns);
/* Helper functions for Sampling */
void increment_sampling_check_rc(nw_state* s, tw_bf* bf);
void increment_sampling_check(nw_state* s, tw_lp* lp, tw_bf* bf);
// Sampling Function: Always to be called after undoing the increment
void increment_sampling_check_rc(nw_state* s, tw_bf* bf)
{
if(bf->c13)
{
s->sampling_indx--;
s->cur_interval_end -= sampling_interval;
}
}
// Sampling Function: Always to be called before incrementing the metric
void increment_sampling_check(nw_state* s, tw_lp* lp, tw_bf* bf)
{
if(tw_now(lp) >= s->cur_interval_end)
{
bf->c13 = 1;
int indx = s->sampling_indx;
s->mpi_wkld_samples[indx].nw_id = s->nw_id;
s->mpi_wkld_samples[indx].app_id = s->app_id;
s->mpi_wkld_samples[indx].sample_end_time = s->cur_interval_end;
s->sampling_indx++;
s->cur_interval_end += sampling_interval;
}
if(s->sampling_indx >= MAX_STATS)
{
struct mpi_workload_sample * tmp = (struct mpi_workload_sample*)calloc((MAX_STATS + s->max_arr_size), sizeof(struct mpi_workload_sample));
memcpy(tmp, s->mpi_wkld_samples, s->sampling_indx);
free(s->mpi_wkld_samples);
s->mpi_wkld_samples = tmp;
s->max_arr_size += MAX_STATS;
}
}
/*static void update_message_size_rc(
struct nw_state * ns,
tw_lp * lp,
......@@ -930,16 +980,11 @@ static void codes_exec_mpi_wait_all_rc(
nw_message * m,
tw_lp* lp)
{
if(bf->c1)
if(enable_sampling)
{
int sampling_indx = s->sampling_indx;
s->mpi_wkld_samples[sampling_indx].num_waits_sample--;
if(bf->c2)
{
s->cur_interval_end -= sampling_interval;
s->sampling_indx--;
}
increment_sampling_check_rc(s, bf);
}
if(s->wait_op)
{
......@@ -967,25 +1012,7 @@ static void codes_exec_mpi_wait_all(
if(enable_sampling)
{
bf->c1 = 1;
if(tw_now(lp) >= s->cur_interval_end)
{
bf->c2 = 1;
int indx = s->sampling_indx;
s->mpi_wkld_samples[indx].nw_id = s->nw_id;
s->mpi_wkld_samples[indx].app_id = s->app_id;
s->mpi_wkld_samples[indx].sample_end_time = s->cur_interval_end;
s->cur_interval_end += sampling_interval;
s->sampling_indx++;
}
if(s->sampling_indx >= MAX_STATS)
{
struct mpi_workload_sample * tmp = (struct mpi_workload_sample*)calloc((MAX_STATS + s->max_arr_size), sizeof(struct mpi_workload_sample));
memcpy(tmp, s->mpi_wkld_samples, s->sampling_indx);
free(s->mpi_wkld_samples);
s->mpi_wkld_samples = tmp;
s->max_arr_size += MAX_STATS;
}
increment_sampling_check(s, lp, bf);
int indx = s->sampling_indx;
s->mpi_wkld_samples[indx].num_waits_sample++;
}
......@@ -1188,7 +1215,7 @@ static void codes_issue_next_event(tw_lp* lp)
/* Simulate delays between MPI operations */
static void codes_exec_comp_delay(
nw_state* s, nw_message * m, tw_lp* lp, struct codes_workload_op * mpi_op)
nw_state* s, nw_message * m, tw_lp* lp, struct codes_workload_op * mpi_op, tw_bf * bf)
{
tw_event* e;
tw_stime ts;
......@@ -1196,6 +1223,18 @@ static void codes_exec_comp_delay(
m->rc.saved_delay = s->compute_time;
s->compute_time += s_to_ns(mpi_op->u.delay.seconds);
if(enable_sampling)
{
increment_sampling_check(s, lp, bf);
int indx = s->sampling_indx;
if(indx == 0){
s->mpi_wkld_samples[indx].comp_time_sample = s->compute_time;
}else{
s->mpi_wkld_samples[indx].comp_time_sample = s->mpi_wkld_samples[indx-1].comp_time_sample - s->compute_time;
}
}
ts = s_to_ns(mpi_op->u.delay.seconds);
ts += g_tw_lookahead + 0.1 + tw_rand_exponential(lp->rng, noise);
......@@ -1277,6 +1316,13 @@ static void codes_exec_mpi_recv(
m->rc.saved_recv_time = s->recv_time;
m->rc.saved_num_bytes = mpi_op->u.recv.num_bytes;
if(enable_sampling)
{
increment_sampling_check(s, lp, bf);
int indx = s->sampling_indx;
s->mpi_wkld_samples[indx].num_recvs_sample++;
}
mpi_msgs_queue * recv_op = (mpi_msgs_queue*) malloc(sizeof(mpi_msgs_queue));
recv_op->req_init_time = tw_now(lp);
recv_op->op_type = mpi_op->op_type;
......@@ -1385,27 +1431,16 @@ static void codes_exec_mpi_send(nw_state* s,
}
if(enable_sampling)
{
if(tw_now(lp) >= s->cur_interval_end)
{
bf->c1 = 1;
int indx = s->sampling_indx;
s->mpi_wkld_samples[indx].nw_id = s->nw_id;
s->mpi_wkld_samples[indx].app_id = s->app_id;
s->mpi_wkld_samples[indx].sample_end_time = s->cur_interval_end;
s->sampling_indx++;
s->cur_interval_end += sampling_interval;
}
if(s->sampling_indx >= MAX_STATS)
{
struct mpi_workload_sample * tmp = (struct mpi_workload_sample*)calloc((MAX_STATS + s->max_arr_size), sizeof(struct mpi_workload_sample));
memcpy(tmp, s->mpi_wkld_samples, s->sampling_indx);
free(s->mpi_wkld_samples);
s->mpi_wkld_samples = tmp;
s->max_arr_size += MAX_STATS;
}
increment_sampling_check(s, lp, bf);
int indx = s->sampling_indx;
s->mpi_wkld_samples[indx].num_sends_sample++;
s->mpi_wkld_samples[indx].num_bytes_sample += mpi_op->u.send.num_bytes;
m->rc.saved_max_bytes = s->mpi_wkld_samples[indx].max_bytes_sample;
m->rc.saved_min_bytes = s->mpi_wkld_samples[indx].min_bytes_sample;
if(s->mpi_wkld_samples[indx].max_bytes_sample < mpi_op->u.send.num_bytes)
s->mpi_wkld_samples[indx].max_bytes_sample = mpi_op->u.send.num_bytes;
if(s->mpi_wkld_samples[indx].min_bytes_sample == 0 || s->mpi_wkld_samples[indx].min_bytes_sample > mpi_op->u.send.num_bytes)
s->mpi_wkld_samples[indx].min_bytes_sample = mpi_op->u.send.num_bytes;
}
nw_message local_m;
nw_message remote_m;
......@@ -1512,6 +1547,8 @@ static void update_completed_queue_rc(nw_state * s, tw_bf * bf, nw_message * m,
struct pending_waits* wait_elem = (struct pending_waits*)rc_stack_pop(s->processed_wait_op);
s->wait_op = wait_elem;
s->wait_time = m->rc.saved_wait_time;
if(enable_sampling)
increment_sampling_check_rc(s, bf);
add_completed_reqs(s, lp, m->fwd.num_matched);
codes_issue_next_event_rc(lp);
}
......@@ -1553,6 +1590,19 @@ static void update_completed_queue(nw_state* s,
m->rc.saved_wait_time = s->wait_time;
s->wait_time += (tw_now(lp) - s->wait_op->start_time);
if(enable_sampling)
{
increment_sampling_check(s, lp, bf);
int indx = s->sampling_indx;
if(indx == 0){
s->mpi_wkld_samples[indx].wait_time_sample = s->wait_time;
s->mpi_wkld_samples[indx].comm_time_sample = s->elapsed_time - s->compute_time;
}else{
s->mpi_wkld_samples[indx].wait_time_sample = s->wait_time - s->mpi_wkld_samples[indx-1].wait_time_sample;
s->mpi_wkld_samples[indx].comm_time_sample = s->mpi_wkld_samples[indx-1].comm_time_sample - (s->elapsed_time - s->compute_time);
}
}
struct pending_waits* wait_elem = s->wait_op;
rc_stack_push(lp, wait_elem, free, s->processed_wait_op);
s->wait_op = NULL;
......@@ -1719,6 +1769,15 @@ static void update_message_time(
m->rc.saved_send_time = s->send_time;
s->send_time += m->fwd.msg_send_time;
if(enable_sampling)
{
increment_sampling_check(s, lp, bf);
int indx = s->sampling_indx;
if(indx == 0)
s->mpi_wkld_samples[indx].send_time_sample = s->send_time;
else
s->mpi_wkld_samples[indx].send_time_sample = s->send_time - s->mpi_wkld_samples[indx-1].send_time_sample;
}
}
static void update_message_time_rc(
......@@ -1730,6 +1789,8 @@ static void update_message_time_rc(
(void)bf;
(void)lp;
s->send_time = m->rc.saved_send_time;
if(enable_sampling)
increment_sampling_check_rc(s, bf);
}
/* initializes the network node LP, loads the trace file in the structs, calls the first MPI operation to be executed */
......@@ -1999,12 +2060,33 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
case CODES_WK_ISEND:
{
codes_exec_mpi_send_rc(s, bf, m, lp);
if(enable_sampling)
{
int indx = s->sampling_indx;
s->mpi_wkld_samples[indx].num_sends_sample--;
s->mpi_wkld_samples[indx].num_bytes_sample -= m->rc.saved_num_bytes;
s->mpi_wkld_samples[indx].max_bytes_sample = m->rc.saved_max_bytes;
s->mpi_wkld_samples[indx].min_bytes_sample = m->rc.saved_min_bytes;
increment_sampling_check_rc(s, bf);
}
model_net_event_rc2(lp, &m->event_rc);
if(m->op_type == CODES_WK_ISEND)
codes_issue_next_event_rc(lp);
s->num_sends--;
s->num_bytes_sent += m->rc.saved_num_bytes;
num_bytes_sent -= m->rc.saved_num_bytes;
}
break;
case CODES_WK_IRECV:
case CODES_WK_RECV:
{
if(enable_sampling)
{
int indx = s->sampling_indx;
s->mpi_wkld_samples[indx].num_recvs_sample--;
increment_sampling_check_rc(s, bf);
}
codes_exec_mpi_recv_rc(s, bf, m, lp);
s->num_recvs--;
}
......@@ -2013,6 +2095,12 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
case CODES_WK_DELAY:
{
if(enable_sampling)
{
int indx = s->sampling_indx;
s->mpi_wkld_samples[indx].num_delays_sample--;
increment_sampling_check_rc(s, bf);
}
s->num_delays--;
if(disable_delay)
codes_issue_next_event_rc(lp);
......@@ -2020,6 +2108,10 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
{
tw_rand_reverse_unif(lp->rng);
s->compute_time = m->rc.saved_delay;
if(enable_sampling)
{
increment_sampling_check_rc(s, bf);
}
}
}
break;
......@@ -2046,6 +2138,12 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
case CODES_WK_REDUCE:
case CODES_WK_COL:
{
if(enable_sampling)
{
int indx = s->sampling_indx;
s->mpi_wkld_samples[indx].num_cols_sample--;
increment_sampling_check_rc(s, bf);
}
s->num_cols--;
codes_issue_next_event_rc(lp);
}
......@@ -2061,12 +2159,24 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
case CODES_WK_WAIT:
{
if(enable_sampling)
{
int indx = s->sampling_indx;
s->mpi_wkld_samples[indx].num_waits_sample--;
increment_sampling_check_rc(s, bf);
}
s->num_wait--;
codes_exec_mpi_wait_rc(s, bf, lp, m);
}
break;
case CODES_WK_WAITALL:
{
if(enable_sampling)
{
int indx = s->sampling_indx;
s->mpi_wkld_samples[indx].num_wait_all_sample--;
increment_sampling_check_rc(s, bf);
}
s->num_waitall--;
codes_exec_mpi_wait_all_rc(s, bf, m, lp);
}
......@@ -2132,11 +2242,17 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
case CODES_WK_DELAY:
{
//printf("\n MPI DELAY ");
if(enable_sampling)
{
increment_sampling_check(s, lp, bf);
int indx = s->sampling_indx;
s->mpi_wkld_samples[indx].num_delays_sample++;
}
s->num_delays++;
if(disable_delay)
codes_issue_next_event(lp);
else
codes_exec_comp_delay(s, m, lp, &mpi_op);
codes_exec_comp_delay(s, m, lp, &mpi_op, bf);
}
break;
......@@ -2152,6 +2268,12 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
case CODES_WK_WAITALL:
{
//printf("\n MPI WAITALL ");
if(enable_sampling)
{
increment_sampling_check(s, lp, bf);
int indx = s->sampling_indx;
s->mpi_wkld_samples[indx].num_wait_all_sample++;
}
s->num_waitall++;
codes_exec_mpi_wait_all(s, bf, m, lp, &mpi_op);
//codes_issue_next_event(lp);
......@@ -2160,6 +2282,12 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
case CODES_WK_WAIT:
{
//printf("\n MPI WAIT ");
if(enable_sampling)
{
increment_sampling_check(s, lp, bf);
int indx = s->sampling_indx;
s->mpi_wkld_samples[indx].num_waits_sample++;
}
s->num_wait++;
//TODO: Uncomment:
codes_exec_mpi_wait(s, bf, m, lp, &mpi_op);
......@@ -2193,7 +2321,13 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
case CODES_WK_ALLTOALLV:
case CODES_WK_COL:
{
s->num_cols++;
if(enable_sampling)
{
increment_sampling_check(s, lp, bf);
int indx = s->sampling_indx;
s->mpi_wkld_samples[indx].num_cols_sample++;
}
s->num_cols++;
codes_issue_next_event(lp);
}
break;
......@@ -2208,8 +2342,12 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
total_syn_data += s->syn_data;
int written = 0;
if(!s->nw_id)
int written2 = 0;
if(!s->nw_id){
written = sprintf(s->output_buf, "# Format <LP ID> <Terminal ID> <Total sends> <Total Recvs> <Bytes sent> <Bytes recvd> <Send time> <Comm. time> <Compute time> <Job ID>");
written2 = sprintf(s->output_buf2, "# Format <LP ID> <Terminal ID> <Interval> <Num Sends> <Num Recvs> <Num Collectives> <Num delays> <Num Wait_Alls> <Num Waits> <Send Time> <Wait Time> <Comm Time> <Compute Time> <Total Bytes> <Min Bytes> <Max Bytes>\n");
written2 += sprintf(s->output_buf2 + written2, "sampling-interval %lf sampling-end-time %lf\n", sampling_interval, sampling_end_time);
}
/*if(s->wait_op)
{
......@@ -2280,6 +2418,16 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
{
fseek(workload_agg_log, sample_bytes_written, SEEK_SET);
fwrite(s->mpi_wkld_samples, sizeof(struct mpi_workload_sample), s->sampling_indx + 1, workload_agg_log);
//printf("sampling_indx:%d sampling_interval:%f sampling_end_time:%f\n",s->sampling_indx, sampling_interval, sampling_end_time);
for(int i=0; i<sampling_end_time/sampling_interval; i++){
written2 += sprintf(s->output_buf2 + written2, "%llu %llu %d %lu %lu %lu %lu %lu %lu %lf %lf %lf %lf %lu %lu %lu\n",
LLU(lp->gid), LLU(s->nw_id), i, s->mpi_wkld_samples[i].num_sends_sample, s->mpi_wkld_samples[i].num_recvs_sample,
s->mpi_wkld_samples[i].num_cols_sample, s->mpi_wkld_samples[i].num_delays_sample, s->mpi_wkld_samples[i].num_wait_all_sample,
s->mpi_wkld_samples[i].num_waits_sample, s->mpi_wkld_samples[i].send_time_sample, s->mpi_wkld_samples[i].wait_time_sample,
s->mpi_wkld_samples[i].comm_time_sample, s->mpi_wkld_samples[i].comp_time_sample,
s->mpi_wkld_samples[i].num_bytes_sample, s->mpi_wkld_samples[i].min_bytes_sample, s->mpi_wkld_samples[i].max_bytes_sample);
}
lp_io_write(lp->gid, "mpi-sampling-stats", written2, s->output_buf2);
}
sample_bytes_written += (s->sampling_indx * sizeof(struct mpi_workload_sample));
if(s->wait_time > max_wait_time)
......@@ -2377,7 +2525,7 @@ const tw_optdef app_opt [] =
TWOPT_UINT("preserve_wait_ordering", preserve_wait_ordering, "only enable when getting unmatched send/recv errors in optimistic mode (turning on slows down simulation)"),
TWOPT_UINT("debug_cols", debug_cols, "completion time of collective operations (currently MPI_AllReduce)"),
TWOPT_UINT("enable_mpi_debug", enable_debug, "enable debugging of MPI sim layer (works with sync=1 only)"),
TWOPT_UINT("sampling_interval", sampling_interval, "sampling interval for MPI operations"),
TWOPT_STIME("sampling_interval", sampling_interval, "sampling interval for MPI operations"),
TWOPT_UINT("enable_sampling", enable_sampling, "enable sampling (only works in sequential mode)"),
TWOPT_STIME("mean_interval", mean_interval, "mean interval for generating background traffic"),
TWOPT_STIME("sampling_end_time", sampling_end_time, "sampling_end_time"),
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment