Commit fe2d271e authored by Misbah Mubarak's avatar Misbah Mubarak

More updates to network models, recording router busy times in stats, using RC...

More updates to network models, recording router busy times in stats, using RC stack for reverse computation instead of copying back data to message
parents b5b0bfa9 44c1aa9a
......@@ -48,7 +48,7 @@ struct terminal_message
short new_vc;
short saved_vc;
/* last hop of the message, can be a terminal, local router or global router */
short last_hop;
int last_hop;
/* For routing */
int intm_group_id;
int chunk_id;
......@@ -61,7 +61,7 @@ struct terminal_message
int local_event_size_bytes;
// For buffer message
short vc_index;
int vc_index;
int sender_radix;
int output_chan;
model_net_event_return event_rc;
......@@ -69,15 +69,17 @@ struct terminal_message
uint64_t pull_size;
/* for reverse computation */
short path_type;
int path_type;
tw_stime saved_available_time;
tw_stime saved_avg_time;
tw_stime saved_start_time;
tw_stime saved_collective_init_time;
tw_stime saved_rcv_time;
tw_stime saved_busy_time;
tw_stime saved_total_time;
tw_stime saved_hist_start_time;
tw_stime msg_start_time;
short saved_completed_chunks;
int saved_hist_num;
int saved_occupancy;
......
......@@ -18,12 +18,12 @@ PARAMS
# modelnet_scheduler="round-robin";
num_vcs="1";
num_routers="4";
local_vc_size="32768";
global_vc_size="65536";
cn_vc_size="32768";
local_vc_size="8192";
global_vc_size="16384";
cn_vc_size="8192";
local_bandwidth="5.25";
global_bandwidth="4.7";
cn_bandwidth="5.25";
message_size="536";
routing="adaptive";
message_size="552";
routing="minimal";
}
......@@ -2,7 +2,7 @@ LPGROUPS
{
MODELNET_GRP
{
repetitions="8";
repetitions="27";
modelnet_simplenet="1";
nw-lp="1";
}
......
......@@ -14,7 +14,8 @@
#include "codes/rc-stack.h"
#include "codes/quicklist.h"
#define TRACK -1
/* turning on track lp will generate a lot of output messages */
#define TRACK_LP -1
#define TRACE -1
#define MAX_WAIT_REQS 512
......@@ -179,6 +180,7 @@ struct nw_message
double saved_wait_time;
double saved_delay;
int saved_num_bytes;
struct codes_workload_op * saved_op;
} rc;
};
......@@ -270,8 +272,6 @@ static int clear_completed_reqs(nw_state * s,
}
}
}
if(lp->gid == TRACK)
printf("\n Pushed num %d ", matched);
return matched;
}
static void add_completed_reqs(nw_state * s,
......@@ -484,7 +484,7 @@ static int rm_matching_rcv(nw_state * ns,
qlist_for_each(ent, &ns->pending_recvs_queue){
qi = qlist_entry(ent, mpi_msgs_queue, ql);
if((qi->num_bytes >= qitem->num_bytes)
if((qi->num_bytes == qitem->num_bytes)
&& ((qi->tag == qitem->tag) || qi->tag == -1)
&& ((qi->source_rank == qitem->source_rank) || qi->source_rank == -1))
{
......@@ -504,9 +504,6 @@ static int rm_matching_rcv(nw_state * ns,
qlist_del(&qi->ql);
if(lp->gid == TRACK)
printf("\n matched recv req id %ld ", qi->req_id);
rc_stack_push(lp, qi, free, ns->processed_ops);
return index;
}
......@@ -525,7 +522,7 @@ static int rm_matching_send(nw_state * ns,
int index = 0;
qlist_for_each(ent, &ns->arrival_queue){
qi = qlist_entry(ent, mpi_msgs_queue, ql);
if((qi->num_bytes <= qitem->num_bytes)
if((qi->num_bytes == qitem->num_bytes)
&& (qi->tag == qitem->tag || qitem->tag == -1)
&& ((qi->source_rank == qitem->source_rank) || qitem->source_rank == -1))
{
......@@ -545,8 +542,6 @@ static int rm_matching_send(nw_state * ns,
qlist_del(&qi->ql);
if(lp->gid == TRACK)
printf("\n matched matching send recv req id %ld ", qitem->req_id);
return index;
}
return -1;
......@@ -580,13 +575,8 @@ static void codes_exec_comp_delay(
tw_stime ts;
nw_message* msg;
if (disable_delay) {
ts = 0.0; // no compute time sim
}
else {
s->compute_time += s_to_ns(mpi_op->u.delay.seconds);
ts = s_to_ns(mpi_op->u.delay.seconds);
}
s->compute_time += s_to_ns(mpi_op->u.delay.seconds);
ts = s_to_ns(mpi_op->u.delay.seconds);
ts += g_tw_lookahead + 0.1 + tw_rand_exponential(lp->rng, noise);
......@@ -605,7 +595,9 @@ static void codes_exec_mpi_recv_rc(
nw_message* m,
tw_lp* lp)
{
num_bytes_recvd -= m->rc.saved_num_bytes;
struct codes_workload_op * mpi_op = m->rc.saved_op;
num_bytes_recvd -= mpi_op->u.recv.num_bytes;
ns->recv_time = m->rc.saved_recv_time;
if(m->fwd.found_match >= 0)
{
......@@ -666,7 +658,6 @@ static void codes_exec_mpi_recv(
receive operations. */
m->rc.saved_recv_time = s->recv_time;
m->rc.saved_num_bytes = mpi_op->u.recv.num_bytes;
num_bytes_recvd += mpi_op->u.recv.num_bytes;
mpi_msgs_queue * recv_op = (mpi_msgs_queue*) malloc(sizeof(mpi_msgs_queue));
......@@ -678,6 +669,10 @@ static void codes_exec_mpi_recv(
recv_op->tag = mpi_op->u.recv.tag;
recv_op->req_id = mpi_op->u.recv.req_id;
if(s->nw_id == TRACK_LP)
printf("\n Receive op posted num bytes %d source %d ", recv_op->num_bytes,
recv_op->source_rank);
int found_matching_sends = rm_matching_send(s, bf, m, lp, recv_op);
/* save the req id inserted in the completed queue for reverse computation. */
......@@ -765,8 +760,6 @@ static tw_stime s_to_ns(tw_stime ns)
static void update_completed_queue_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
if(lp->gid == TRACK)
printf("\n Reverse computation!!!! ");
if(bf->c0)
{
struct qlist_head * ent = qlist_pop_back(&s->completed_reqs);
......@@ -903,6 +896,10 @@ static void update_arrival_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp
arrived_op->num_bytes = m->fwd.num_bytes;
arrived_op->tag = m->fwd.tag;
if(s->nw_id == TRACK_LP)
printf("\n Send op arrived source rank %ld num bytes %d ", arrived_op->source_rank,
arrived_op->num_bytes);
int found_matching_recv = rm_matching_rcv(s, bf, m, lp, arrived_op);
if(found_matching_recv < 0)
......@@ -985,8 +982,8 @@ void nw_test_init(nw_state* s, tw_lp* lp)
void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
*(int *)bf = (int)0;
//rc_stack_gc(lp, s->matched_reqs);
//rc_stack_gc(lp, s->processed_ops);
rc_stack_gc(lp, s->matched_reqs);
rc_stack_gc(lp, s->processed_ops);
switch(m->msg_type)
{
......@@ -1017,7 +1014,8 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
codes_workload_get_next_rc2(wrkld_id, 0, (int)s->nw_id);
struct codes_workload_op * mpi_op = m->rc.saved_op;
codes_workload_get_next_rc(wrkld_id, 0, (int)s->nw_id, mpi_op);
if(m->op_type == CODES_WK_END)
{
......@@ -1028,13 +1026,12 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
case CODES_WK_SEND:
case CODES_WK_ISEND:
{
int saved_num_bytes = m->rc.saved_num_bytes;
model_net_event_rc(net_id, lp, saved_num_bytes);
model_net_event_rc(net_id, lp, m->rc.saved_num_bytes);
if(m->op_type == CODES_WK_ISEND)
codes_issue_next_event_rc(lp);
s->num_sends--;
s->num_bytes_sent += saved_num_bytes;
num_bytes_sent -= saved_num_bytes;
s->num_bytes_sent += m->rc.saved_num_bytes;
num_bytes_sent -= m->rc.saved_num_bytes;
}
break;
......@@ -1051,10 +1048,6 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
{
s->num_delays--;
tw_rand_reverse_unif(lp->rng);
if (!disable_delay) {
s->compute_time -= s_to_ns(m->rc.saved_delay);
}
}
break;
case CODES_WK_BCAST:
......@@ -1098,23 +1091,24 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
struct codes_workload_op mpi_op;
codes_workload_get_next(wrkld_id, 0, (int)s->nw_id, &mpi_op);
struct codes_workload_op * mpi_op = malloc(sizeof(struct codes_workload_op));
codes_workload_get_next(wrkld_id, 0, (int)s->nw_id, mpi_op);
m->op_type = mpi_op.op_type;
m->op_type = mpi_op->op_type;
m->rc.saved_op = mpi_op;
if(mpi_op.op_type == CODES_WK_END)
if(mpi_op->op_type == CODES_WK_END)
{
s->elapsed_time = tw_now(lp) - s->start_time;
return;
}
switch(mpi_op.op_type)
switch(mpi_op->op_type)
{
case CODES_WK_SEND:
case CODES_WK_ISEND:
{
s->num_sends++;
codes_exec_mpi_send(s, bf, m, lp, &mpi_op);
codes_exec_mpi_send(s, bf, m, lp, mpi_op);
}
break;
......@@ -1122,14 +1116,18 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
case CODES_WK_IRECV:
{
s->num_recvs++;
codes_exec_mpi_recv(s, bf, m, lp, &mpi_op);
codes_exec_mpi_recv(s, bf, m, lp, mpi_op);
}
break;
case CODES_WK_DELAY:
{
s->num_delays++;
codes_exec_comp_delay(s, lp, &mpi_op);
if(disable_delay)
codes_issue_next_event(lp);
else
codes_exec_comp_delay(s, lp, mpi_op);
}
break;
......@@ -1144,13 +1142,13 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
case CODES_WK_WAITALL:
{
s->num_waitall++;
codes_exec_mpi_wait_all(s, bf, m, lp, &mpi_op);
codes_exec_mpi_wait_all(s, bf, m, lp, mpi_op);
}
break;
case CODES_WK_WAIT:
{
s->num_wait++;
codes_exec_mpi_wait(s, lp, &mpi_op);
codes_exec_mpi_wait(s, lp, mpi_op);
}
break;
case CODES_WK_BCAST:
......@@ -1167,7 +1165,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
}
break;
default:
printf("\n Invalid op type %d ", mpi_op.op_type);
printf("\n Invalid op type %d ", mpi_op->op_type);
}
return;
}
......
......@@ -281,7 +281,7 @@ static void printQueue(tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue, char* msg
while(tmp)
{
if(tmp->mpi_op->op_type == CODES_WK_SEND || tmp->mpi_op->op_type == CODES_WK_ISEND)
printf("\n lpid %llu send operation count %d tag %d source %d",
printf("\n lpid %llu send operation num bytes %d tag %d source %d",
lpid, tmp->mpi_op->u.send.num_bytes,
tmp->mpi_op->u.send.tag, tmp->mpi_op->u.send.source_rank);
else if(tmp->mpi_op->op_type == CODES_WK_IRECV || tmp->mpi_op->op_type == CODES_WK_RECV)
......@@ -667,7 +667,7 @@ static int match_receive(
assert(op1->op_type == CODES_WK_IRECV || op1->op_type == CODES_WK_RECV);
assert(op2->op_type == CODES_WK_SEND || op2->op_type == CODES_WK_ISEND);
if((op1->u.recv.num_bytes >= op2->u.send.num_bytes) &&
if((op1->u.recv.num_bytes == op2->u.send.num_bytes) &&
((op1->u.recv.tag == op2->u.send.tag) || op1->u.recv.tag == -1) &&
((op1->u.recv.source_rank == op2->u.send.source_rank) || op1->u.recv.source_rank == -1))
{
......@@ -1215,7 +1215,7 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
{
s->num_waitall--;
codes_exec_mpi_wait_all_rc(s, m, lp, mpi_op);
}
}
break;
case CODES_WK_WAITSOME:
case CODES_WK_WAITANY:
......@@ -1305,7 +1305,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
{
s->num_waitall++;
codes_exec_mpi_wait_all(s, lp, m, mpi_op);
}
}
break;
default:
printf("\n Invalid op type %d ", mpi_op->op_type);
......@@ -1323,11 +1323,11 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
{
printf("\n LP %llu unmatched irecvs %d unmatched sends %d Total sends %ld receives %ld collectives %ld delays %ld wait alls %ld waits %ld send time %lf wait %lf",
lp->gid, s->pending_recvs_queue->num_elems, s->arrival_queue->num_elems, s->num_sends, s->num_recvs, s->num_cols, s->num_delays, s->num_waitall, s->num_wait, s->send_time, s->wait_time);
//if(lp->gid == TRACE)
//{
if(lp->gid == TRACE)
{
printQueue(lp->gid, s->pending_recvs_queue, "irecv ");
printQueue(lp->gid, s->arrival_queue, "isend");
//}
printQueue(lp->gid, s->arrival_queue, "isend");
}
written += sprintf(s->output_buf + written, "\n %lu %lu %ld %ld %ld %ld %lf %lf %lf", lp->gid, s->nw_id, s->num_sends, s->num_recvs, s->num_bytes_sent,
s->num_bytes_recvd, s->send_time, s->elapsed_time - s->compute_time, s->compute_time);
......
This diff is collapsed.
......@@ -23,6 +23,6 @@ PARAMS
local_bandwidth="5.25";
global_bandwidth="4.7";
cn_bandwidth="5.25";
message_size="352";
message_size="360";
routing="nonminimal";
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment