Commit f28191b7 authored by Misbah Mubarak's avatar Misbah Mubarak
Browse files

Merge branch 'dfp-instrumentation' into 'master'

DFP and MPI replay layer instrumentation updates

See merge request !70
parents 1f5c3974 81c58341
......@@ -41,7 +41,7 @@ PARAMS
# bandwidth in GiB/s for compute node-router channels
cn_bandwidth="16.0";
# ROSS message size
message_size="656";
message_size="736";
# number of compute nodes connected to router, dictated by dragonfly config
# file
num_cns_per_router="2";
......
......@@ -23,6 +23,6 @@ PARAMS
local_bandwidth="5.25";
global_bandwidth="4.7";
cn_bandwidth="5.25";
message_size="656";
message_size="736";
routing="adaptive";
}
......@@ -31,6 +31,6 @@ PARAMS
cn_bandwidth="9.0";
router_delay="0";
link_delay="0";
message_size="656";
message_size="736";
routing="minimal";
}
......@@ -10,7 +10,7 @@ LPGROUPS
PARAMS
{
packet_size="512";
message_size="656";
message_size="736";
modelnet_order=( "torus" );
# scheduler options
modelnet_scheduler="fcfs";
......
......@@ -212,6 +212,25 @@ struct msg_size_info
struct qhash_head hash_link;
struct qlist_head ql;
};
struct ross_model_sample
{
tw_lpid nw_id;
int app_id;
int local_rank;
unsigned long num_sends;
unsigned long num_recvs;
unsigned long long num_bytes_sent;
unsigned long long num_bytes_recvd;
double send_time;
double recv_time;
double wait_time;
double compute_time;
double comm_time;
double max_time;
double avg_msg_time;
};
typedef struct mpi_msgs_queue mpi_msgs_queue;
typedef struct completed_requests completed_requests;
typedef struct pending_waits pending_waits;
......@@ -300,6 +319,7 @@ struct nw_state
struct mpi_workload_sample * mpi_wkld_samples;
char output_buf[512];
char col_stats[64];
struct ross_model_sample ross_sample;
};
/* data for handling reverse computation.
......@@ -336,12 +356,17 @@ struct nw_message
{
int saved_perm;
double saved_send_time;
double saved_send_time_sample;
double saved_recv_time;
double saved_recv_time_sample;
double saved_wait_time;
double saved_wait_time_sample;
double saved_delay;
double saved_delay_sample;
int64_t saved_num_bytes;
int saved_syn_length;
unsigned long saved_prev_switch;
double saved_prev_max_time;
} rc;
};
......@@ -642,9 +667,11 @@ static void gen_synthetic_tr_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp
s->gen_data -= payload_sz;
num_syn_bytes_sent -= payload_sz;
s->num_bytes_sent -= payload_sz;
s->ross_sample.num_bytes_sent -= payload_sz;
}
tw_rand_reverse_unif(lp->rng);
s->num_sends--;
s->ross_sample.num_sends--;
if(bf->c5)
s->is_finished = 0;
......@@ -786,10 +813,12 @@ static void gen_synthetic_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * l
s->gen_data += payload_sz;
s->num_bytes_sent += payload_sz;
s->ross_sample.num_bytes_sent += payload_sz;
num_syn_bytes_sent += payload_sz;
}
}
s->num_sends++;
s->ross_sample.num_sends++;
/* New event after MEAN_INTERVAL */
tw_stime ts = mean_interval + tw_rand_exponential(lp->rng, noise);
......@@ -815,11 +844,19 @@ void arrive_syn_tr_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
(void)lp;
// printf("\n Data arrived %d total data %ld ", m->fwd.num_bytes, s->syn_data);
s->num_recvs--;
s->ross_sample.num_recvs--;
int data = m->fwd.num_bytes;
s->syn_data -= data;
num_syn_bytes_recvd -= data;
s->num_bytes_recvd -= data;
s->ross_sample.num_bytes_recvd -= data;
s->send_time = m->rc.saved_send_time;
s->ross_sample.send_time = m->rc.saved_send_time_sample;
if((tw_now(lp) - m->fwd.sim_start_time) > s->max_time)
{
s->max_time = m->rc.saved_prev_max_time;
s->ross_sample.max_time = m->rc.saved_prev_max_time;
}
}
void arrive_syn_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
......@@ -844,14 +881,22 @@ void arrive_syn_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
}*/
}
m->rc.saved_send_time = s->send_time;
m->rc.saved_send_time_sample = s->ross_sample.send_time;
if((tw_now(lp) - m->fwd.sim_start_time) > s->max_time)
{
m->rc.saved_prev_max_time = s->max_time;
s->max_time = tw_now(lp) - m->fwd.sim_start_time;
s->ross_sample.max_time = tw_now(lp) - m->fwd.sim_start_time;
}
s->send_time += (tw_now(lp) - m->fwd.sim_start_time);
s->ross_sample.send_time += (tw_now(lp) - m->fwd.sim_start_time);
s->num_recvs++;
s->ross_sample.num_recvs++;
int data = m->fwd.num_bytes;
s->syn_data += data;
s->num_bytes_recvd += data;
s->ross_sample.num_bytes_recvd += data;
num_syn_bytes_recvd += data;
}
/* Debugging functions, may generate unused function warning */
......@@ -1255,7 +1300,9 @@ static int rm_matching_rcv(nw_state * ns,
{
bf->c12 = 1;
m->rc.saved_recv_time = ns->recv_time;
m->rc.saved_recv_time_sample = ns->ross_sample.recv_time;
ns->recv_time += (tw_now(lp) - m->fwd.sim_start_time);
ns->ross_sample.recv_time += (tw_now(lp) - m->fwd.sim_start_time);
}
if(qi->op_type == CODES_WK_IRECV && !is_rend)
{
......@@ -1321,7 +1368,9 @@ static int rm_matching_send(nw_state * ns,
}
m->rc.saved_recv_time = ns->recv_time;
m->rc.saved_recv_time_sample = ns->ross_sample.recv_time;
ns->recv_time += (tw_now(lp) - qitem->req_init_time);
ns->ross_sample.recv_time += (tw_now(lp) - qitem->req_init_time);
/*if(ns->nw_id == (tw_lpid)TRACK_LP && qitem->op_type == CODES_WK_IRECV)
{
......@@ -1381,7 +1430,9 @@ static void codes_exec_comp_delay(
nw_message* msg;
m->rc.saved_delay = s->compute_time;
m->rc.saved_delay_sample = s->ross_sample.compute_time;
s->compute_time += mpi_op->u.delay.nsecs;
s->ross_sample.compute_time += mpi_op->u.delay.nsecs;
ts = mpi_op->u.delay.nsecs;
if(ts <= g_tw_lookahead)
{
......@@ -1407,6 +1458,7 @@ static void codes_exec_mpi_recv_rc(
tw_lp* lp)
{
ns->recv_time = m->rc.saved_recv_time;
ns->ross_sample.recv_time = m->rc.saved_recv_time_sample;
if(bf->c11)
codes_issue_next_event_rc(lp);
......@@ -1416,6 +1468,7 @@ static void codes_exec_mpi_recv_rc(
if(m->fwd.found_match >= 0)
{
ns->recv_time = m->rc.saved_recv_time;
ns->ross_sample.recv_time = m->rc.saved_recv_time_sample;
//int queue_count = qlist_count(&ns->arrival_queue);
mpi_msgs_queue * qi = (mpi_msgs_queue*)rc_stack_pop(ns->processed_ops);
......@@ -1466,6 +1519,7 @@ static void codes_exec_mpi_recv(
receive operations. */
m->rc.saved_recv_time = s->recv_time;
m->rc.saved_recv_time_sample = s->ross_sample.recv_time;
m->rc.saved_num_bytes = mpi_op->u.recv.num_bytes;
mpi_msgs_queue * recv_op = (mpi_msgs_queue*) malloc(sizeof(mpi_msgs_queue));
......@@ -1529,7 +1583,10 @@ static void codes_exec_mpi_send_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_
}
}
if(bf->c15 || bf->c16)
{
s->num_sends--;
s->ross_sample.num_sends--;
}
if (bf->c15)
model_net_event_rc2(lp, &m->event_rc);
......@@ -1544,6 +1601,7 @@ static void codes_exec_mpi_send_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_
if(bf->c3)
{
s->num_bytes_sent -= m->rc.saved_num_bytes;
s->ross_sample.num_bytes_sent -= m->rc.saved_num_bytes;
num_bytes_sent -= m->rc.saved_num_bytes;
}
}
......@@ -1639,6 +1697,7 @@ static void codes_exec_mpi_send(nw_state* s,
bf->c15 = 1;
is_eager = 1;
s->num_sends++;
s->ross_sample.num_sends++;
tw_stime copy_overhead = copy_per_byte_eager * mpi_op->u.send.num_bytes;
local_m.fwd.sim_start_time = tw_now(lp);
......@@ -1654,6 +1713,7 @@ static void codes_exec_mpi_send(nw_state* s,
* only remote message sent. */
bf->c16 = 1;
s->num_sends++;
s->ross_sample.num_sends++;
remote_m.fwd.sim_start_time = tw_now(lp);
remote_m.fwd.dest_rank = mpi_op->u.send.dest_rank;
remote_m.fwd.src_rank = mpi_op->u.send.source_rank;
......@@ -1697,6 +1757,7 @@ static void codes_exec_mpi_send(nw_state* s,
{
bf->c3 = 1;
s->num_bytes_sent += mpi_op->u.send.num_bytes;
s->ross_sample.num_bytes_sent += mpi_op->u.send.num_bytes;
num_bytes_sent += mpi_op->u.send.num_bytes;
}
/* isend executed, now get next MPI operation from the queue */
......@@ -1733,6 +1794,7 @@ static void update_completed_queue_rc(nw_state * s, tw_bf * bf, nw_message * m,
struct pending_waits* wait_elem = (struct pending_waits*)rc_stack_pop(s->processed_wait_op);
s->wait_op = wait_elem;
s->wait_time = m->rc.saved_wait_time;
s->ross_sample.wait_time = m->rc.saved_wait_time_sample;
add_completed_reqs(s, lp, m->fwd.num_matched);
codes_issue_next_event_rc(lp);
}
......@@ -1772,7 +1834,9 @@ static void update_completed_queue(nw_state* s,
m->fwd.num_matched = clear_completed_reqs(s, lp, s->wait_op->req_ids, s->wait_op->count);
m->rc.saved_wait_time = s->wait_time;
m->rc.saved_wait_time_sample = s->ross_sample.wait_time;
s->wait_time += (tw_now(lp) - s->wait_op->start_time);
s->ross_sample.wait_time += (tw_now(lp) - s->wait_op->start_time);
struct pending_waits* wait_elem = s->wait_op;
rc_stack_push(lp, wait_elem, free, s->processed_wait_op);
......@@ -1844,6 +1908,7 @@ static void update_arrival_queue_rc(nw_state* s,
nw_message * m, tw_lp * lp)
{
s->num_bytes_recvd -= m->fwd.num_bytes;
s->ross_sample.num_bytes_recvd -= m->fwd.num_bytes;
num_bytes_recvd -= m->fwd.num_bytes;
if(bf->c1)
......@@ -1876,7 +1941,10 @@ static void update_arrival_queue_rc(nw_state* s,
}
}
if(bf->c12)
{
s->recv_time = m->rc.saved_recv_time;
s->ross_sample.recv_time = m->rc.saved_recv_time_sample;
}
//if(bf->c10)
// send_ack_back_rc(s, bf, m, lp);
......@@ -1904,7 +1972,9 @@ static void update_arrival_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp
//if(s->local_rank != m->fwd.dest_rank)
// printf("\n Dest rank %d local rank %d ", m->fwd.dest_rank, s->local_rank);
m->rc.saved_recv_time = s->recv_time;
m->rc.saved_recv_time_sample = s->ross_sample.recv_time;
s->num_bytes_recvd += m->fwd.num_bytes;
s->ross_sample.num_bytes_recvd += m->fwd.num_bytes;
num_bytes_recvd += m->fwd.num_bytes;
// send a callback to the sender to increment times
......@@ -1965,7 +2035,9 @@ static void update_message_time(
(void)lp;
m->rc.saved_send_time = s->send_time;
m->rc.saved_send_time_sample = s->ross_sample.send_time;
s->send_time += m->fwd.msg_send_time;
s->ross_sample.send_time += m->fwd.msg_send_time;
}
static void update_message_time_rc(
......@@ -1977,6 +2049,7 @@ static void update_message_time_rc(
(void)bf;
(void)lp;
s->send_time = m->rc.saved_send_time;
s->ross_sample.send_time = m->rc.saved_send_time_sample;
}
/* initializes the network node LP, loads the trace file in the structs, calls the first MPI operation to be executed */
......@@ -2201,7 +2274,9 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
}
m->rc.saved_recv_time = s->recv_time;
m->rc.saved_recv_time_sample = s->ross_sample.recv_time;
s->recv_time += (tw_now(lp) - m->fwd.sim_start_time);
s->ross_sample.recv_time += (tw_now(lp) - m->fwd.sim_start_time);
}
break;
......@@ -2303,6 +2378,7 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
{
codes_exec_mpi_recv_rc(s, bf, m, lp);
s->num_recvs--;
s->ross_sample.num_recvs--;
}
break;
......@@ -2317,6 +2393,7 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
if (bf->c28)
tw_rand_reverse_unif(lp->rng);
s->compute_time = m->rc.saved_delay;
s->ross_sample.compute_time = m->rc.saved_delay_sample;
}
}
break;
......@@ -2426,6 +2503,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
case CODES_WK_IRECV:
{
s->num_recvs++;
s->ross_sample.num_recvs++;
//printf("\n MPI RECV ");
codes_exec_mpi_recv(s, bf, m, lp, mpi_op);
}
......@@ -2662,6 +2740,7 @@ void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * l
update_completed_queue_rc(s, bf, m, lp);
s->recv_time = m->rc.saved_recv_time;
s->ross_sample.recv_time = m->rc.saved_recv_time_sample;
}
break;
......@@ -2767,14 +2846,37 @@ void nw_lp_model_stat_collect(nw_state *s, tw_lp *lp, char *buffer)
return;
}
void ross_nw_lp_sample_fn(nw_state * s, tw_bf * bf, tw_lp * lp, struct ross_model_sample *sample)
{
memcpy(sample, &s->ross_sample, sizeof(s->ross_sample));
sample->nw_id = s->nw_id;
sample->app_id = s->app_id;
sample->local_rank = s->local_rank;
sample->comm_time = s->elapsed_time - s->compute_time;
if (alloc_spec == 1)
{
struct codes_jobmap_id lid;
lid = codes_jobmap_to_local_id(s->nw_id, jobmap_ctx);
if(strncmp(file_name_of_job[lid.job], "synthetic", 9) == 0)
sample->avg_msg_time = (s->send_time / s->num_recvs);
}
memset(&s->ross_sample, 0, sizeof(s->ross_sample));
}
void ross_nw_lp_sample_rc_fn(nw_state * s, tw_bf * bf, tw_lp * lp, struct ross_model_sample *sample)
{
memcpy(&s->ross_sample, sample, sizeof(*sample));
}
st_model_types nw_lp_model_types[] = {
{(ev_trace_f) nw_lp_event_collect,
sizeof(int),
(model_stat_f) nw_lp_model_stat_collect,
0,
NULL,
NULL,
0},
(sample_event_f) ross_nw_lp_sample_fn,
(sample_revent_f) ross_nw_lp_sample_rc_fn,
sizeof(struct ross_model_sample)},
{NULL, 0, NULL, 0, NULL, NULL, 0}
};
......
......@@ -1643,6 +1643,7 @@ void issue_rtr_bw_monitor_event(router_state *s, tw_bf *bf, terminal_plus_messag
s->qos_data[i][j] = 0;
}
s->busy_time_sample[i] = 0;
s->ross_rsample.busy_time[i] = 0;
}
if(tw_now(lp) > max_qos_monitor)
......@@ -2297,6 +2298,7 @@ static void packet_send(terminal_state *s, tw_bf *bf, terminal_plus_message *msg
s->busy_time += (tw_now(lp) - s->last_buf_full);
s->busy_time_sample += (tw_now(lp) - s->last_buf_full);
s->ross_sample.busy_time_sample += (tw_now(lp) - s->last_buf_full);
s->last_buf_full = 0.0;
}
}
......@@ -2737,7 +2739,7 @@ void dragonfly_plus_rsample_fin(router_state *s, tw_lp *lp)
if (s->router_id == 0) {
/* write metadata file */
char meta_fname[64];
sprintf(meta_fname, "dragonfly-plus-router-sampling.meta");
sprintf(meta_fname, "dragonfly-router-sampling.meta");
FILE *fp = fopen(meta_fname, "w");
fprintf(fp,
......@@ -2750,7 +2752,7 @@ void dragonfly_plus_rsample_fin(router_state *s, tw_lp *lp)
}
char rt_fn[MAX_NAME_LENGTH];
if (strcmp(router_sample_file, "") == 0)
sprintf(rt_fn, "dragonfly-plus-router-sampling-%ld.bin", g_tw_mynode);
sprintf(rt_fn, "dragonfly-router-sampling-%ld.bin", g_tw_mynode);
else
sprintf(rt_fn, "%s-%ld.bin", router_sample_file, g_tw_mynode);
......@@ -4168,6 +4170,7 @@ static void router_packet_send_rc(router_state *s, tw_bf *bf, terminal_plus_mess
{
s->busy_time[output_port] = msg->saved_rcv_time;
s->busy_time_sample[output_port] = msg->saved_sample_time;
s->ross_rsample.busy_time[output_port] = msg->saved_sample_time;
s->last_buf_full[output_port] = msg->saved_busy_time;
}
......@@ -4256,6 +4259,7 @@ static void router_packet_send(router_state *s, tw_bf *bf, terminal_plus_message
msg->saved_sample_time = s->busy_time_sample[output_port];
s->busy_time[output_port] += (tw_now(lp) - s->last_buf_full[output_port]);
s->busy_time_sample[output_port] += (tw_now(lp) - s->last_buf_full[output_port]);
s->ross_rsample.busy_time[output_port] += (tw_now(lp) - s->last_buf_full[output_port]);
s->last_buf_full[output_port] = 0.0;
}
......@@ -4330,6 +4334,7 @@ static void router_packet_send(router_state *s, tw_bf *bf, terminal_plus_message
bf->c11 = 1;
s->link_traffic[output_port] += (cur_entry->msg.packet_size % s->params->chunk_size);
s->link_traffic_sample[output_port] += (cur_entry->msg.packet_size % s->params->chunk_size);
s->ross_rsample.link_traffic_sample[output_port] += (cur_entry->msg.packet_size % s->params->chunk_size);
msg_size = cur_entry->msg.packet_size % s->params->chunk_size;
}
else {
......@@ -4399,6 +4404,7 @@ static void router_buf_update_rc(router_state *s, tw_bf *bf, terminal_plus_messa
if (bf->c3) {
s->busy_time[indx] = msg->saved_rcv_time;
s->busy_time_sample[indx] = msg->saved_sample_time;
s->ross_rsample.busy_time[indx] = msg->saved_sample_time;
s->last_buf_full[indx] = msg->saved_busy_time;
}
if (bf->c1) {
......@@ -4430,6 +4436,7 @@ static void router_buf_update(router_state *s, tw_bf *bf, terminal_plus_message
msg->saved_sample_time = s->busy_time_sample[indx];
s->busy_time[indx] += (tw_now(lp) - s->last_buf_full[indx]);
s->busy_time_sample[indx] += (tw_now(lp) - s->last_buf_full[indx]);
s->ross_rsample.busy_time[indx] += (tw_now(lp) - s->last_buf_full[indx]);
s->last_buf_full[indx] = 0.0;
}
if (s->queued_msgs[indx][output_chan] != NULL) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment