dragonfly.c 54.6 KB
Newer Older
Philip Carns's avatar
Philip Carns committed
1
2
3
4
5
6
/*
 * Copyright (C) 2013 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

7
8
9
10
// Local router ID: 0 --- total_router-1
// Router LP ID 
// Terminal LP ID

11
12
13
14
15
16
#include <ross.h>

#include "codes/codes_mapping.h"
#include "codes/codes.h"
#include "codes/model-net.h"
#include "codes/model-net-method.h"
17
18
#include "codes/model-net-lp.h"
#include "codes/net/dragonfly.h"
19
20
21
22
23

#define CHUNK_SIZE 32.0
#define CREDIT_SIZE 8
#define MEAN_PROCESS 1.0

24
25
26
/* collective specific parameters */
#define TREE_DEGREE 4
#define LEVEL_DELAY 1000
27
#define DRAGONFLY_COLLECTIVE_DEBUG 0
28
29
30
31
#define NUM_COLLECTIVES  1
#define COLLECTIVE_COMPUTATION_DELAY 5700
#define DRAGONFLY_FAN_OUT_DELAY 20.0

32
33
34
35
36
// debugging parameters
#define TRACK 235221
#define PRINT_ROUTER_TABLE 1
#define DEBUG 1

37
38
39
#define LP_CONFIG_NM (model_net_lp_config_names[DRAGONFLY])
#define LP_METHOD_NM (model_net_method_names[DRAGONFLY])

40
41
42
43
44
45
46
47
48
49
50
51
// arrival rate
static double MEAN_INTERVAL=200.0;
/* radix of a dragonfly router = number of global channels + number of
 * compute node channels + number of local router channels */
static int radix=0;

/* configurable parameters, coming from the codes config file*/
/* number of virtual channels, number of routers comes from the
 * config file, number of compute nodes, global channels and group
 * is calculated from these configurable parameters */
static int num_vcs, num_routers, num_cn, num_global_channels, num_groups;

52
53
54
55
/* adaptive threshold is to bias the adaptive routing */
static int total_routers, adaptive_threshold = 10;

/* minimal and non-minimal packet counts for adaptive routing*/
56
int minimal_count=0, nonmin_count=0;
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93

/* configurable parameters, global channel, local channel and 
 * compute node bandwidth */
static double global_bandwidth, local_bandwidth, cn_bandwidth;

/*configurable parameters, global virtual channel size, local
 * virtual channel size and compute node channel size */
static int global_vc_size, local_vc_size, cn_vc_size;

/* global variables for codes mapping */
static char lp_group_name[MAX_NAME_LENGTH], lp_type_name[MAX_NAME_LENGTH];
static int mapping_grp_id, mapping_type_id, mapping_rep_id, mapping_offset;

/* handles terminal and router events like packet generate/send/receive/buffer */
typedef enum event_t event_t;

typedef struct terminal_state terminal_state;
typedef struct router_state router_state;

/* dragonfly compute node data structure */
struct terminal_state
{
   unsigned long long packet_counter;

   // Dragonfly specific parameters
   unsigned int router_id;
   unsigned int terminal_id;

   // Each terminal will have an input and output channel with the router
   int* vc_occupancy; // NUM_VC
   int* output_vc_state;
   tw_stime terminal_available_time;
   tw_stime next_credit_available_time;
// Terminal generate, sends and arrival T_SEND, T_ARRIVAL, T_GENERATE
// Router-Router Intra-group sends and receives RR_LSEND, RR_LARRIVE
// Router-Router Inter-group sends and receives RR_GSEND, RR_GARRIVE
   struct mn_stats dragonfly_stats_array[CATEGORY_MAX];
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
  /* collective init time */
  tw_stime collective_init_time;

  /* node ID in the tree */ 
   tw_lpid node_id;

   /* messages sent & received in collectives may get interchanged several times so we have to save the 
     origin server information in the node's state */
   tw_lpid origin_svr; 
  
  /* parent node ID of the current node */
   tw_lpid parent_node_id;
   /* array of children to be allocated in terminal_init*/
   tw_lpid* children;

   /* children of a node can be less than or equal to the tree degree */
   int num_children;

   short is_root;
   short is_leaf;

   /* to maintain a count of child nodes that have fanned in at the parent during the collective
      fan-in phase*/
   int num_fan_nodes;
118
};
119

120
121
122
123
124
125
126
127
128
/* terminal event type (1-4) */
enum event_t
{
  T_GENERATE=1,
  T_ARRIVE,
  T_SEND,
  T_BUFFER,
  R_SEND,
  R_ARRIVE,
129
130
131
132
  R_BUFFER,
  D_COLLECTIVE_INIT,
  D_COLLECTIVE_FAN_IN,
  D_COLLECTIVE_FAN_OUT
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
};
/* status of a virtual channel can be idle, active, allocated or wait for credit */
enum vc_status
{
   VC_IDLE,
   VC_ACTIVE,
   VC_ALLOC,
   VC_CREDIT
};

/* whether the last hop of a packet was global, local or a terminal */
enum last_hop
{
   GLOBAL,
   LOCAL,
   TERMINAL
};

/* three forms of routing algorithms available, adaptive routing is not
 * accurate and fully functional in the current version as the formulas
 * for detecting load on global channels are not very accurate */
enum ROUTING_ALGO
{
156
157
158
    MINIMAL = 0,
    NON_MINIMAL,
    ADAPTIVE
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
};

struct router_state
{
   unsigned int router_id;
   unsigned int group_id;
  
   int* global_channel; 
   tw_stime* next_output_available_time;
   tw_stime* next_credit_available_time;
   int* vc_occupancy;
   int* output_vc_state;
};

static short routing = MINIMAL;

static int head_delay;
static uint64_t num_chunks;

static tw_stime         dragonfly_total_time = 0;
static tw_stime         dragonfly_max_latency = 0;
180
static tw_stime         max_collective = 0;
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196


static long long       total_hops = 0;
static long long       N_finished_packets = 0;

/* returns the dragonfly router lp type for lp registration */
static const tw_lptype* dragonfly_get_router_lp_type(void);

/* returns the dragonfly message size */
static int dragonfly_get_msg_sz(void)
{
	   return sizeof(terminal_message);
}


/* setup the dragonfly model, initialize global parameters */
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
static void dragonfly_setup(const void* net_params)
{
   dragonfly_param* d_param = (dragonfly_param*)net_params;

   num_vcs = d_param->num_vcs;
   num_routers = d_param->num_routers;
   num_cn = num_routers/2;
   num_global_channels = num_routers/2;
   num_groups = num_routers * num_cn + 1; 

   global_bandwidth = d_param->global_bandwidth;
   local_bandwidth = d_param->local_bandwidth;
   cn_bandwidth = d_param->cn_bandwidth;

   global_vc_size = d_param->global_vc_size;
   local_vc_size = d_param->local_vc_size;
   cn_vc_size = d_param->cn_vc_size;
   routing = d_param->routing;

216
   radix = num_vcs * (num_cn + num_global_channels + num_routers);
217
218
219
220
221
222
223
224
225
226
227
   total_routers = num_groups * num_routers;
   lp_type_register("dragonfly_router", dragonfly_get_router_lp_type());
   return;
}

/* report dragonfly statistics like average and maximum packet latency, average number of hops traversed */
static void dragonfly_report_stats()
{
/* TODO: Add dragonfly packet average, maximum latency and average number of hops traversed */
   long long avg_hops, total_finished_packets;
   tw_stime avg_time, max_time;
228
   int total_minimal_packets, total_nonmin_packets;
229
230
231
232
233

   MPI_Reduce( &total_hops, &avg_hops, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
   MPI_Reduce( &N_finished_packets, &total_finished_packets, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
   MPI_Reduce( &dragonfly_total_time, &avg_time, 1,MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
   MPI_Reduce( &dragonfly_max_latency, &max_time, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
234
235
236
237
238
   if(routing == ADAPTIVE)
    {
	MPI_Reduce(&minimal_count, &total_minimal_packets, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
 	MPI_Reduce(&nonmin_count, &total_nonmin_packets, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
    }
239
240
241
242
243

   /* print statistics */
   if(!g_tw_mynode)
   {
      printf(" Average number of hops traversed %f average message latency %lf us maximum message latency %lf us \n", (float)avg_hops/total_finished_packets, avg_time/(total_finished_packets*1000), max_time/1000);
244
245
246
247
     if(routing == ADAPTIVE)
              printf("\n ADAPTIVE ROUTING STATS: %d packets routed minimally %d packets routed non-minimally ", total_minimal_packets, total_nonmin_packets);
 
  }
248
249
   return;
}
250

251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
void dragonfly_collective_init(terminal_state * s,
           		   tw_lp * lp)
{
    codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, &mapping_type_id, lp_type_name, &mapping_rep_id, &mapping_offset);
    int num_lps = codes_mapping_get_lp_count(lp_group_name, LP_CONFIG_NM);
    int num_reps = codes_mapping_get_group_reps(lp_group_name);
    s->node_id = (mapping_rep_id * num_lps) + mapping_offset;

    int i;
   /* handle collective operations by forming a tree of all the LPs */
   /* special condition for root of the tree */
   if( s->node_id == 0)
    {
        s->parent_node_id = -1;
        s->is_root = 1;
   }
   else
   {
       s->parent_node_id = (s->node_id - ((s->node_id - 1) % TREE_DEGREE)) / TREE_DEGREE;
       s->is_root = 0;
   }
   s->children = (tw_lpid*)malloc(TREE_DEGREE * sizeof(tw_lpid));

   /* set the isleaf to zero by default */
   s->is_leaf = 1;
   s->num_children = 0;

   /* calculate the children of the current node. If its a leaf, no need to set children,
      only set isleaf and break the loop*/

   for( i = 0; i < TREE_DEGREE; i++ )
    {
        tw_lpid next_child = (TREE_DEGREE * s->node_id) + i + 1;
        if(next_child < (num_lps * num_reps))
        {
            s->num_children++;
            s->is_leaf = 0;
            s->children[i] = next_child;
        }
        else
           s->children[i] = -1;
    }

#if DRAGONFLY_COLLECTIVE_DEBUG == 1
   printf("\n LP %ld parent node id ", s->node_id);

   for( i = 0; i < TREE_DEGREE; i++ )
        printf(" child node ID %ld ", s->children[i]);
   printf("\n");

   if(s->is_leaf)
        printf("\n LP %ld is leaf ", s->node_id);
#endif
}

306
/* dragonfly packet event , generates a dragonfly packet on the compute node */
307
static tw_stime dragonfly_packet_event(char* category, tw_lpid final_dest_lp, uint64_t packet_size, int is_pull, uint64_t pull_size, tw_stime offset, int remote_event_size, const void* remote_event, int self_event_size, const void* self_event, tw_lpid src_lp, tw_lp *sender, int is_last_pckt)
308
309
310
311
{
    tw_event * e_new;
    tw_stime xfer_to_nic_time;
    terminal_message * msg;
312
    tw_lpid dest_nic_id;
313
    char* tmp_ptr;
314
//    printf("\n g_tw_lookahead default %f src lp %d sender %d ", g_tw_lookahead, src_lp, sender->gid);
315
#if 0
316
317
318
319
    char lp_type_name[MAX_NAME_LENGTH], lp_group_name[MAX_NAME_LENGTH];

    int mapping_grp_id, mapping_rep_id, mapping_type_id, mapping_offset;
    codes_mapping_get_lp_info(sender->gid, lp_group_name, &mapping_grp_id, &mapping_type_id, lp_type_name, &mapping_rep_id, &mapping_offset);
320
321
    codes_mapping_get_lp_id(lp_group_name, LP_CONFIG_NM, mapping_rep_id, mapping_offset, &local_nic_id);
#endif
322
323

    codes_mapping_get_lp_info(final_dest_lp, lp_group_name, &mapping_grp_id, &mapping_type_id, lp_type_name, &mapping_rep_id, &mapping_offset);
324
    codes_mapping_get_lp_id(lp_group_name, LP_CONFIG_NM, mapping_rep_id, mapping_offset, &dest_nic_id);
325
  
326
    xfer_to_nic_time = codes_local_latency(sender); /* Throws an error of found last KP time > current event time otherwise when LPs of one type are placed together*/
327
328
    //printf("\n transfer in time %f %f ", xfer_to_nic_time+offset, tw_now(sender));
    //e_new = tw_event_new(sender->gid, xfer_to_nic_time+offset, sender);
329
330
331
    //msg = tw_event_data(e_new);
    e_new = model_net_method_event_new(sender->gid, xfer_to_nic_time+offset,
            sender, DRAGONFLY, (void**)&msg, (void**)&tmp_ptr);
332
333
334
    strcpy(msg->category, category);
    msg->final_dest_gid = final_dest_lp;
    msg->dest_terminal_id = dest_nic_id;
335
    msg->sender_lp=src_lp;
336
337
338
339
    msg->packet_size = packet_size;
    msg->remote_event_size_bytes = 0;
    msg->local_event_size_bytes = 0;
    msg->type = T_GENERATE;
340
341
    msg->is_pull = is_pull;
    msg->pull_size = pull_size;
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357

    if(is_last_pckt) /* Its the last packet so pass in remote and local event information*/
      {
	if(remote_event_size > 0)
	 {
		msg->remote_event_size_bytes = remote_event_size;
		memcpy(tmp_ptr, remote_event, remote_event_size);
		tmp_ptr += remote_event_size;
	}
	if(self_event_size > 0)
	{
		msg->local_event_size_bytes = self_event_size;
		memcpy(tmp_ptr, self_event, self_event_size);
		tmp_ptr += self_event_size;
	}
     }
358
	   //printf("\n dragonfly remote event %d local event %d last packet %d %lf ", msg->remote_event_size_bytes, msg->local_event_size_bytes, is_last_pckt, xfer_to_nic_time);
359
    tw_event_send(e_new);
360
    return xfer_to_nic_time;
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
}

/* dragonfly packet event reverse handler */
static void dragonfly_packet_event_rc(tw_lp *sender)
{
	  codes_local_latency_reverse(sender);
	    return;
}

/* given a group ID gid, find the router in the current group that is attached
 * to a router in the group gid */
tw_lpid getRouterFromGroupID(int gid, 
		    router_state * r)
{
  int group_begin = r->group_id * num_routers;
  int group_end = (r->group_id * num_routers) + num_routers-1;
  int offset = (gid * num_routers - group_begin) / num_routers;
  
  if((gid * num_routers) < group_begin)
    offset = (group_begin - gid * num_routers) / num_routers; // take absolute value
  
  int half_channel = num_global_channels / 2;
  int index = (offset - 1)/(half_channel * num_routers);
  
  offset=(offset - 1) % (half_channel * num_routers);

  // If the destination router is in the same group
  tw_lpid router_id;

  if(index % 2 != 0)
    router_id = group_end - (offset / half_channel); // start from the end
  else
    router_id = group_begin + (offset / half_channel);

  return router_id;
}	

/*When a packet is sent from the current router and a buffer slot becomes available, a credit is sent back to schedule another packet event*/
void router_credit_send(router_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp)
{
  tw_event * buf_e;
  tw_stime ts;
  terminal_message * buf_msg;

  int dest=0, credit_delay=0, type = R_BUFFER;
406
  int is_terminal = 0;
407
408
409
410
411
412
413
414

 // Notify sender terminal about available buffer space
  if(msg->last_hop == TERMINAL)
  {
   dest = msg->src_terminal_id;
   //determine the time in ns to transfer the credit
   credit_delay = (1 / cn_bandwidth) * CREDIT_SIZE;
   type = T_BUFFER;
415
   is_terminal = 1;
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
  }
   else if(msg->last_hop == GLOBAL)
   {
     dest = msg->intm_lp_id;
     credit_delay = (1 / global_bandwidth) * CREDIT_SIZE;
   }
    else if(msg->last_hop == LOCAL)
     {
        dest = msg->intm_lp_id;
     	credit_delay = (1/local_bandwidth) * CREDIT_SIZE;
     }
    else
      printf("\n Invalid message type");

   // Assume it takes 0.1 ns of serialization latency for processing the credits in the queue
    int output_port = msg->saved_vc / num_vcs;
    msg->saved_available_time = s->next_credit_available_time[output_port];
    s->next_credit_available_time[output_port] = max(tw_now(lp), s->next_credit_available_time[output_port]);
434
    ts = credit_delay + 0.1 + tw_rand_exponential(lp->rng, (double)credit_delay/1000);
435
436
	
    s->next_credit_available_time[output_port]+=ts;
437
438
439
440
441
442
443
444
445
    if (is_terminal){
        buf_e = model_net_method_event_new(dest, 
                s->next_credit_available_time[output_port] - tw_now(lp), lp,
                DRAGONFLY, (void**)&buf_msg, NULL);
    }
    else{
        buf_e = tw_event_new(dest, s->next_credit_available_time[output_port] - tw_now(lp) , lp);
        buf_msg = tw_event_data(buf_e);
    }
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
    buf_msg->vc_index = msg->saved_vc;
    buf_msg->type=type;
    buf_msg->last_hop = msg->last_hop;
    buf_msg->packet_ID=msg->packet_ID;

    tw_event_send(buf_e);

    return;
}

/* generates packet at the current dragonfly compute node */
void packet_generate(terminal_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp)
{
  tw_stime ts;
  tw_event *e;
  terminal_message *m;
462
  int i, total_event_size;
463
464
465
  num_chunks = msg->packet_size / CHUNK_SIZE;
  msg->packet_ID = lp->gid + g_tw_nlp * s->packet_counter + tw_rand_integer(lp->rng, 0, lp->gid + g_tw_nlp * s->packet_counter);
  msg->travel_start_time = tw_now(lp);
466
  msg->my_N_hop = 0;
467
468
  for(i = 0; i < num_chunks; i++)
  {
469
470
	  // Before
	  // msg->my_N_hop = 0; generating a packet, check if the input queue is available
471
        ts = g_tw_lookahead + 0.1 + tw_rand_exponential(lp->rng, MEAN_INTERVAL/200);
472
473
474
475
476
477
478
479
480
481
	int chan = -1, j;
	for(j = 0; j < num_vcs; j++)
	 {
	     if(s->vc_occupancy[j] < cn_vc_size * num_chunks)
	      {
	       chan=j;
	       break;
	      }
         }

482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
        // this is a terminal event, so use the method-event version
       //e = tw_event_new(lp->gid, i + ts, lp);
       //m = tw_event_data(e);
       //memcpy(m, msg, sizeof(terminal_message) + msg->remote_event_size_bytes + msg->local_event_size_bytes);
       void * m_data;
       e = model_net_method_event_new(lp->gid, i+ts, lp, DRAGONFLY,
               (void**)&m, &m_data);
       memcpy(m, msg, sizeof(terminal_message));
       void * m_data_src = model_net_method_get_edata(DRAGONFLY, msg);
       if (msg->remote_event_size_bytes){
            memcpy(m_data, m_data_src, msg->remote_event_size_bytes);
       }
       if (msg->local_event_size_bytes){ 
            memcpy((char*)m_data + msg->remote_event_size_bytes,
                    (char*)m_data_src + msg->remote_event_size_bytes,
                    msg->local_event_size_bytes);
       }
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
       m->intm_group_id = -1;
       m->saved_vc=0;
       m->chunk_id = i;
       
       if(msg->packet_ID == TRACK && msg->chunk_id == num_chunks-1)
         printf("\n packet generated %lld at terminal %d chunk id %d ", msg->packet_ID, (int)lp->gid, i);
       
       m->output_chan = -1;
       if(chan != -1) // If the input queue is available
   	{
	    // Send the packet out
	     m->type = T_SEND;
 	     tw_event_send(e);
        }
      else
         {
	  printf("\n Exceeded queue size, exitting %d", s->vc_occupancy[0]);
	  MPI_Finalize();
	  exit(-1);
        } //else
  } // for
520
521
  total_event_size = model_net_get_msg_sz(DRAGONFLY) + 
      msg->remote_event_size_bytes + msg->local_event_size_bytes;
522
523
524
525
526
527
528
  mn_stats* stat;
  stat = model_net_find_stats(msg->category, s->dragonfly_stats_array);
  stat->send_count++;
  stat->send_bytes += msg->packet_size;
  stat->send_time += (1/cn_bandwidth) * msg->packet_size;
  if(stat->max_event_size < total_event_size)
	  stat->max_event_size = total_event_size;
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
  return;
}

/* sends the packet from the current dragonfly compute node to the attached router */
void packet_send(terminal_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp)
{
  tw_stime ts;
  tw_event *e;
  terminal_message *m;
  tw_lpid router_id;
  /* Route the packet to its source router */ 
   int vc=msg->saved_vc;

   //  Each packet is broken into chunks and then sent over the channel
   msg->saved_available_time = s->terminal_available_time;
   head_delay = (1/cn_bandwidth) * CHUNK_SIZE;
   ts = head_delay + tw_rand_exponential(lp->rng, (double)head_delay/200);
   s->terminal_available_time = max(s->terminal_available_time, tw_now(lp));
   s->terminal_available_time += ts;

549
550
   codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, &mapping_type_id, lp_type_name, &mapping_rep_id, &mapping_offset);
   codes_mapping_get_lp_id(lp_group_name, "dragonfly_router", s->router_id, 0, &router_id);
551
   // we are sending an event to the router, so no method_event here
552
553
554
555
556
   e = tw_event_new(router_id, s->terminal_available_time - tw_now(lp), lp);

   if(msg->packet_ID == TRACK && msg->chunk_id == num_chunks-1)
     printf("\n terminal %d packet %lld chunk %d being sent to router %d router id %d ", (int)lp->gid, (long long)msg->packet_ID, msg->chunk_id, (int)router_id, s->router_id);
   m = tw_event_data(e);
557
558
559
560
561
   memcpy(m, msg, sizeof(terminal_message));
   if (msg->remote_event_size_bytes){
        memcpy(m+1, model_net_method_get_edata(DRAGONFLY, msg),
                msg->remote_event_size_bytes);
   }
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
   m->type = R_ARRIVE;
   m->src_terminal_id = lp->gid;
   m->saved_vc = vc;
   m->last_hop = TERMINAL;
   m->intm_group_id = -1;
   m->local_event_size_bytes = 0;
   tw_event_send(e);
//  Each chunk is 32B and the VC occupancy is in chunks to enable efficient flow control

   if(msg->chunk_id == num_chunks - 1) 
    {
      /* local completion message */
      if(msg->local_event_size_bytes > 0)
	 {
           tw_event* e_new;
	   terminal_message* m_new;
578
579
580
	   void* local_event = 
               (char*)model_net_method_get_edata(DRAGONFLY, msg) + 
               msg->remote_event_size_bytes;
581
	   ts = g_tw_lookahead + (1/cn_bandwidth) * msg->local_event_size_bytes;
582
	   e_new = tw_event_new(msg->sender_lp, ts, lp);
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
	   m_new = tw_event_data(e_new);
	   memcpy(m_new, local_event, msg->local_event_size_bytes);
	   tw_event_send(e_new);
	}
    }
   
   s->packet_counter++;
   s->vc_occupancy[vc]++;

   if(s->vc_occupancy[vc] >= (cn_vc_size * num_chunks))
      s->output_vc_state[vc] = VC_CREDIT;
   return;
}

/* packet arrives at the destination terminal */
void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp)
{
#if DEBUG
if( msg->packet_ID == TRACK && msg->chunk_id == num_chunks-1)
    {
	printf( "(%lf) [Terminal %d] packet %lld has arrived  \n",
              tw_now(lp), (int)lp->gid, msg->packet_ID);

	printf("travel start time is %f\n",
                msg->travel_start_time);

	printf("My hop now is %d\n",msg->my_N_hop);
    }
#endif

  // Packet arrives and accumulate # queued
  // Find a queue with an empty buffer slot
   tw_event * e, * buf_e;
   terminal_message * m, * buf_msg;
   tw_stime ts;
   bf->c3 = 0;
   bf->c2 = 0;

   msg->my_N_hop++;
  if(msg->chunk_id == num_chunks-1)
  {
	 bf->c2 = 1;
625
626
627
628
629
	 mn_stats* stat = model_net_find_stats(msg->category, s->dragonfly_stats_array);
	 stat->recv_count++;
	 stat->recv_bytes += msg->packet_size;
	 stat->recv_time += tw_now(lp) - msg->travel_start_time;

630
631
632
633
634
635
636
637
638
639
640
641
642
	 N_finished_packets++;
	 dragonfly_total_time += tw_now( lp ) - msg->travel_start_time;
	 total_hops += msg->my_N_hop;

	 if (dragonfly_max_latency < tw_now( lp ) - msg->travel_start_time) 
	 {
		bf->c3 = 1;
		msg->saved_available_time = dragonfly_max_latency;
		dragonfly_max_latency=tw_now( lp ) - msg->travel_start_time;
	 }
	// Trigger an event on receiving server
	if(msg->remote_event_size_bytes)
	{
643
            void * tmp_ptr = model_net_method_get_edata(DRAGONFLY, msg);
644
            ts = g_tw_lookahead + 0.1 + (1/cn_bandwidth) * msg->remote_event_size_bytes;
645
            if (msg->is_pull){
646
                int net_id = model_net_get_id(LP_METHOD_NM);
647
648
649
650
651
652
653
654
655
656
                model_net_event(net_id, msg->category, msg->sender_lp,
                        msg->pull_size, ts, msg->remote_event_size_bytes,
                        tmp_ptr, 0, NULL, lp);
            }
            else{
                e = tw_event_new(msg->final_dest_gid, ts, lp);
                m = tw_event_data(e);
                memcpy(m, tmp_ptr, msg->remote_event_size_bytes);
                tw_event_send(e); 
            }
657
658
659
660
	}
  }

  int credit_delay = (1 / cn_bandwidth) * CREDIT_SIZE;
661
  ts = credit_delay + 0.1 + tw_rand_exponential(lp->rng, credit_delay/1000);
662
  
663
  msg->saved_credit_time = s->next_credit_available_time;
664
665
666
667
  s->next_credit_available_time = max(s->next_credit_available_time, tw_now(lp));
  s->next_credit_available_time += ts;

  tw_lpid router_dest_id;
668
669
  codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, &mapping_type_id, lp_type_name, &mapping_rep_id, &mapping_offset);
  codes_mapping_get_lp_id(lp_group_name, "dragonfly_router", s->router_id, 0, &router_dest_id);
670
  // no method_event here - message going to router
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
  buf_e = tw_event_new(router_dest_id, s->next_credit_available_time - tw_now(lp), lp);
  buf_msg = tw_event_data(buf_e);
  buf_msg->vc_index = msg->saved_vc;
  buf_msg->type=R_BUFFER;
  buf_msg->packet_ID=msg->packet_ID;
  buf_msg->last_hop = TERMINAL;
  tw_event_send(buf_e);

  return;
}

/* initialize a dragonfly compute node terminal */
void 
terminal_init( terminal_state * s, 
	       tw_lp * lp )
{
    int i;
    // Assign the global router ID
   codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, &mapping_type_id, lp_type_name, &mapping_rep_id, &mapping_offset);
690
   int num_lps = codes_mapping_get_lp_count(lp_group_name, LP_CONFIG_NM);
691
692

   s->terminal_id = (mapping_rep_id * num_lps) + mapping_offset;  
693
694
695
696
697
698
699
700
701
702
703
704
   s->router_id=(int)s->terminal_id / num_routers;
   s->terminal_available_time = 0.0;
   s->packet_counter = 0;

   s->vc_occupancy = (int*)malloc(num_vcs * sizeof(int));
   s->output_vc_state = (int*)malloc(num_vcs * sizeof(int));

   for( i = 0; i < num_vcs; i++ )
    {
      s->vc_occupancy[i]=0;
      s->output_vc_state[i]=VC_IDLE;
    }
705
   dragonfly_collective_init(s, lp);
706
707
708
   return;
}

709
710
711
712
713
714
715
716
717
718
719
720
/* collective operation for the torus network */
void dragonfly_collective(char* category, int message_size, int remote_event_size, const void* remote_event, tw_lp* sender)
{
    tw_event * e_new;
    tw_stime xfer_to_nic_time;
    terminal_message * msg;
    tw_lpid local_nic_id;
    char* tmp_ptr;

    codes_mapping_get_lp_info(sender->gid, lp_group_name, &mapping_grp_id, &mapping_type_id, lp_type_name, &mapping_rep_id, &mapping_offset);
    codes_mapping_get_lp_id(lp_group_name, LP_CONFIG_NM, mapping_rep_id, mapping_offset, &local_nic_id);

721
    xfer_to_nic_time = codes_local_latency(sender);
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
    e_new = model_net_method_event_new(local_nic_id, xfer_to_nic_time,
            sender, DRAGONFLY, (void**)&msg, (void**)&tmp_ptr);

    msg->remote_event_size_bytes = message_size;
    strcpy(msg->category, category);
    msg->sender_svr=sender->gid;
    msg->type = D_COLLECTIVE_INIT;

    tmp_ptr = (char*)msg;
    tmp_ptr += dragonfly_get_msg_sz();
    if(remote_event_size > 0)
     {
            msg->remote_event_size_bytes = remote_event_size;
            memcpy(tmp_ptr, remote_event, remote_event_size);
            tmp_ptr += remote_event_size;
     }

    tw_event_send(e_new);
    return;
}

/* reverse for collective operation of the dragonfly network */
void dragonfly_collective_rc(int message_size, tw_lp* sender)
{
     codes_local_latency_reverse(sender);
     return;
}

static void send_remote_event(terminal_state * s,
                        tw_bf * bf,
                        terminal_message * msg,
                        tw_lp * lp)
{
    // Trigger an event on receiving server
    if(msg->remote_event_size_bytes)
     {
            tw_event* e;
            tw_stime ts;
            terminal_message * m;
            ts = (1/cn_bandwidth) * msg->remote_event_size_bytes;
            e = codes_event_new(s->origin_svr, ts, lp);
            m = tw_event_data(e);
            char* tmp_ptr = (char*)msg;
            tmp_ptr += dragonfly_get_msg_sz();
            memcpy(m, tmp_ptr, msg->remote_event_size_bytes);
            tw_event_send(e);
     }
}

static void node_collective_init(terminal_state * s,
                        tw_bf * bf,
                        terminal_message * msg,
                        tw_lp * lp)
{
        tw_event * e_new;
        tw_lpid parent_nic_id;
        tw_stime xfer_to_nic_time;
        terminal_message * msg_new;
        int num_lps;

        msg->saved_collective_init_time = s->collective_init_time;
        s->collective_init_time = tw_now(lp);
	s->origin_svr = msg->sender_svr;
	
        if(s->is_leaf)
        {
            //printf("\n LP %ld sending message to parent %ld ", s->node_id, s->parent_node_id);
            /* get the global LP ID of the parent node */
            codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, &mapping_type_id, lp_type_name, &mapping_rep_id, &mapping_offset);
            num_lps = codes_mapping_get_lp_count(lp_group_name, LP_CONFIG_NM);
            codes_mapping_get_lp_id(lp_group_name, LP_CONFIG_NM, s->parent_node_id/num_lps , (s->parent_node_id % num_lps), &parent_nic_id);

           /* send a message to the parent that the LP has entered the collective operation */
            xfer_to_nic_time = g_tw_lookahead + LEVEL_DELAY;
            //e_new = codes_event_new(parent_nic_id, xfer_to_nic_time, lp);
	    void* m_data;
	    e_new = model_net_method_event_new(parent_nic_id, xfer_to_nic_time,
            	lp, DRAGONFLY, (void**)&msg_new, (void**)&m_data);
	    	
            memcpy(msg_new, msg, sizeof(terminal_message));
	    if (msg->remote_event_size_bytes){
        	memcpy(m_data, model_net_method_get_edata(DRAGONFLY, msg),
                	msg->remote_event_size_bytes);
      	    }
	    
            msg_new->type = D_COLLECTIVE_FAN_IN;
            msg_new->sender_node = s->node_id;

            tw_event_send(e_new);
        }
        return;
}

static void node_collective_fan_in(terminal_state * s,
                        tw_bf * bf,
                        terminal_message * msg,
                        tw_lp * lp)
{
        int i;
        s->num_fan_nodes++;

        codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, &mapping_type_id, lp_type_name, &mapping_rep_id, &mapping_offset);
        int num_lps = codes_mapping_get_lp_count(lp_group_name, LP_CONFIG_NM);

        tw_event* e_new;
        terminal_message * msg_new;
        tw_stime xfer_to_nic_time;

        bf->c1 = 0;
        bf->c2 = 0;

        /* if the number of fanned in nodes have completed at the current node then signal the parent */
        if((s->num_fan_nodes == s->num_children) && !s->is_root)
        {
            bf->c1 = 1;
            msg->saved_fan_nodes = s->num_fan_nodes-1;
            s->num_fan_nodes = 0;
            tw_lpid parent_nic_id;
            xfer_to_nic_time = g_tw_lookahead + LEVEL_DELAY;

            /* get the global LP ID of the parent node */
            codes_mapping_get_lp_id(lp_group_name, LP_CONFIG_NM, s->parent_node_id/num_lps , (s->parent_node_id % num_lps), &parent_nic_id);

           /* send a message to the parent that the LP has entered the collective operation */
            //e_new = codes_event_new(parent_nic_id, xfer_to_nic_time, lp);
            //msg_new = tw_event_data(e_new);
	    void * m_data;
      	    e_new = model_net_method_event_new(parent_nic_id,
              xfer_to_nic_time,
              lp, DRAGONFLY, (void**)&msg_new, &m_data);
	    
            memcpy(msg_new, msg, sizeof(terminal_message));
            msg_new->type = D_COLLECTIVE_FAN_IN;
            msg_new->sender_node = s->node_id;

            if (msg->remote_event_size_bytes){
	        memcpy(m_data, model_net_method_get_edata(DRAGONFLY, msg),
        	        msg->remote_event_size_bytes);
      	   }
	    
            tw_event_send(e_new);
      }

      /* root node starts off with the fan-out phase */
      if(s->is_root && (s->num_fan_nodes == s->num_children))
      {
           bf->c2 = 1;
           msg->saved_fan_nodes = s->num_fan_nodes-1;
           s->num_fan_nodes = 0;
           send_remote_event(s, bf, msg, lp);

           for( i = 0; i < s->num_children; i++ )
           {
                tw_lpid child_nic_id;
                /* Do some computation and fan out immediate child nodes from the collective */
                xfer_to_nic_time = g_tw_lookahead + COLLECTIVE_COMPUTATION_DELAY + LEVEL_DELAY + tw_rand_exponential(lp->rng, (double)LEVEL_DELAY/50);

                /* get global LP ID of the child node */
                codes_mapping_get_lp_id(lp_group_name, LP_CONFIG_NM, s->children[i]/num_lps , (s->children[i] % num_lps), &child_nic_id);
                //e_new = codes_event_new(child_nic_id, xfer_to_nic_time, lp);

                //msg_new = tw_event_data(e_new);
                void * m_data;
	        e_new = model_net_method_event_new(child_nic_id,
                xfer_to_nic_time,
		lp, DRAGONFLY, (void**)&msg_new, &m_data);

		memcpy(msg_new, msg, sizeof(terminal_message));
	        if (msg->remote_event_size_bytes){
	                memcpy(m_data, model_net_method_get_edata(TORUS, msg),
        	               msg->remote_event_size_bytes);
      		}
		
                msg_new->type = D_COLLECTIVE_FAN_OUT;
                msg_new->sender_node = s->node_id;

                tw_event_send(e_new);
           }
      }
}

static void node_collective_fan_out(terminal_state * s,
                        tw_bf * bf,
                        terminal_message * msg,
                        tw_lp * lp)
{
        int i;
        int num_lps = codes_mapping_get_lp_count(lp_group_name, LP_CONFIG_NM);
        bf->c1 = 0;
        bf->c2 = 0;

        send_remote_event(s, bf, msg, lp);

        if(!s->is_leaf)
        {
            bf->c1 = 1;
            tw_event* e_new;
            nodes_message * msg_new;
            tw_stime xfer_to_nic_time;

           for( i = 0; i < s->num_children; i++ )
           {
                xfer_to_nic_time = g_tw_lookahead + DRAGONFLY_FAN_OUT_DELAY + tw_rand_exponential(lp->rng, (double)DRAGONFLY_FAN_OUT_DELAY/10);

                if(s->children[i] > 0)
                {
                        tw_lpid child_nic_id;

                        /* get global LP ID of the child node */
                        codes_mapping_get_lp_id(lp_group_name, LP_CONFIG_NM, s->children[i]/num_lps , (s->children[i] % num_lps), &child_nic_id);
                        //e_new = codes_event_new(child_nic_id, xfer_to_nic_time, lp);
                        //msg_new = tw_event_data(e_new);
                        //memcpy(msg_new, msg, sizeof(nodes_message) + msg->remote_event_size_bytes);
			void* m_data;
			e_new = model_net_method_event_new(child_nic_id,
							xfer_to_nic_time,
					                lp, DRAGONFLY, (void**)&msg_new, &m_data);
		        memcpy(msg_new, msg, sizeof(nodes_message));
		        if (msg->remote_event_size_bytes){
			        memcpy(m_data, model_net_method_get_edata(DRAGONFLY, msg),
			                msg->remote_event_size_bytes);
      			}


                        msg_new->type = D_COLLECTIVE_FAN_OUT;
                        msg_new->sender_node = s->node_id;
                        tw_event_send(e_new);
                }
           }
         }
	//printf("\n Fan out phase completed %ld ", lp->gid);
        if(max_collective < tw_now(lp) - s->collective_init_time )
          {
              bf->c2 = 1;
              max_collective = tw_now(lp) - s->collective_init_time;
          }
}
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
/* update the compute node-router channel buffer */
void 
terminal_buf_update(terminal_state * s, 
		    tw_bf * bf, 
		    terminal_message * msg, 
		    tw_lp * lp)
{
  // Update the buffer space associated with this router LP 
    int msg_indx = msg->vc_index;
    
    s->vc_occupancy[msg_indx]--;
    s->output_vc_state[msg_indx] = VC_IDLE;

    return;
}

void 
terminal_event( terminal_state * s, 
		tw_bf * bf, 
		terminal_message * msg, 
		tw_lp * lp )
{
  *(int *)bf = (int)0;
  switch(msg->type)
    {
    case T_GENERATE:
       packet_generate(s,bf,msg,lp);
    break;
    
    case T_ARRIVE:
        packet_arrive(s,bf,msg,lp);
    break;
    
    case T_SEND:
      packet_send(s,bf,msg,lp);
    break;
    
    case T_BUFFER:
       terminal_buf_update(s, bf, msg, lp);
     break;
999
1000
1001
1002
    
    case D_COLLECTIVE_INIT:
      node_collective_init(s, bf, msg, lp);
    break;
1003

1004
1005
1006
1007
1008
1009
1010
1011
    case D_COLLECTIVE_FAN_IN:
      node_collective_fan_in(s, bf, msg, lp);
    break;

    case D_COLLECTIVE_FAN_OUT:
      node_collective_fan_out(s, bf, msg, lp);
    break;
    
1012
1013
1014
1015
1016
1017
    default:
       printf("\n LP %d Terminal message type not supported %d ", (int)lp->gid, msg->type);
    }
}

void 
1018
dragonfly_terminal_final( terminal_state * s, 
1019
1020
      tw_lp * lp )
{
1021
	model_net_print_stats(lp->gid, s->dragonfly_stats_array);
1022
1023
}

1024
1025
1026
1027
1028
void dragonfly_router_final(router_state * s,
		tw_lp * lp)
{
   free(s->global_channel);
}
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
/* get the next stop for the current packet
 * determines if it is a router within a group, a router in another group
 * or the destination terminal */
tw_lpid 
get_next_stop(router_state * s, 
		      tw_bf * bf, 
		      terminal_message * msg, 
		      tw_lp * lp, 
		      int path)
{
   int dest_lp;
1040
   tw_lpid router_dest_id = -1;
1041
1042
1043
1044
   int i;
   int dest_group_id;

   codes_mapping_get_lp_info(msg->dest_terminal_id, lp_group_name, &mapping_grp_id, &mapping_type_id, lp_type_name, &mapping_rep_id, &mapping_offset); 
1045
   int num_lps = codes_mapping_get_lp_count(lp_group_name, LP_CONFIG_NM);
1046
   int dest_router_id = (mapping_offset + (mapping_rep_id * num_lps)) / num_routers;
1047
1048
1049
1050
1051
1052
   
   codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, &mapping_type_id, lp_type_name, &mapping_rep_id, &mapping_offset);
   int local_router_id = (mapping_offset + mapping_rep_id);

   bf->c2 = 0;

1053
  /* If the packet has arrived at the destination router */
1054
1055
1056
1057
1058
1059
   if(dest_router_id == local_router_id)
    {
        dest_lp = msg->dest_terminal_id;

        return dest_lp;
    }
1060
   /* Generate inter-mediate destination for non-minimal routing (selecting a random group) */
1061
   if(msg->last_hop == TERMINAL && msg->path_type == NON_MINIMAL)
1062
1063
1064
1065
1066
    {
      if(dest_router_id / num_routers != s->group_id)
         {
            bf->c2 = 1;
            int intm_grp_id = tw_rand_integer(lp->rng, 0, num_groups-1);
1067
1068
            //int intm_grp_id = (s->group_id + s->group_id/2) % num_groups;
	    msg->intm_group_id = intm_grp_id;
1069
1070
          }    
    }
1071
  /* It means that the packet has arrived at the inter-mediate group for non-minimal routing. Reset the group now. */
1072
1073
1074
1075
   if(msg->intm_group_id == s->group_id)
   {  
           msg->intm_group_id = -1;//no inter-mediate group
   } 
1076
  /* Intermediate group ID is set. Divert the packet to an intermediate group. */
1077
1078
1079
1080
  if(msg->intm_group_id >= 0)
   {
      dest_group_id = msg->intm_group_id;
   }
1081
  else /* direct the packet to the destination group */
1082
1083
1084
1085
   {
     dest_group_id = dest_router_id / num_routers;
   }
  
1086
  /* It means the packet has arrived at the destination group. Now divert it to the destination router. */
1087
1088
1089
1090
1091
1092
  if(s->group_id == dest_group_id)
   {
     dest_lp = dest_router_id;
   }
   else
   {
1093
      /* Packet is at the source or intermediate group. Find a router that has a path to the destination group. */
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
      dest_lp=getRouterFromGroupID(dest_group_id,s);
  
      if(dest_lp == local_router_id)
      {
        for(i=0; i < num_global_channels; i++)
           {
            if(s->global_channel[i] / num_routers == dest_group_id)
                dest_lp=s->global_channel[i];
          }
      }
   }
1105
  codes_mapping_get_lp_id(lp_group_name, "dragonfly_router", dest_lp, 0, &router_dest_id);
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
  return router_dest_id;
}

/* gets the output port corresponding to the next stop of the message */
int 
get_output_port( router_state * s, 
		tw_bf * bf, 
		terminal_message * msg, 
		tw_lp * lp, 
		int next_stop )
{
  int output_port = -1, i, terminal_id;
  codes_mapping_get_lp_info(msg->dest_terminal_id, lp_group_name, &mapping_grp_id, &mapping_type_id, lp_type_name, &mapping_rep_id, &mapping_offset);
1119
  int num_lps = codes_mapping_get_lp_count(lp_group_name,LP_CONFIG_NM);
1120
  terminal_id = (mapping_rep_id * num_lps) + mapping_offset;
1121
1122
1123
1124

  if(next_stop == msg->dest_terminal_id)
   {
      output_port = num_routers + num_global_channels + ( terminal_id % num_cn);
1125
1126
      //if(output_port > 6)
	//      printf("\n incorrect output port %d terminal id %d ", output_port, terminal_id);
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
    }
    else
    {
     codes_mapping_get_lp_info(next_stop, lp_group_name, &mapping_grp_id, &mapping_type_id, lp_type_name, &mapping_rep_id, &mapping_offset);
     int local_router_id = mapping_rep_id + mapping_offset;
     int intm_grp_id = local_router_id / num_routers;

     if(intm_grp_id != s->group_id)
      {
        for(i=0; i < num_global_channels; i++)
         {
1138
           if(s->global_channel[i] == local_router_id)
1139
1140
1141
1142
1143
1144
1145
             output_port = num_routers + i;
          }
      }
      else
       {
        output_port = local_router_id % num_routers;
       }
1146
//	      printf("\n output port not found %d next stop %d local router id %d group id %d intm grp id %d %d", output_port, next_stop, local_router_id, s->group_id, intm_grp_id, local_router_id%num_routers);
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
    }
    return output_port;
}

/* routes the current packet to the next stop */
void 
router_packet_send( router_state * s, 
		    tw_bf * bf, 
		     terminal_message * msg, tw_lp * lp)
{
1157
//   *(int *)bf = (int)0;
1158
1159
1160
1161
1162
1163
1164
   tw_stime ts;
   tw_event *e;
   terminal_message *m;

   int next_stop = -1, output_port = -1, output_chan = -1;
   float bandwidth = local_bandwidth;
   int path = routing;
1165
   int minimal_out_port = -1, nonmin_out_port = -1;
1166
1167
1168
1169
   bf->c3 = 0;

   if(msg->last_hop == TERMINAL && routing == ADAPTIVE)
   {
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
  // decide which routing to take
    int minimal_next_stop=get_next_stop(s, bf, msg, lp, MINIMAL);
    minimal_out_port = get_output_port(s, bf, msg, lp, minimal_next_stop);
    int nonmin_next_stop = get_next_stop(s, bf, msg, lp, NON_MINIMAL);
    nonmin_out_port = get_output_port(s, bf, msg, lp, nonmin_next_stop);
    int nonmin_port_count = s->vc_occupancy[nonmin_out_port];
    int min_port_count = s->vc_occupancy[minimal_out_port];
    int nonmin_vc = s->vc_occupancy[nonmin_out_port * num_vcs + 2];
    int min_vc = s->vc_occupancy[minimal_out_port * num_vcs + 1];

    // Adaptive routing condition from the dragonfly paper Page 83
1181
   // modified according to booksim adaptive routing condition
1182
1183
1184
   if((min_vc <= (nonmin_vc * 2 + adaptive_threshold) && minimal_out_port == nonmin_out_port)
               || (min_port_count <= (nonmin_port_count * 2 + adaptive_threshold) && minimal_out_port != nonmin_out_port))
        {
1185
	   msg->path_type = MINIMAL;
1186
1187
1188
1189
           next_stop = minimal_next_stop;
           output_port = minimal_out_port;
           minimal_count++;
           msg->intm_group_id = -1;
1190

1191
1192
1193
1194
1195
           if(msg->packet_ID == TRACK)
              printf("\n (%lf) [Router %d] Packet %d routing minimally ", tw_now(lp), (int)lp->gid, (int)msg->packet_ID);
        }
       else
         {
1196
	   msg->path_type = NON_MINIMAL;
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
           next_stop = nonmin_next_stop;
           output_port = nonmin_out_port;
           nonmin_count++;
           if(msg->packet_ID == TRACK)
                printf("\n (%lf) [Router %d] Packet %d routing non-minimally ", tw_now(lp), (int)lp->gid, (int)msg->packet_ID);

         }
  }
  else
   {
1207
	msg->path_type = routing; /*defaults to the routing algorithm if we don't have adaptive routing here*/
1208
1209
1210
   	next_stop = get_next_stop(s, bf, msg, lp, path);
   	output_port = get_output_port(s, bf, msg, lp, next_stop); 
   }
1211
1212
   output_chan = output_port * num_vcs;

1213
    // Even numbered channels for minimal routing
1214
   // Odd numbered channels for nonminimal routing
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
   // Separate the queue occupancy into minimal and non minimal virtual channels if the min & non min
   // paths start at the same output port
   /*if((routing == ADAPTIVE) && (minimal_out_port == nonmin_out_port))
   {
        if(path == MINIMAL)
          output_chan = output_chan + 1;
        else
          if(path == NON_MINIMAL)
            output_chan = output_chan + 2;
   }*/

1226
1227
1228
   int global=0;
   int buf_size = local_vc_size;

1229
1230
   assert(output_port != -1);
   assert(output_chan != -1);
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
   // Allocate output Virtual Channel
  if(output_port >= num_routers && output_port < num_routers + num_global_channels)
  {
	 bandwidth = global_bandwidth;
	 global = 1;
	 buf_size = global_vc_size;
  }

  if(output_port >= num_routers + num_global_channels)
	buf_size = cn_vc_size;

   if(s->vc_occupancy[output_chan] >= buf_size)
    {
1244
	    printf("\n %lf Router %ld buffers overflowed from incoming terminals channel %d occupancy %d radix %d next_stop %d ", tw_now(lp),(long int) lp->gid, output_chan, s->vc_occupancy[output_chan], radix, next_stop);
1245
	    bf->c3 = 1;
1246
1247
1248
	    return;
	    //MPI_Finalize();
	    //exit(-1);
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
    }

#if DEBUG
if( msg->packet_ID == TRACK && next_stop != msg->dest_terminal_id && msg->chunk_id == num_chunks-1)
  {
   printf("\n (%lf) [Router %d] Packet %lld being sent to intermediate group router %d Final destination terminal %d Output Channel Index %d Saved vc %d msg_intm_id %d \n", 
              tw_now(lp), (int)lp->gid, msg->packet_ID, next_stop, 
	      msg->dest_terminal_id, output_chan, msg->saved_vc, msg->intm_group_id);
  }
#endif
 // If source router doesn't have global channel and buffer space is available, then assign to appropriate intra-group virtual channel 
  msg->saved_available_time = s->next_output_available_time[output_port];
1261
  ts = g_tw_lookahead + 0.1 + ((1/bandwidth) * CHUNK_SIZE) + tw_rand_exponential(lp->rng, (double)CHUNK_SIZE/200);
1262
1263
1264

  s->next_output_available_time[output_port] = max(s->next_output_available_time[output_port], tw_now(lp));
  s->next_output_available_time[output_port] += ts;
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
  // dest can be a router or a terminal, so we must check
  void * m_data;
  if (next_stop == msg->dest_terminal_id){
      e = model_net_method_event_new(next_stop, 
              s->next_output_available_time[output_port] - tw_now(lp), lp,
              DRAGONFLY, (void**)&m, &m_data);
  }
  else{
      e = tw_event_new(next_stop, s->next_output_available_time[output_port] - tw_now(lp), lp);
      m = tw_event_data(e);
      m_data = m+1;
  }
  memcpy(m, msg, sizeof(terminal_message));
  if (msg->remote_event_size_bytes){
      memcpy(m_data, msg+1, msg->remote_event_size_bytes);
  }
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291

  if(global)
    m->last_hop=GLOBAL;
  else
    m->last_hop = LOCAL;

  m->saved_vc = output_chan;
  msg->old_vc = output_chan;
  m->intm_lp_id = lp->gid;
  s->vc_occupancy[output_chan]++;

1292
1293
  /* Determine the event type. If the packet has arrived at the final destination
     router then it should arrive at the destination terminal next. */
1294
1295
1296
1297
1298
1299
1300
1301
1302
  if(next_stop == msg->dest_terminal_id)
  {
    m->type = T_ARRIVE;

    if(s->vc_occupancy[output_chan] >= cn_vc_size * num_chunks)
      s->output_vc_state[output_chan] = VC_CREDIT;
  }
  else
  {
1303
    /* The packet has to be sent to another router */
1304
1305
    m->type = R_ARRIVE;

1306
   /* If this is a global channel then the buffer space is different */
1307
1308
1309
1310
1311
1312
1313
   if( global )
   {
     if(s->vc_occupancy[output_chan] >= global_vc_size * num_chunks )
       s->output_vc_state[output_chan] = VC_CREDIT;
   }
  else
    {
1314
     /* buffer space is less for local channels */
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
     if( s->vc_occupancy[output_chan] >= local_vc_size * num_chunks )
	s->output_vc_state[output_chan] = VC_CREDIT;
    }
  }
  tw_event_send(e);
  return;
}

/* Packet arrives at the router and a credit is sent back to the sending terminal/router */
void 
router_packet_receive( router_state * s, 
			tw_bf * bf, 
			terminal_message * msg, 
			tw_lp * lp )
{
    tw_event *e;
    terminal_message *m;
    tw_stime ts;

    msg->my_N_hop++;
1335
    ts = g_tw_lookahead + 0.1 + tw_rand_exponential(lp->rng, (double)MEAN_INTERVAL/200);
1336
1337
1338
1339
1340
    num_chunks = msg->packet_size/CHUNK_SIZE;

    if(msg->packet_ID == TRACK && msg->chunk_id == num_chunks-1)
       printf("\n packet %lld chunk %d received at router %d ", msg->packet_ID, msg->chunk_id, (int)lp->gid);
   
1341
    // router self message - no need for method_event
1342
1343
    e = tw_event_new(lp->gid, ts, lp);
    m = tw_event_data(e);
1344
    memcpy(m, msg, sizeof(terminal_message) + msg->remote_event_size_bytes);
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
    m->type = R_SEND;
    router_credit_send(s, bf, msg, lp);
    tw_event_send(e);  
    return;
}

/* sets up the router virtual channels, global channels, local channels, compute node channels */
void router_setup(router_state * r, tw_lp * lp)
{
   codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, &mapping_type_id, lp_type_name, &mapping_rep_id, &mapping_offset);
   r->router_id=mapping_rep_id + mapping_offset;
   r->group_id=r->router_id/num_routers;

   int i;
1359
   int router_offset=(r->router_id % num_routers) * (num_global_channels / 2) + 1;
1360
1361
1362
1363
1364
1365

   r->global_channel = (int*)malloc(num_global_channels * sizeof(int));
   r->next_output_available_time = (tw_stime*)malloc(radix * sizeof(tw_stime));
   r->next_credit_available_time = (tw_stime*)malloc(radix * sizeof(tw_stime));
   r->vc_occupancy = (int*)malloc(radix * sizeof(int));
   r->output_vc_state = (int*)malloc(radix * sizeof(int));
1366
  
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
   for(i=0; i < radix; i++)
    {
       // Set credit & router occupancy
	r->next_output_available_time[i]=0;
        r->next_credit_available_time[i]=0;
        r->vc_occupancy[i]=0;
        r->output_vc_state[i]= VC_IDLE;
    }

   //round the number of global channels to the nearest even number
   for(i=0; i < num_global_channels; i++)
    {
      if(i % 2 != 0)
          {
1381
1382
             r->global_channel[i]=(r->router_id + (router_offset * num_routers))%total_routers;
             router_offset++;
1383
1384
1385
          }
          else
           {
1386
             r->global_channel[i]=r->router_id - ((router_offset) * num_routers);
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
           }
        if(r->global_channel[i]<0)
         {
           r->global_channel[i]=total_routers+r->global_channel[i]; 
	 }
    }
   return;
}	

/* Update the buffer space associated with this router LP */
void router_buf_update(router_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp)
{
    int msg_indx = msg->vc_index;
    s->vc_occupancy[msg_indx]--;
    s->output_vc_state[msg_indx] = VC_IDLE;
    return;
}

void router_event(router_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp)
{
  *(int *)bf = (int)0;
  switch(msg->type)
   {
	   case R_SEND: // Router has sent a packet to an intra-group router (local channel)
 		 router_packet_send(s, bf, msg, lp);
           break;

	   case R_ARRIVE: // Router has received a packet from an intra-group router (local channel)
	        router_packet_receive(s, bf, msg, lp);
	   break;
	
	   case R_BUFFER:
	        router_buf_update(s, bf, msg, lp);
	   break;

	   default:
		  printf("\n (%lf) [Router %d] Router Message type not supported %d dest terminal id %d packet ID %d ", tw_now(lp), (int)lp->gid, msg->type, (int)msg->dest_terminal_id, (int)msg->packet_ID);
	   break;
   }	   
}

/* Reverse computation handler for a terminal event */
void terminal_rc_event_handler(terminal_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp)
{
   switch(msg->type)
   {
	   case T_GENERATE:
		 {
		 int i;
		 tw_rand_reverse_unif(lp->rng);

		 for(i = 0; i < num_chunks; i++)
                  tw_rand_reverse_unif(lp->rng);
1440
1441
1442
1443
1444
		 mn_stats* stat;
		 stat = model_net_find_stats(msg->category, s->dragonfly_stats_array);
		 stat->send_count--;
		 stat->send_bytes -= msg->packet_size;
		 stat->send_time -= (1/cn_bandwidth) * msg->packet_size;
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
		 }
	   break;
	   
	   case T_SEND:
	         {
	           s->terminal_available_time = msg->saved_available_time;
		   tw_rand_reverse_unif(lp->rng);	
		   int vc = msg->saved_vc;
		   s->vc_occupancy[vc]--;
		   s->packet_counter--;
		   s->output_vc_state[vc] = VC_IDLE;
		 }
	   break;

	   case T_ARRIVE:
	   	 {
		   tw_rand_reverse_unif(lp->rng);
1462
		   s->next_credit_available_time = msg->saved_credit_time;
1463
1464
		   if(bf->c2)
		   {
1465
1466
1467
1468
1469
		    mn_stats* stat;
		    stat = model_net_find_stats(msg->category, s->dragonfly_stats_array);
		    stat->recv_count--;
		    stat->recv_bytes -= msg->packet_size;
		    stat->recv_time -= tw_now(lp) - msg->travel_start_time;
1470
1471
1472
1473
1474
		    N_finished_packets--;
		    dragonfly_total_time -= (tw_now(lp) - msg->travel_start_time);
		    total_hops -= msg->my_N_hop;
		   if(bf->c3)
		         dragonfly_max_latency = msg->saved_available_time;
1475
1476
1477
		   }
		    
		   msg->my_N_hop--;
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
		 }
           break;

	   case T_BUFFER:
	        {
		   int msg_indx = msg->vc_index;
		   s->vc_occupancy[msg_indx]++;
		   
		   if(s->vc_occupancy[msg_indx] == cn_vc_size * num_chunks)
			s->output_vc_state[msg_indx] = VC_CREDIT;
	     }  
	   break;
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
	
          case D_COLLECTIVE_INIT:
                {
                    s->collective_init_time = msg->saved_collective_init_time;
                }
          break;

          case D_COLLECTIVE_FAN_IN:
                {
                   int i;
                   s->num_fan_nodes--;
                   if(bf->c1)
                    {
                        s->num_fan_nodes = msg->saved_fan_nodes;
                    }
                   if(bf->c2)
                     {
                        s->num_fan_nodes = msg->saved_fan_nodes;
                        for( i = 0; i < s->num_children; i++ )
                            tw_rand_reverse_unif(lp->rng);
                     }
                }
        break;

        case D_COLLECTIVE_FAN_OUT:
                {
                 int i;
                 if(bf->c1)
                    {
                        for( i = 0; i < s->num_children; i++ )
                            tw_rand_reverse_unif(lp->rng);
                    }
                }	 
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
   }
}

/* Reverse computation handler for a router event */
void router_rc_event_handler(router_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp)
{
  switch(msg->type)
    {
            case R_SEND:
		    {
1533
			if(msg->path_type == NON_MINIMAL && bf->c2)
1534
			   tw_rand_reverse_unif(lp->rng);
1535
1536

			if(routing == ADAPTIVE && msg->path_type == MINIMAL)
1537
                                minimal_count--;
1538
                        if(routing == ADAPTIVE && msg->path_type == NON_MINIMAL)
1539
                                nonmin_count--;
1540
1541
1542
1543

			if(bf->c3)
			   return;
			    
1544
		        tw_rand_reverse_unif(lp->rng);
1545
1546
1547
1548
1549
1550
			int output_chan = msg->old_vc;
			int output_port = output_chan / num_vcs;

			s->next_output_available_time[output_port] = msg->saved_available_time;
			s->vc_occupancy[output_chan]--;
			s->output_vc_state[output_chan]=VC_IDLE;
1551

1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
		    }
	    break;

	    case R_ARRIVE:
	    	    {
			msg->my_N_hop--;
			tw_rand_reverse_unif(lp->rng);
			tw_rand_reverse_unif(lp->rng);
			int output_port = msg->saved_vc/num_vcs;
			s->next_credit_available_time[output_port] = msg->saved_available_time;
1562
1563
1564
                        if (msg->chunk_id == num_chunks-1 && 
                                msg->remote_event_size_bytes && 
                                msg->is_pull){
1565
                            int net_id = model_net_get_id(LP_METHOD_NM);
1566
1567
                            model_net_event_rc(net_id, lp, msg->pull_size);
                        }
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
		    }
	    break;

	    case R_BUFFER:
	    	   {
		      int msg_indx = msg->vc_index;
                      s->vc_occupancy[msg_indx]++;

                      int buf = local_vc_size;

		      if(msg->last_hop == GLOBAL)
			 buf = global_vc_size;
		       else if(msg->last_hop == TERMINAL)
			 buf = cn_vc_size;
	 
		      if(s->vc_occupancy[msg_indx] >= buf * num_chunks)
                          s->output_vc_state[msg_indx] = VC_CREDIT;

		   }
	    break;
	  
    }
}
/* dragonfly compute node and router LP types */
tw_lptype dragonfly_lps[] =
{
   // Terminal handling functions
   {
    (init_f)terminal_init,
    (event_f) terminal_event,
    (revent_f) terminal_rc_event_handler,
1599
    (final_f) dragonfly_terminal_final,
1600
1601
1602
1603
1604
1605
1606
    (map_f) codes_mapping,
    sizeof(terminal_state)
    },
   {
     (init_f) router_setup,
     (event_f) router_event,
     (revent_f) router_rc_event_handler,
1607
     (final_f) dragonfly_router_final,
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
     (map_f) codes_mapping,
     sizeof(router_state),
   },
   {0},
};

/* returns the dragonfly lp type for lp registration */
static const tw_lptype* dragonfly_get_cn_lp_type(void)
{
	   return(&dragonfly_lps[0]);
}
static const tw_lptype* dragonfly_get_router_lp_type(void)
{
	           return(&dragonfly_lps[1]);
}          

1624
1625
1626
1627
1628
1629
1630
static tw_lpid dragonfly_find_local_device(tw_lp *sender)
{
     char lp_type_name[MAX_NAME_LENGTH], lp_group_name[MAX_NAME_LENGTH];
     int mapping_grp_id, mapping_rep_id, mapping_type_id, mapping_offset;
     tw_lpid dest_id;

     codes_mapping_get_lp_info(sender->gid, lp_group_name, &mapping_grp_id, &mapping_type_id, lp_type_name, &mapping_rep_id, &mapping_offset);
1631
     codes_mapping_get_lp_id(lp_group_name, LP_CONFIG_NM, mapping_rep_id, mapping_offset, &dest_id);
1632
1633
1634

    return(dest_id);
}
1635

1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
/* data structure for dragonfly statistics */
struct model_net_method dragonfly_method =
{
    .mn_setup = dragonfly_setup,
    .model_net_method_packet_event = dragonfly_packet_event,
    .model_net_method_packet_event_rc = dragonfly_packet_event_rc,
    .mn_get_lp_type = dragonfly_get_cn_lp_type,
    .mn_get_msg_sz = dragonfly_get_msg_sz,
    .mn_report_stats = dragonfly_report_stats,
    .model_net_method_find_local_device = dragonfly_find_local_device,
1646
1647
    .mn_collective_call = dragonfly_collective,
    .mn_collective_call_rc = dragonfly_collective_rc   
1648
1649
};

Philip Carns's avatar
Philip Carns committed
1650
1651
1652
1653
1654
1655
1656
1657
1658

/*
 * Local variables:
 *  c-indent-level: 4
 *  c-basic-offset: 4
 * End:
 *
 * vim: ts=8 sts=4 sw=4 expandtab
 */