Commit a2e9d893 authored by Noah Wolfe's avatar Noah Wolfe
Browse files

Adding neuro synthetic workloads configured with input file

- Connections and average spikes per tick are read in from the chip
connections csv file and each background synthetic nw-lp configures its
parameters accordingly
parent 9a0ed993
......@@ -97,6 +97,15 @@ static int num_nw_lps = 0, num_mpi_lps = 0;
static int num_syn_clients;
// For Reading in neuromorphic chip connection/spike data
int *starting_connection;
int *avg_spikes_per_tick;
int *chip_connections;
static int chip_connections_flag;
char chip_file[8192];
int num_chips_finished_this_tick;
int *times_finished;
FILE * workload_log = NULL;
FILE * msg_size_log = NULL;
FILE * workload_agg_log = NULL;
......@@ -111,7 +120,7 @@ long long num_syn_bytes_sent = 0;
long long num_syn_bytes_recvd = 0;
static unsigned long msgs_per_tick = 0;
static double tick_interval = 0;
static double tick_interval = 1000000;
double max_time = 0, max_comm_time = 0, max_wait_time = 0, max_send_time = 0, max_recv_time = 0;
double avg_time = 0, avg_comm_time = 0, avg_wait_time = 0, avg_send_time = 0, avg_recv_time = 0;
......@@ -278,6 +287,11 @@ struct nw_state
unsigned long num_bytes_recvd;
unsigned long msgs_per_tick_sent;
int num_chip_conns;
int last_conn;
int * chip_conns;
int * spikes_per_conn;
int * num_sends_per_conn;
/* For sampling data */
int sampling_indx;
......@@ -325,6 +339,8 @@ struct nw_message
int64_t saved_min_bytes;
int64_t saved_max_bytes;
int64_t saved_msgs_per_tick_sent;
int saved_last_conn;
int saved_conn_indx;
} rc;
};
......@@ -644,15 +660,41 @@ static void gen_synthetic_tr_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp
if(bf->c0)
return;
s->msgs_per_tick_sent--;
if(bf->c17){
s->msgs_per_tick_sent = m->rc.saved_msgs_per_tick_sent;
if(chip_connections_flag == 0){
tw_rand_reverse_unif(lp->rng);
if(bf->c17){
s->msgs_per_tick_sent = m->rc.saved_msgs_per_tick_sent;
num_chips_finished_this_tick--;
//printf("Rolling back finished chip:%d chips finished before: %d, LP:%llu, msgs_per_tick_sent before:%llu, msgs_per_tick:%llu, tick_interval:%llu, tw_now:%llu\n", s->local_rank, num_chips_finished_this_tick, LLU(lp->gid),LLU(s->msgs_per_tick_sent), LLU(msgs_per_tick), LLU(tick_interval), LLU(tw_now(lp)));
}
}
else if(chip_connections_flag == 1){
if(s->num_chip_conns == 0)
return;
if(bf->c20){
s->last_conn = m->rc.saved_last_conn;
}
if(bf->c18){
s->last_conn = m->rc.saved_last_conn;
for(int i=0; i<s->num_chip_conns; i++){
s->num_sends_per_conn[i] = s->spikes_per_conn[i];
}
num_chips_finished_this_tick--;
times_finished[s->local_rank]--;
s->msgs_per_tick_sent--;
//printf("Rolling back finished chip:%d chips finished before: %d, LP:%llu, msgs_per_tick_sent before:%llu, msgs_per_tick:%llu, tick_interval:%llu, tw_now:%llu\n", s->local_rank, num_chips_finished_this_tick, LLU(lp->gid),LLU(s->msgs_per_tick_sent), LLU(msgs_per_tick), LLU(tick_interval), LLU(tw_now(lp)));
}
s->num_sends_per_conn[m->rc.saved_conn_indx]--;
}
s->msgs_per_tick_sent--;
if(bf->c21)
s->compute_time = m->rc.saved_delay;
model_net_event_rc2(lp, &m->event_rc);
s->num_bytes_sent -= PAYLOAD_SZ;
num_bytes_sent -= PAYLOAD_SZ;
s->num_sends--;
s->compute_time = m->rc.saved_delay;
if(enable_sampling)
{
......@@ -662,6 +704,7 @@ static void gen_synthetic_tr_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp
s->mpi_wkld_samples[indx].num_bytes_sent_sample -= PAYLOAD_SZ;
increment_sampling_check_rc(s, bf);
}
tw_rand_reverse_unif(lp->rng);
}
/* generate synthetic traffic */
......@@ -680,13 +723,69 @@ static void gen_synthetic_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * l
jid = codes_jobmap_to_local_id(s->nw_id, jobmap_ctx);
int num_clients = codes_jobmap_get_num_ranks(jid.job, jobmap_ctx);
int dest_svr = tw_rand_integer(lp->rng, 0, num_clients - 1);
if(dest_svr == s->local_rank)
{
dest_svr = (s->local_rank + 1) % num_clients;
int dest_svr;
int tick_msgs_finished = 0;
// Determine the destination local process ID to send a message to
if(chip_connections_flag == 0){
dest_svr = tw_rand_integer(lp->rng, 0, num_clients - 1);
if(dest_svr == s->local_rank)
{
dest_svr = (s->local_rank + 1) % num_clients;
}
//Check if all spikes have been sent for this tick
if(s->msgs_per_tick_sent >= msgs_per_tick){
bf->c17 = 1;
tick_msgs_finished = 1;
m->rc.saved_msgs_per_tick_sent = s->msgs_per_tick_sent;
s->msgs_per_tick_sent = 0;
num_chips_finished_this_tick++;
}
}
else if(chip_connections_flag == 1){
if(s->local_rank == 0){
int five = 5;
five++;
}
if(s->num_chip_conns == 0)
return;
dest_svr = -1;
int conn_indx;
//Find destination chip
for( int i=0; i<s->num_chip_conns; i++ ){
conn_indx = (s->last_conn + i) % s->num_chip_conns;
if(s->num_sends_per_conn[conn_indx] < s->spikes_per_conn[conn_indx]){
bf->c20 = 1;
dest_svr = s->chip_conns[conn_indx];
m->rc.saved_last_conn = s->last_conn;
s->last_conn = (conn_indx + 1) % s->num_chip_conns;
m->rc.saved_conn_indx = conn_indx;
break;
}
}
//Check if all spikes have been sent for this tick and setup/send for next tick
if( dest_svr == -1 ){
bf->c18 = 1;
tick_msgs_finished = 1; // Indicates all messages for the current tick window have been sent
m->rc.saved_last_conn = s->last_conn;
conn_indx = 0;
s->last_conn = 1;
for(int i=0;i<s->num_chip_conns;i++){
s->num_sends_per_conn[i] = 0;
}
dest_svr = s->chip_conns[conn_indx];
m->rc.saved_conn_indx = conn_indx;
num_chips_finished_this_tick++;
times_finished[s->local_rank]++;
if(times_finished[s->local_rank] > 1){
printf("\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\nchip %d finished too many times\n",s->local_rank);
exit(0);
}
}
s->num_sends_per_conn[conn_indx]++;
}
assert(dest_svr < num_clients);
s->msgs_per_tick_sent++;
jid.rank = dest_svr;
int intm_dest_id = codes_jobmap_to_global_id(jid, jobmap_ctx);
......@@ -723,17 +822,16 @@ static void gen_synthetic_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * l
/* Check if num messages sent has reached total for the current tick window */
/* If so, issue next message to start at the beggining of the next tick */
//printf("LP:%llu, msgs_per_tick_sent:%llu, msgs_per_tick:%llu, tick_delay:%d, tick_interval:%llu, tw_now:%llu\n", LLU(lp->gid),LLU(s->msgs_per_tick_sent), LLU(msgs_per_tick), tick_delay, LLU(tick_interval), LLU(tw_now(lp)));
if(s->msgs_per_tick_sent >= msgs_per_tick){
if(tick_msgs_finished){
tick_delay = tick_interval - (LLU(tw_now(lp)) % LLU(tick_interval));
m->rc.saved_delay = s->compute_time;
s->compute_time += tick_delay;
m->rc.saved_msgs_per_tick_sent = s->msgs_per_tick_sent;
s->msgs_per_tick_sent = 0;
bf->c17 = 1;
if(LLU(lp->gid) == 0)
printf("All messages transfered before tick. LP:%llu, msgs_per_tick_sent:%llu, msgs_per_tick:%llu, tick_delay:%d, tick_interval:%llu, tw_now:%llu\n", LLU(lp->gid),LLU(s->msgs_per_tick_sent), LLU(msgs_per_tick), tick_delay, LLU(tick_interval), LLU(tw_now(lp)));
bf->c21 = 1;
//if(LLU(lp->gid) == 0)
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
//printf("process %d. All messages transfered before tick. chip_flag:%d, chip:%d, times_finished:%d chips finished: %d, LP:%llu, msgs_per_tick_sent:%llu, msgs_per_tick:%llu, tick_delay:%d, tick_interval:%llu, tw_now:%llu\n", world_rank, chip_connections_flag, s->local_rank, times_finished[s->local_rank], num_chips_finished_this_tick, LLU(lp->gid),LLU(s->msgs_per_tick_sent), LLU(msgs_per_tick), tick_delay, LLU(tick_interval), LLU(tw_now(lp)));
}
s->msgs_per_tick_sent++;
/* New event after MEAN_INTERVAL */
tw_stime ts = 0.1 + tick_delay + mean_interval + tw_rand_exponential(lp->rng, noise);
......@@ -1934,15 +2032,55 @@ void nw_test_init(nw_state* s, tw_lp* lp)
if(strcmp(file_name_of_job[lid.job], "synthetic") == 0)
{
tw_event * e;
nw_message * m_new;
tw_stime ts = tw_rand_exponential(lp->rng, mean_interval/1000);
e = tw_event_new(lp->gid, ts, lp);
m_new = (nw_message*)tw_event_data(e);
m_new->msg_type = CLI_BCKGND_GEN;
tw_event_send(e);
is_synthetic = 1;
if(chip_connections_flag == 0){
num_chips_finished_this_tick = 0;
tw_event * e;
nw_message * m_new;
tw_stime ts = tw_rand_exponential(lp->rng, mean_interval/1000);
e = tw_event_new(lp->gid, ts, lp);
m_new = (nw_message*)tw_event_data(e);
m_new->msg_type = CLI_BCKGND_GEN;
tw_event_send(e);
is_synthetic = 1;
}
else{
// Set neuromorphic params
num_chips_finished_this_tick = 0;
s->last_conn = 0;
s->num_chip_conns = 0;
if( starting_connection[s->local_rank] >= 0 ){
int start = starting_connection[s->local_rank];
int i = 1;
int end = starting_connection[s->local_rank + i];
s->num_chip_conns = end-start;
s->chip_conns = (int*)malloc(s->num_chip_conns*sizeof(int));
s->spikes_per_conn = (int*)malloc(s->num_chip_conns*sizeof(int));
s->num_sends_per_conn = (int*)malloc(s->num_chip_conns*sizeof(int));
while( end == -1 ){
if(s->local_rank + i == num_traces_of_job[lid.job]){
s->num_chip_conns = 0;
break;
}
end = starting_connection[s->local_rank + i];
i++;
}
int indx = 0;
for( i=start; i<end; i++){
s->chip_conns[indx] = chip_connections[i];
s->spikes_per_conn[indx] = avg_spikes_per_tick[i];
s->num_sends_per_conn[indx] = 0;
indx++;
}
tw_event * e;
nw_message * m_new;
tw_stime ts = tw_rand_exponential(lp->rng, mean_interval/1000);
e = tw_event_new(lp->gid, ts, lp);
m_new = (nw_message*)tw_event_data(e);
m_new->msg_type = CLI_BCKGND_GEN;
tw_event_send(e);
is_synthetic = 1;
}
}
}
else
{
......@@ -2581,8 +2719,10 @@ const tw_optdef app_opt [] =
TWOPT_CHAR("lp-io-dir", lp_io_dir, "Where to place io output (unspecified -> no output"),
TWOPT_UINT("lp-io-use-suffix", lp_io_use_suffix, "Whether to append uniq suffix to lp-io directory (default 0)"),
TWOPT_CHAR("offset_file", offset_file, "offset file name"),
TWOPT_UINT("msgs_per_tick", msgs_per_tick, "Number of messages for each node to transfer between each simulated tick (Default 0)"),
TWOPT_STIME("tick_interval", tick_interval, "Length of a tick in nanoseconds (Default 0)"),
TWOPT_STIME("tick_interval", tick_interval, "Length of a tick in nanoseconds (Default 1000000)"),
TWOPT_UINT("msgs_per_tick", msgs_per_tick, "Number of messages for each node to transfer between each simulated tick (Default 0, indicating value read from chip_file)"),
TWOPT_UINT("chip_connections-flag", chip_connections_flag, "0: chip connections are random all-to-all. 1: chip connections are read from chip_file (Default 0)"),
TWOPT_CHAR("chip_file", chip_file, "File containing the neuromorphic chip connections and avg spike per tick data"),
TWOPT_STIME("payload_size", PAYLOAD_SZ, "size, in bytes, of the messages transfered in the background traffic (Default 8B)"),
#ifdef ENABLE_CORTEX_PYTHON
TWOPT_CHAR("cortex-file", cortex_file, "Python file (without .py) containing the CoRtEx translation class"),
......@@ -2757,6 +2897,40 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
jobmap_ctx = codes_jobmap_configure(CODES_JOBMAP_LIST, &jobmap_p);
}
}
// Read in the neuromorphic chip connection/spike data
starting_connection = (int*)calloc(num_syn_clients+1,sizeof(int));
avg_spikes_per_tick = (int*)malloc(num_syn_clients*num_syn_clients*sizeof(int));
chip_connections = (int*)malloc(num_syn_clients*num_syn_clients*sizeof(int));
times_finished = (int*)calloc(num_syn_clients,sizeof(int));
int src_chip, dst_chip, avg_spike, conn_indx = 0;
for( int i=0; i<num_syn_clients; i++ ){
starting_connection[i] = -1; //Negative one indicates no connections for the given chip
}
if(chip_connections_flag){
FILE *chip_conn_file = fopen(chip_file, "r");
if(!chip_conn_file)
tw_error(TW_LOC, "\n Could not open file %s ", chip_file);
while(!feof(chip_conn_file)){
char ref = fscanf(chip_conn_file, "%d,%d,%d",&src_chip, &dst_chip, &avg_spike);
if( starting_connection[src_chip] == -1 )
starting_connection[src_chip] = conn_indx;
chip_connections[conn_indx] = dst_chip;
avg_spikes_per_tick[conn_indx] = avg_spike;
conn_indx++;
assert(src_chip < num_syn_clients);
if(dst_chip >= num_syn_clients)
printf("placeholder");
assert(dst_chip < num_syn_clients);
assert(conn_indx < num_syn_clients*num_syn_clients);
}
starting_connection[src_chip+1] = conn_indx;
fclose(chip_conn_file);
}
MPI_Comm_rank(MPI_COMM_CODES, &rank);
MPI_Comm_size(MPI_COMM_CODES, &nprocs);
......@@ -2841,6 +3015,20 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
double total_avg_recv_time, total_max_recv_time;
double g_total_syn_data;
int chips_finished;
int *total_times_finished = (int*)malloc(num_net_traces*sizeof(int));
MPI_Allreduce(&num_chips_finished_this_tick,&chips_finished,1,MPI_INT,MPI_SUM,MPI_COMM_WORLD);
MPI_Allreduce(times_finished,total_times_finished,num_net_traces,MPI_INT,MPI_SUM,MPI_COMM_WORLD);
printf("chips finished: %d\n",chips_finished);
if(!g_tw_mynode){
for(int i=0; i<num_net_traces; i++){
if(total_times_finished[i] == 0)
printf("chip %d didn't finish\n", i);
if(total_times_finished[i] > 1)
printf("chip %d finished too many times\n", i);
}
}
MPI_Reduce(&num_bytes_sent, &total_bytes_sent, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_CODES);
MPI_Reduce(&num_bytes_recvd, &total_bytes_recvd, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_CODES);
MPI_Reduce(&max_comm_time, &max_comm_run_time, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_CODES);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment