diff --git a/src/models/network-workloads/model-net-mpi-wrklds.c b/src/models/network-workloads/model-net-mpi-wrklds.c index b03835954b88b92b905d4090d48cfe770370b12c..062c48cf11fde196b031f2f90ee5086559a1f2f4 100644 --- a/src/models/network-workloads/model-net-mpi-wrklds.c +++ b/src/models/network-workloads/model-net-mpi-wrklds.c @@ -11,6 +11,7 @@ #include "codes/configuration.h" #include "codes/codes_mapping.h" #include "codes/model-net.h" +#include "codes/rc-stack.h" #define TRACE -1 @@ -93,6 +94,7 @@ struct nw_state tw_lpid nw_id; short wrkld_end; + struct rc_stack * st; /* count of sends, receives, collectives and delays */ unsigned long num_sends; unsigned long num_recvs; @@ -390,7 +392,6 @@ static void notify_waits_rc(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, du wait_elem->num_completed--; s->pending_waits = wait_elem; tw_rand_reverse_unif(lp->rng); - } } @@ -422,7 +423,7 @@ static int notify_waits(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_ remove_req_id(&s->completed_reqs, completed_req); m->u.rc.saved_pending_wait = wait_elem; - s->pending_waits = NULL; + s->pending_waits = NULL; codes_issue_next_event(lp); return 0; } @@ -457,8 +458,10 @@ static int notify_waits(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_ for(i = 0; i < required_count; i++) remove_req_id(&s->completed_reqs, wait_elem->mpi_op->u.waits.req_ids[i]); + //rc_stack_push(&lp, wait_elem->mpi_op, free, s->st); + codes_issue_next_event(lp); //wait completed - } + } } return 0; } @@ -829,6 +832,8 @@ static void codes_exec_comp_delay( msg->msg_type = MPI_OP_GET_NEXT; tw_event_send(e); + + rc_stack_push(&lp, mpi_op, free, s->st); } /* reverse computation operation for MPI irecv */ @@ -888,7 +893,8 @@ static void codes_exec_mpi_recv(nw_state* s, tw_lp* lp, nw_message * m, struct c else { m->u.rc.found_match = found_matching_sends; - codes_issue_next_event(lp); + rc_stack_push(&lp, mpi_op, free, s->st); + codes_issue_next_event(lp); } } @@ -940,6 +946,8 @@ static void codes_exec_mpi_send(nw_state* s, tw_lp* lp, struct codes_workload_op /* isend executed, now get next MPI operation from the queue */ if(mpi_op->op_type == CODES_WK_ISEND) codes_issue_next_event(lp); + + rc_stack_push(&lp, mpi_op, free, s->st); } /* MPI collective operations */ @@ -1059,6 +1067,7 @@ static void update_arrival_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp else { m->u.rc.found_match = found_matching_recv; + rc_stack_push(&lp, arrived_op, free, s->st); notify_waits(s, bf, lp, m, m->u.rc.saved_matched_req); } } @@ -1112,6 +1121,11 @@ void nw_test_init(nw_state* s, tw_lp* lp) //printf("\n network LP not generating events %d ", (int)s->nw_id); return; } + + /* Initialize the RC stack */ + rc_stack_create(&s->st); + assert(s->st != NULL); + wrkld_id = codes_workload_load("dumpi-trace-workload", params, 0, (int)s->nw_id); s->arrival_queue = queue_init(); @@ -1127,7 +1141,9 @@ void nw_test_init(nw_state* s, tw_lp* lp) void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp) { *(int *)bf = (int)0; - switch(m->msg_type) + rc_stack_gc(lp, s->st); + + switch(m->msg_type) { case MPI_SEND_POSTED: update_send_completion_queue(s, bf, m, lp); @@ -1226,6 +1242,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l { struct codes_workload_op * mpi_op = malloc(sizeof(struct codes_workload_op)); codes_workload_get_next(wrkld_id, 0, (int)s->nw_id, mpi_op); + m->op = mpi_op; if(mpi_op->op_type == CODES_WK_END) @@ -1329,7 +1346,8 @@ void nw_test_finalize(nw_state* s, tw_lp* lp) //printf("\n LP %ld Time spent in communication %llu ", lp->gid, total_time - s->compute_time); free(s->arrival_queue); free(s->pending_recvs_queue); - } + rc_stack_destroy(s->st); + } } void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp) diff --git a/src/models/networks/model-net/dragonfly.c b/src/models/networks/model-net/dragonfly.c index 04028f4e826980fb8eab0f7a170b098358363284..7488ab8647a9d05e246b20731a32208ea53fe1fc 100644 --- a/src/models/networks/model-net/dragonfly.c +++ b/src/models/networks/model-net/dragonfly.c @@ -31,7 +31,7 @@ #define COLLECTIVE_COMPUTATION_DELAY 5700 #define DRAGONFLY_FAN_OUT_DELAY 20.0 #define WINDOW_LENGTH 0 -#define DFLY_HASH_TABLE_SIZE 10000 +#define DFLY_HASH_TABLE_SIZE 65536 // debugging parameters #define TRACK -1 @@ -175,6 +175,8 @@ struct terminal_state short is_root; short is_leaf; + struct rc_stack * st; + /* to maintain a count of child nodes that have fanned in at the parent during the collective fan-in phase*/ int num_fan_nodes; @@ -664,6 +666,7 @@ terminal_init( terminal_state * s, s->total_msg_time = 0.0; s->total_msg_size = 0; + rc_stack_create(&s->st); s->num_vcs = 1; s->vc_occupancy = (int*)malloc(s->num_vcs * sizeof(int)); @@ -1215,12 +1218,21 @@ void packet_arrive_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw term_ecount--; term_rev_ecount++; + tw_rand_reverse_unif(lp->rng); completed_packets--; if(msg->path_type == MINIMAL) minimal_count--; if(msg->path_type == NON_MINIMAL) nonmin_count--; + uint64_t total_chunks = msg->total_size / s->params->chunk_size; + + if(msg->total_size % s->params->chunk_size) + total_chunks++; + + if(!total_chunks) + total_chunks = 1; + struct dfly_hash_key key; key.message_id = msg->message_id; key.sender_id = msg->sender_lp; @@ -1238,28 +1250,19 @@ void packet_arrive_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw s->finished_packets--; dragonfly_total_time -= (tw_now(lp) - msg->travel_start_time); total_hops -= msg->my_N_hop; + } - if(bf->c3) + if(bf->c3) dragonfly_max_latency = msg->saved_available_time; - - if(bf->c8) - { - assert(hash_link); - struct dfly_qhash_entry * tmp = NULL; - tmp = qhash_entry(hash_link, struct dfly_qhash_entry, hash_link); - assert(tmp); - tmp->remote_event_size = 0; - free(tmp->remote_event_data); - } - } + - if(bf->c7) + if(msg->saved_completed_chunks >= total_chunks) { s->finished_msgs--; s->total_msg_time -= (tw_now(lp) - msg->msg_start_time); s->total_msg_size -= msg->total_size; - assert(!hash_link); + //assert(!hash_link); void *m_data_src = model_net_method_get_edata(DRAGONFLY, msg); struct dfly_qhash_entry * d_entry = malloc(sizeof(struct dfly_qhash_entry)); d_entry->num_chunks = msg->saved_completed_chunks; @@ -1274,38 +1277,26 @@ void packet_arrive_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw d_entry->remote_event_size = msg->saved_remote_esize; } qhash_add(s->rank_tbl, &key, &(d_entry->hash_link)); - + + s->rank_tbl_pop++; int net_id = model_net_get_id(LP_METHOD_NM); if(bf->c4) model_net_event_rc2(lp, &msg->event_rc); + } - - if(bf->c5) - { - /* re-initialize the element */ - hash_link = qhash_search(s->rank_tbl, &key); - assert(hash_link); - qhash_del(hash_link); - s->rank_tbl_pop--; - } - if(bf->c6) - { - hash_link = NULL; - hash_link = qhash_search(s->rank_tbl, &key); - assert(hash_link); - struct dfly_qhash_entry * tmp2 = NULL; - tmp2 = qhash_entry(hash_link, struct dfly_qhash_entry, hash_link); - assert(tmp2); - tmp2->num_chunks--; - } - - msg->my_N_hop--; - tw_rand_reverse_unif(lp->rng); - - return; + hash_link = NULL; + hash_link = qhash_search(s->rank_tbl, &key); + + assert(hash_link); + struct dfly_qhash_entry * tmp2 = NULL; + tmp2 = qhash_entry(hash_link, struct dfly_qhash_entry, hash_link); + assert(tmp2); + tmp2->num_chunks--; + + return; } void send_remote_event(terminal_state * s, terminal_message * msg, tw_lp * lp, tw_bf * bf, char * event_data, int remote_event_size) { @@ -1332,6 +1323,7 @@ void send_remote_event(terminal_state * s, terminal_message * msg, tw_lp * lp, t memcpy(m_remote, event_data, remote_event_size); tw_event_send(e); } + return; } /* packet arrives at the destination terminal */ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg, @@ -1339,6 +1331,25 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg, term_ecount++; + // NIC aggregation - should this be a separate function? + // Trigger an event on receiving server + + tw_stime ts = g_tw_lookahead + s->params->credit_delay + tw_rand_unif(lp->rng); + + if(msg->packet_ID == TRACK && msg->message_id == TRACK_MSG) + printf("\n terminal sending credit at chan %d ", msg->saved_vc); + + // no method_event here - message going to router + tw_event * buf_e; + terminal_message * buf_msg; + buf_e = tw_event_new(msg->intm_lp_id, ts, lp); + buf_msg = tw_event_data(buf_e); + buf_msg->magic = router_magic_num; + buf_msg->vc_index = msg->vc_index; + buf_msg->output_chan = msg->output_chan; + buf_msg->type = R_BUFFER; + tw_event_send(buf_e); + bf->c1 = 0; bf->c2 = 0; bf->c4 = 0; @@ -1350,6 +1361,7 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg, /* WE do not allow self messages through dragonfly */ assert(lp->gid != msg->src_terminal_id); + int is_last_msg = 0; int num_chunks = msg->packet_size / s->params->chunk_size; uint64_t total_chunks = msg->total_size / s->params->chunk_size; @@ -1401,7 +1413,7 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg, struct dfly_hash_key key; key.message_id = msg->message_id; key.sender_id = msg->sender_lp; - + hash_link = qhash_search(s->rank_tbl, &key); struct dfly_qhash_entry * tmp = NULL; @@ -1409,28 +1421,58 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg, if(!hash_link) { bf->c5 = 1; - struct dfly_qhash_entry * d_entry = malloc(sizeof(struct dfly_qhash_entry)); - d_entry->num_chunks = 1; - d_entry->key = key; - d_entry->remote_event_data = NULL; - qhash_add(s->rank_tbl, &key, &(d_entry->hash_link)); + struct dfly_qhash_entry d_entry; + d_entry.num_chunks = 0; + d_entry.key = key; + d_entry.remote_event_data = NULL; + qhash_add(s->rank_tbl, &key, &(d_entry.hash_link)); s->rank_tbl_pop++; } - else { - /* if one exists already then update it*/ - bf->c6 = 1; - tmp = qhash_entry(hash_link, struct dfly_qhash_entry, hash_link); - assert(tmp); - tmp->num_chunks++; - } - /* All chunks arrived, issue remote event and delete entry from the hash */ - hash_link = NULL; + hash_link = qhash_search(s->rank_tbl, &key); tmp = qhash_entry(hash_link, struct dfly_qhash_entry, hash_link); + assert(tmp); + tmp->num_chunks++; + if(tmp->num_chunks >= total_chunks) + is_last_msg = 1; + + /* if its the last chunk of the packet then handle the remote event data */ + if(msg->chunk_id == num_chunks - 1) + { + bf->c1 = 1; + mn_stats* stat = model_net_find_stats(msg->category, s->dragonfly_stats_array); + stat->recv_count++; + stat->recv_bytes += msg->packet_size; + stat->recv_time += tw_now(lp) - msg->travel_start_time; + + N_finished_packets++; + s->finished_packets++; + + dragonfly_total_time += tw_now( lp ) - msg->travel_start_time; + total_hops += msg->my_N_hop; + + if (dragonfly_max_latency < tw_now( lp ) - msg->travel_start_time) { + bf->c3 = 1; + msg->saved_available_time = dragonfly_max_latency; + dragonfly_max_latency = tw_now( lp ) - msg->travel_start_time; + } + } + if(msg->remote_event_size_bytes > 0 && !is_last_msg) + { + /* Retreive the remote event entry */ + if(!tmp->remote_event_data) + { + tmp->remote_event_data = (void*)malloc(msg->remote_event_size_bytes); + assert(tmp->remote_event_data); + tmp->remote_event_size = msg->remote_event_size_bytes; + memcpy(tmp->remote_event_data, m_data_src, msg->remote_event_size_bytes); + } + } + /* If all chunks of a message have arrived then send a remote event to the + * callee*/ + if(is_last_msg) { - bf->c7 = 1; - s->finished_msgs++; s->total_msg_time += (tw_now(lp) - msg->msg_start_time); s->total_msg_size += msg->total_size; @@ -1440,11 +1482,11 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg, char * remote_data = malloc(msg->remote_event_size_bytes); memcpy(remote_data, m_data_src, msg->remote_event_size_bytes); send_remote_event(s, msg, lp, bf, remote_data, msg->remote_event_size_bytes); + rc_stack_push(&lp, remote_data, free, s->st); } else { void *m_data = model_net_method_get_edata(DRAGONFLY, msg); - assert(tmp->remote_event_size > 0); send_remote_event(s, msg, lp, bf, tmp->remote_event_data, tmp->remote_event_size); msg->saved_remote_esize = tmp->remote_event_size; /* append remote event data to this message */ @@ -1452,69 +1494,12 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg, } msg->saved_completed_chunks = tmp->num_chunks; + /* Remove the hash entry */ qhash_del(hash_link); + rc_stack_push(&lp, tmp->remote_event_data, free, s->st); + rc_stack_push(&lp, tmp, free, s->st); s->rank_tbl_pop--; } - tw_stime ts; - - msg->my_N_hop++; - if(msg->chunk_id == num_chunks-1) - { - bf->c1 = 1; - mn_stats* stat = model_net_find_stats(msg->category, s->dragonfly_stats_array); - stat->recv_count++; - stat->recv_bytes += msg->packet_size; - stat->recv_time += tw_now(lp) - msg->travel_start_time; - - N_finished_packets++; - s->finished_packets++; - - dragonfly_total_time += tw_now( lp ) - msg->travel_start_time; - total_hops += msg->my_N_hop; - - if (dragonfly_max_latency < tw_now( lp ) - msg->travel_start_time) { - bf->c3 = 1; - msg->saved_available_time = dragonfly_max_latency; - dragonfly_max_latency = tw_now( lp ) - msg->travel_start_time; - } - - if(!bf->c7 && msg->remote_event_size_bytes > 0) - { - /* Retreive the remote event entry */ - bf->c8 = 1; - hash_link = NULL; - hash_link = qhash_search(s->rank_tbl, &key); - - struct dfly_qhash_entry * tmp = NULL; - tmp = qhash_entry(hash_link, struct dfly_qhash_entry, hash_link); - assert(tmp); - - tmp->remote_event_data = (void*)malloc(msg->remote_event_size_bytes); - assert(tmp->remote_event_data); - tmp->remote_event_size = msg->remote_event_size_bytes; - - memcpy(tmp->remote_event_data, m_data_src, msg->remote_event_size_bytes); - } - } - // NIC aggregation - should this be a separate function? - // Trigger an event on receiving server - - ts = g_tw_lookahead + s->params->credit_delay + tw_rand_unif(lp->rng); - - if(msg->packet_ID == TRACK && msg->message_id == TRACK_MSG) - printf("\n terminal sending credit at chan %d ", msg->saved_vc); - - // no method_event here - message going to router - tw_event * buf_e; - terminal_message * buf_msg; - buf_e = tw_event_new(msg->intm_lp_id, ts, lp); - buf_msg = tw_event_data(buf_e); - buf_msg->magic = router_magic_num; - buf_msg->vc_index = msg->vc_index; - buf_msg->output_chan = msg->output_chan; - buf_msg->type = R_BUFFER; - tw_event_send(buf_e); - return; } @@ -1835,6 +1820,8 @@ terminal_event( terminal_state * s, { *(int *)bf = (int)0; assert(msg->magic == terminal_magic_num); + + rc_stack_gc(lp, s->st); switch(msg->type) { case T_GENERATE: @@ -1886,6 +1873,14 @@ dragonfly_terminal_final( terminal_state * s, if(s->terminal_msgs[0] != NULL) printf("[%lu] leftover terminal messages \n", lp->gid); + + + qhash_finalize(s->rank_tbl); + rc_stack_destroy(s->st); + free(s->vc_occupancy); + free(s->terminal_msgs); + free(s->terminal_msgs_tail); + free(s->children); } void dragonfly_router_final(router_state * s, @@ -1980,18 +1975,20 @@ get_next_stop(router_state * s, dest_group_id = dest_router_id / s->params->num_routers; - /* Generate inter-mediate destination for non-minimal routing (selecting a random group) */ - if(msg->last_hop == TERMINAL && path == NON_MINIMAL) - { - msg->intm_group_id = intm_id; - } - - /* If the packet has arrived at the destination router */ - if(dest_router_id == local_router_id && msg->intm_group_id == -1) + /* If the packet has arrived at the destination router */ + if(dest_router_id == local_router_id) { dest_lp = msg->dest_terminal_id; return dest_lp; } + /* Generate inter-mediate destination for non-minimal routing (selecting a random group) */ + if(msg->last_hop == TERMINAL && path == NON_MINIMAL) + { + if((dest_router_id / s->params->num_routers) != s->group_id) + { + msg->intm_group_id = intm_id; + } + } /******************** DECIDE THE DESTINATION GROUP ***********************/ /* It means that the packet has arrived at the inter-mediate group for non-minimal routing. Reset the group now. */ if(path == NON_MINIMAL && msg->intm_group_id == s->group_id) @@ -2003,6 +2000,10 @@ get_next_stop(router_state * s, { dest_group_id = msg->intm_group_id; } + else /* direct the packet to the destination group */ + { + dest_group_id = dest_router_id / s->params->num_routers; + } /********************** DECIDE THE ROUTER IN THE DESTINATION GROUP ***************/ /* It means the packet has arrived at the destination group. Now divert it to the destination router. */ @@ -2400,8 +2401,8 @@ router_packet_send( router_state * s, return; } - if(msg->packet_ID == TRACK && msg->message_id == TRACK_MSG) - printf("\n packet sending origin %ld to router %ld at output port %d ", cur_entry->msg.src_terminal_id, cur_entry->msg.next_stop, output_port); + //if(msg->packet_ID == TRACK && msg->message_id == TRACK_MSG) + // printf("\n packet sending origin %ld to router %ld at output port %d ", cur_entry->msg.src_terminal_id, cur_entry->msg.next_stop, output_port); int to_terminal = 1, global = 0; double delay = s->params->cn_delay;