Commit 9bbcd39b authored by Caitlin Ross's avatar Caitlin Ross
Browse files

adding sampling data for MPI replay layer

parent 7f32b5f0
......@@ -212,6 +212,22 @@ struct msg_size_info
struct qhash_head hash_link;
struct qlist_head ql;
};
struct ross_model_sample
{
tw_lpid nw_id;
int app_id;
int local_rank;
unsigned long num_sends;
unsigned long num_recvs;
unsigned long long num_bytes_sent;
unsigned long long num_bytes_recvd;
double send_time;
double recv_time;
double wait_time;
double compute_time;
};
typedef struct mpi_msgs_queue mpi_msgs_queue;
typedef struct completed_requests completed_requests;
typedef struct pending_waits pending_waits;
......@@ -300,6 +316,7 @@ struct nw_state
struct mpi_workload_sample * mpi_wkld_samples;
char output_buf[512];
char col_stats[64];
struct ross_model_sample ross_sample;
};
/* data for handling reverse computation.
......@@ -336,9 +353,13 @@ struct nw_message
{
int saved_perm;
double saved_send_time;
double saved_send_time_sample;
double saved_recv_time;
double saved_recv_time_sample;
double saved_wait_time;
double saved_wait_time_sample;
double saved_delay;
double saved_delay_sample;
int64_t saved_num_bytes;
int saved_syn_length;
unsigned long saved_prev_switch;
......@@ -642,9 +663,11 @@ static void gen_synthetic_tr_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp
s->gen_data -= payload_sz;
num_syn_bytes_sent -= payload_sz;
s->num_bytes_sent -= payload_sz;
s->ross_sample.num_bytes_sent -= payload_sz;
}
tw_rand_reverse_unif(lp->rng);
s->num_sends--;
s->ross_sample.num_sends--;
if(bf->c5)
s->is_finished = 0;
......@@ -786,10 +809,12 @@ static void gen_synthetic_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * l
s->gen_data += payload_sz;
s->num_bytes_sent += payload_sz;
s->ross_sample.num_bytes_sent += payload_sz;
num_syn_bytes_sent += payload_sz;
}
}
s->num_sends++;
s->ross_sample.num_sends++;
/* New event after MEAN_INTERVAL */
tw_stime ts = mean_interval + tw_rand_exponential(lp->rng, noise);
......@@ -815,11 +840,14 @@ void arrive_syn_tr_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
(void)lp;
// printf("\n Data arrived %d total data %ld ", m->fwd.num_bytes, s->syn_data);
s->num_recvs--;
s->ross_sample.num_recvs--;
int data = m->fwd.num_bytes;
s->syn_data -= data;
num_syn_bytes_recvd -= data;
s->num_bytes_recvd -= data;
s->ross_sample.num_bytes_recvd -= data;
s->send_time = m->rc.saved_send_time;
s->ross_sample.send_time = m->rc.saved_send_time_sample;
}
void arrive_syn_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
......@@ -844,14 +872,18 @@ void arrive_syn_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
}*/
}
m->rc.saved_send_time = s->send_time;
m->rc.saved_send_time_sample = s->ross_sample.send_time;
if((tw_now(lp) - m->fwd.sim_start_time) > s->max_time)
s->max_time = tw_now(lp) - m->fwd.sim_start_time;
s->send_time += (tw_now(lp) - m->fwd.sim_start_time);
s->ross_sample.send_time += (tw_now(lp) - m->fwd.sim_start_time);
s->num_recvs++;
s->ross_sample.num_recvs++;
int data = m->fwd.num_bytes;
s->syn_data += data;
s->num_bytes_recvd += data;
s->ross_sample.num_bytes_recvd += data;
num_syn_bytes_recvd += data;
}
/* Debugging functions, may generate unused function warning */
......@@ -1255,7 +1287,9 @@ static int rm_matching_rcv(nw_state * ns,
{
bf->c12 = 1;
m->rc.saved_recv_time = ns->recv_time;
m->rc.saved_recv_time_sample = ns->ross_sample.recv_time;
ns->recv_time += (tw_now(lp) - m->fwd.sim_start_time);
ns->ross_sample.recv_time += (tw_now(lp) - m->fwd.sim_start_time);
}
if(qi->op_type == CODES_WK_IRECV && !is_rend)
{
......@@ -1321,7 +1355,9 @@ static int rm_matching_send(nw_state * ns,
}
m->rc.saved_recv_time = ns->recv_time;
m->rc.saved_recv_time_sample = ns->ross_sample.recv_time;
ns->recv_time += (tw_now(lp) - qitem->req_init_time);
ns->ross_sample.recv_time += (tw_now(lp) - qitem->req_init_time);
/*if(ns->nw_id == (tw_lpid)TRACK_LP && qitem->op_type == CODES_WK_IRECV)
{
......@@ -1381,7 +1417,9 @@ static void codes_exec_comp_delay(
nw_message* msg;
m->rc.saved_delay = s->compute_time;
m->rc.saved_delay_sample = s->ross_sample.compute_time;
s->compute_time += mpi_op->u.delay.nsecs;
s->ross_sample.compute_time += mpi_op->u.delay.nsecs;
ts = mpi_op->u.delay.nsecs;
if(ts <= g_tw_lookahead)
{
......@@ -1407,6 +1445,7 @@ static void codes_exec_mpi_recv_rc(
tw_lp* lp)
{
ns->recv_time = m->rc.saved_recv_time;
ns->ross_sample.recv_time = m->rc.saved_recv_time_sample;
if(bf->c11)
codes_issue_next_event_rc(lp);
......@@ -1416,6 +1455,7 @@ static void codes_exec_mpi_recv_rc(
if(m->fwd.found_match >= 0)
{
ns->recv_time = m->rc.saved_recv_time;
ns->ross_sample.recv_time = m->rc.saved_recv_time_sample;
//int queue_count = qlist_count(&ns->arrival_queue);
mpi_msgs_queue * qi = (mpi_msgs_queue*)rc_stack_pop(ns->processed_ops);
......@@ -1465,7 +1505,8 @@ static void codes_exec_mpi_recv(
If no matching isend is found, the receive operation is queued in the pending queue of
receive operations. */
m->rc.saved_recv_time = s->recv_time;
m->rc.saved_recv_time = s->recv_time;
m->rc.saved_recv_time_sample = s->ross_sample.recv_time;
m->rc.saved_num_bytes = mpi_op->u.recv.num_bytes;
mpi_msgs_queue * recv_op = (mpi_msgs_queue*) malloc(sizeof(mpi_msgs_queue));
......@@ -1529,7 +1570,10 @@ static void codes_exec_mpi_send_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_
}
}
if(bf->c15 || bf->c16)
{
s->num_sends--;
s->ross_sample.num_sends--;
}
if (bf->c15)
model_net_event_rc2(lp, &m->event_rc);
......@@ -1544,6 +1588,7 @@ static void codes_exec_mpi_send_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_
if(bf->c3)
{
s->num_bytes_sent -= m->rc.saved_num_bytes;
s->ross_sample.num_bytes_sent -= m->rc.saved_num_bytes;
num_bytes_sent -= m->rc.saved_num_bytes;
}
}
......@@ -1639,6 +1684,7 @@ static void codes_exec_mpi_send(nw_state* s,
bf->c15 = 1;
is_eager = 1;
s->num_sends++;
s->ross_sample.num_sends++;
tw_stime copy_overhead = copy_per_byte_eager * mpi_op->u.send.num_bytes;
local_m.fwd.sim_start_time = tw_now(lp);
......@@ -1654,6 +1700,7 @@ static void codes_exec_mpi_send(nw_state* s,
* only remote message sent. */
bf->c16 = 1;
s->num_sends++;
s->ross_sample.num_sends++;
remote_m.fwd.sim_start_time = tw_now(lp);
remote_m.fwd.dest_rank = mpi_op->u.send.dest_rank;
remote_m.fwd.src_rank = mpi_op->u.send.source_rank;
......@@ -1697,6 +1744,7 @@ static void codes_exec_mpi_send(nw_state* s,
{
bf->c3 = 1;
s->num_bytes_sent += mpi_op->u.send.num_bytes;
s->ross_sample.num_bytes_sent += mpi_op->u.send.num_bytes;
num_bytes_sent += mpi_op->u.send.num_bytes;
}
/* isend executed, now get next MPI operation from the queue */
......@@ -1733,6 +1781,7 @@ static void update_completed_queue_rc(nw_state * s, tw_bf * bf, nw_message * m,
struct pending_waits* wait_elem = (struct pending_waits*)rc_stack_pop(s->processed_wait_op);
s->wait_op = wait_elem;
s->wait_time = m->rc.saved_wait_time;
s->ross_sample.wait_time = m->rc.saved_wait_time_sample;
add_completed_reqs(s, lp, m->fwd.num_matched);
codes_issue_next_event_rc(lp);
}
......@@ -1772,7 +1821,9 @@ static void update_completed_queue(nw_state* s,
m->fwd.num_matched = clear_completed_reqs(s, lp, s->wait_op->req_ids, s->wait_op->count);
m->rc.saved_wait_time = s->wait_time;
m->rc.saved_wait_time_sample = s->ross_sample.wait_time;
s->wait_time += (tw_now(lp) - s->wait_op->start_time);
s->ross_sample.wait_time += (tw_now(lp) - s->wait_op->start_time);
struct pending_waits* wait_elem = s->wait_op;
rc_stack_push(lp, wait_elem, free, s->processed_wait_op);
......@@ -1844,6 +1895,7 @@ static void update_arrival_queue_rc(nw_state* s,
nw_message * m, tw_lp * lp)
{
s->num_bytes_recvd -= m->fwd.num_bytes;
s->ross_sample.num_bytes_recvd -= m->fwd.num_bytes;
num_bytes_recvd -= m->fwd.num_bytes;
if(bf->c1)
......@@ -1876,7 +1928,10 @@ static void update_arrival_queue_rc(nw_state* s,
}
}
if(bf->c12)
s->recv_time = m->rc.saved_recv_time;
{
s->recv_time = m->rc.saved_recv_time;
s->ross_sample.recv_time = m->rc.saved_recv_time_sample;
}
//if(bf->c10)
// send_ack_back_rc(s, bf, m, lp);
......@@ -1903,8 +1958,10 @@ static void update_arrival_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp
//if(s->local_rank != m->fwd.dest_rank)
// printf("\n Dest rank %d local rank %d ", m->fwd.dest_rank, s->local_rank);
m->rc.saved_recv_time = s->recv_time;
m->rc.saved_recv_time = s->recv_time;
m->rc.saved_recv_time_sample = s->ross_sample.recv_time;
s->num_bytes_recvd += m->fwd.num_bytes;
s->ross_sample.num_bytes_recvd += m->fwd.num_bytes;
num_bytes_recvd += m->fwd.num_bytes;
// send a callback to the sender to increment times
......@@ -1965,7 +2022,9 @@ static void update_message_time(
(void)lp;
m->rc.saved_send_time = s->send_time;
m->rc.saved_send_time_sample = s->ross_sample.send_time;
s->send_time += m->fwd.msg_send_time;
s->ross_sample.send_time += m->fwd.msg_send_time;
}
static void update_message_time_rc(
......@@ -1977,6 +2036,7 @@ static void update_message_time_rc(
(void)bf;
(void)lp;
s->send_time = m->rc.saved_send_time;
s->ross_sample.send_time = m->rc.saved_send_time_sample;
}
/* initializes the network node LP, loads the trace file in the structs, calls the first MPI operation to be executed */
......@@ -2201,7 +2261,9 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
}
m->rc.saved_recv_time = s->recv_time;
m->rc.saved_recv_time_sample = s->ross_sample.recv_time;
s->recv_time += (tw_now(lp) - m->fwd.sim_start_time);
s->ross_sample.recv_time += (tw_now(lp) - m->fwd.sim_start_time);
}
break;
......@@ -2303,6 +2365,7 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
{
codes_exec_mpi_recv_rc(s, bf, m, lp);
s->num_recvs--;
s->ross_sample.num_recvs--;
}
break;
......@@ -2317,6 +2380,7 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
if (bf->c28)
tw_rand_reverse_unif(lp->rng);
s->compute_time = m->rc.saved_delay;
s->ross_sample.compute_time = m->rc.saved_delay_sample;
}
}
break;
......@@ -2426,6 +2490,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
case CODES_WK_IRECV:
{
s->num_recvs++;
s->ross_sample.num_recvs++;
//printf("\n MPI RECV ");
codes_exec_mpi_recv(s, bf, m, lp, mpi_op);
}
......@@ -2662,6 +2727,7 @@ void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * l
update_completed_queue_rc(s, bf, m, lp);
s->recv_time = m->rc.saved_recv_time;
s->ross_sample.recv_time = m->rc.saved_recv_time_sample;
}
break;
......@@ -2767,14 +2833,28 @@ void nw_lp_model_stat_collect(nw_state *s, tw_lp *lp, char *buffer)
return;
}
void ross_nw_lp_sample_fn(nw_state * s, tw_bf * bf, tw_lp * lp, struct ross_model_sample *sample)
{
memcpy(sample, &s->ross_sample, sizeof(s->ross_sample));
sample->nw_id = s->nw_id;
sample->app_id = s->app_id;
sample->local_rank = s->local_rank;
memset(&s->ross_sample, 0, sizeof(s->ross_sample));
}
void ross_nw_lp_sample_rc_fn(nw_state * s, tw_bf * bf, tw_lp * lp, struct ross_model_sample *sample)
{
memcpy(&s->ross_sample, sample, sizeof(*sample));
}
st_model_types nw_lp_model_types[] = {
{(ev_trace_f) nw_lp_event_collect,
sizeof(int),
(model_stat_f) nw_lp_model_stat_collect,
0,
NULL,
NULL,
0},
(sample_event_f) ross_nw_lp_sample_fn,
(sample_revent_f) ross_nw_lp_sample_rc_fn,
sizeof(struct ross_model_sample)},
{NULL, 0, NULL, 0, NULL, NULL, 0}
};
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment