Commit 0240e969 authored by Misbah Mubarak's avatar Misbah Mubarak
Browse files

Adding updates to dragonfly model and MPI Sim layer

parent a322f093
......@@ -11,6 +11,7 @@
#include "codes/configuration.h"
#include "codes/codes_mapping.h"
#include "codes/model-net.h"
#include "codes/rc-stack.h"
#define TRACE -1
......@@ -93,6 +94,7 @@ struct nw_state
tw_lpid nw_id;
short wrkld_end;
struct rc_stack * st;
/* count of sends, receives, collectives and delays */
unsigned long num_sends;
unsigned long num_recvs;
......@@ -390,7 +392,6 @@ static void notify_waits_rc(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, du
wait_elem->num_completed--;
s->pending_waits = wait_elem;
tw_rand_reverse_unif(lp->rng);
}
}
......@@ -422,7 +423,7 @@ static int notify_waits(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_
remove_req_id(&s->completed_reqs, completed_req);
m->u.rc.saved_pending_wait = wait_elem;
s->pending_waits = NULL;
s->pending_waits = NULL;
codes_issue_next_event(lp);
return 0;
}
......@@ -457,8 +458,10 @@ static int notify_waits(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_
for(i = 0; i < required_count; i++)
remove_req_id(&s->completed_reqs, wait_elem->mpi_op->u.waits.req_ids[i]);
//rc_stack_push(&lp, wait_elem->mpi_op, free, s->st);
codes_issue_next_event(lp); //wait completed
}
}
}
return 0;
}
......@@ -829,6 +832,8 @@ static void codes_exec_comp_delay(
msg->msg_type = MPI_OP_GET_NEXT;
tw_event_send(e);
rc_stack_push(&lp, mpi_op, free, s->st);
}
/* reverse computation operation for MPI irecv */
......@@ -888,7 +893,8 @@ static void codes_exec_mpi_recv(nw_state* s, tw_lp* lp, nw_message * m, struct c
else
{
m->u.rc.found_match = found_matching_sends;
codes_issue_next_event(lp);
rc_stack_push(&lp, mpi_op, free, s->st);
codes_issue_next_event(lp);
}
}
......@@ -940,6 +946,8 @@ static void codes_exec_mpi_send(nw_state* s, tw_lp* lp, struct codes_workload_op
/* isend executed, now get next MPI operation from the queue */
if(mpi_op->op_type == CODES_WK_ISEND)
codes_issue_next_event(lp);
rc_stack_push(&lp, mpi_op, free, s->st);
}
/* MPI collective operations */
......@@ -1059,6 +1067,7 @@ static void update_arrival_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp
else
{
m->u.rc.found_match = found_matching_recv;
rc_stack_push(&lp, arrived_op, free, s->st);
notify_waits(s, bf, lp, m, m->u.rc.saved_matched_req);
}
}
......@@ -1112,6 +1121,11 @@ void nw_test_init(nw_state* s, tw_lp* lp)
//printf("\n network LP not generating events %d ", (int)s->nw_id);
return;
}
/* Initialize the RC stack */
rc_stack_create(&s->st);
assert(s->st != NULL);
wrkld_id = codes_workload_load("dumpi-trace-workload", params, 0, (int)s->nw_id);
s->arrival_queue = queue_init();
......@@ -1127,7 +1141,9 @@ void nw_test_init(nw_state* s, tw_lp* lp)
void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
*(int *)bf = (int)0;
switch(m->msg_type)
rc_stack_gc(lp, s->st);
switch(m->msg_type)
{
case MPI_SEND_POSTED:
update_send_completion_queue(s, bf, m, lp);
......@@ -1226,6 +1242,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
{
struct codes_workload_op * mpi_op = malloc(sizeof(struct codes_workload_op));
codes_workload_get_next(wrkld_id, 0, (int)s->nw_id, mpi_op);
m->op = mpi_op;
if(mpi_op->op_type == CODES_WK_END)
......@@ -1329,7 +1346,8 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
//printf("\n LP %ld Time spent in communication %llu ", lp->gid, total_time - s->compute_time);
free(s->arrival_queue);
free(s->pending_recvs_queue);
}
rc_stack_destroy(s->st);
}
}
void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
......
......@@ -31,7 +31,7 @@
#define COLLECTIVE_COMPUTATION_DELAY 5700
#define DRAGONFLY_FAN_OUT_DELAY 20.0
#define WINDOW_LENGTH 0
#define DFLY_HASH_TABLE_SIZE 10000
#define DFLY_HASH_TABLE_SIZE 65536
// debugging parameters
#define TRACK -1
......@@ -175,6 +175,8 @@ struct terminal_state
short is_root;
short is_leaf;
struct rc_stack * st;
/* to maintain a count of child nodes that have fanned in at the parent during the collective
fan-in phase*/
int num_fan_nodes;
......@@ -664,6 +666,7 @@ terminal_init( terminal_state * s,
s->total_msg_time = 0.0;
s->total_msg_size = 0;
rc_stack_create(&s->st);
s->num_vcs = 1;
s->vc_occupancy = (int*)malloc(s->num_vcs * sizeof(int));
......@@ -1215,12 +1218,21 @@ void packet_arrive_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw
term_ecount--;
term_rev_ecount++;
tw_rand_reverse_unif(lp->rng);
completed_packets--;
if(msg->path_type == MINIMAL)
minimal_count--;
if(msg->path_type == NON_MINIMAL)
nonmin_count--;
uint64_t total_chunks = msg->total_size / s->params->chunk_size;
if(msg->total_size % s->params->chunk_size)
total_chunks++;
if(!total_chunks)
total_chunks = 1;
struct dfly_hash_key key;
key.message_id = msg->message_id;
key.sender_id = msg->sender_lp;
......@@ -1238,28 +1250,19 @@ void packet_arrive_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw
s->finished_packets--;
dragonfly_total_time -= (tw_now(lp) - msg->travel_start_time);
total_hops -= msg->my_N_hop;
}
if(bf->c3)
if(bf->c3)
dragonfly_max_latency = msg->saved_available_time;
if(bf->c8)
{
assert(hash_link);
struct dfly_qhash_entry * tmp = NULL;
tmp = qhash_entry(hash_link, struct dfly_qhash_entry, hash_link);
assert(tmp);
tmp->remote_event_size = 0;
free(tmp->remote_event_data);
}
}
if(bf->c7)
if(msg->saved_completed_chunks >= total_chunks)
{
s->finished_msgs--;
s->total_msg_time -= (tw_now(lp) - msg->msg_start_time);
s->total_msg_size -= msg->total_size;
assert(!hash_link);
//assert(!hash_link);
void *m_data_src = model_net_method_get_edata(DRAGONFLY, msg);
struct dfly_qhash_entry * d_entry = malloc(sizeof(struct dfly_qhash_entry));
d_entry->num_chunks = msg->saved_completed_chunks;
......@@ -1274,38 +1277,26 @@ void packet_arrive_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw
d_entry->remote_event_size = msg->saved_remote_esize;
}
qhash_add(s->rank_tbl, &key, &(d_entry->hash_link));
s->rank_tbl_pop++;
int net_id = model_net_get_id(LP_METHOD_NM);
if(bf->c4)
model_net_event_rc2(lp, &msg->event_rc);
}
if(bf->c5)
{
/* re-initialize the element */
hash_link = qhash_search(s->rank_tbl, &key);
assert(hash_link);
qhash_del(hash_link);
s->rank_tbl_pop--;
}
if(bf->c6)
{
hash_link = NULL;
hash_link = qhash_search(s->rank_tbl, &key);
assert(hash_link);
struct dfly_qhash_entry * tmp2 = NULL;
tmp2 = qhash_entry(hash_link, struct dfly_qhash_entry, hash_link);
assert(tmp2);
tmp2->num_chunks--;
}
msg->my_N_hop--;
tw_rand_reverse_unif(lp->rng);
return;
hash_link = NULL;
hash_link = qhash_search(s->rank_tbl, &key);
assert(hash_link);
struct dfly_qhash_entry * tmp2 = NULL;
tmp2 = qhash_entry(hash_link, struct dfly_qhash_entry, hash_link);
assert(tmp2);
tmp2->num_chunks--;
return;
}
void send_remote_event(terminal_state * s, terminal_message * msg, tw_lp * lp, tw_bf * bf, char * event_data, int remote_event_size)
{
......@@ -1332,6 +1323,7 @@ void send_remote_event(terminal_state * s, terminal_message * msg, tw_lp * lp, t
memcpy(m_remote, event_data, remote_event_size);
tw_event_send(e);
}
return;
}
/* packet arrives at the destination terminal */
void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg,
......@@ -1339,6 +1331,25 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg,
term_ecount++;
// NIC aggregation - should this be a separate function?
// Trigger an event on receiving server
tw_stime ts = g_tw_lookahead + s->params->credit_delay + tw_rand_unif(lp->rng);
if(msg->packet_ID == TRACK && msg->message_id == TRACK_MSG)
printf("\n terminal sending credit at chan %d ", msg->saved_vc);
// no method_event here - message going to router
tw_event * buf_e;
terminal_message * buf_msg;
buf_e = tw_event_new(msg->intm_lp_id, ts, lp);
buf_msg = tw_event_data(buf_e);
buf_msg->magic = router_magic_num;
buf_msg->vc_index = msg->vc_index;
buf_msg->output_chan = msg->output_chan;
buf_msg->type = R_BUFFER;
tw_event_send(buf_e);
bf->c1 = 0;
bf->c2 = 0;
bf->c4 = 0;
......@@ -1350,6 +1361,7 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg,
/* WE do not allow self messages through dragonfly */
assert(lp->gid != msg->src_terminal_id);
int is_last_msg = 0;
int num_chunks = msg->packet_size / s->params->chunk_size;
uint64_t total_chunks = msg->total_size / s->params->chunk_size;
......@@ -1401,7 +1413,7 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg,
struct dfly_hash_key key;
key.message_id = msg->message_id;
key.sender_id = msg->sender_lp;
hash_link = qhash_search(s->rank_tbl, &key);
struct dfly_qhash_entry * tmp = NULL;
......@@ -1409,28 +1421,58 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg,
if(!hash_link)
{
bf->c5 = 1;
struct dfly_qhash_entry * d_entry = malloc(sizeof(struct dfly_qhash_entry));
d_entry->num_chunks = 1;
d_entry->key = key;
d_entry->remote_event_data = NULL;
qhash_add(s->rank_tbl, &key, &(d_entry->hash_link));
struct dfly_qhash_entry d_entry;
d_entry.num_chunks = 0;
d_entry.key = key;
d_entry.remote_event_data = NULL;
qhash_add(s->rank_tbl, &key, &(d_entry.hash_link));
s->rank_tbl_pop++;
}
else {
/* if one exists already then update it*/
bf->c6 = 1;
tmp = qhash_entry(hash_link, struct dfly_qhash_entry, hash_link);
assert(tmp);
tmp->num_chunks++;
}
/* All chunks arrived, issue remote event and delete entry from the hash */
hash_link = NULL;
hash_link = qhash_search(s->rank_tbl, &key);
tmp = qhash_entry(hash_link, struct dfly_qhash_entry, hash_link);
assert(tmp);
tmp->num_chunks++;
if(tmp->num_chunks >= total_chunks)
is_last_msg = 1;
/* if its the last chunk of the packet then handle the remote event data */
if(msg->chunk_id == num_chunks - 1)
{
bf->c1 = 1;
mn_stats* stat = model_net_find_stats(msg->category, s->dragonfly_stats_array);
stat->recv_count++;
stat->recv_bytes += msg->packet_size;
stat->recv_time += tw_now(lp) - msg->travel_start_time;
N_finished_packets++;
s->finished_packets++;
dragonfly_total_time += tw_now( lp ) - msg->travel_start_time;
total_hops += msg->my_N_hop;
if (dragonfly_max_latency < tw_now( lp ) - msg->travel_start_time) {
bf->c3 = 1;
msg->saved_available_time = dragonfly_max_latency;
dragonfly_max_latency = tw_now( lp ) - msg->travel_start_time;
}
}
if(msg->remote_event_size_bytes > 0 && !is_last_msg)
{
/* Retreive the remote event entry */
if(!tmp->remote_event_data)
{
tmp->remote_event_data = (void*)malloc(msg->remote_event_size_bytes);
assert(tmp->remote_event_data);
tmp->remote_event_size = msg->remote_event_size_bytes;
memcpy(tmp->remote_event_data, m_data_src, msg->remote_event_size_bytes);
}
}
/* If all chunks of a message have arrived then send a remote event to the
* callee*/
if(is_last_msg)
{
bf->c7 = 1;
s->finished_msgs++;
s->total_msg_time += (tw_now(lp) - msg->msg_start_time);
s->total_msg_size += msg->total_size;
......@@ -1440,11 +1482,11 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg,
char * remote_data = malloc(msg->remote_event_size_bytes);
memcpy(remote_data, m_data_src, msg->remote_event_size_bytes);
send_remote_event(s, msg, lp, bf, remote_data, msg->remote_event_size_bytes);
rc_stack_push(&lp, remote_data, free, s->st);
}
else
{
void *m_data = model_net_method_get_edata(DRAGONFLY, msg);
assert(tmp->remote_event_size > 0);
send_remote_event(s, msg, lp, bf, tmp->remote_event_data, tmp->remote_event_size);
msg->saved_remote_esize = tmp->remote_event_size;
/* append remote event data to this message */
......@@ -1452,69 +1494,12 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg,
}
msg->saved_completed_chunks = tmp->num_chunks;
/* Remove the hash entry */
qhash_del(hash_link);
rc_stack_push(&lp, tmp->remote_event_data, free, s->st);
rc_stack_push(&lp, tmp, free, s->st);
s->rank_tbl_pop--;
}
tw_stime ts;
msg->my_N_hop++;
if(msg->chunk_id == num_chunks-1)
{
bf->c1 = 1;
mn_stats* stat = model_net_find_stats(msg->category, s->dragonfly_stats_array);
stat->recv_count++;
stat->recv_bytes += msg->packet_size;
stat->recv_time += tw_now(lp) - msg->travel_start_time;
N_finished_packets++;
s->finished_packets++;
dragonfly_total_time += tw_now( lp ) - msg->travel_start_time;
total_hops += msg->my_N_hop;
if (dragonfly_max_latency < tw_now( lp ) - msg->travel_start_time) {
bf->c3 = 1;
msg->saved_available_time = dragonfly_max_latency;
dragonfly_max_latency = tw_now( lp ) - msg->travel_start_time;
}
if(!bf->c7 && msg->remote_event_size_bytes > 0)
{
/* Retreive the remote event entry */
bf->c8 = 1;
hash_link = NULL;
hash_link = qhash_search(s->rank_tbl, &key);
struct dfly_qhash_entry * tmp = NULL;
tmp = qhash_entry(hash_link, struct dfly_qhash_entry, hash_link);
assert(tmp);
tmp->remote_event_data = (void*)malloc(msg->remote_event_size_bytes);
assert(tmp->remote_event_data);
tmp->remote_event_size = msg->remote_event_size_bytes;
memcpy(tmp->remote_event_data, m_data_src, msg->remote_event_size_bytes);
}
}
// NIC aggregation - should this be a separate function?
// Trigger an event on receiving server
ts = g_tw_lookahead + s->params->credit_delay + tw_rand_unif(lp->rng);
if(msg->packet_ID == TRACK && msg->message_id == TRACK_MSG)
printf("\n terminal sending credit at chan %d ", msg->saved_vc);
// no method_event here - message going to router
tw_event * buf_e;
terminal_message * buf_msg;
buf_e = tw_event_new(msg->intm_lp_id, ts, lp);
buf_msg = tw_event_data(buf_e);
buf_msg->magic = router_magic_num;
buf_msg->vc_index = msg->vc_index;
buf_msg->output_chan = msg->output_chan;
buf_msg->type = R_BUFFER;
tw_event_send(buf_e);
return;
}
......@@ -1835,6 +1820,8 @@ terminal_event( terminal_state * s,
{
*(int *)bf = (int)0;
assert(msg->magic == terminal_magic_num);
rc_stack_gc(lp, s->st);
switch(msg->type)
{
case T_GENERATE:
......@@ -1886,6 +1873,14 @@ dragonfly_terminal_final( terminal_state * s,
if(s->terminal_msgs[0] != NULL)
printf("[%lu] leftover terminal messages \n", lp->gid);
qhash_finalize(s->rank_tbl);
rc_stack_destroy(s->st);
free(s->vc_occupancy);
free(s->terminal_msgs);
free(s->terminal_msgs_tail);
free(s->children);
}
void dragonfly_router_final(router_state * s,
......@@ -1980,18 +1975,20 @@ get_next_stop(router_state * s,
dest_group_id = dest_router_id / s->params->num_routers;
/* Generate inter-mediate destination for non-minimal routing (selecting a random group) */
if(msg->last_hop == TERMINAL && path == NON_MINIMAL)
{
msg->intm_group_id = intm_id;
}
/* If the packet has arrived at the destination router */
if(dest_router_id == local_router_id && msg->intm_group_id == -1)
/* If the packet has arrived at the destination router */
if(dest_router_id == local_router_id)
{
dest_lp = msg->dest_terminal_id;
return dest_lp;
}
/* Generate inter-mediate destination for non-minimal routing (selecting a random group) */
if(msg->last_hop == TERMINAL && path == NON_MINIMAL)
{
if((dest_router_id / s->params->num_routers) != s->group_id)
{
msg->intm_group_id = intm_id;
}
}
/******************** DECIDE THE DESTINATION GROUP ***********************/
/* It means that the packet has arrived at the inter-mediate group for non-minimal routing. Reset the group now. */
if(path == NON_MINIMAL && msg->intm_group_id == s->group_id)
......@@ -2003,6 +2000,10 @@ get_next_stop(router_state * s,
{
dest_group_id = msg->intm_group_id;
}
else /* direct the packet to the destination group */
{
dest_group_id = dest_router_id / s->params->num_routers;
}
/********************** DECIDE THE ROUTER IN THE DESTINATION GROUP ***************/
/* It means the packet has arrived at the destination group. Now divert it to the destination router. */
......@@ -2400,8 +2401,8 @@ router_packet_send( router_state * s,
return;
}
if(msg->packet_ID == TRACK && msg->message_id == TRACK_MSG)
printf("\n packet sending origin %ld to router %ld at output port %d ", cur_entry->msg.src_terminal_id, cur_entry->msg.next_stop, output_port);
//if(msg->packet_ID == TRACK && msg->message_id == TRACK_MSG)
// printf("\n packet sending origin %ld to router %ld at output port %d ", cur_entry->msg.src_terminal_id, cur_entry->msg.next_stop, output_port);
int to_terminal = 1, global = 0;
double delay = s->params->cn_delay;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment