diff --git a/codes/net/dragonfly.h b/codes/net/dragonfly.h index d6c93aa9a6f15e3e7945dced334bba32d8947db6..742e8902d6bdddbcc3124367114ec9976cccceb3 100644 --- a/codes/net/dragonfly.h +++ b/codes/net/dragonfly.h @@ -48,7 +48,7 @@ struct terminal_message short new_vc; short saved_vc; /* last hop of the message, can be a terminal, local router or global router */ - short last_hop; + int last_hop; /* For routing */ int intm_group_id; int chunk_id; @@ -61,7 +61,7 @@ struct terminal_message int local_event_size_bytes; // For buffer message - short vc_index; + int vc_index; int sender_radix; int output_chan; model_net_event_return event_rc; @@ -69,15 +69,17 @@ struct terminal_message uint64_t pull_size; /* for reverse computation */ - short path_type; + int path_type; tw_stime saved_available_time; + tw_stime saved_avg_time; - tw_stime saved_start_time; - tw_stime saved_collective_init_time; + tw_stime saved_rcv_time; + + tw_stime saved_busy_time; + tw_stime saved_total_time; tw_stime saved_hist_start_time; tw_stime msg_start_time; - short saved_completed_chunks; int saved_hist_num; int saved_occupancy; diff --git a/src/models/network-workloads/conf/modelnet-mpi-test-dragonfly.conf b/src/models/network-workloads/conf/modelnet-mpi-test-dragonfly.conf index 92e5efd8b371e2c75234bcc02b52897d7c6f933d..27102c5c03498f0d7a2ce4036aef03a4bb4cab7e 100644 --- a/src/models/network-workloads/conf/modelnet-mpi-test-dragonfly.conf +++ b/src/models/network-workloads/conf/modelnet-mpi-test-dragonfly.conf @@ -18,12 +18,12 @@ PARAMS # modelnet_scheduler="round-robin"; num_vcs="1"; num_routers="4"; - local_vc_size="32768"; - global_vc_size="65536"; - cn_vc_size="32768"; + local_vc_size="8192"; + global_vc_size="16384"; + cn_vc_size="8192"; local_bandwidth="5.25"; global_bandwidth="4.7"; cn_bandwidth="5.25"; - message_size="536"; - routing="adaptive"; + message_size="552"; + routing="minimal"; } diff --git a/src/models/network-workloads/conf/modelnet-mpi-test.conf b/src/models/network-workloads/conf/modelnet-mpi-test.conf index 351b5bd5cb16ae32b57a5639aaad0527fc4668b0..35bda538d22208ef73196012ecd161087dfc1160 100644 --- a/src/models/network-workloads/conf/modelnet-mpi-test.conf +++ b/src/models/network-workloads/conf/modelnet-mpi-test.conf @@ -2,7 +2,7 @@ LPGROUPS { MODELNET_GRP { - repetitions="8"; + repetitions="27"; modelnet_simplenet="1"; nw-lp="1"; } diff --git a/src/models/network-workloads/model-net-mpi-replay.c b/src/models/network-workloads/model-net-mpi-replay.c index f6a9a1a54ff4aea6d7d1d9bc67775fa90d61f5f0..6fb3565cfdf40357f39291f261edbbdc995dca61 100644 --- a/src/models/network-workloads/model-net-mpi-replay.c +++ b/src/models/network-workloads/model-net-mpi-replay.c @@ -14,7 +14,8 @@ #include "codes/rc-stack.h" #include "codes/quicklist.h" -#define TRACK -1 +/* turning on track lp will generate a lot of output messages */ +#define TRACK_LP -1 #define TRACE -1 #define MAX_WAIT_REQS 512 @@ -179,6 +180,7 @@ struct nw_message double saved_wait_time; double saved_delay; int saved_num_bytes; + struct codes_workload_op * saved_op; } rc; }; @@ -270,8 +272,6 @@ static int clear_completed_reqs(nw_state * s, } } } - if(lp->gid == TRACK) - printf("\n Pushed num %d ", matched); return matched; } static void add_completed_reqs(nw_state * s, @@ -484,7 +484,7 @@ static int rm_matching_rcv(nw_state * ns, qlist_for_each(ent, &ns->pending_recvs_queue){ qi = qlist_entry(ent, mpi_msgs_queue, ql); - if((qi->num_bytes >= qitem->num_bytes) + if((qi->num_bytes == qitem->num_bytes) && ((qi->tag == qitem->tag) || qi->tag == -1) && ((qi->source_rank == qitem->source_rank) || qi->source_rank == -1)) { @@ -504,9 +504,6 @@ static int rm_matching_rcv(nw_state * ns, qlist_del(&qi->ql); - if(lp->gid == TRACK) - printf("\n matched recv req id %ld ", qi->req_id); - rc_stack_push(lp, qi, free, ns->processed_ops); return index; } @@ -525,7 +522,7 @@ static int rm_matching_send(nw_state * ns, int index = 0; qlist_for_each(ent, &ns->arrival_queue){ qi = qlist_entry(ent, mpi_msgs_queue, ql); - if((qi->num_bytes <= qitem->num_bytes) + if((qi->num_bytes == qitem->num_bytes) && (qi->tag == qitem->tag || qitem->tag == -1) && ((qi->source_rank == qitem->source_rank) || qitem->source_rank == -1)) { @@ -545,8 +542,6 @@ static int rm_matching_send(nw_state * ns, qlist_del(&qi->ql); - if(lp->gid == TRACK) - printf("\n matched matching send recv req id %ld ", qitem->req_id); return index; } return -1; @@ -580,13 +575,8 @@ static void codes_exec_comp_delay( tw_stime ts; nw_message* msg; - if (disable_delay) { - ts = 0.0; // no compute time sim - } - else { - s->compute_time += s_to_ns(mpi_op->u.delay.seconds); - ts = s_to_ns(mpi_op->u.delay.seconds); - } + s->compute_time += s_to_ns(mpi_op->u.delay.seconds); + ts = s_to_ns(mpi_op->u.delay.seconds); ts += g_tw_lookahead + 0.1 + tw_rand_exponential(lp->rng, noise); @@ -605,7 +595,9 @@ static void codes_exec_mpi_recv_rc( nw_message* m, tw_lp* lp) { - num_bytes_recvd -= m->rc.saved_num_bytes; + struct codes_workload_op * mpi_op = m->rc.saved_op; + + num_bytes_recvd -= mpi_op->u.recv.num_bytes; ns->recv_time = m->rc.saved_recv_time; if(m->fwd.found_match >= 0) { @@ -666,7 +658,6 @@ static void codes_exec_mpi_recv( receive operations. */ m->rc.saved_recv_time = s->recv_time; - m->rc.saved_num_bytes = mpi_op->u.recv.num_bytes; num_bytes_recvd += mpi_op->u.recv.num_bytes; mpi_msgs_queue * recv_op = (mpi_msgs_queue*) malloc(sizeof(mpi_msgs_queue)); @@ -678,6 +669,10 @@ static void codes_exec_mpi_recv( recv_op->tag = mpi_op->u.recv.tag; recv_op->req_id = mpi_op->u.recv.req_id; + if(s->nw_id == TRACK_LP) + printf("\n Receive op posted num bytes %d source %d ", recv_op->num_bytes, + recv_op->source_rank); + int found_matching_sends = rm_matching_send(s, bf, m, lp, recv_op); /* save the req id inserted in the completed queue for reverse computation. */ @@ -765,8 +760,6 @@ static tw_stime s_to_ns(tw_stime ns) static void update_completed_queue_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp) { - if(lp->gid == TRACK) - printf("\n Reverse computation!!!! "); if(bf->c0) { struct qlist_head * ent = qlist_pop_back(&s->completed_reqs); @@ -903,6 +896,10 @@ static void update_arrival_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp arrived_op->num_bytes = m->fwd.num_bytes; arrived_op->tag = m->fwd.tag; + if(s->nw_id == TRACK_LP) + printf("\n Send op arrived source rank %ld num bytes %d ", arrived_op->source_rank, + arrived_op->num_bytes); + int found_matching_recv = rm_matching_rcv(s, bf, m, lp, arrived_op); if(found_matching_recv < 0) @@ -985,8 +982,8 @@ void nw_test_init(nw_state* s, tw_lp* lp) void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp) { *(int *)bf = (int)0; - //rc_stack_gc(lp, s->matched_reqs); - //rc_stack_gc(lp, s->processed_ops); + rc_stack_gc(lp, s->matched_reqs); + rc_stack_gc(lp, s->processed_ops); switch(m->msg_type) { @@ -1017,7 +1014,8 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp) static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp) { - codes_workload_get_next_rc2(wrkld_id, 0, (int)s->nw_id); + struct codes_workload_op * mpi_op = m->rc.saved_op; + codes_workload_get_next_rc(wrkld_id, 0, (int)s->nw_id, mpi_op); if(m->op_type == CODES_WK_END) { @@ -1028,13 +1026,12 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t case CODES_WK_SEND: case CODES_WK_ISEND: { - int saved_num_bytes = m->rc.saved_num_bytes; - model_net_event_rc(net_id, lp, saved_num_bytes); + model_net_event_rc(net_id, lp, m->rc.saved_num_bytes); if(m->op_type == CODES_WK_ISEND) codes_issue_next_event_rc(lp); s->num_sends--; - s->num_bytes_sent += saved_num_bytes; - num_bytes_sent -= saved_num_bytes; + s->num_bytes_sent += m->rc.saved_num_bytes; + num_bytes_sent -= m->rc.saved_num_bytes; } break; @@ -1051,10 +1048,6 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t { s->num_delays--; tw_rand_reverse_unif(lp->rng); - - if (!disable_delay) { - s->compute_time -= s_to_ns(m->rc.saved_delay); - } } break; case CODES_WK_BCAST: @@ -1098,23 +1091,24 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp) { - struct codes_workload_op mpi_op; - codes_workload_get_next(wrkld_id, 0, (int)s->nw_id, &mpi_op); + struct codes_workload_op * mpi_op = malloc(sizeof(struct codes_workload_op)); + codes_workload_get_next(wrkld_id, 0, (int)s->nw_id, mpi_op); - m->op_type = mpi_op.op_type; + m->op_type = mpi_op->op_type; + m->rc.saved_op = mpi_op; - if(mpi_op.op_type == CODES_WK_END) + if(mpi_op->op_type == CODES_WK_END) { s->elapsed_time = tw_now(lp) - s->start_time; return; } - switch(mpi_op.op_type) + switch(mpi_op->op_type) { case CODES_WK_SEND: case CODES_WK_ISEND: { s->num_sends++; - codes_exec_mpi_send(s, bf, m, lp, &mpi_op); + codes_exec_mpi_send(s, bf, m, lp, mpi_op); } break; @@ -1122,14 +1116,18 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l case CODES_WK_IRECV: { s->num_recvs++; - codes_exec_mpi_recv(s, bf, m, lp, &mpi_op); + codes_exec_mpi_recv(s, bf, m, lp, mpi_op); } break; case CODES_WK_DELAY: { + s->num_delays++; - codes_exec_comp_delay(s, lp, &mpi_op); + if(disable_delay) + codes_issue_next_event(lp); + else + codes_exec_comp_delay(s, lp, mpi_op); } break; @@ -1144,13 +1142,13 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l case CODES_WK_WAITALL: { s->num_waitall++; - codes_exec_mpi_wait_all(s, bf, m, lp, &mpi_op); + codes_exec_mpi_wait_all(s, bf, m, lp, mpi_op); } break; case CODES_WK_WAIT: { s->num_wait++; - codes_exec_mpi_wait(s, lp, &mpi_op); + codes_exec_mpi_wait(s, lp, mpi_op); } break; case CODES_WK_BCAST: @@ -1167,7 +1165,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l } break; default: - printf("\n Invalid op type %d ", mpi_op.op_type); + printf("\n Invalid op type %d ", mpi_op->op_type); } return; } diff --git a/src/models/network-workloads/model-net-mpi-wrklds.c b/src/models/network-workloads/model-net-mpi-wrklds.c index b660063889bc8df8764d4e2be9d7b2bbb64f1b0e..02dd98ea569953ac62419c4d06c19c696eb90f86 100644 --- a/src/models/network-workloads/model-net-mpi-wrklds.c +++ b/src/models/network-workloads/model-net-mpi-wrklds.c @@ -281,7 +281,7 @@ static void printQueue(tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue, char* msg while(tmp) { if(tmp->mpi_op->op_type == CODES_WK_SEND || tmp->mpi_op->op_type == CODES_WK_ISEND) - printf("\n lpid %llu send operation count %d tag %d source %d", + printf("\n lpid %llu send operation num bytes %d tag %d source %d", lpid, tmp->mpi_op->u.send.num_bytes, tmp->mpi_op->u.send.tag, tmp->mpi_op->u.send.source_rank); else if(tmp->mpi_op->op_type == CODES_WK_IRECV || tmp->mpi_op->op_type == CODES_WK_RECV) @@ -667,7 +667,7 @@ static int match_receive( assert(op1->op_type == CODES_WK_IRECV || op1->op_type == CODES_WK_RECV); assert(op2->op_type == CODES_WK_SEND || op2->op_type == CODES_WK_ISEND); - if((op1->u.recv.num_bytes >= op2->u.send.num_bytes) && + if((op1->u.recv.num_bytes == op2->u.send.num_bytes) && ((op1->u.recv.tag == op2->u.send.tag) || op1->u.recv.tag == -1) && ((op1->u.recv.source_rank == op2->u.send.source_rank) || op1->u.recv.source_rank == -1)) { @@ -1215,7 +1215,7 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t { s->num_waitall--; codes_exec_mpi_wait_all_rc(s, m, lp, mpi_op); - } + } break; case CODES_WK_WAITSOME: case CODES_WK_WAITANY: @@ -1305,7 +1305,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l { s->num_waitall++; codes_exec_mpi_wait_all(s, lp, m, mpi_op); - } + } break; default: printf("\n Invalid op type %d ", mpi_op->op_type); @@ -1323,11 +1323,11 @@ void nw_test_finalize(nw_state* s, tw_lp* lp) { printf("\n LP %llu unmatched irecvs %d unmatched sends %d Total sends %ld receives %ld collectives %ld delays %ld wait alls %ld waits %ld send time %lf wait %lf", lp->gid, s->pending_recvs_queue->num_elems, s->arrival_queue->num_elems, s->num_sends, s->num_recvs, s->num_cols, s->num_delays, s->num_waitall, s->num_wait, s->send_time, s->wait_time); - //if(lp->gid == TRACE) - //{ + if(lp->gid == TRACE) + { printQueue(lp->gid, s->pending_recvs_queue, "irecv "); - printQueue(lp->gid, s->arrival_queue, "isend"); - //} + printQueue(lp->gid, s->arrival_queue, "isend"); + } written += sprintf(s->output_buf + written, "\n %lu %lu %ld %ld %ld %ld %lf %lf %lf", lp->gid, s->nw_id, s->num_sends, s->num_recvs, s->num_bytes_sent, s->num_bytes_recvd, s->send_time, s->elapsed_time - s->compute_time, s->compute_time); diff --git a/src/models/networks/model-net/dragonfly.c b/src/models/networks/model-net/dragonfly.c index 7640aaee32d3b2630299b267a83da32c96b077c4..fab394683115c1502e49ae55ce43858ceb03cdc6 100644 --- a/src/models/networks/model-net/dragonfly.c +++ b/src/models/networks/model-net/dragonfly.c @@ -32,10 +32,11 @@ #define COLLECTIVE_COMPUTATION_DELAY 5700 #define DRAGONFLY_FAN_OUT_DELAY 20.0 #define WINDOW_LENGTH 0 -#define DFLY_HASH_TABLE_SIZE 65536 +#define DFLY_HASH_TABLE_SIZE 262144 // debugging parameters -#define TRACK -1 +#define TRACK 2 +#define TRACK_PKT 45543 #define TRACK_MSG -1 #define PRINT_ROUTER_TABLE 1 #define DEBUG 0 @@ -45,6 +46,7 @@ #define LP_METHOD_NM (model_net_method_names[DRAGONFLY]) long term_ecount, router_ecount, term_rev_ecount, router_rev_ecount; +long packet_gen = 0, packet_fin = 0; static double maxd(double a, double b) { return a < b ? b : a; } @@ -68,6 +70,8 @@ int router_magic_num = 0; /* terminal magic number */ int terminal_magic_num = 0; +FILE * dragonfly_log = NULL; + typedef struct terminal_message_list terminal_message_list; struct terminal_message_list { terminal_message msg; @@ -112,6 +116,7 @@ struct dragonfly_param double local_delay; double global_delay; double credit_delay; + double router_delay; }; struct dfly_hash_key @@ -139,6 +144,9 @@ struct terminal_state { uint64_t packet_counter; + int packet_gen; + int packet_fin; + // Dragonfly specific parameters unsigned int router_id; unsigned int terminal_id; @@ -191,12 +199,15 @@ struct terminal_state tw_stime total_time; long total_msg_size; - long total_hops; + double total_hops; long finished_msgs; - long finished_chunks; + double finished_chunks; long finished_packets; - char output_buf[512]; + tw_stime last_buf_full; + tw_stime busy_time; + + char output_buf[1024]; }; /* terminal event type (1-4) */ @@ -250,20 +261,27 @@ struct router_state tw_stime* next_output_available_time; tw_stime* cur_hist_start_time; + tw_stime* last_buf_full; + tw_stime* busy_time; + terminal_message_list ***pending_msgs; terminal_message_list ***pending_msgs_tail; terminal_message_list ***queued_msgs; terminal_message_list ***queued_msgs_tail; int *in_send_loop; + int *queued_count; + struct rc_stack * st; int** vc_occupancy; - int* link_traffic; + int64_t* link_traffic; const char * anno; const dragonfly_param *params; int* prev_hist_num; int* cur_hist_num; + + char output_buf[1024]; }; static short routing = MINIMAL; @@ -296,11 +314,9 @@ static int dragonfly_rank_hash_compare( static int dragonfly_hash_func(void *k, int table_size) { struct dfly_hash_key *tmp = (struct dfly_hash_key *)k; - uint64_t key = (~tmp->message_id) + (tmp->message_id << 18); - key = key * 21; - key = ~key ^ (tmp->sender_id >> 4); - key = key * tmp->sender_id; - return (int)(key % table_size); + uint32_t pc = 0, pb = 0; + bj_hashlittle2(tmp, sizeof(*tmp), &pc, &pb); + return (int)(pc % (table_size - 1)); } /* convert GiB/s and bytes to ns */ @@ -324,6 +340,11 @@ static int dragonfly_get_msg_sz(void) return sizeof(terminal_message); } +static void free_nothing(void* ptr) +{ + + /* Do nothing */ +} static void free_tmp(void * ptr) { struct dfly_qhash_entry * dfly = ptr; @@ -362,7 +383,7 @@ static void create_prepend_to_terminal_message_list( terminal_message_list ** thisq, terminal_message_list ** thistail, int index, - terminal_message *msg) { + terminal_message * msg) { terminal_message_list* new_entry = (terminal_message_list*)malloc( sizeof(terminal_message_list)); init_terminal_message_list(new_entry, msg); @@ -397,6 +418,7 @@ static terminal_message_list* return_tail( terminal_message_list ** thistail, int index) { terminal_message_list *tail = thistail[index]; + assert(tail); if(tail->prev != NULL) { tail->prev->next = NULL; thistail[index] = tail->prev; @@ -415,7 +437,6 @@ static void copy_terminal_list_entry( terminal_message_list *cur_entry, msg->packet_ID = cur_msg->packet_ID; strcpy(msg->category, cur_msg->category); msg->final_dest_gid = cur_msg->final_dest_gid; - msg->msg_start_time = msg->msg_start_time; msg->sender_lp = cur_msg->sender_lp; msg->dest_terminal_id = cur_msg->dest_terminal_id; msg->src_terminal_id = cur_msg->src_terminal_id; @@ -507,6 +528,10 @@ static void dragonfly_read_config(const char * anno, dragonfly_param *params){ fprintf(stderr, "Bandwidth of compute node channels not specified, setting to %lf\n", p->cn_bandwidth); } + p->router_delay = 50; + configuration_get_value_double(&config, "PARAMS", "router_delay", anno, + &p->router_delay); + char routing_str[MAX_NAME_LENGTH]; configuration_get_value(&config, "PARAMS", "routing", anno, routing_str, MAX_NAME_LENGTH); @@ -570,6 +595,7 @@ static void dragonfly_report_stats() long long total_finished_msgs, final_msg_sz; tw_stime avg_time, max_time; int total_minimal_packets, total_nonmin_packets; + long total_gen, total_fin; MPI_Reduce( &total_hops, &avg_hops, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD); MPI_Reduce( &N_finished_packets, &total_finished_packets, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD); @@ -578,6 +604,9 @@ static void dragonfly_report_stats() MPI_Reduce( &total_msg_sz, &final_msg_sz, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD); MPI_Reduce( &dragonfly_total_time, &avg_time, 1,MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD); MPI_Reduce( &dragonfly_max_latency, &max_time, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD); + + MPI_Reduce( &packet_gen, &total_gen, 1, MPI_LONG, MPI_SUM, 0, MPI_COMM_WORLD); + MPI_Reduce( &packet_fin, &total_fin, 1, MPI_LONG, MPI_SUM, 0, MPI_COMM_WORLD); if(routing == ADAPTIVE || routing == PROG_ADAPTIVE) { MPI_Reduce(&minimal_count, &total_minimal_packets, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD); @@ -589,9 +618,10 @@ static void dragonfly_report_stats() { printf(" Average number of hops traversed %f average chunk latency %lf us maximum chunk latency %lf us avg message size %lf bytes finished messages %ld \n", (float)avg_hops/total_finished_chunks, avg_time/(total_finished_chunks*1000), max_time/1000, (float)final_msg_sz/total_finished_msgs, total_finished_msgs); if(routing == ADAPTIVE || routing == PROG_ADAPTIVE) - printf("\n ADAPTIVE ROUTING STATS: %d percent chunks routed minimally %d percent chunks routed non-minimally completed packets %lld ", total_minimal_packets, total_nonmin_packets, total_finished_chunks); + printf("\n ADAPTIVE ROUTING STATS: %d chunks routed minimally %d chunks routed non-minimally completed packets %lld ", total_minimal_packets, total_nonmin_packets, total_finished_chunks); - } + printf("\n Total packets generated %ld finished %ld ", total_gen, total_fin); + } return; } @@ -658,6 +688,9 @@ void terminal_init( terminal_state * s, tw_lp * lp ) { + s->packet_gen = 0; + s->packet_fin = 0; + uint32_t h1 = 0, h2 = 0; bj_hashlittle2(LP_METHOD_NM, strlen(LP_METHOD_NM), &h1, &h2); terminal_magic_num = h1 + h2; @@ -693,6 +726,9 @@ terminal_init( terminal_state * s, s->total_time = 0.0; s->total_msg_size = 0; + s->last_buf_full = 0.0; + s->busy_time = 0.0; + rc_stack_create(&s->st); s->num_vcs = 1; s->vc_occupancy = (int*)malloc(s->num_vcs * sizeof(int)); @@ -752,7 +788,7 @@ void router_setup(router_state * r, tw_lp * lp) r->global_channel = (int*)malloc(p->num_global_channels * sizeof(int)); r->next_output_available_time = (tw_stime*)malloc(p->radix * sizeof(tw_stime)); r->cur_hist_start_time = (tw_stime*)malloc(p->radix * sizeof(tw_stime)); - r->link_traffic = (int*)malloc(p->radix * sizeof(int)); + r->link_traffic = (int64_t*)malloc(p->radix * sizeof(int64_t)); r->cur_hist_num = (int*)malloc(p->radix * sizeof(int)); r->prev_hist_num = (int*)malloc(p->radix * sizeof(int)); @@ -766,16 +802,22 @@ void router_setup(router_state * r, tw_lp * lp) (terminal_message_list***)malloc(p->radix * sizeof(terminal_message_list**)); r->queued_msgs_tail = (terminal_message_list***)malloc(p->radix * sizeof(terminal_message_list**)); - + r->queued_count = (int*)malloc(p->radix * sizeof(int)); + r->last_buf_full = (tw_stime*)malloc(p->radix * sizeof(tw_stime)); + r->busy_time = (tw_stime*)malloc(p->radix * sizeof(tw_stime)); + + rc_stack_create(&r->st); for(int i=0; i < p->radix; i++) { // Set credit & router occupancy + r->last_buf_full[i] = 0.0; + r->busy_time[i] = 0.0; r->next_output_available_time[i]=0; r->cur_hist_start_time[i] = 0; - r->link_traffic[i]=0; + r->link_traffic[i]=0; r->cur_hist_num[i] = 0; r->prev_hist_num[i] = 0; - + r->queued_count[i] = 0; r->in_send_loop[i] = 0; r->vc_occupancy[i] = (int*)malloc(p->num_vcs * sizeof(int)); r->pending_msgs[i] = (terminal_message_list**)malloc(p->num_vcs * @@ -1000,6 +1042,8 @@ void router_credit_send(router_state * s, tw_bf * bf, terminal_message * msg, void packet_generate_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp) { + s->packet_gen--; + packet_gen--; tw_rand_reverse_unif(lp->rng); int num_chunks = msg->packet_size/s->params->chunk_size; @@ -1021,6 +1065,7 @@ void packet_generate_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, } if(bf->c11) { s->issueIdle = 0; + s->last_buf_full = msg->saved_busy_time; } struct mn_stats* stat; stat = model_net_find_stats(msg->category, s->dragonfly_stats_array); @@ -1032,7 +1077,10 @@ void packet_generate_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, /* generates packet at the current dragonfly compute node */ void packet_generate(terminal_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp) { - + + s->packet_gen++; + + packet_gen++; tw_stime ts, nic_ts; assert(lp->gid != msg->dest_terminal_id); @@ -1054,14 +1102,20 @@ void packet_generate(terminal_state * s, tw_bf * bf, terminal_message * msg, msg->my_g_hop = 0; msg->intm_group_id = -1; - if(msg->packet_ID == TRACK && msg->message_id == TRACK_MSG) - printf("\n Packet generated at terminal %lu destination %d ", lp->gid, s->router_id); + + //if(msg->dest_terminal_id == TRACK) + if(msg->packet_ID == TRACK_PKT) + printf("\n Packet %ld generated at terminal %d dest %ld size %d num chunks %d ", + msg->packet_ID, s->terminal_id, msg->dest_terminal_id, + msg->packet_size, num_chunks); for(i = 0; i < num_chunks; i++) { terminal_message_list *cur_chunk = (terminal_message_list*)malloc( sizeof(terminal_message_list)); + msg->origin_router_id = s->router_id; init_terminal_message_list(cur_chunk, msg); + if(msg->remote_event_size_bytes + msg->local_event_size_bytes > 0) { cur_chunk->event_data = (char*)malloc( @@ -1079,6 +1133,7 @@ void packet_generate(terminal_state * s, tw_bf * bf, terminal_message * msg, } cur_chunk->msg.chunk_id = i; + cur_chunk->msg.origin_router_id = s->router_id; append_to_terminal_message_list(s->terminal_msgs, s->terminal_msgs_tail, 0, cur_chunk); s->terminal_length += s->params->chunk_size; @@ -1089,6 +1144,8 @@ void packet_generate(terminal_state * s, tw_bf * bf, terminal_message * msg, } else { bf->c11 = 1; s->issueIdle = 1; + msg->saved_busy_time = s->last_buf_full; + s->last_buf_full = tw_now(lp); } if(s->in_send_loop == 0) { @@ -1121,6 +1178,8 @@ void packet_send_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, { if(bf->c1) { s->in_send_loop = 1; + s->last_buf_full = msg->saved_busy_time; + return; } @@ -1134,19 +1193,23 @@ void packet_send_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, s->packet_counter--; s->vc_occupancy[0] -= s->params->chunk_size; - create_prepend_to_terminal_message_list(s->terminal_msgs, - s->terminal_msgs_tail, 0, msg); + terminal_message_list* cur_entry = rc_stack_pop(s->st); + +// create_prepend_to_terminal_message_list(s->terminal_msgs, +// s->terminal_msgs_tail, 0, cur_entry); + prepend_to_terminal_message_list(s->terminal_msgs, + s->terminal_msgs_tail, 0, cur_entry); if(bf->c3) { tw_rand_reverse_unif(lp->rng); } if(bf->c4) { s->in_send_loop = 1; } - if(bf->c5) + /*if(bf->c5) { codes_local_latency_reverse(lp); s->issueIdle = 1; - } + }*/ return; } /* sends the packet from the current dragonfly compute node to the attached router */ @@ -1164,12 +1227,12 @@ void packet_send(terminal_state * s, tw_bf * bf, terminal_message * msg, || cur_entry == NULL) { bf->c1 = 1; s->in_send_loop = 0; - //printf("[%d] Skipping send %d %d\n", lp->gid, cur_entry == NULL, - // (s->vc_occupancy[0] + s->params->chunk_size > s->params->cn_vc_size)); + + msg->saved_busy_time = s->last_buf_full; + s->last_buf_full = tw_now(lp); return; } -// printf("\n Packet %ld sent at time %lf ", cur_entry->msg.packet_ID, tw_now(lp)); msg->saved_available_time = s->terminal_available_time; ts = g_tw_lookahead + s->params->cn_delay + tw_rand_unif(lp->rng); s->terminal_available_time = maxd(s->terminal_available_time, tw_now(lp)); @@ -1189,7 +1252,6 @@ void packet_send(terminal_state * s, tw_bf * bf, terminal_message * msg, m->remote_event_size_bytes); } - m->origin_router_id = s->router_id; m->type = R_ARRIVE; m->src_terminal_id = lp->gid; m->vc_index = 0; @@ -1222,12 +1284,15 @@ void packet_send(terminal_state * s, tw_bf * bf, terminal_message * msg, s->packet_counter++; s->vc_occupancy[0] += s->params->chunk_size; cur_entry = return_head(s->terminal_msgs, s->terminal_msgs_tail, 0); - copy_terminal_list_entry(cur_entry, msg); - delete_terminal_message_list(cur_entry); + rc_stack_push(lp, cur_entry, free, s->st); + //copy_terminal_list_entry(cur_entry, msg); + + //delete_terminal_message_list(cur_entry); s->terminal_length -= s->params->chunk_size; cur_entry = s->terminal_msgs[0]; - + + /* if there is another packet inline then schedule another send event */ if(cur_entry != NULL && s->vc_occupancy[0] + s->params->chunk_size <= s->params->cn_vc_size) { bf->c3 = 1; @@ -1239,19 +1304,23 @@ void packet_send(terminal_state * s, tw_bf * bf, terminal_message * msg, m->magic = terminal_magic_num; tw_event_send(e); } else { + /* If not then the LP will wait for another credit or packet generation */ bf->c4 = 1; s->in_send_loop = 0; } - if(s->issueIdle) { + /*if(s->issueIdle) { bf->c5 = 1; s->issueIdle = 0; model_net_method_idle_event(codes_local_latency(lp), 0, lp); - } + }*/ return; } void packet_arrive_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp) { + s->packet_fin--; + packet_fin--; + tw_rand_reverse_unif(lp->rng); if(msg->path_type == MINIMAL) minimal_count--; @@ -1263,32 +1332,9 @@ void packet_arrive_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw total_hops -= msg->my_N_hop; s->total_hops -= msg->my_N_hop; - dragonfly_total_time -= (tw_now(lp) - msg->travel_start_time); + dragonfly_total_time = msg->saved_total_time; s->total_time = msg->saved_avg_time; - /*if(msg->chunk_id == num_chunks - 1) - { - mn_stats* stat; - stat = model_net_find_stats(msg->category, s->dragonfly_stats_array); - stat->recv_count--; - stat->recv_bytes -= msg->packet_size; - stat->recv_time -= tw_now(lp) - msg->travel_start_time; - - - N_finished_packets--; - - dragonfly_total_time -= (tw_now(lp) - msg->travel_start_time); - - if(bf->c3) - dragonfly_max_latency = msg->saved_available_time; - } - if (msg->chunk_id == num_chunks-1 && - msg->remote_event_size_bytes && - msg->is_pull) - { - int net_id = model_net_get_id(LP_METHOD_NM); - model_net_event_rc(net_id, lp, msg->pull_size); - }*/ struct qhash_head * hash_link = NULL; struct dfly_qhash_entry * tmp = NULL; @@ -1301,7 +1347,7 @@ void packet_arrive_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw mn_stats* stat; stat = model_net_find_stats(msg->category, s->dragonfly_stats_array); - stat->recv_time -= (tw_now(lp) - msg->travel_start_time); + stat->recv_time = msg->saved_rcv_time; if(bf->c1) { @@ -1315,19 +1361,18 @@ void packet_arrive_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw if(bf->c7) { + assert(!hash_link); s->finished_msgs--; total_msg_sz -= msg->total_size; N_finished_msgs--; s->total_msg_size -= msg->total_size; -// struct dfly_qhash_entry * d_entry_pop = (struct dfly_qhash_entry*)rc_stack_pop(s->st); - struct dfly_qhash_entry * d_entry_pop = msg->saved_hash; + struct dfly_qhash_entry * d_entry_pop = msg->saved_hash; qhash_add(s->rank_tbl, &key, &(d_entry_pop->hash_link)); s->rank_tbl_pop++; - hash_link = qhash_search(s->rank_tbl, &key); - tmp = qhash_entry(hash_link, struct dfly_qhash_entry, hash_link); - + hash_link = &(d_entry_pop->hash_link); + tmp = d_entry_pop; if(bf->c4) model_net_event_rc2(lp, &msg->event_rc); @@ -1369,12 +1414,18 @@ void send_remote_event(terminal_state * s, terminal_message * msg, tw_lp * lp, t void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp) { - // NIC aggregation - should this be a separate function? // Trigger an event on receiving server - tw_stime ts = g_tw_lookahead + s->params->credit_delay + tw_rand_unif(lp->rng); + assert(lp->gid == msg->dest_terminal_id); + s->packet_fin++; + packet_fin++; +// if(lp->gid == TRACK) + if(msg->packet_ID == TRACK_PKT) + printf("\n Packet %ld arrived at lp %ld hops %d", msg->packet_ID, lp->gid, msg->my_N_hop); + tw_stime ts = g_tw_lookahead + s->params->credit_delay + tw_rand_unif(lp->rng); + // no method_event here - message going to router tw_event * buf_e; terminal_message * buf_msg; @@ -1398,7 +1449,7 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg, assert(lp->gid != msg->src_terminal_id); int num_chunks = msg->packet_size / s->params->chunk_size; - uint64_t total_chunks = msg->total_size / s->params->chunk_size; + int total_chunks = msg->total_size / s->params->chunk_size; if(msg->total_size % s->params->chunk_size) total_chunks++; @@ -1423,11 +1474,14 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg, msg->saved_avg_time = s->total_time; s->total_time += (tw_now(lp) - msg->travel_start_time); + + msg->saved_total_time = dragonfly_total_time; dragonfly_total_time += tw_now( lp ) - msg->travel_start_time; total_hops += msg->my_N_hop; s->total_hops += msg->my_N_hop; mn_stats* stat = model_net_find_stats(msg->category, s->dragonfly_stats_array); + msg->saved_rcv_time = stat->recv_time; stat->recv_time += (tw_now(lp) - msg->travel_start_time); #if DEBUG == 1 @@ -1445,47 +1499,6 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg, } #endif - /*tw_event * e; - terminal_message * m; - if(msg->chunk_id == num_chunks-1) - { - bf->c2 = 1; - mn_stats* stat = model_net_find_stats(msg->category, s->dragonfly_stats_array); - stat->recv_count++; - stat->recv_bytes += msg->packet_size; - stat->recv_time += tw_now(lp) - msg->travel_start_time; - N_finished_packets++; - total_hops -= msg->my_N_hop; - - dragonfly_total_time += tw_now( lp ) - msg->travel_start_time; - if (dragonfly_max_latency < tw_now( lp ) - msg->travel_start_time) - { - bf->c3 = 1; - msg->saved_available_time = dragonfly_max_latency; - dragonfly_max_latency=tw_now( lp ) - msg->travel_start_time; - } - if(msg->remote_event_size_bytes) - { - void * tmp_ptr = model_net_method_get_edata(DRAGONFLY, msg); - ts = g_tw_lookahead + 0.1 + (1/s->params->cn_bandwidth) * msg->remote_event_size_bytes; - if (msg->is_pull){ - struct codes_mctx mc_dst = - codes_mctx_set_global_direct(msg->sender_mn_lp); - struct codes_mctx mc_src = - codes_mctx_set_global_direct(lp->gid); - int net_id = model_net_get_id(LP_METHOD_NM); - model_net_event_mctx(net_id, &mc_src, &mc_dst, msg->category, - msg->sender_lp, msg->pull_size, ts, - msg->remote_event_size_bytes, tmp_ptr, 0, NULL, lp); - } - else { - e = tw_event_new(msg->final_dest_gid, ts, lp); - m = tw_event_data(e); - memcpy(m, tmp_ptr, msg->remote_event_size_bytes); - tw_event_send(e); - } - } - }*/ /* Now retreieve the number of chunks completed from the hash and update * them */ void *m_data_src = model_net_method_get_edata(DRAGONFLY, msg); @@ -1509,7 +1522,7 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg, d_entry->remote_event_size = 0; qhash_add(s->rank_tbl, &key, &(d_entry->hash_link)); s->rank_tbl_pop++; - + hash_link = qhash_search(s->rank_tbl, &key); tmp = qhash_entry(hash_link, struct dfly_qhash_entry, hash_link); } @@ -1543,7 +1556,9 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg, } /* If all chunks of a message have arrived then send a remote event to the * callee*/ - if(tmp->num_chunks >= total_chunks) + assert(tmp->num_chunks <= total_chunks); + + if(tmp->num_chunks == total_chunks) { bf->c7 = 1; @@ -1552,12 +1567,11 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg, s->total_msg_size += msg->total_size; s->finished_msgs++; - //assert(tmp->remote_event_data && tmp->remote_event_size); + assert(tmp->remote_event_data && tmp->remote_event_size); send_remote_event(s, msg, lp, bf, tmp->remote_event_data, tmp->remote_event_size); /* Remove the hash entry */ qhash_del(hash_link); msg->saved_hash = tmp; - //rc_stack_push(lp, tmp, free_tmp, s->st); s->rank_tbl_pop--; } return; @@ -1638,7 +1652,7 @@ static void node_collective_init(terminal_state * s, terminal_message * msg_new; int num_lps; - msg->saved_collective_init_time = s->collective_init_time; + msg->saved_busy_time = s->collective_init_time; s->collective_init_time = tw_now(lp); s->origin_svr = msg->sender_svr; @@ -1838,6 +1852,11 @@ void terminal_buf_update_rc(terminal_state * s, { s->vc_occupancy[0] += s->params->chunk_size; codes_local_latency_reverse(lp); + if(bf->c3) + { + s->busy_time = msg->saved_total_time; + s->last_buf_full = msg->saved_busy_time; + } if(bf->c1) { s->in_send_loop = 0; } @@ -1851,8 +1870,24 @@ terminal_buf_update(terminal_state * s, terminal_message * msg, tw_lp * lp) { + bf->c1 = 0; + bf->c2 = 0; + bf->c3 = 0; + tw_stime ts = codes_local_latency(lp); s->vc_occupancy[0] -= s->params->chunk_size; + + /* Update the terminal buffer time */ + if(s->last_buf_full > 0) + { + bf->c3 = 1; + msg->saved_total_time = s->busy_time; + msg->saved_busy_time = s->last_buf_full; + + s->busy_time += (tw_now(lp) - s->last_buf_full); + s->last_buf_full = 0.0; + } + if(s->in_send_loop == 0 && s->terminal_msgs[0] != NULL) { terminal_message *m; bf->c1 = 1; @@ -1868,7 +1903,6 @@ terminal_buf_update(terminal_state * s, bf->c2 = 1; model_net_method_idle_event(ts, 0, lp); } - return; } @@ -1878,10 +1912,10 @@ terminal_event( terminal_state * s, terminal_message * msg, tw_lp * lp ) { - *(int *)bf = (int)0; + //*(int *)bf = (int)0; assert(msg->magic == terminal_magic_num); - //rc_stack_gc(lp, s->st); + rc_stack_gc(lp, s->st); switch(msg->type) { case T_GENERATE: @@ -1923,18 +1957,36 @@ dragonfly_terminal_final( terminal_state * s, tw_lp * lp ) { model_net_print_stats(lp->gid, s->dragonfly_stats_array); - + int written = 0; if(!s->terminal_id) - written = sprintf(s->output_buf, "# Format <# Msgs finished> <# Packets finished> <# Chunks finished> \n"); + written = sprintf(s->output_buf, "# Format <# Packets finished> \n"); - written += sprintf(s->output_buf + written, "%lu %u %ld %lf %ld %ld %ld %lf\n", lp->gid, s->terminal_id, s->total_msg_size, s->total_time, s->finished_msgs, s->finished_packets, s->finished_chunks, (double)s->total_hops/s->finished_chunks); - lp_io_write(lp->gid, "dragonfly-msg-stats", written, s->output_buf); + written += sprintf(s->output_buf + written, "%lu %u %ld %lf %ld %lf %lf\n", + lp->gid, s->terminal_id, s->total_msg_size, s->total_time, + s->finished_packets, (double)s->total_hops/s->finished_chunks, + s->busy_time); + lp_io_write(lp->gid, "dragonfly-msg-stats", written, s->output_buf); + + /*char log[64]; + sprintf(log, "dragonfly-msg-stats-%d", routing); + dragonfly_log = fopen(log, "w+"); + assert(dragonfly_log); + if(s->finished_chunks) + { + fprintf(dragonfly_log, "%u %ld %lf %lf %lf\n", s->terminal_id, s->total_msg_size, s->total_time, s->finished_chunks, s->total_hops/s->finished_chunks); + printf("%u %ld %lf %lf %lf\n", s->terminal_id, s->total_msg_size, s->total_time, s->finished_chunks, s->total_hops/s->finished_chunks); + } + fclose(dragonfly_log); + */ if(s->terminal_msgs[0] != NULL) printf("[%lu] leftover terminal messages \n", lp->gid); + //if(s->packet_gen != s->packet_fin) + // printf("\n generated %d finished %d ", s->packet_gen, s->packet_fin); + qhash_finalize(s->rank_tbl); rc_stack_destroy(s->st); free(s->vc_occupancy); @@ -1947,21 +1999,6 @@ void dragonfly_router_final(router_state * s, tw_lp * lp) { free(s->global_channel); - /*char *stats_file = getenv("TRACER_LINK_FILE"); - if(stats_file != NULL) { - FILE *fout = fopen(stats_file, "a"); - const dragonfly_param *p = s->params; - int result = flock(fileno(fout), LOCK_EX); - assert(result); - fprintf(fout, "%d %d ", s->router_id / p->num_routers, - s->router_id % p->num_routers); - for(int d = 0; d < p->num_routers + p->num_global_channels; d++) { - fprintf(fout, "%d ", s->link_traffic[d]); - } - fprintf(fout, "\n"); - result = flock(fileno(fout), LOCK_UN); - fclose(fout); - }*/ int i, j; for(i = 0; i < s->params->radix; i++) { for(j = 0; j < 3; j++) { @@ -1974,6 +2011,22 @@ void dragonfly_router_final(router_state * s, } } } + + rc_stack_destroy(s->st); + + const dragonfly_param *p = s->params; + int written = 0; + if(!s->router_id) + written = sprintf(s->output_buf, "# Format "); + + written += sprintf(s->output_buf + written, "\n %d %d ", + s->router_id / p->num_routers, + s->router_id % p->num_routers); + for(int d = 0; d < p->num_routers + p->num_global_channels; d++) + written += sprintf(s->output_buf + written, " %lf", s->busy_time[d]); + + sprintf(s->output_buf + written, "\n"); + lp_io_write(lp->gid, "dragonfly-router-stats", written, s->output_buf); } /* Get the number of hops for this particular path source and destination groups */ @@ -2023,15 +2076,14 @@ get_next_stop(router_state * s, tw_lp * lp, int path, int dest_router_id, - int intm_id) + int intm_id) { int dest_lp; - tw_lpid router_dest_id = -1; + tw_lpid router_dest_id; int dest_group_id; - codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, NULL, - &mapping_type_id, NULL, &mapping_rep_id, &mapping_offset); - int local_router_id = (mapping_offset + mapping_rep_id); + int local_router_id = s->router_id; + int origin_grp_id = msg->origin_router_id / s->params->num_routers; dest_group_id = dest_router_id / s->params->num_routers; @@ -2041,46 +2093,36 @@ get_next_stop(router_state * s, dest_lp = msg->dest_terminal_id; return dest_lp; } + if(s->group_id == dest_group_id) + { + codes_mapping_get_lp_id(lp_group_name, "dragonfly_router", s->anno, 0, dest_router_id, + 0, &router_dest_id); + return router_dest_id; + } /* Generate inter-mediate destination for non-minimal routing (selecting a random group) */ if(msg->last_hop == TERMINAL && path == NON_MINIMAL) { - if(dest_group_id != s->group_id) msg->intm_group_id = intm_id; } /******************** DECIDE THE DESTINATION GROUP ***********************/ /* It means that the packet has arrived at the inter-mediate group for non-minimal routing. Reset the group now. */ - if(path == NON_MINIMAL && msg->intm_group_id == s->group_id) - { - msg->intm_group_id = -1;//no inter-mediate group - } /* Intermediate group ID is set. Divert the packet to the intermediate group. */ if(path == NON_MINIMAL && msg->intm_group_id >= 0 && - (dest_group_id != s->group_id)) + (s->group_id == origin_grp_id)) { dest_group_id = msg->intm_group_id; } /********************** DECIDE THE ROUTER IN THE DESTINATION GROUP ***************/ /* It means the packet has arrived at the destination group. Now divert it to the destination router. */ - if(s->group_id == dest_group_id) - { - if(msg->last_hop == TERMINAL && path == NON_MINIMAL) { - dest_lp = (s->group_id * s->params->num_routers) + intm_id % s->params->num_routers; - } else { - dest_lp = dest_router_id; - } - } - else - { /* Packet is at the source or intermediate group. Find a router that has a path to the destination group. */ - dest_lp=getRouterFromGroupID(dest_group_id, - s->router_id/s->params->num_routers, s->params->num_routers, - s->params->num_groups); - + dest_lp=getRouterFromGroupID(dest_group_id, + s->group_id, s->params->num_routers, + s->params->num_groups); + if(dest_lp == local_router_id) { #if USE_DIRECT_SCHEME - // printf("[%d] tg %d orig \n", lp->gid, target_grp, dest_group_id); int my_pos = s->group_id % s->params->num_routers; if(s->group_id == s->params->num_groups - 1) { my_pos = dest_group_id % s->params->num_routers; @@ -2094,7 +2136,6 @@ get_next_stop(router_state * s, } #endif } - } codes_mapping_get_lp_id(lp_group_name, "dragonfly_router", s->anno, 0, dest_lp, 0, &router_dest_id); @@ -2148,7 +2189,6 @@ get_output_port( router_state * s, { output_port = local_router_id % s->params->num_routers; } -// printf("\n output port not found %d next stop %d local router id %d group id %d intm grp id %d %d", output_port, next_stop, local_router_id, s->group_id, intm_grp_id, local_router_id%num_routers); } return output_port; } @@ -2160,7 +2200,7 @@ static int do_adaptive_routing( router_state * s, terminal_message * msg, tw_lp * lp, int dest_router_id, - int intm_id) { + int intm_id) { int next_stop; int minimal_out_port = -1, nonmin_out_port = -1; // decide which routing to take @@ -2169,6 +2209,7 @@ static int do_adaptive_routing( router_state * s, minimal_out_port = get_output_port(s, bf, msg, lp, minimal_next_stop); int nonmin_next_stop = get_next_stop(s, bf, msg, lp, NON_MINIMAL, dest_router_id, intm_id); nonmin_out_port = get_output_port(s, bf, msg, lp, nonmin_next_stop); + int nomin_vc = 0; if(nonmin_out_port < s->params->num_routers) { nomin_vc = msg->my_l_hop; @@ -2183,21 +2224,27 @@ static int do_adaptive_routing( router_state * s, s->params->num_global_channels)) { min_vc = msg->my_g_hop; } - int min_port_count = s->vc_occupancy[minimal_out_port][min_vc]; + int min_port_count = s->vc_occupancy[minimal_out_port][0] + + s->vc_occupancy[minimal_out_port][1] + s->vc_occupancy[minimal_out_port][2] + + s->queued_count[minimal_out_port]; // Now get the expected number of hops to be traversed for both routes int num_min_hops = get_num_hops(s->router_id, dest_router_id, s->params->num_routers, 0, s->params->num_groups); - int intm_router_id = getRouterFromGroupID(intm_id, + int intm_router_id = getRouterFromGroupID(msg->intm_group_id, s->router_id / s->params->num_routers, s->params->num_routers, s->params->num_groups); - int num_nonmin_hops = get_num_hops(s->router_id, intm_router_id, + int num_nonmin_hops = 6; + + if(msg->intm_group_id >= 0) + { + num_nonmin_hops = get_num_hops(s->router_id, intm_router_id, s->params->num_routers, 1, s->params->num_groups) + get_num_hops(intm_router_id, dest_router_id, s->params->num_routers, 1, s->params->num_groups); - + } assert(num_nonmin_hops <= 6); /* average the local queues of the router */ @@ -2222,25 +2269,19 @@ static int do_adaptive_routing( router_state * s, int nonmin_hist_count = s->cur_hist_num[nonmin_out_chan] + (s->prev_hist_num[min_out_chan]/2); - int nonmin_port_count = s->vc_occupancy[nonmin_out_port][nomin_vc]; - if(num_min_hops * (min_port_count - min_hist_count) <= (num_nonmin_hops * ((q_avg + 1) - nonmin_hist_count))) { - //if(min_port_count <= nonmin_port_count) { + int nonmin_port_count = s->vc_occupancy[nonmin_out_port][0] + + s->vc_occupancy[nonmin_out_port][1] + s->vc_occupancy[nonmin_out_port][2] + + s->queued_count[nonmin_out_port]; + //if(num_min_hops * (min_port_count - min_hist_count) <= (num_nonmin_hops * ((q_avg + 1) - nonmin_hist_count))) { + if(min_port_count <= nonmin_port_count) { msg->path_type = MINIMAL; next_stop = minimal_next_stop; msg->intm_group_id = -1; - - if(msg->packet_ID == TRACK && msg->message_id == TRACK_MSG) - printf("\n (%lf) [Router %d] Packet %d routing minimally ", tw_now(lp), (int)lp->gid, (int)msg->packet_ID); } else { msg->path_type = NON_MINIMAL; next_stop = nonmin_next_stop; - msg->intm_group_id = intm_id; - - if(msg->packet_ID == TRACK && msg->message_id == TRACK_MSG) - printf("\n (%lf) [Router %d] Packet %d routing non-minimally ", tw_now(lp), (int)lp->gid, (int)msg->packet_ID); - } return next_stop; } @@ -2256,13 +2297,12 @@ void router_packet_receive_rc(router_state * s, int output_port = msg->saved_vc; int output_chan = msg->saved_channel; - if(bf->c1) - tw_rand_reverse_unif(lp->rng); + tw_rand_reverse_unif(lp->rng); if(bf->c2) { tw_rand_reverse_unif(lp->rng); - delete_terminal_message_list(return_tail(s->pending_msgs[output_port], - s->pending_msgs_tail[output_port], output_chan)); + terminal_message_list * tail = return_tail(s->pending_msgs[output_port], s->pending_msgs_tail[output_port], output_chan); + delete_terminal_message_list(tail); s->vc_occupancy[output_port][output_chan] -= s->params->chunk_size; if(bf->c3) { codes_local_latency_reverse(lp); @@ -2270,8 +2310,10 @@ void router_packet_receive_rc(router_state * s, } } if(bf->c4) { - delete_terminal_message_list(return_tail(s->queued_msgs[output_port], + s->last_buf_full[output_port] = msg->saved_busy_time; + delete_terminal_message_list(return_tail(s->queued_msgs[output_port], s->queued_msgs_tail[output_port], output_chan)); + s->queued_count[output_port] -= s->params->chunk_size; } } @@ -2288,6 +2330,7 @@ router_packet_receive( router_state * s, bf->c2 = 0; bf->c3 = 0; bf->c4 = 0; + bf->c5 = 0; tw_stime ts; @@ -2300,21 +2343,15 @@ router_packet_receive( router_state * s, s->anno, 0); int dest_router_id = (mapping_offset + (mapping_rep_id * num_lps)) / s->params->num_cn; - int intm_id = -1; int local_grp_id = s->router_id / s->params->num_routers; - - if(routing != MINIMAL) - { - bf->c1 = 1; - intm_id = tw_rand_integer(lp->rng, 0, s->params->num_groups - 1); - if(intm_id == local_grp_id) - intm_id = (local_grp_id + 2) % s->params->num_groups; - } + + int intm_id = tw_rand_integer(lp->rng, 0, s->params->num_groups - 1); + if(intm_id == s->group_id) + intm_id = (s->group_id + 2) % s->params->num_groups; /* progressive adaptive routing makes a check at every node/router at the * source group to sense congestion. Once it does and decides on taking * non-minimal path, it does not check any longer. */ - terminal_message_list * cur_chunk = (terminal_message_list *)malloc( - sizeof(terminal_message_list)); + terminal_message_list * cur_chunk = (terminal_message_list*)malloc(sizeof(terminal_message_list)); init_terminal_message_list(cur_chunk, msg); if(routing == PROG_ADAPTIVE @@ -2327,23 +2364,27 @@ router_packet_receive( router_state * s, if(routing == MINIMAL || routing == NON_MINIMAL) cur_chunk->msg.path_type = routing; /*defaults to the routing algorithm if we don't have adaptive routing here*/ - next_stop = get_next_stop(s, bf, &(cur_chunk->msg), lp, msg->path_type, dest_router_id, - intm_id); + assert(cur_chunk->msg.path_type == MINIMAL || cur_chunk->msg.path_type == NON_MINIMAL); + next_stop = get_next_stop(s, bf, &(cur_chunk->msg), lp, cur_chunk->msg.path_type, dest_router_id, intm_id); } - assert(cur_chunk->msg.path_type == MINIMAL || cur_chunk->msg.path_type == NON_MINIMAL); + + if(msg->remote_event_size_bytes > 0) { void *m_data_src = model_net_method_get_edata(DRAGONFLY, msg); cur_chunk->event_data = (char*)malloc(msg->remote_event_size_bytes); memcpy(cur_chunk->event_data, m_data_src, msg->remote_event_size_bytes); } - - output_port = get_output_port(s, bf, msg, lp, next_stop); + output_port = get_output_port(s, bf, &(cur_chunk->msg), lp, next_stop); + assert(output_port >= 0); output_chan = 0; int max_vc_size = s->params->cn_vc_size; cur_chunk->msg.vc_index = output_port; cur_chunk->msg.next_stop = next_stop; + if(msg->packet_ID == TRACK_PKT) + printf("\n Router packet %ld arrived lp id %ld final dest %ld output port %d ", msg->packet_ID, lp->gid, msg->dest_terminal_id, output_port); + if(output_port < s->params->num_routers) { output_chan = msg->my_l_hop; if(msg->my_g_hop == 1) output_chan = 1; @@ -2356,7 +2397,7 @@ router_packet_receive( router_state * s, max_vc_size = s->params->global_vc_size; cur_chunk->msg.my_g_hop++; } - + cur_chunk->msg.output_chan = output_chan; cur_chunk->msg.my_N_hop++; @@ -2389,6 +2430,9 @@ router_packet_receive( router_state * s, cur_chunk->msg.saved_channel = msg->output_chan; append_to_terminal_message_list( s->queued_msgs[output_port], s->queued_msgs_tail[output_port], output_chan, cur_chunk); + s->queued_count[output_port] += s->params->chunk_size; + msg->saved_busy_time = s->last_buf_full[output_port]; + s->last_buf_full[output_port] = tw_now(lp); } msg->saved_vc = output_port; @@ -2403,8 +2447,8 @@ void router_packet_send_rc(router_state * s, router_ecount--; router_rev_ecount++; - int output_port = msg->vc_index; - int output_chan = msg->output_chan; + int output_port = msg->saved_vc; + int output_chan = msg->saved_channel; if(bf->c1) { s->in_send_loop[output_port] = 1; return; @@ -2412,10 +2456,19 @@ void router_packet_send_rc(router_state * s, tw_rand_reverse_unif(lp->rng); + if(bf->c11) + s->link_traffic[output_port] -= msg->packet_size % s->params->chunk_size; + if(bf->c12) + s->link_traffic[output_port] -= s->params->chunk_size; s->next_output_available_time[output_port] = msg->saved_available_time; - s->link_traffic[output_port] -= s->params->chunk_size; - create_prepend_to_terminal_message_list(s->pending_msgs[output_port], - s->pending_msgs_tail[output_port], output_chan, msg); + + terminal_message_list * cur_entry = rc_stack_pop(s->st); + assert(cur_entry); +// create_prepend_to_terminal_message_list(s->pending_msgs[output_port], +// s->pending_msgs_tail[output_port], output_chan, msg); + + prepend_to_terminal_message_list(s->pending_msgs[output_port], + s->pending_msgs_tail[output_port], output_chan, cur_entry); if(routing == PROG_ADAPTIVE) { @@ -2435,7 +2488,6 @@ void router_packet_send_rc(router_state * s, if(bf->c4) { s->in_send_loop[output_port] = 1; } - } /* routes the current packet to the next stop */ void @@ -2476,7 +2528,7 @@ router_packet_send( router_state * s, if(output_port < s->params->num_routers) { to_terminal = 0; - delay = s->params->global_delay; + delay = s->params->local_delay; } else if(output_port < s->params->num_routers + s->params->num_global_channels) { to_terminal = 0; @@ -2484,13 +2536,24 @@ router_packet_send( router_state * s, delay = s->params->global_delay; } - ts = g_tw_lookahead + delay + tw_rand_unif(lp->rng); + int num_chunks = cur_entry->msg.packet_size / s->params->chunk_size; + if(msg->packet_size % s->params->chunk_size) + num_chunks++; + + double bytetime; + if((cur_entry->msg.packet_size % s->params->chunk_size) && (cur_entry->msg.chunk_id == num_chunks - 1)) { + bytetime = delay * (cur_entry->msg.packet_size % s->params->chunk_size); + } else { + bytetime = delay * s->params->chunk_size; + } + ts = g_tw_lookahead + tw_rand_unif( lp->rng) + bytetime + s->params->router_delay; msg->saved_available_time = s->next_output_available_time[output_port]; s->next_output_available_time[output_port] = maxd(s->next_output_available_time[output_port], tw_now(lp)); s->next_output_available_time[output_port] += ts; + ts = s->next_output_available_time[output_port] - tw_now(lp); // dest can be a router or a terminal, so we must check void * m_data; if (to_terminal) { @@ -2518,7 +2581,14 @@ router_packet_send( router_state * s, m->intm_lp_id = lp->gid; m->magic = router_magic_num; - s->link_traffic[output_port] += s->params->chunk_size; + if((cur_entry->msg.packet_size % s->params->chunk_size) && (cur_entry->msg.chunk_id == num_chunks - 1)) { + bf->c11 = 1; + s->link_traffic[output_port] += (cur_entry->msg.packet_size % + s->params->chunk_size); + } else { + bf->c12 = 1; + s->link_traffic[output_port] += s->params->chunk_size; + } if(routing == PROG_ADAPTIVE) { @@ -2546,10 +2616,14 @@ router_packet_send( router_state * s, cur_entry = return_head(s->pending_msgs[output_port], s->pending_msgs_tail[output_port], output_chan); - copy_terminal_list_entry(cur_entry, msg); - delete_terminal_message_list(cur_entry); + rc_stack_push(lp, cur_entry, free, s->st); + //copy_terminal_list_entry(cur_entry, msg); + //delete_terminal_message_list(cur_entry); + msg->saved_vc = output_port; + msg->saved_channel = output_chan; cur_entry = s->pending_msgs[output_port][2]; + if(cur_entry == NULL) cur_entry = s->pending_msgs[output_port][1]; if(cur_entry == NULL) cur_entry = s->pending_msgs[output_port][0]; if(cur_entry != NULL) { @@ -2577,6 +2651,11 @@ void router_buf_update_rc(router_state * s, int indx = msg->vc_index; int output_chan = msg->output_chan; s->vc_occupancy[indx][output_chan] += s->params->chunk_size; + if(bf->c3) + { + s->busy_time[indx] = msg->saved_rcv_time; + s->last_buf_full[indx] = msg->saved_busy_time; + } if(bf->c1) { terminal_message_list* head = return_tail(s->pending_msgs[indx], s->pending_msgs_tail[indx], output_chan); @@ -2584,6 +2663,7 @@ void router_buf_update_rc(router_state * s, prepend_to_terminal_message_list(s->queued_msgs[indx], s->queued_msgs_tail[indx], output_chan, head); s->vc_occupancy[indx][output_chan] -= s->params->chunk_size; + s->queued_count[indx] -= s->params->chunk_size; } if(bf->c2) { codes_local_latency_reverse(lp); @@ -2596,13 +2676,15 @@ void router_buf_update(router_state * s, tw_bf * bf, terminal_message * msg, tw_ int indx = msg->vc_index; int output_chan = msg->output_chan; s->vc_occupancy[indx][output_chan] -= s->params->chunk_size; - /*if(TRACK == msg->packet_ID) + + if(s->last_buf_full[indx]) { - int i; - printf("\n channel %d occupancy ", output_chan); - for(i = 0; i < s->params->radix; i++) - printf(" %d ", s->vc_occupancy[i][output_chan]); - }*/ + bf->c3 = 1; + msg->saved_rcv_time = s->busy_time[indx]; + msg->saved_busy_time = s->last_buf_full[indx]; + s->busy_time[indx] += (tw_now(lp) - s->last_buf_full[indx]); + s->last_buf_full[indx] = 0.0; + } if(s->queued_msgs[indx][output_chan] != NULL) { bf->c1 = 1; terminal_message_list *head = return_head(s->queued_msgs[indx], @@ -2611,6 +2693,7 @@ void router_buf_update(router_state * s, tw_bf * bf, terminal_message * msg, tw_ append_to_terminal_message_list(s->pending_msgs[indx], s->pending_msgs_tail[indx], output_chan, head); s->vc_occupancy[indx][output_chan] += s->params->chunk_size; + s->queued_count[indx] += s->params->chunk_size; } if(s->in_send_loop[indx] == 0 && s->pending_msgs[indx][output_chan] != NULL) { bf->c2 = 1; @@ -2630,6 +2713,7 @@ void router_buf_update(router_state * s, tw_bf * bf, terminal_message * msg, tw_ void router_event(router_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp) { + rc_stack_gc(lp, s->st); assert(msg->magic == router_magic_num); switch(msg->type) { @@ -2678,7 +2762,7 @@ void terminal_rc_event_handler(terminal_state * s, tw_bf * bf, case D_COLLECTIVE_INIT: { - s->collective_init_time = msg->saved_collective_init_time; + s->collective_init_time = msg->saved_busy_time; } break; case D_COLLECTIVE_FAN_IN: { diff --git a/tests/conf/modelnet-test-dragonfly.conf b/tests/conf/modelnet-test-dragonfly.conf index 4bfc5b2e6ccbc8c2e614d5fc029f876c5fd1e5c7..b0876f78d3e2f4bf214991aab29c7cf8dc8dc19c 100644 --- a/tests/conf/modelnet-test-dragonfly.conf +++ b/tests/conf/modelnet-test-dragonfly.conf @@ -23,6 +23,6 @@ PARAMS local_bandwidth="5.25"; global_bandwidth="4.7"; cn_bandwidth="5.25"; - message_size="352"; + message_size="360"; routing="nonminimal"; }