Commit ef443c39 authored by Noah Wolfe's avatar Noah Wolfe

CODES-NeMo message aggregation finished

- Aggregates both types of NeMo synthetic workloads (from file or
function)
- Variable naming changes
parent 8a89e794
......@@ -101,10 +101,13 @@ static int num_syn_clients;
int *starting_connection;
int *avg_spikes_per_tick;
int *chip_connections;
static int chip_connections_flag;
static int chip_connections_flag = 0;
char chip_file[8192];
int num_chips_finished_this_tick;
int *times_finished;
static int spike_aggregation_flag = 0;
int aggregation_overhead = 0;
int spike_size = 8;
FILE * workload_log = NULL;
FILE * msg_size_log = NULL;
......@@ -119,7 +122,7 @@ long long num_bytes_recvd=0;
long long num_syn_bytes_sent = 0;
long long num_syn_bytes_recvd = 0;
static unsigned long msgs_per_tick = 0;
static unsigned long spikes_per_tick = 0;
static double tick_interval = 1000000;
double max_time = 0, max_comm_time = 0, max_wait_time = 0, max_send_time = 0, max_recv_time = 0;
......@@ -286,7 +289,7 @@ struct nw_state
unsigned long num_bytes_sent;
unsigned long num_bytes_recvd;
unsigned long msgs_per_tick_sent;
unsigned long spikes_per_tick_sent;
int num_chip_conns;
int last_conn;
int * chip_conns;
......@@ -338,7 +341,7 @@ struct nw_message
int64_t saved_num_bytes;
int64_t saved_min_bytes;
int64_t saved_max_bytes;
int64_t saved_msgs_per_tick_sent;
int64_t saved_spikes_per_tick_sent;
int saved_last_conn;
int saved_conn_indx;
} rc;
......@@ -655,6 +658,20 @@ void finish_nbr_wkld(
notify_neighbor(ns, lp, b, msg);
}
int get_payload_sz(nw_state * s, int conn_indx, int num_clients)
{
PAYLOAD_SZ = spike_size;
if(spike_aggregation_flag == 1){
if(chip_connections_flag == 0){
PAYLOAD_SZ = spike_size * (spikes_per_tick / (num_clients - 1));
}else{
PAYLOAD_SZ = spike_size * (s->num_sends_per_conn[conn_indx]);
}
}
return PAYLOAD_SZ;
}
static void gen_synthetic_tr_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
if(bf->c0)
......@@ -663,9 +680,9 @@ static void gen_synthetic_tr_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp
if(chip_connections_flag == 0){
tw_rand_reverse_unif(lp->rng);
if(bf->c17){
s->msgs_per_tick_sent = m->rc.saved_msgs_per_tick_sent;
s->spikes_per_tick_sent = m->rc.saved_spikes_per_tick_sent;
num_chips_finished_this_tick--;
//printf("Rolling back finished chip:%d chips finished before: %d, LP:%llu, msgs_per_tick_sent before:%llu, msgs_per_tick:%llu, tick_interval:%llu, tw_now:%llu\n", s->local_rank, num_chips_finished_this_tick, LLU(lp->gid),LLU(s->msgs_per_tick_sent), LLU(msgs_per_tick), LLU(tick_interval), LLU(tw_now(lp)));
//printf("Rolling back finished chip:%d chips finished before: %d, LP:%llu, spikes_per_tick_sent before:%llu, spikes_per_tick:%llu, tick_interval:%llu, tw_now:%llu\n", s->local_rank, num_chips_finished_this_tick, LLU(lp->gid),LLU(s->spikes_per_tick_sent), LLU(spikes_per_tick), LLU(tick_interval), LLU(tw_now(lp)));
}
}
else if(chip_connections_flag == 1){
......@@ -681,17 +698,22 @@ static void gen_synthetic_tr_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp
}
num_chips_finished_this_tick--;
times_finished[s->local_rank]--;
s->msgs_per_tick_sent--;
//printf("Rolling back finished chip:%d chips finished before: %d, LP:%llu, msgs_per_tick_sent before:%llu, msgs_per_tick:%llu, tick_interval:%llu, tw_now:%llu\n", s->local_rank, num_chips_finished_this_tick, LLU(lp->gid),LLU(s->msgs_per_tick_sent), LLU(msgs_per_tick), LLU(tick_interval), LLU(tw_now(lp)));
s->spikes_per_tick_sent--;
//printf("Rolling back finished chip:%d chips finished before: %d, LP:%llu, spikes_per_tick_sent before:%llu, spikes_per_tick:%llu, tick_interval:%llu, tw_now:%llu\n", s->local_rank, num_chips_finished_this_tick, LLU(lp->gid),LLU(s->spikes_per_tick_sent), LLU(spikes_per_tick), LLU(tick_interval), LLU(tw_now(lp)));
}
s->num_sends_per_conn[m->rc.saved_conn_indx]--;
}
s->msgs_per_tick_sent--;
s->spikes_per_tick_sent--;
if(bf->c21)
s->compute_time = m->rc.saved_delay;
model_net_event_rc2(lp, &m->event_rc);
struct codes_jobmap_id jid;
jid = codes_jobmap_to_local_id(s->nw_id, jobmap_ctx);
int num_clients = codes_jobmap_get_num_ranks(jid.job, jobmap_ctx);
PAYLOAD_SZ = get_payload_sz(s, m->rc.saved_conn_indx, num_clients);
s->num_bytes_sent -= PAYLOAD_SZ;
num_bytes_sent -= PAYLOAD_SZ;
s->num_sends--;
......@@ -725,6 +747,7 @@ static void gen_synthetic_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * l
int num_clients = codes_jobmap_get_num_ranks(jid.job, jobmap_ctx);
int dest_svr;
int tick_msgs_finished = 0;
int conn_indx;
// Determine the destination local process ID to send a message to
if(chip_connections_flag == 0){
dest_svr = tw_rand_integer(lp->rng, 0, num_clients - 1);
......@@ -733,23 +756,18 @@ static void gen_synthetic_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * l
dest_svr = (s->local_rank + 1) % num_clients;
}
//Check if all spikes have been sent for this tick
if(s->msgs_per_tick_sent >= msgs_per_tick){
if(s->spikes_per_tick_sent >= spikes_per_tick){
bf->c17 = 1;
tick_msgs_finished = 1;
m->rc.saved_msgs_per_tick_sent = s->msgs_per_tick_sent;
s->msgs_per_tick_sent = 0;
m->rc.saved_spikes_per_tick_sent = s->spikes_per_tick_sent;
s->spikes_per_tick_sent = 0;
num_chips_finished_this_tick++;
}
}
else if(chip_connections_flag == 1){
if(s->local_rank == 0){
int five = 5;
five++;
}
if(s->num_chip_conns == 0)
return;
dest_svr = -1;
int conn_indx;
//Find destination chip
for( int i=0; i<s->num_chip_conns; i++ ){
conn_indx = (s->last_conn + i) % s->num_chip_conns;
......@@ -781,16 +799,22 @@ static void gen_synthetic_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * l
exit(0);
}*/
}
s->num_sends_per_conn[conn_indx]++;
if(spike_aggregation_flag == 0){
s->num_sends_per_conn[conn_indx]++;
}else{
s->num_sends_per_conn[conn_indx] = s->spikes_per_conn[conn_indx];
}
}
assert(dest_svr < num_clients);
s->msgs_per_tick_sent++;
s->spikes_per_tick_sent++;
jid.rank = dest_svr;
int intm_dest_id = codes_jobmap_to_global_id(jid, jobmap_ctx);
global_dest_id = codes_mapping_get_lpid_from_relative(intm_dest_id, NULL, NW_LP_NM, NULL, 0);
PAYLOAD_SZ = get_payload_sz(s, conn_indx, num_clients);
nw_message remote_m;
remote_m.fwd.sim_start_time = tw_now(lp);
remote_m.fwd.dest_rank = dest_svr;
......@@ -821,7 +845,7 @@ static void gen_synthetic_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * l
int tick_delay = 0;
/* Check if num messages sent has reached total for the current tick window */
/* If so, issue next message to start at the beggining of the next tick */
//printf("LP:%llu, msgs_per_tick_sent:%llu, msgs_per_tick:%llu, tick_delay:%d, tick_interval:%llu, tw_now:%llu\n", LLU(lp->gid),LLU(s->msgs_per_tick_sent), LLU(msgs_per_tick), tick_delay, LLU(tick_interval), LLU(tw_now(lp)));
//printf("LP:%llu, spikes_per_tick_sent:%llu, spikes_per_tick:%llu, tick_delay:%d, tick_interval:%llu, tw_now:%llu\n", LLU(lp->gid),LLU(s->spikes_per_tick_sent), LLU(spikes_per_tick), tick_delay, LLU(tick_interval), LLU(tw_now(lp)));
if(tick_msgs_finished){
tick_delay = tick_interval - (LLU(tw_now(lp)) % LLU(tick_interval));
m->rc.saved_delay = s->compute_time;
......@@ -830,11 +854,24 @@ static void gen_synthetic_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * l
//if(LLU(lp->gid) == 0)
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
//printf("process %d. All messages transfered before tick. chip_flag:%d, chip:%d, times_finished:%d chips finished: %d, LP:%llu, msgs_per_tick_sent:%llu, msgs_per_tick:%llu, tick_delay:%d, tick_interval:%llu, tw_now:%llu\n", world_rank, chip_connections_flag, s->local_rank, times_finished[s->local_rank], num_chips_finished_this_tick, LLU(lp->gid),LLU(s->msgs_per_tick_sent), LLU(msgs_per_tick), tick_delay, LLU(tick_interval), LLU(tw_now(lp)));
//printf("process %d. All messages transfered before tick. chip_flag:%d, chip:%d, times_finished:%d chips finished: %d, LP:%llu, spikes_per_tick_sent:%llu, spikes_per_tick:%llu, tick_delay:%d, tick_interval:%llu, tw_now:%llu\n", world_rank, chip_connections_flag, s->local_rank, times_finished[s->local_rank], num_chips_finished_this_tick, LLU(lp->gid),LLU(s->spikes_per_tick_sent), LLU(spikes_per_tick), tick_delay, LLU(tick_interval), LLU(tw_now(lp)));
}
int aggregation_delay = 0;
if(spike_aggregation_flag == 1){
if(chip_connections_flag == 0){
aggregation_delay = spikes_per_tick * mean_interval + (PAYLOAD_SZ / 8) * aggregation_overhead;
}else{
int total_chip_spikes = 0;
for(int i=0; i>s->num_chip_conns; i++){
total_chip_spikes += s->spikes_per_conn[i];
}
aggregation_delay = total_chip_spikes * mean_interval + s->spikes_per_conn[conn_indx] * aggregation_overhead;
}
}
/* New event after MEAN_INTERVAL */
tw_stime ts = 0.1 + tick_delay + mean_interval + tw_rand_exponential(lp->rng, noise);
tw_stime ts = 0.1 + aggregation_delay + tick_delay + mean_interval + tw_rand_exponential(lp->rng, noise);
tw_event * e;
nw_message * m_new;
e = tw_event_new(lp->gid, ts, lp);
......@@ -850,6 +887,7 @@ void arrive_syn_tr_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
(void)lp;
// printf("\n Data arrived %d total data %ld ", m->fwd.num_bytes, s->syn_data);
int data = m->fwd.num_bytes;
PAYLOAD_SZ = data;
s->num_bytes_recvd -= data;
s->num_recvs--;
s->send_time = m->rc.saved_send_time;
......@@ -869,6 +907,7 @@ void arrive_syn_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
// printf("\n Data arrived %d total data %ld ", m->fwd.num_bytes, s->syn_data);
int data = m->fwd.num_bytes;
PAYLOAD_SZ = data;
s->num_bytes_recvd += data;
s->num_recvs++;
m->rc.saved_send_time = s->send_time;
......@@ -2025,7 +2064,7 @@ void nw_test_init(nw_state* s, tw_lp* lp)
s->compute_time = 0;
s->elapsed_time = 0;
s->msgs_per_tick_sent = 0;
s->spikes_per_tick_sent = 0;
s->app_id = lid.job;
s->local_rank = lid.rank;
......@@ -2720,10 +2759,12 @@ const tw_optdef app_opt [] =
TWOPT_UINT("lp-io-use-suffix", lp_io_use_suffix, "Whether to append uniq suffix to lp-io directory (default 0)"),
TWOPT_CHAR("offset_file", offset_file, "offset file name"),
TWOPT_STIME("tick_interval", tick_interval, "Length of a tick in nanoseconds (Default 1000000)"),
TWOPT_UINT("msgs_per_tick", msgs_per_tick, "Number of messages for each node to transfer between each simulated tick (Default 0, indicating value read from chip_file)"),
TWOPT_UINT("spikes_per_tick", spikes_per_tick, "Number of messages for each node to transfer between each simulated tick (Default 0, indicating value read from chip_file)"),
TWOPT_UINT("chip_connections-flag", chip_connections_flag, "0: chip connections are random all-to-all. 1: chip connections are read from chip_file (Default 0)"),
TWOPT_CHAR("chip_file", chip_file, "File containing the neuromorphic chip connections and avg spike per tick data"),
TWOPT_STIME("payload_size", PAYLOAD_SZ, "size, in bytes, of the messages transfered in the background traffic (Default 8B)"),
TWOPT_UINT("spike_size", spike_size, "size, in bytes, of the spikes transfered in the background traffic (Default 8B)"),
TWOPT_UINT("spike_aggregation_flag", spike_aggregation_flag, "0: Spikes are sent as individual messages. 1: Spikes within the same tick destined for the same chip are sent in one message using the aggregation-overhead (Default 0)"),
TWOPT_UINT("aggregation_overhead", aggregation_overhead, "Nanosecond delay per spike used when aggregating spikes into one message per chip connection (Default 0)"),
#ifdef ENABLE_CORTEX_PYTHON
TWOPT_CHAR("cortex-file", cortex_file, "Python file (without .py) containing the CoRtEx translation class"),
TWOPT_CHAR("cortex-class", cortex_class, "Python class implementing the CoRtEx translator"),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment