dragonfly.c 74.5 KB
Newer Older
Philip Carns's avatar
Philip Carns committed
1
2
3
4
5
6
/*
 * Copyright (C) 2013 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

7
8
9
10
// Local router ID: 0 --- total_router-1
// Router LP ID 
// Terminal LP ID

11
12
13
#include <ross.h>

#include "codes/codes_mapping.h"
14
#include "codes/jenkins-hash.h"
15
16
17
#include "codes/codes.h"
#include "codes/model-net.h"
#include "codes/model-net-method.h"
18
19
#include "codes/model-net-lp.h"
#include "codes/net/dragonfly.h"
20
21
22
23

#define CREDIT_SIZE 8
#define MEAN_PROCESS 1.0

24
25
26
/* collective specific parameters */
#define TREE_DEGREE 4
#define LEVEL_DELAY 1000
27
#define DRAGONFLY_COLLECTIVE_DEBUG 0
28
29
30
#define NUM_COLLECTIVES  1
#define COLLECTIVE_COMPUTATION_DELAY 5700
#define DRAGONFLY_FAN_OUT_DELAY 20.0
31
#define WINDOW_LENGTH 0
32

33
// debugging parameters
34
#define TRACK -1
35
#define PRINT_ROUTER_TABLE 1
36
#define DEBUG 0
37

38
39
40
#define LP_CONFIG_NM (model_net_lp_config_names[DRAGONFLY])
#define LP_METHOD_NM (model_net_method_names[DRAGONFLY])

41
42
43
44
#define DRAGONFLY_DBG 0
#define dprintf(_fmt, ...) \
    do {if (CLIENT_DBG) printf(_fmt, __VA_ARGS__);} while (0)

45
long term_ecount, router_ecount, term_rev_ecount, router_rev_ecount;
46

47
48
static double maxd(double a, double b) { return a < b ? b : a; }

49
50
// arrival rate
static double MEAN_INTERVAL=200.0;
51
52
// threshold for adaptive routing
static int adaptive_threshold = 10;
53
54

/* minimal and non-minimal packet counts for adaptive routing*/
55
unsigned int minimal_count=0, nonmin_count=0, completed_packets = 0;
56

57
58
59
60
61
62
typedef struct dragonfly_param dragonfly_param;
/* annotation-specific parameters (unannotated entry occurs at the 
 * last index) */
static uint64_t                  num_params = 0;
static dragonfly_param         * all_params = NULL;
static const config_anno_map_t * anno_map   = NULL;
63
64

/* global variables for codes mapping */
65
static char lp_group_name[MAX_NAME_LENGTH];
66
67
static int mapping_grp_id, mapping_type_id, mapping_rep_id, mapping_offset;

68
69
70
71
72
73
74
75
/* router magic number */
int router_magic_num = 0;

/* terminal magic number */
int terminal_magic_num = 0;

/* number of routers in a mapping group */
static int num_routers_per_mgrp = 0;
76

77
78
79
80
81
82
/* maximum number of terminals and routers */
int max_term_occupancy, max_router_occupancy;

/* noise of 1ns */
double noise = 1.0;

83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
struct dragonfly_param
{
    // configuration parameters
    int num_routers; /*Number of routers in a group*/
    double local_bandwidth;/* bandwidth of the router-router channels within a group */
    double global_bandwidth;/* bandwidth of the inter-group router connections */
    double cn_bandwidth;/* bandwidth of the compute node channels connected to routers */
    int num_vcs; /* number of virtual channels */
    int local_vc_size; /* buffer size of the router-router channels */
    int global_vc_size; /* buffer size of the global channels */
    int cn_vc_size; /* buffer size of the compute node channels */
    int chunk_size; /* full-sized packets are broken into smaller chunks.*/

    // derived parameters
    int num_cn;
    int num_groups;
    int radix;
    int total_routers;
101
    int total_terminals;
102
103
104
    int num_global_channels;
};

105
106
107
108
109
110
111
112
113
struct pending_router_msgs
{
    struct pending_router_msgs * next;
    struct pending_router_msgs * prev;
    char * event_data;
    terminal_message msg;
    int output_chan;
    int next_stop; 
};
114
115
116
117
118
119
120
121
122
123
124
125
/* handles terminal and router events like packet generate/send/receive/buffer */
typedef enum event_t event_t;

typedef struct terminal_state terminal_state;
typedef struct router_state router_state;

/* dragonfly compute node data structure */
struct terminal_state
{
   unsigned long long packet_counter;

   // Dragonfly specific parameters
126
127
   tw_lpid router_id;
   tw_lpid terminal_id;
128
129
130
131
132
133
134
135
136
137

   // Each terminal will have an input and output channel with the router
   int* vc_occupancy; // NUM_VC
   int* output_vc_state;
   tw_stime terminal_available_time;
   tw_stime next_credit_available_time;
// Terminal generate, sends and arrival T_SEND, T_ARRIVAL, T_GENERATE
// Router-Router Intra-group sends and receives RR_LSEND, RR_LARRIVE
// Router-Router Inter-group sends and receives RR_GSEND, RR_GARRIVE
   struct mn_stats dragonfly_stats_array[CATEGORY_MAX];
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
  /* collective init time */
  tw_stime collective_init_time;

  /* node ID in the tree */ 
   tw_lpid node_id;

   /* messages sent & received in collectives may get interchanged several times so we have to save the 
     origin server information in the node's state */
   tw_lpid origin_svr; 
  
  /* parent node ID of the current node */
   tw_lpid parent_node_id;
   /* array of children to be allocated in terminal_init*/
   tw_lpid* children;

   /* children of a node can be less than or equal to the tree degree */
   int num_children;

   short is_root;
   short is_leaf;

   /* to maintain a count of child nodes that have fanned in at the parent during the collective
      fan-in phase*/
   int num_fan_nodes;
162
   int max_term_vc_occupancy;
163
164
165

   const char * anno;
   const dragonfly_param *params;
166
};
167

168
169
170
171
172
173
/* terminal event type (1-4) */
enum event_t
{
  T_GENERATE=1,
  T_ARRIVE,
  T_BUFFER,
174
  R_FORWARD,
175
176
177
178
  R_BUFFER,
  D_COLLECTIVE_INIT,
  D_COLLECTIVE_FAN_IN,
  D_COLLECTIVE_FAN_OUT
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
};
/* status of a virtual channel can be idle, active, allocated or wait for credit */
enum vc_status
{
   VC_IDLE,
   VC_ACTIVE,
   VC_ALLOC,
   VC_CREDIT
};

/* whether the last hop of a packet was global, local or a terminal */
enum last_hop
{
   GLOBAL,
   LOCAL,
   TERMINAL
};

/* three forms of routing algorithms available, adaptive routing is not
 * accurate and fully functional in the current version as the formulas
 * for detecting load on global channels are not very accurate */
enum ROUTING_ALGO
{
202
203
    MINIMAL = 0,
    NON_MINIMAL,
204
205
    ADAPTIVE,
    PROG_ADAPTIVE
206
207
208
209
210
211
};

struct router_state
{
   unsigned int router_id;
   unsigned int group_id;
212
   
213
214
   tw_stime* next_output_available_time;
   tw_stime* next_credit_available_time;
215
216
   tw_stime* cur_hist_start_time;
   
217
218
   int* vc_occupancy;
   int* output_vc_state;
219
   int * global_channel;
220
   int max_router_vc_occupancy;
221
222
223

   const char * anno;
   const dragonfly_param *params;
224
225
226

   int* prev_hist_num;
   int* cur_hist_num;
227
228
229
230

   struct pending_router_msgs * head;
   struct pending_router_msgs * tail;
   int num_waiting;
231
232
233
234
235
236
};

static short routing = MINIMAL;

static tw_stime         dragonfly_total_time = 0;
static tw_stime         dragonfly_max_latency = 0;
237
static tw_stime         max_collective = 0;
238
239
240
241
242


static long long       total_hops = 0;
static long long       N_finished_packets = 0;

243
244
245
/* function definitions */
static void router_credit_send(router_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp);

246
247
248
/* returns the dragonfly router lp type for lp registration */
static const tw_lptype* dragonfly_get_router_lp_type(void);

249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
/* remove head of the queue */
void remove_pending_list_head(struct pending_router_msgs** head,
                              struct pending_router_msgs** tail,
                              terminal_message * msg)
{
    struct pending_router_msgs * elem = *head;

    if(*head == *tail)
        *tail = NULL;

    *head = (*head)->next;
    
    if(*head)
        (*head)->prev = NULL;
    
    free(elem);
}

/* pending router messages */
void add_pending_router_message(struct pending_router_msgs** head,
                                struct pending_router_msgs** tail,
                                terminal_message * msg,
                                int chan,
                                int next_stop)
{
    struct pending_router_msgs * elem = malloc(sizeof(struct pending_router_msgs));
    memcpy(&(elem->msg), msg, sizeof(terminal_message));
    elem->prev = NULL;
    elem->next_stop = next_stop;
    elem->output_chan = chan;

    if(msg->remote_event_size_bytes)
    {
        void * m_data = msg+1;
        elem->event_data = (void*)malloc(msg->remote_event_size_bytes);
        memcpy(elem->event_data, m_data, msg->remote_event_size_bytes);
    }
    elem->next = *head;

    if(*head)
      (*head)->prev = elem;
    
    if(!(*head))
        *tail = elem;
    
    *head = elem;
    return;
}

/* remove message from the queue */
struct pending_router_msgs * remove_pending_router_msgs(struct pending_router_msgs** head, 
                                struct pending_router_msgs** tail,
                                int chan)
{
   struct pending_router_msgs * elem = *head;

   if(!elem)
     return NULL;

   while(elem != NULL)
   {
       if(elem->output_chan == chan)
       {
        /* Remove elemt from the list */
           /* if there is just one element */
           if(elem == *head && elem == *tail)
           {
               *head = NULL;
               *tail = NULL;
           }
           /* if element if at the head */
           if(elem == *head && elem != *tail)
           {
               *head = elem->next;
               elem->next->prev = NULL;
           }

           /* if element is at the tail */
           if(elem == *tail && elem != *head)
           {
               *tail = elem->prev;
               elem->prev->next = NULL;
           }

           /* if element is in the middle */
           if(elem->prev)
               elem->prev->next = elem->next;
           
           if(elem->next)
               elem->next->prev = elem->prev;
          

           //printf("\n Returned element %d %d %d", elem->msg.dest_terminal_id, chan, elem->next_stop);
           return elem;
       }
       elem = elem->next;
   }
   return NULL;
}

349
350
351
352
353
354
/* returns the dragonfly message size */
static int dragonfly_get_msg_sz(void)
{
	   return sizeof(terminal_message);
}

355
356
357
static void dragonfly_read_config(const char * anno, dragonfly_param *params){
    // shorthand
    dragonfly_param *p = params;
358

359
360
361
362
363
364
365
366
367
368
    configuration_get_value_int(&config, "PARAMS", "num_routers", anno,
            &p->num_routers);
    if(p->num_routers <= 0) {
        p->num_routers = 4;
        fprintf(stderr, "Number of dimensions not specified, setting to %d\n",
                p->num_routers);
    }

    configuration_get_value_int(&config, "PARAMS", "num_vcs", anno,
            &p->num_vcs);
369
    if(!p->num_vcs) {
370
371
372
373
374
        p->num_vcs = 1;
        fprintf(stderr, "Number of virtual channels not specified, setting to %d\n", p->num_vcs);
    }

    configuration_get_value_int(&config, "PARAMS", "local_vc_size", anno, &p->local_vc_size);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
375
    if(p->local_vc_size <= 0) {
376
377
378
379
380
        p->local_vc_size = 1024;
        fprintf(stderr, "Buffer size of local channels not specified, setting to %d\n", p->local_vc_size);
    }

    configuration_get_value_int(&config, "PARAMS", "global_vc_size", anno, &p->global_vc_size);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
381
    if(p->global_vc_size <= 0) {
382
383
384
385
386
        p->global_vc_size = 2048;
        fprintf(stderr, "Buffer size of global channels not specified, setting to %d\n", p->global_vc_size);
    }

    configuration_get_value_int(&config, "PARAMS", "cn_vc_size", anno, &p->cn_vc_size);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
387
    if(p->cn_vc_size <= 0) {
388
389
390
391
392
        p->cn_vc_size = 1024;
        fprintf(stderr, "Buffer size of compute node channels not specified, setting to %d\n", p->cn_vc_size);
    }

    configuration_get_value_int(&config, "PARAMS", "chunk_size", anno, &p->chunk_size);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
393
    if(p->chunk_size <= 0) {
394
        p->chunk_size = 64;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
395
        fprintf(stderr, "Chunk size for packets is not specified, setting to %d\n", p->chunk_size);
396
397
398
    }

    configuration_get_value_double(&config, "PARAMS", "local_bandwidth", anno, &p->local_bandwidth);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
399
    if(p->local_bandwidth <= 0) {
400
401
402
403
404
        p->local_bandwidth = 5.25;
        fprintf(stderr, "Bandwidth of local channels not specified, setting to %lf\n", p->local_bandwidth);
    }

    configuration_get_value_double(&config, "PARAMS", "global_bandwidth", anno, &p->global_bandwidth);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
405
    if(p->global_bandwidth <= 0) {
406
407
408
409
410
        p->global_bandwidth = 4.7;
        fprintf(stderr, "Bandwidth of global channels not specified, setting to %lf\n", p->global_bandwidth);
    }

    configuration_get_value_double(&config, "PARAMS", "cn_bandwidth", anno, &p->cn_bandwidth);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
411
    if(p->cn_bandwidth <= 0) {
412
413
414
415
416
        p->cn_bandwidth = 5.25;
        fprintf(stderr, "Bandwidth of compute node channels not specified, setting to %lf\n", p->cn_bandwidth);
    }


417

418
419
    char routing_str[MAX_NAME_LENGTH];
    configuration_get_value(&config, "PARAMS", "routing", anno, routing_str,
420
            MAX_NAME_LENGTH);
421
422
423
424
425
426
427
428
    if(strcmp(routing_str, "minimal") == 0)
        routing = MINIMAL;
    else if(strcmp(routing_str, "nonminimal")==0 || strcmp(routing_str,"non-minimal")==0)
        routing = NON_MINIMAL;
    else if (strcmp(routing_str, "adaptive") == 0)
        routing = ADAPTIVE;
    else if (strcmp(routing_str, "prog-adaptive") == 0)
	routing = PROG_ADAPTIVE;
429
430
431
432
    else
    {
        fprintf(stderr, 
                "No routing protocol specified, setting to minimal routing\n");
433
        routing = -1;
434
435
436
437
438
439
440
    }

    // set the derived parameters
    p->num_cn = p->num_routers/2;
    p->num_global_channels = p->num_routers/2;
    p->num_groups = p->num_routers * p->num_cn + 1;
    p->radix = p->num_vcs *
441
        (p->num_routers + p->num_global_channels + p->num_cn);
442
    p->total_routers = p->num_groups * p->num_routers;
443
    p->total_terminals = p->total_routers * p->num_cn;
444
445
446
    
    if(!g_tw_mynode)
    	printf("\n Total nodes %d routers %d groups %d radix %d num_vc %d ", p->num_cn * p->total_routers,
447
448
								p->total_routers,
								p->num_groups,
449
450
								p->radix,
                                                                p->num_vcs);
451
452
}

453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
/* convert GiB/s and bytes to ns */
static tw_stime bytes_to_ns(uint64_t bytes, double GB_p_s)
{
    tw_stime time;

    /* bytes to GB */
    time = ((double)bytes)/(1024.0*1024.0*1024.0);
    /* MB to s */
    time = time / GB_p_s;
    /* s to ns */
    time = time * 1000.0 * 1000.0 * 1000.0;

    return(time);
}

/* reverse computation for msg ready event */
469
470
471
472
static void dragonfly_configure(){
    anno_map = codes_mapping_get_lp_anno_map(LP_CONFIG_NM);
    assert(anno_map);
    num_params = anno_map->num_annos + (anno_map->has_unanno_lp > 0);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
473
    all_params = calloc(num_params, sizeof(*all_params));
474
475

    for (uint64_t i = 0; i < anno_map->num_annos; i++){
476
        const char * anno = anno_map->annotations[i].ptr;
477
478
479
480
481
        dragonfly_read_config(anno, &all_params[i]);
    }
    if (anno_map->has_unanno_lp > 0){
        dragonfly_read_config(NULL, &all_params[anno_map->num_annos]);
    }
482
483
484
485
486
487
488
}

/* report dragonfly statistics like average and maximum packet latency, average number of hops traversed */
static void dragonfly_report_stats()
{
   long long avg_hops, total_finished_packets;
   tw_stime avg_time, max_time;
489
   long total_term_events, total_router_events;
490
   int total_minimal_packets, total_nonmin_packets, total_completed_packets;
491
492
493
494
495

   MPI_Reduce( &total_hops, &avg_hops, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
   MPI_Reduce( &N_finished_packets, &total_finished_packets, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
   MPI_Reduce( &dragonfly_total_time, &avg_time, 1,MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
   MPI_Reduce( &dragonfly_max_latency, &max_time, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
496
497
498
   MPI_Reduce( &term_ecount, &total_term_events, 1, MPI_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
   MPI_Reduce( &router_ecount, &total_router_events, 1, MPI_LONG, MPI_SUM, 0, MPI_COMM_WORLD);

499
   if(routing == ADAPTIVE || routing == PROG_ADAPTIVE)
500
501
502
    {
	MPI_Reduce(&minimal_count, &total_minimal_packets, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
 	MPI_Reduce(&nonmin_count, &total_nonmin_packets, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
503
 	MPI_Reduce(&completed_packets, &total_completed_packets, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
504
    }
505
506
   /* print statistics */
   if(!g_tw_mynode)
507
   {	
508
509
      printf("\n total finished packets %lld ", total_finished_packets);
      printf(" Average number of hops traversed %f average message latency %lf us maximum message latency %lf us\n", (float)avg_hops/total_finished_packets, avg_time/(total_finished_packets*1000), max_time/1000);
510
     if(routing == ADAPTIVE || routing == PROG_ADAPTIVE)
511
              printf("\n ADAPTIVE ROUTING STATS: %d packets routed minimally %d packets routed non-minimally completed packets %d ", total_minimal_packets, total_nonmin_packets, total_completed_packets);
512
 
513
     printf("\n Max terminal occupancy %d max router occupancy %d ", max_term_occupancy, max_router_occupancy);
514
     printf("\n Event population: total committed terminal events:%ld router events: %ld ", total_term_events, total_router_events);
515
  }
516
517
   return;
}
518

519
520
521
void dragonfly_collective_init(terminal_state * s,
           		   tw_lp * lp)
{
522
523
524
525
526
    // TODO: be annotation-aware
    codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, NULL,
            &mapping_type_id, NULL, &mapping_rep_id, &mapping_offset);
    int num_lps = codes_mapping_get_lp_count(lp_group_name, 1, LP_CONFIG_NM,
            NULL, 1);
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
    int num_reps = codes_mapping_get_group_reps(lp_group_name);
    s->node_id = (mapping_rep_id * num_lps) + mapping_offset;

    int i;
   /* handle collective operations by forming a tree of all the LPs */
   /* special condition for root of the tree */
   if( s->node_id == 0)
    {
        s->parent_node_id = -1;
        s->is_root = 1;
   }
   else
   {
       s->parent_node_id = (s->node_id - ((s->node_id - 1) % TREE_DEGREE)) / TREE_DEGREE;
       s->is_root = 0;
   }
   s->children = (tw_lpid*)malloc(TREE_DEGREE * sizeof(tw_lpid));

   /* set the isleaf to zero by default */
   s->is_leaf = 1;
   s->num_children = 0;

   /* calculate the children of the current node. If its a leaf, no need to set children,
      only set isleaf and break the loop*/

   for( i = 0; i < TREE_DEGREE; i++ )
    {
        tw_lpid next_child = (TREE_DEGREE * s->node_id) + i + 1;
        if(next_child < (num_lps * num_reps))
        {
            s->num_children++;
            s->is_leaf = 0;
            s->children[i] = next_child;
        }
        else
           s->children[i] = -1;
    }

#if DRAGONFLY_COLLECTIVE_DEBUG == 1
   printf("\n LP %ld parent node id ", s->node_id);

   for( i = 0; i < TREE_DEGREE; i++ )
        printf(" child node ID %ld ", s->children[i]);
   printf("\n");

   if(s->is_leaf)
        printf("\n LP %ld is leaf ", s->node_id);
#endif
}

577
/* dragonfly packet event , generates a dragonfly packet on the compute node */
578
static tw_stime dragonfly_packet_event(char const * category, tw_lpid final_dest_lp, tw_lpid dest_mn_lp, uint64_t packet_size, int is_pull, uint64_t pull_size, tw_stime offset, const mn_sched_params *sched_params, int remote_event_size, const void* remote_event, int self_event_size, const void* self_event, tw_lpid src_lp, tw_lp *sender, int is_last_pckt)
579
580
581
582
583
584
{
    tw_event * e_new;
    tw_stime xfer_to_nic_time;
    terminal_message * msg;
    char* tmp_ptr;

585
    xfer_to_nic_time = codes_local_latency(sender); /* Throws an error of found last KP time > current event time otherwise when LPs of one type are placed together*/
586
587
    e_new = model_net_method_event_new(sender->gid, xfer_to_nic_time+offset,
            sender, DRAGONFLY, (void**)&msg, (void**)&tmp_ptr);
588
589
    strcpy(msg->category, category);
    msg->final_dest_gid = final_dest_lp;
590
    msg->dest_terminal_id = dest_mn_lp;
591
    msg->sender_lp=src_lp;
592
    msg->sender_mn_lp = sender->gid;
593
594
595
596
    msg->packet_size = packet_size;
    msg->remote_event_size_bytes = 0;
    msg->local_event_size_bytes = 0;
    msg->type = T_GENERATE;
597
    msg->magic = terminal_magic_num;
598
599
    msg->is_pull = is_pull;
    msg->pull_size = pull_size;
600
    msg->chunk_id = 0;
601
    msg->packet_ID = 0;
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618

    if(is_last_pckt) /* Its the last packet so pass in remote and local event information*/
      {
	if(remote_event_size > 0)
	 {
		msg->remote_event_size_bytes = remote_event_size;
		memcpy(tmp_ptr, remote_event, remote_event_size);
		tmp_ptr += remote_event_size;
	}
	if(self_event_size > 0)
	{
		msg->local_event_size_bytes = self_event_size;
		memcpy(tmp_ptr, self_event, self_event_size);
		tmp_ptr += self_event_size;
	}
     }
    tw_event_send(e_new);
619
    return xfer_to_nic_time;
620
621
622
623
624
625
626
627
628
}

/* dragonfly packet event reverse handler */
static void dragonfly_packet_event_rc(tw_lp *sender)
{
	  codes_local_latency_reverse(sender);
	    return;
}

629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
void send_packet_from_router(router_state * s,
                             tw_bf * bf,
                             terminal_message * msg,
                             tw_lp * lp,
                             int output_chan,
                             int next_stop,
                             char * event_data)
{
       router_credit_send(s, bf, msg, lp); 

       tw_stime ts;
       tw_event *e;
       terminal_message *m;
      
       int global=0;
       int output_port = output_chan / s->params->num_vcs;
       double bandwidth = s->params->local_bandwidth;
       // Allocate output Virtual Channel
      if(output_port >= s->params->num_routers && 
              output_port < s->params->num_routers + s->params->num_global_channels)
      {
         global = 1;
         bandwidth = s->params->global_bandwidth;
      }

      // If source router doesn't have global channel and buffer space is available, then assign to appropriate intra-group virtual channel 
	  msg->saved_available_time = s->next_output_available_time[output_port];
	  ts = g_tw_lookahead + bytes_to_ns(s->params->chunk_size, bandwidth) + tw_rand_exponential(lp->rng, noise);

	  s->next_output_available_time[output_port] = maxd(s->next_output_available_time[output_port], tw_now(lp));
	  s->next_output_available_time[output_port] += ts;
	  // dest can be a router or a terminal, so we must check
	  void * m_data;
	  if (next_stop == msg->dest_terminal_id){
	      e = model_net_method_event_new(next_stop, 
		      s->next_output_available_time[output_port] - tw_now(lp), lp,
		      DRAGONFLY, (void**)&m, &m_data);
	  }
	  else{
	      e = tw_event_new(next_stop, s->next_output_available_time[output_port] - tw_now(lp), lp);
	      m = tw_event_data(e);
	      m_data = m+1;
	  }
	  memcpy(m, msg, sizeof(terminal_message));
	  if (msg->remote_event_size_bytes){
          if(!event_data)
	        memcpy(m_data, msg+1, msg->remote_event_size_bytes);
	      else
              memcpy(m_data, event_data, msg->remote_event_size_bytes);
      }
	  if(global)
	    m->last_hop=GLOBAL;
	  else
	    m->last_hop = LOCAL;

	  m->local_id = s->router_id;
	  /* for reverse computation */
	  msg->new_vc = output_chan;
	  /* for sending back credit in forward event handler */
	  m->saved_vc = output_chan;
	  m->intm_lp_id = lp->gid;
	  s->vc_occupancy[output_chan]++;
	 
      if(s->vc_occupancy[output_chan] > s->max_router_vc_occupancy)
	   {
		bf->c3 = 1;
		msg->saved_occupancy = s->max_router_vc_occupancy;
		s->max_router_vc_occupancy = s->vc_occupancy[output_chan];
 	   }
	 if(routing == PROG_ADAPTIVE)
	  {
		  if(tw_now(lp) - s->cur_hist_start_time[output_chan] >= WINDOW_LENGTH)
		  {
			bf->c2 = 1;
			msg->saved_hist_num = s->prev_hist_num[output_chan];
			msg->saved_hist_start_time = s->cur_hist_start_time[output_chan];

			s->prev_hist_num[output_chan] = s->cur_hist_num[output_chan];

			s->cur_hist_start_time[output_chan] = tw_now(lp);
			s->cur_hist_num[output_chan] = 1;
		  }
		  else
		  {
			s->cur_hist_num[output_chan]++;
		  }
	  }
	  /* Determine the event type. If the packet has arrived at the final destination
	     router then it should arrive at the destination terminal next. */
	  if(next_stop == msg->dest_terminal_id)
	  {
	    m->type = T_ARRIVE;
	    m->magic = terminal_magic_num;

	    if(s->vc_occupancy[output_chan] >= s->params->cn_vc_size)
	      s->output_vc_state[output_chan] = VC_CREDIT;
	  }
	  else
	  {
	    /* The packet has to be sent to another router */
	    m->type = R_FORWARD;
	    m->magic = router_magic_num;
	   /* If this is a global channel then the buffer space is different */
	   if( global )
	   {
	     if(s->vc_occupancy[output_chan] >= s->params->global_vc_size)
	       s->output_vc_state[output_chan] = VC_CREDIT;
	   }
	  else
	    {
	     /* buffer space is less for local channels */
	     if( s->vc_occupancy[output_chan] >= s->params->local_vc_size)
		    s->output_vc_state[output_chan] = VC_CREDIT;
	    }
	  }
	  tw_event_send(e);
	  return;

}
748
749
750
751
/* given two group IDs, find the router of the src_gid that connects to the dest_gid*/
tw_lpid getRouterFromGroupID(int dest_gid, 
		    int src_gid,
		    int num_routers)
752
{
753
754
755
  int group_begin = src_gid * num_routers;
  int group_end = (src_gid * num_routers) + num_routers-1;
  int offset = (dest_gid * num_routers - group_begin) / num_routers;
756
  
757
758
  if((dest_gid * num_routers) < group_begin)
    offset = (group_begin - dest_gid * num_routers) / num_routers; // take absolute value
759
  
760
761
  int half_channel = num_routers / 4;
  int index = (offset - 1)/(half_channel * num_routers);
762
  
763
  offset=(offset - 1) % (half_channel * num_routers);
764
765
766
767
768
769
770
771
772
773
774
775
776

  // If the destination router is in the same group
  tw_lpid router_id;

  if(index % 2 != 0)
    router_id = group_end - (offset / half_channel); // start from the end
  else
    router_id = group_begin + (offset / half_channel);

  return router_id;
}	

/*When a packet is sent from the current router and a buffer slot becomes available, a credit is sent back to schedule another packet event*/
777
static void router_credit_send(router_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp)
778
779
780
781
782
{
  tw_event * buf_e;
  tw_stime ts;
  terminal_message * buf_msg;

783
  int dest=0, type = R_BUFFER;
784
  int is_terminal = 0;
785
  int found_magic = router_magic_num;
786
  tw_stime credit_delay = 0.0;
787

788
  const dragonfly_param *p = s->params;
789
  int sender_radix;
790
791
792
793
 // Notify sender terminal about available buffer space
  if(msg->last_hop == TERMINAL)
  {
   dest = msg->src_terminal_id;
794
   sender_radix = msg->local_id % p->num_cn;  
795
   //determine the time in ns to transfer the credit
796
   credit_delay = bytes_to_ns(CREDIT_SIZE, p->cn_bandwidth);
797
   type = T_BUFFER;
798
   is_terminal = 1;
799
   found_magic = terminal_magic_num;
800
801
802
803
  }
   else if(msg->last_hop == GLOBAL)
   {
     dest = msg->intm_lp_id;
804
     sender_radix = p->num_cn + (msg->local_id % p->num_global_channels);
805
     credit_delay = bytes_to_ns(CREDIT_SIZE, p->global_bandwidth);
806
807
808
809
   }
    else if(msg->last_hop == LOCAL)
     {
        dest = msg->intm_lp_id;
810
        sender_radix = p->num_cn + p->num_global_channels + (msg->local_id % p->num_routers);
811
     	credit_delay = bytes_to_ns(CREDIT_SIZE, p->local_bandwidth) * CREDIT_SIZE;
812
813
814
815
     }
    else
      printf("\n Invalid message type");

816
817
    msg->sender_radix = sender_radix;

818
    assert(sender_radix < s->params->radix );
819

Jonathan Jenkins's avatar
Jonathan Jenkins committed
820
    msg->saved_credit_time = s->next_credit_available_time[sender_radix];
821
    s->next_credit_available_time[sender_radix] = maxd(tw_now(lp), s->next_credit_available_time[sender_radix]);
822
    ts = credit_delay + 0.1 + tw_rand_exponential(lp->rng, (double)credit_delay/1000);
823
	
824
    s->next_credit_available_time[sender_radix]+=ts;
825
826
    if (is_terminal){
        buf_e = model_net_method_event_new(dest, 
827
                s->next_credit_available_time[sender_radix] - tw_now(lp), lp,
828
829
830
                DRAGONFLY, (void**)&buf_msg, NULL);
    }
    else{
831
        buf_e = tw_event_new(dest, s->next_credit_available_time[sender_radix] - tw_now(lp) , lp);
832
833
        buf_msg = tw_event_data(buf_e);
    }
834
    buf_msg->origin_router_id = s->router_id;
835
836
    buf_msg->vc_index = msg->saved_vc;
    buf_msg->type=type;
837
    buf_msg->magic = found_magic;
838
839
840
841
842
843
844
845
    buf_msg->last_hop = msg->last_hop;
    buf_msg->packet_ID=msg->packet_ID;

    tw_event_send(buf_e);

    return;
}

846
static void packet_generate_send_rc(terminal_state * s, 
847
848
849
			    tw_bf * bf, 
			    terminal_message * msg, 
			    tw_lp * lp)
850
{
851
852
853
   term_rev_ecount++;
   term_ecount--;

854
   tw_rand_reverse_unif(lp->rng);
855
	 
856
857
858
859
860
861
862
   s->terminal_available_time = msg->saved_available_time;
   tw_rand_reverse_unif(lp->rng);
   int vc = msg->saved_vc;
   s->vc_occupancy[vc]--;
   s->packet_counter--;
   s->output_vc_state[vc] = VC_IDLE;

863
864
   //if(bf->c2)
   //  s->max_term_vc_occupancy = msg->saved_occupancy;
865

866
867
868
   if (msg->chunk_id == (msg->num_chunks-1)){
     codes_local_latency_reverse(lp);
   }
869
870
871
872

    if(bf->c1)
        codes_local_latency_reverse(lp);

873
     struct mn_stats* stat;
874
875
876
877
878
     stat = model_net_find_stats(msg->category, s->dragonfly_stats_array);
     stat->send_count--;
     stat->send_bytes -= msg->packet_size;
     stat->send_time -= (1/s->params->cn_bandwidth) * msg->packet_size;
}
879

880
/* generates packet at the current dragonfly compute node */
881
static void packet_generate_send(terminal_state * s, 
882
883
884
885
			    tw_bf * bf, 
			    terminal_message * msg, 
			    tw_lp * lp)
{
886
	bf->c1 = 0;
887
	term_ecount++;
888
	term_rev_ecount++;
889

890
        const dragonfly_param *p = s->params;
891

892
	tw_stime ts, travel_start_time;
893
	tw_event *e;
894
        tw_lpid router_id;
895
896
897
898
899
900
	terminal_message *m;
	int i, total_event_size;

	uint64_t num_chunks = msg->packet_size / p->chunk_size;
	if (msg->packet_size % s->params->chunk_size)
	  num_chunks++;
901

902
903
	if(!num_chunks)
	   num_chunks = 1;
904

905
906
	if(!msg->packet_ID)
	    msg->packet_ID = lp->gid + g_tw_nlp * s->packet_counter + tw_rand_integer(lp->rng, 0, lp->gid + g_tw_nlp * s->packet_counter);
907

908
909
	//  Each packet is broken into chunks and then sent over the channel
	msg->saved_available_time = s->terminal_available_time;
910
911
912
	tw_stime head_delay = bytes_to_ns(s->params->chunk_size, s->params->cn_bandwidth);
	ts = head_delay + tw_rand_exponential(lp->rng, noise);
	//printf("\n ts %f calculated %f ", ts, s->params->chunk_size * (1/s->params->cn_bandwidth));
913
914
	s->terminal_available_time = maxd(s->terminal_available_time, tw_now(lp));
	s->terminal_available_time += ts;
915

916
	
917
918
919
	int chan = -1, j;
	for(j = 0; j < p->num_vcs; j++)
	 {
920
	     if(s->vc_occupancy[j] < p->cn_vc_size)
921
922
923
924
925
	      {
	       chan=j;
	       break;
	      }
	 }
926
927
928
929
	  
	/* for reverse computation */
	msg->saved_vc = chan;
  
930
    /* simulation should exit */
931
932
933
934
935
936
937
938
939
940
941
	if(chan == -1)
	    tw_error(TW_LOC, "\n No terminal buffers available, increase buffer size");
	   
        //TODO: be annotation-aware
        codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, NULL,
		   &mapping_type_id, NULL, &mapping_rep_id, &mapping_offset);
	   
	codes_mapping_get_lp_id(lp_group_name, "dragonfly_router", NULL, 1,
		   s->router_id/num_routers_per_mgrp, 
		   s->router_id % num_routers_per_mgrp, &router_id);
   
942
    e = tw_event_new(router_id, s->terminal_available_time - tw_now(lp), lp);
943
944
	m = tw_event_data(e);
	memcpy(m, msg, sizeof(terminal_message));
945
946
        
	if(msg->chunk_id == 0)
947
948
949
            travel_start_time = tw_now(lp);
	else
	    travel_start_time = msg->travel_start_time;
950
951

	m->num_chunks = num_chunks;	
952
953
	m->magic = router_magic_num;
	m->origin_router_id = s->router_id;
954
        m->type = R_FORWARD;
955
956
957
958
        m->src_terminal_id = lp->gid;
        m->chunk_id = msg->chunk_id;
        m->last_hop = TERMINAL;
        m->intm_group_id = -1;
959
960
        m->travel_start_time = travel_start_time;
	m->path_type = -1;
961
962
        m->local_event_size_bytes = 0;
        m->local_id = s->terminal_id;
963

964
965
966
967
968
969
	 if (msg->remote_event_size_bytes){
		memcpy(m+1, model_net_method_get_edata(DRAGONFLY, msg),
			msg->remote_event_size_bytes);
	   }
   
	tw_event_send(e);
970

971
	if(msg->packet_ID == TRACK && msg->chunk_id == num_chunks - 1)
972
973
974
975
976
	   printf("\n packet %d generated chunk id %d reach at time %lf ", 
			msg->packet_ID, 
			msg->chunk_id, 
			s->terminal_available_time - tw_now(lp));	

977
         if(msg->chunk_id == num_chunks - 1) 
978
	{
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
		// now that message is sent, issue an "idle" event to tell the scheduler
		// when I'm next available
		model_net_method_idle_event(codes_local_latency(lp) +
		      s->terminal_available_time - tw_now(lp), 0, lp);

		/* local completion message */
		if(msg->local_event_size_bytes > 0)
		 {
		   tw_event* e_new;
		   terminal_message* m_new;
		   void* local_event = 
		       (char*)model_net_method_get_edata(DRAGONFLY, msg) + 
		       msg->remote_event_size_bytes;
		   ts = g_tw_lookahead + (1/s->params->cn_bandwidth) * msg->local_event_size_bytes;
		   e_new = tw_event_new(msg->sender_lp, ts, lp);
		   m_new = tw_event_data(e_new);
		   memcpy(m_new, local_event, msg->local_event_size_bytes);
		   tw_event_send(e_new);
		}
998
	}
999
1000
	s->packet_counter++;
	s->vc_occupancy[chan]++;
For faster browsing, not all history is shown. View entire blame