dragonfly.c 89 KB
Newer Older
Philip Carns's avatar
Philip Carns committed
1
2
3
4
5
6
/*
 * Copyright (C) 2013 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

7
8
9
10
// Local router ID: 0 --- total_router-1
// Router LP ID 
// Terminal LP ID

11
12
#include <ross.h>

13
#include "codes/jenkins-hash.h"
14
15
16
17
#include "codes/codes_mapping.h"
#include "codes/codes.h"
#include "codes/model-net.h"
#include "codes/model-net-method.h"
18
19
#include "codes/model-net-lp.h"
#include "codes/net/dragonfly.h"
20
#include "sys/file.h"
21
#include "codes/quickhash.h"
22
#include "codes/rc-stack.h"
23
24
25
26

#define CREDIT_SIZE 8
#define MEAN_PROCESS 1.0

27
28
29
/* collective specific parameters */
#define TREE_DEGREE 4
#define LEVEL_DELAY 1000
30
#define DRAGONFLY_COLLECTIVE_DEBUG 0
31
32
33
#define NUM_COLLECTIVES  1
#define COLLECTIVE_COMPUTATION_DELAY 5700
#define DRAGONFLY_FAN_OUT_DELAY 20.0
34
#define WINDOW_LENGTH 0
35
#define DFLY_HASH_TABLE_SIZE 262144
36

37
// debugging parameters
38
39
#define TRACK 2
#define TRACK_PKT 45543
40
#define TRACK_MSG -1
41
#define PRINT_ROUTER_TABLE 1
Misbah Mubarak's avatar
Misbah Mubarak committed
42
#define DEBUG 0
43
#define USE_DIRECT_SCHEME 1
44

45
46
47
#define LP_CONFIG_NM (model_net_lp_config_names[DRAGONFLY])
#define LP_METHOD_NM (model_net_method_names[DRAGONFLY])

48
long term_ecount, router_ecount, term_rev_ecount, router_rev_ecount;
49
long packet_gen = 0, packet_fin = 0;
50

51
52
static double maxd(double a, double b) { return a < b ? b : a; }

53
/* minimal and non-minimal packet counts for adaptive routing*/
54
static unsigned int minimal_count=0, nonmin_count=0;
55

56
57
58
59
60
61
typedef struct dragonfly_param dragonfly_param;
/* annotation-specific parameters (unannotated entry occurs at the 
 * last index) */
static uint64_t                  num_params = 0;
static dragonfly_param         * all_params = NULL;
static const config_anno_map_t * anno_map   = NULL;
62
63

/* global variables for codes mapping */
64
static char lp_group_name[MAX_NAME_LENGTH];
65
66
static int mapping_grp_id, mapping_type_id, mapping_rep_id, mapping_offset;

67
68
69
70
71
72
/* router magic number */
int router_magic_num = 0;

/* terminal magic number */
int terminal_magic_num = 0;

73
74
FILE * dragonfly_log = NULL;

75
76
77
78
79
80
81
typedef struct terminal_message_list terminal_message_list;
struct terminal_message_list {
    terminal_message msg;
    char* event_data;
    terminal_message_list *next;
    terminal_message_list *prev;
};
82

83
84
85
86
87
88
89
void init_terminal_message_list(terminal_message_list *this, 
    terminal_message *inmsg) {
    this->msg = *inmsg;
    this->event_data = NULL;
    this->next = NULL;
    this->prev = NULL;
}
90

91
92
93
94
void delete_terminal_message_list(terminal_message_list *this) {
    if(this->event_data != NULL) free(this->event_data);
    free(this);
}
95

96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
struct dragonfly_param
{
    // configuration parameters
    int num_routers; /*Number of routers in a group*/
    double local_bandwidth;/* bandwidth of the router-router channels within a group */
    double global_bandwidth;/* bandwidth of the inter-group router connections */
    double cn_bandwidth;/* bandwidth of the compute node channels connected to routers */
    int num_vcs; /* number of virtual channels */
    int local_vc_size; /* buffer size of the router-router channels */
    int global_vc_size; /* buffer size of the global channels */
    int cn_vc_size; /* buffer size of the compute node channels */
    int chunk_size; /* full-sized packets are broken into smaller chunks.*/
    // derived parameters
    int num_cn;
    int num_groups;
    int radix;
    int total_routers;
113
    int total_terminals;
114
    int num_global_channels;
115
116
117
118
    double cn_delay;
    double local_delay;
    double global_delay;
    double credit_delay;
119
    double router_delay;
120
121
};

122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
struct dfly_hash_key
{
    uint64_t message_id;
    tw_lpid sender_id;
};

struct dfly_qhash_entry
{
   struct dfly_hash_key key;
   char * remote_event_data;
   int num_chunks;
   int remote_event_size;
   struct qhash_head hash_link;
};

137
138
139
140
141
142
143
144
/* handles terminal and router events like packet generate/send/receive/buffer */
typedef enum event_t event_t;
typedef struct terminal_state terminal_state;
typedef struct router_state router_state;

/* dragonfly compute node data structure */
struct terminal_state
{
145
   uint64_t packet_counter;
146

147
148
149
   int packet_gen;
   int packet_fin;

150
   // Dragonfly specific parameters
151
152
   unsigned int router_id;
   unsigned int terminal_id;
153
154
155

   // Each terminal will have an input and output channel with the router
   int* vc_occupancy; // NUM_VC
156
   int num_vcs;
157
   tw_stime terminal_available_time;
158
159
160
   terminal_message_list **terminal_msgs;
   terminal_message_list **terminal_msgs_tail;
   int in_send_loop;
161
162
163
164
// Terminal generate, sends and arrival T_SEND, T_ARRIVAL, T_GENERATE
// Router-Router Intra-group sends and receives RR_LSEND, RR_LARRIVE
// Router-Router Inter-group sends and receives RR_GSEND, RR_GARRIVE
   struct mn_stats dragonfly_stats_array[CATEGORY_MAX];
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
  /* collective init time */
  tw_stime collective_init_time;

  /* node ID in the tree */ 
   tw_lpid node_id;

   /* messages sent & received in collectives may get interchanged several times so we have to save the 
     origin server information in the node's state */
   tw_lpid origin_svr; 
  
  /* parent node ID of the current node */
   tw_lpid parent_node_id;
   /* array of children to be allocated in terminal_init*/
   tw_lpid* children;

   /* children of a node can be less than or equal to the tree degree */
   int num_children;

   short is_root;
   short is_leaf;

186
   struct rc_stack * st;
187
188
   int issueIdle;
   int terminal_length;
189

190
191
192
   /* to maintain a count of child nodes that have fanned in at the parent during the collective
      fan-in phase*/
   int num_fan_nodes;
193
194
195

   const char * anno;
   const dragonfly_param *params;
196

197
198
199
   struct qhash_table *rank_tbl;
   uint64_t rank_tbl_pop;

Misbah Mubarak's avatar
Misbah Mubarak committed
200
   tw_stime   total_time;
201
   long total_msg_size;
202
   double total_hops;
203
   long finished_msgs;
204
   double finished_chunks;
205
   long finished_packets;
206

207
208
209
   tw_stime last_buf_full;
   tw_stime busy_time;

210
   char output_buf[4096];
211
};
212

213
214
215
216
217
/* terminal event type (1-4) */
enum event_t
{
  T_GENERATE=1,
  T_ARRIVE,
218
  T_SEND,
219
  T_BUFFER,
220
221
  R_SEND,
  R_ARRIVE,
222
223
224
225
  R_BUFFER,
  D_COLLECTIVE_INIT,
  D_COLLECTIVE_FAN_IN,
  D_COLLECTIVE_FAN_OUT
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
};
/* status of a virtual channel can be idle, active, allocated or wait for credit */
enum vc_status
{
   VC_IDLE,
   VC_ACTIVE,
   VC_ALLOC,
   VC_CREDIT
};

/* whether the last hop of a packet was global, local or a terminal */
enum last_hop
{
   GLOBAL,
   LOCAL,
   TERMINAL
};

/* three forms of routing algorithms available, adaptive routing is not
 * accurate and fully functional in the current version as the formulas
 * for detecting load on global channels are not very accurate */
enum ROUTING_ALGO
{
249
250
    MINIMAL = 0,
    NON_MINIMAL,
251
252
    ADAPTIVE,
    PROG_ADAPTIVE
253
254
255
256
257
258
};

struct router_state
{
   unsigned int router_id;
   unsigned int group_id;
259
260
  
   int* global_channel; 
261
   
262
   tw_stime* next_output_available_time;
263
   tw_stime* cur_hist_start_time;
264
265
266
   tw_stime* last_buf_full;
   tw_stime* busy_time;

267
268
269
270
271
   terminal_message_list ***pending_msgs;
   terminal_message_list ***pending_msgs_tail;
   terminal_message_list ***queued_msgs;
   terminal_message_list ***queued_msgs_tail;
   int *in_send_loop;
272
   int *queued_count;
273
   struct rc_stack * st;
274
   
275
   int** vc_occupancy;
276
   int64_t* link_traffic;
277
278
279

   const char * anno;
   const dragonfly_param *params;
280
281
282

   int* prev_hist_num;
   int* cur_hist_num;
283
   
284
   char output_buf[4096];
285
286
287
288
};

static short routing = MINIMAL;

289
290
static tw_stime         dragonfly_total_time = 0;
static tw_stime         dragonfly_max_latency = 0;
291
static tw_stime         max_collective = 0;
292

293

294
295
static long long       total_hops = 0;
static long long       N_finished_packets = 0;
296
297
298
static long long       total_msg_sz = 0;
static long long       N_finished_msgs = 0;
static long long       N_finished_chunks = 0;
299

300
301
302
303
304
305
306
307
308
309
310
311
312
313
static int dragonfly_rank_hash_compare(
        void *key, struct qhash_head *link)
{
    struct dfly_hash_key *message_key = (struct dfly_hash_key *)key;
    struct dfly_qhash_entry *tmp;

    tmp = qhash_entry(link, struct dfly_qhash_entry, hash_link);
    
    if (tmp->key.message_id == message_key->message_id
            && tmp->key.sender_id == message_key->sender_id)
        return 1;

    return 0;
}
314
315
316
static int dragonfly_hash_func(void *k, int table_size)
{
	struct dfly_hash_key *tmp = (struct dfly_hash_key *)k;
317
318
    uint32_t pc = 0, pb = 0;
	bj_hashlittle2(tmp, sizeof(*tmp), &pc, &pb);
319
    return (int)(pc % (uint32_t)(table_size - 1));	
320
321
}

322
323
324
325
326
327
328
329
330
331
332
333
334
335
/* convert GiB/s and bytes to ns */
static tw_stime bytes_to_ns(uint64_t bytes, double GB_p_s)
{
    tw_stime time;

    /* bytes to GB */
    time = ((double)bytes)/(1024.0*1024.0*1024.0);
    /* MB to s */
    time = time / GB_p_s;
    /* s to ns */
    time = time * 1000.0 * 1000.0 * 1000.0;

    return(time);
}
336

337
338
/* returns the dragonfly message size */
static int dragonfly_get_msg_sz(void)
339
{
340
341
	   return sizeof(terminal_message);
}
342

343
344
345
346
347
348
static void free_tmp(void * ptr)
{
    struct dfly_qhash_entry * dfly = ptr; 
    free(dfly->remote_event_data);
    free(dfly);
}
349
350
351
352
353
354
355
356
357
358
359
360
static void append_to_terminal_message_list(  
        terminal_message_list ** thisq,
        terminal_message_list ** thistail,
        int index, 
        terminal_message_list *msg) {
    if(thisq[index] == NULL) {
        thisq[index] = msg;
    } else {
        thistail[index]->next = msg;
        msg->prev = thistail[index];
    } 
    thistail[index] = msg;
361
362
}

363
364
365
366
367
368
369
370
371
372
373
374
375
static void prepend_to_terminal_message_list(  
        terminal_message_list ** thisq,
        terminal_message_list ** thistail,
        int index, 
        terminal_message_list *msg) {
    if(thisq[index] == NULL) {
        thistail[index] = msg;
    } else {
        thisq[index]->prev = msg;
        msg->next = thisq[index];
    } 
    thisq[index] = msg;
}
376

377
378
379
380
static void create_prepend_to_terminal_message_list(
        terminal_message_list ** thisq,
        terminal_message_list ** thistail,
        int index, 
381
        terminal_message * msg) {
382
383
384
385
386
    terminal_message_list* new_entry = (terminal_message_list*)malloc(
        sizeof(terminal_message_list));
    init_terminal_message_list(new_entry, msg);
    if(msg->remote_event_size_bytes) {
        void *m_data = model_net_method_get_edata(DRAGONFLY, msg);
387
388
389
        size_t s = msg->remote_event_size_bytes + msg->local_event_size_bytes;
        new_entry->event_data = (void*)malloc(s);
        memcpy(new_entry->event_data, m_data, s);
390
    }
391
    prepend_to_terminal_message_list( thisq, thistail, index, new_entry);
392
393
}

394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
static terminal_message_list* return_head(
        terminal_message_list ** thisq,
        terminal_message_list ** thistail,
        int index) {
    terminal_message_list *head = thisq[index];
    if(head != NULL) {
        thisq[index] = head->next;
        if(head->next != NULL) {
            head->next->prev = NULL;
            head->next = NULL;
        } else {
            thistail[index] = NULL;
        }
    }
    return head;
409
410
}

411
412
413
414
415
static terminal_message_list* return_tail(
        terminal_message_list ** thisq,
        terminal_message_list ** thistail,
        int index) {
    terminal_message_list *tail = thistail[index];
416
    assert(tail);
417
418
419
420
421
422
423
424
425
    if(tail->prev != NULL) {
        tail->prev->next = NULL;
        thistail[index] = tail->prev;
        tail->prev = NULL;
    } else {
        thistail[index] = NULL;
        thisq[index] = NULL;
    }
    return tail;
426
427
}

428
429
430
static void dragonfly_read_config(const char * anno, dragonfly_param *params){
    // shorthand
    dragonfly_param *p = params;
431

432
433
434
435
436
437
438
439
    configuration_get_value_int(&config, "PARAMS", "num_routers", anno,
            &p->num_routers);
    if(p->num_routers <= 0) {
        p->num_routers = 4;
        fprintf(stderr, "Number of dimensions not specified, setting to %d\n",
                p->num_routers);
    }

440
    p->num_vcs = 3;
441
442

    configuration_get_value_int(&config, "PARAMS", "local_vc_size", anno, &p->local_vc_size);
443
    if(!p->local_vc_size) {
444
445
446
447
448
        p->local_vc_size = 1024;
        fprintf(stderr, "Buffer size of local channels not specified, setting to %d\n", p->local_vc_size);
    }

    configuration_get_value_int(&config, "PARAMS", "global_vc_size", anno, &p->global_vc_size);
449
    if(!p->global_vc_size) {
450
451
452
453
454
        p->global_vc_size = 2048;
        fprintf(stderr, "Buffer size of global channels not specified, setting to %d\n", p->global_vc_size);
    }

    configuration_get_value_int(&config, "PARAMS", "cn_vc_size", anno, &p->cn_vc_size);
455
    if(!p->cn_vc_size) {
456
457
458
459
460
        p->cn_vc_size = 1024;
        fprintf(stderr, "Buffer size of compute node channels not specified, setting to %d\n", p->cn_vc_size);
    }

    configuration_get_value_int(&config, "PARAMS", "chunk_size", anno, &p->chunk_size);
461
    if(!p->chunk_size) {
462
        p->chunk_size = 512;
463
        fprintf(stderr, "Chunk size for packets is specified, setting to %d\n", p->chunk_size);
464
465
466
    }

    configuration_get_value_double(&config, "PARAMS", "local_bandwidth", anno, &p->local_bandwidth);
467
    if(!p->local_bandwidth) {
468
469
470
471
472
        p->local_bandwidth = 5.25;
        fprintf(stderr, "Bandwidth of local channels not specified, setting to %lf\n", p->local_bandwidth);
    }

    configuration_get_value_double(&config, "PARAMS", "global_bandwidth", anno, &p->global_bandwidth);
473
    if(!p->global_bandwidth) {
474
475
476
477
478
        p->global_bandwidth = 4.7;
        fprintf(stderr, "Bandwidth of global channels not specified, setting to %lf\n", p->global_bandwidth);
    }

    configuration_get_value_double(&config, "PARAMS", "cn_bandwidth", anno, &p->cn_bandwidth);
479
    if(!p->cn_bandwidth) {
480
481
482
483
        p->cn_bandwidth = 5.25;
        fprintf(stderr, "Bandwidth of compute node channels not specified, setting to %lf\n", p->cn_bandwidth);
    }

484
485
486
487
    p->router_delay = 50;
    configuration_get_value_double(&config, "PARAMS", "router_delay", anno,
            &p->router_delay);

488
489
    char routing_str[MAX_NAME_LENGTH];
    configuration_get_value(&config, "PARAMS", "routing", anno, routing_str,
490
            MAX_NAME_LENGTH);
491
492
    if(strcmp(routing_str, "minimal") == 0)
        routing = MINIMAL;
493
494
    else if(strcmp(routing_str, "nonminimal")==0 || 
            strcmp(routing_str,"non-minimal")==0)
495
496
497
498
499
        routing = NON_MINIMAL;
    else if (strcmp(routing_str, "adaptive") == 0)
        routing = ADAPTIVE;
    else if (strcmp(routing_str, "prog-adaptive") == 0)
	routing = PROG_ADAPTIVE;
500
501
502
503
    else
    {
        fprintf(stderr, 
                "No routing protocol specified, setting to minimal routing\n");
504
        routing = -1;
505
506
507
508
509
510
    }

    // set the derived parameters
    p->num_cn = p->num_routers/2;
    p->num_global_channels = p->num_routers/2;
    p->num_groups = p->num_routers * p->num_cn + 1;
511
    p->radix = (p->num_global_channels + p->num_routers + p->num_cn);
512
    p->total_routers = p->num_groups * p->num_routers;
513
    p->total_terminals = p->total_routers * p->num_cn;
514
515
516
517
518
519
520
    int rank;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    if(!rank) {
        printf("\n Total nodes %d routers %d groups %d radix %d \n",
                p->num_cn * p->total_routers, p->total_routers, p->num_groups,
                p->radix);
    }
521
    
522
523
524
525
    p->cn_delay = bytes_to_ns(p->chunk_size, p->cn_bandwidth);
    p->local_delay = bytes_to_ns(p->chunk_size, p->local_bandwidth);
    p->global_delay = bytes_to_ns(p->chunk_size, p->global_bandwidth);
    p->credit_delay = bytes_to_ns(8.0, p->local_bandwidth); //assume 8 bytes packet
526
527
528

}

529
530
531
532
static void dragonfly_configure(){
    anno_map = codes_mapping_get_lp_anno_map(LP_CONFIG_NM);
    assert(anno_map);
    num_params = anno_map->num_annos + (anno_map->has_unanno_lp > 0);
533
    all_params = malloc(num_params * sizeof(*all_params));
534
535

    for (uint64_t i = 0; i < anno_map->num_annos; i++){
536
        const char * anno = anno_map->annotations[i].ptr;
537
538
539
540
541
        dragonfly_read_config(anno, &all_params[i]);
    }
    if (anno_map->has_unanno_lp > 0){
        dragonfly_read_config(NULL, &all_params[anno_map->num_annos]);
    }
542
543
544
545
546
}

/* report dragonfly statistics like average and maximum packet latency, average number of hops traversed */
static void dragonfly_report_stats()
{
547
548
   long long avg_hops, total_finished_packets, total_finished_chunks;
   long long total_finished_msgs, final_msg_sz;
549
   tw_stime avg_time, max_time;
550
   int total_minimal_packets, total_nonmin_packets;
551
   long total_gen, total_fin;
552
553
554

   MPI_Reduce( &total_hops, &avg_hops, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
   MPI_Reduce( &N_finished_packets, &total_finished_packets, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
555
556
557
   MPI_Reduce( &N_finished_msgs, &total_finished_msgs, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
   MPI_Reduce( &N_finished_chunks, &total_finished_chunks, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
   MPI_Reduce( &total_msg_sz, &final_msg_sz, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
558
559
   MPI_Reduce( &dragonfly_total_time, &avg_time, 1,MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
   MPI_Reduce( &dragonfly_max_latency, &max_time, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
560
561
562
   
   MPI_Reduce( &packet_gen, &total_gen, 1, MPI_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
   MPI_Reduce( &packet_fin, &total_fin, 1, MPI_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
563
   if(routing == ADAPTIVE || routing == PROG_ADAPTIVE)
564
565
566
567
    {
	MPI_Reduce(&minimal_count, &total_minimal_packets, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
 	MPI_Reduce(&nonmin_count, &total_nonmin_packets, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
    }
568

569
570
   /* print statistics */
   if(!g_tw_mynode)
571
   {	
572
      printf(" Average number of hops traversed %f average chunk latency %lf us maximum chunk latency %lf us avg message size %lf bytes finished messages %ld finished chunks %ld \n", (float)avg_hops/total_finished_chunks, avg_time/(total_finished_chunks*1000), max_time/1000, (float)final_msg_sz/total_finished_msgs, total_finished_msgs, total_finished_chunks);
573
     if(routing == ADAPTIVE || routing == PROG_ADAPTIVE)
574
              printf("\n ADAPTIVE ROUTING STATS: %d chunks routed minimally %d chunks routed non-minimally completed packets %lld ", total_minimal_packets, total_nonmin_packets, total_finished_chunks);
575
 
576
577
  printf("\n Total packets generated %ld finished %ld ", total_gen, total_fin);
   }
578
579
   return;
}
580

581
582
583
void dragonfly_collective_init(terminal_state * s,
           		   tw_lp * lp)
{
584
585
586
587
588
    // TODO: be annotation-aware
    codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, NULL,
            &mapping_type_id, NULL, &mapping_rep_id, &mapping_offset);
    int num_lps = codes_mapping_get_lp_count(lp_group_name, 1, LP_CONFIG_NM,
            NULL, 1);
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
    int num_reps = codes_mapping_get_group_reps(lp_group_name);
    s->node_id = (mapping_rep_id * num_lps) + mapping_offset;

    int i;
   /* handle collective operations by forming a tree of all the LPs */
   /* special condition for root of the tree */
   if( s->node_id == 0)
    {
        s->parent_node_id = -1;
        s->is_root = 1;
   }
   else
   {
       s->parent_node_id = (s->node_id - ((s->node_id - 1) % TREE_DEGREE)) / TREE_DEGREE;
       s->is_root = 0;
   }
   s->children = (tw_lpid*)malloc(TREE_DEGREE * sizeof(tw_lpid));

   /* set the isleaf to zero by default */
   s->is_leaf = 1;
   s->num_children = 0;

   /* calculate the children of the current node. If its a leaf, no need to set children,
      only set isleaf and break the loop*/

   for( i = 0; i < TREE_DEGREE; i++ )
    {
        tw_lpid next_child = (TREE_DEGREE * s->node_id) + i + 1;
        if(next_child < (num_lps * num_reps))
        {
            s->num_children++;
            s->is_leaf = 0;
            s->children[i] = next_child;
        }
        else
           s->children[i] = -1;
    }

#if DRAGONFLY_COLLECTIVE_DEBUG == 1
   printf("\n LP %ld parent node id ", s->node_id);

   for( i = 0; i < TREE_DEGREE; i++ )
        printf(" child node ID %ld ", s->children[i]);
   printf("\n");

   if(s->is_leaf)
        printf("\n LP %ld is leaf ", s->node_id);
#endif
}

639
640
641
642
643
/* initialize a dragonfly compute node terminal */
void 
terminal_init( terminal_state * s, 
	       tw_lp * lp )
{
644
645
646
    s->packet_gen = 0;
    s->packet_fin = 0;

647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
    uint32_t h1 = 0, h2 = 0; 
    bj_hashlittle2(LP_METHOD_NM, strlen(LP_METHOD_NM), &h1, &h2);
    terminal_magic_num = h1 + h2;
    
    int i;
    char anno[MAX_NAME_LENGTH];

    // Assign the global router ID
    // TODO: be annotation-aware
    codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, NULL,
            &mapping_type_id, anno, &mapping_rep_id, &mapping_offset);
    if (anno[0] == '\0'){
        s->anno = NULL;
        s->params = &all_params[num_params-1];
    }
    else{
        s->anno = strdup(anno);
        int id = configuration_get_annotation_index(anno, anno_map);
        s->params = &all_params[id];
    }

   int num_lps = codes_mapping_get_lp_count(lp_group_name, 1, LP_CONFIG_NM,
           s->anno, 0);

   s->terminal_id = (mapping_rep_id * num_lps) + mapping_offset;  
   s->router_id=(int)s->terminal_id / (s->params->num_routers/2);
   s->terminal_available_time = 0.0;
   s->packet_counter = 0;
675
   
676
   s->finished_msgs = 0;
Misbah Mubarak's avatar
Misbah Mubarak committed
677
678
679
   s->finished_chunks = 0;
   s->finished_packets = 0;
   s->total_time = 0.0;
680
   s->total_msg_size = 0;
681

682
683
684
   s->last_buf_full = 0.0;
   s->busy_time = 0.0;

685
   rc_stack_create(&s->st);
686
687
688
689
690
691
692
693
   s->num_vcs = 1;
   s->vc_occupancy = (int*)malloc(s->num_vcs * sizeof(int));

   for( i = 0; i < s->num_vcs; i++ )
    {
      s->vc_occupancy[i]=0;
    }

694
   s->rank_tbl = qhash_init(dragonfly_rank_hash_compare, dragonfly_hash_func, DFLY_HASH_TABLE_SIZE);
695
696
697
698

   if(!s->rank_tbl)
       tw_error(TW_LOC, "\n Hash table not initialized! ");

699
700
701
702
703
704
   s->terminal_msgs = 
       (terminal_message_list**)malloc(1*sizeof(terminal_message_list*));
   s->terminal_msgs_tail = 
       (terminal_message_list**)malloc(1*sizeof(terminal_message_list*));
   s->terminal_msgs[0] = NULL;
   s->terminal_msgs_tail[0] = NULL;
705
   s->terminal_length = 0;
706
   s->in_send_loop = 0;
707
   s->issueIdle = 0;
708
709
710
711
712
713
714
715
716

   dragonfly_collective_init(s, lp);
   return;
}


/* sets up the router virtual channels, global channels, 
 * local channels, compute node channels */
void router_setup(router_state * r, tw_lp * lp)
717
{
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
    uint32_t h1 = 0, h2 = 0; 
    bj_hashlittle2(LP_METHOD_NM, strlen(LP_METHOD_NM), &h1, &h2);
    router_magic_num = h1 + h2;
    
    char anno[MAX_NAME_LENGTH];
    codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, NULL,
            &mapping_type_id, anno, &mapping_rep_id, &mapping_offset);

    if (anno[0] == '\0'){
        r->anno = NULL;
        r->params = &all_params[num_params-1];
    } else{
        r->anno = strdup(anno);
        int id = configuration_get_annotation_index(anno, anno_map);
        r->params = &all_params[id];
    }

    // shorthand
    const dragonfly_param *p = r->params;

   r->router_id=mapping_rep_id + mapping_offset;
   r->group_id=r->router_id/p->num_routers;

   r->global_channel = (int*)malloc(p->num_global_channels * sizeof(int));
   r->next_output_available_time = (tw_stime*)malloc(p->radix * sizeof(tw_stime));
   r->cur_hist_start_time = (tw_stime*)malloc(p->radix * sizeof(tw_stime));
744
   r->link_traffic = (int64_t*)malloc(p->radix * sizeof(int64_t));
745
746
747
748
749
750
751
752
753
754
755
756
757
   r->cur_hist_num = (int*)malloc(p->radix * sizeof(int));
   r->prev_hist_num = (int*)malloc(p->radix * sizeof(int));
   
   r->vc_occupancy = (int**)malloc(p->radix * sizeof(int*));
   r->in_send_loop = (int*)malloc(p->radix * sizeof(int));
   r->pending_msgs = 
    (terminal_message_list***)malloc(p->radix * sizeof(terminal_message_list**));
   r->pending_msgs_tail = 
    (terminal_message_list***)malloc(p->radix * sizeof(terminal_message_list**));
   r->queued_msgs = 
    (terminal_message_list***)malloc(p->radix * sizeof(terminal_message_list**));
   r->queued_msgs_tail = 
    (terminal_message_list***)malloc(p->radix * sizeof(terminal_message_list**));
758
   r->queued_count = (int*)malloc(p->radix * sizeof(int));
759
760
   r->last_buf_full = (tw_stime*)malloc(p->radix * sizeof(tw_stime));
   r->busy_time = (tw_stime*)malloc(p->radix * sizeof(tw_stime));
761

762
   rc_stack_create(&r->st);
763
   for(int i=0; i < p->radix; i++)
764
765
    {
       // Set credit & router occupancy
766
767
    r->last_buf_full[i] = 0.0;
    r->busy_time[i] = 0.0;
768
769
	r->next_output_available_time[i]=0;
	r->cur_hist_start_time[i] = 0;
770
    r->link_traffic[i]=0;
771
772
	r->cur_hist_num[i] = 0;
	r->prev_hist_num[i] = 0;
773
    r->queued_count[i] = 0;    
774
775
776
777
778
779
780
781
782
783
        r->in_send_loop[i] = 0;
        r->vc_occupancy[i] = (int*)malloc(p->num_vcs * sizeof(int));
        r->pending_msgs[i] = (terminal_message_list**)malloc(p->num_vcs * 
            sizeof(terminal_message_list*));
        r->pending_msgs_tail[i] = (terminal_message_list**)malloc(p->num_vcs * 
            sizeof(terminal_message_list*));
        r->queued_msgs[i] = (terminal_message_list**)malloc(p->num_vcs * 
            sizeof(terminal_message_list*));
        r->queued_msgs_tail[i] = (terminal_message_list**)malloc(p->num_vcs * 
            sizeof(terminal_message_list*));
784
        for(int j = 0; j < p->num_vcs; j++) {
785
786
787
788
789
790
791
792
793
            r->vc_occupancy[i][j] = 0;
            r->pending_msgs[i][j] = NULL;
            r->pending_msgs_tail[i][j] = NULL;
            r->queued_msgs[i][j] = NULL;
            r->queued_msgs_tail[i][j] = NULL;
        }
    }

#if DEBUG == 1
794
//   printf("\n LP ID %d VC occupancy radix %d Router %d is connected to ", lp->gid, p->radix, r->router_id);
795
796
797
798
#endif 
   //round the number of global channels to the nearest even number
#if USE_DIRECT_SCHEME
       int first = r->router_id % p->num_routers;
799
       for(int i=0; i < p->num_global_channels; i++)
800
801
802
803
804
805
806
807
808
809
810
811
812
        {
            int target_grp = first;
            if(target_grp == r->group_id) {
                target_grp = p->num_groups - 1;
            }
            int my_pos = r->group_id % p->num_routers;
            if(r->group_id == p->num_groups - 1) {
                my_pos = target_grp % p->num_routers;
            }
            r->global_channel[i] = target_grp * p->num_routers + my_pos;
            first += p->num_routers;
        }
#else
813
814
815
   int router_offset = (r->router_id % p->num_routers) * 
    (p->num_global_channels / 2) + 1;
   for(int i=0; i < p->num_global_channels; i++)
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
    {
      if(i % 2 != 0)
          {
             r->global_channel[i]=(r->router_id + (router_offset * p->num_routers))%p->total_routers;
             router_offset++;
          }
          else
           {
             r->global_channel[i]=r->router_id - ((router_offset) * p->num_routers);
           }
        if(r->global_channel[i]<0)
         {
           r->global_channel[i]=p->total_routers+r->global_channel[i]; 
	 }
#if DEBUG == 1
    printf("\n channel %d ", r->global_channel[i]);
#endif 
    }
#endif

#if DEBUG == 1
   printf("\n");
#endif
   return;
}	


/* dragonfly packet event , generates a dragonfly packet on the compute node */
844
845
846
847
848
849
850
851
852
853
static tw_stime dragonfly_packet_event(
        model_net_request const * req,
        uint64_t message_offset,
        uint64_t packet_size,
        tw_stime offset,
        mn_sched_params const * sched_params,
        void const * remote_event,
        void const * self_event,
        tw_lp *sender,
        int is_last_pckt)
854
{
855
856
857
858
859
    tw_event * e_new;
    tw_stime xfer_to_nic_time;
    terminal_message * msg;
    char* tmp_ptr;

860
861
862
    xfer_to_nic_time = codes_local_latency(sender); 
    //e_new = tw_event_new(sender->gid, xfer_to_nic_time+offset, sender);
    //msg = tw_event_data(e_new);
863
864
    e_new = model_net_method_event_new(sender->gid, xfer_to_nic_time+offset,
            sender, DRAGONFLY, (void**)&msg, (void**)&tmp_ptr);
865
866
    strcpy(msg->category, req->category);
    msg->final_dest_gid = req->final_dest_lp;
867
    msg->total_size = req->msg_size;
868
    msg->sender_lp=req->src_lp;
869
    msg->sender_mn_lp = sender->gid;
870
    msg->packet_size = packet_size;
871
    msg->travel_start_time = tw_now(sender);
872
873
874
    msg->remote_event_size_bytes = 0;
    msg->local_event_size_bytes = 0;
    msg->type = T_GENERATE;
875
    msg->dest_terminal_id = req->dest_mn_lp;
876
    msg->message_id = req->msg_id;
877
878
    msg->is_pull = req->is_pull;
    msg->pull_size = req->pull_size;
879
    msg->magic = terminal_magic_num; 
880
881
    msg->msg_start_time = req->msg_start_time;

882
883
    if(is_last_pckt) /* Its the last packet so pass in remote and local event information*/
      {
884
	if(req->remote_event_size > 0)
885
	 {
886
887
888
		msg->remote_event_size_bytes = req->remote_event_size;
		memcpy(tmp_ptr, remote_event, req->remote_event_size);
		tmp_ptr += req->remote_event_size;
889
	}
890
	if(req->self_event_size > 0)
891
	{
892
893
894
		msg->local_event_size_bytes = req->self_event_size;
		memcpy(tmp_ptr, self_event, req->self_event_size);
		tmp_ptr += req->self_event_size;
895
896
	}
     }
897
	   //printf("\n dragonfly remote event %d local event %d last packet %d %lf ", msg->remote_event_size_bytes, msg->local_event_size_bytes, is_last_pckt, xfer_to_nic_time);
898
    tw_event_send(e_new);
899
    return xfer_to_nic_time;
900
901
902
903
904
905
906
907
908
}

/* dragonfly packet event reverse handler */
static void dragonfly_packet_event_rc(tw_lp *sender)
{
	  codes_local_latency_reverse(sender);
	    return;
}

909
910
911
/* given two group IDs, find the router of the src_gid that connects to the dest_gid*/
tw_lpid getRouterFromGroupID(int dest_gid, 
		    int src_gid,
912
913
		    int num_routers,
                    int total_groups)
914
{
915
916
917
918
919
920
921
#if USE_DIRECT_SCHEME
  int dest = dest_gid;
  if(dest == total_groups - 1) {
      dest = src_gid;
  }
  return src_gid * num_routers + (dest % num_routers);
#else
922
923
924
  int group_begin = src_gid * num_routers;
  int group_end = (src_gid * num_routers) + num_routers-1;
  int offset = (dest_gid * num_routers - group_begin) / num_routers;
925
  
926
927
  if((dest_gid * num_routers) < group_begin)
    offset = (group_begin - dest_gid * num_routers) / num_routers; // take absolute value
928
  
929
930
  int half_channel = num_routers / 4;
  int index = (offset - 1)/(half_channel * num_routers);
931
  
932
  offset=(offset - 1) % (half_channel * num_routers);
933
934
935
936
937
938
939
940
941
942

  // If the destination router is in the same group
  tw_lpid router_id;

  if(index % 2 != 0)
    router_id = group_end - (offset / half_channel); // start from the end
  else
    router_id = group_begin + (offset / half_channel);

  return router_id;
943
#endif
944
945
946
}	

/*When a packet is sent from the current router and a buffer slot becomes available, a credit is sent back to schedule another packet event*/
947
948
void router_credit_send(router_state * s, tw_bf * bf, terminal_message * msg, 
  tw_lp * lp, int sq) {
949
950
951
952
  tw_event * buf_e;
  tw_stime ts;
  terminal_message * buf_msg;

953
  int dest = 0,  type = R_BUFFER;
954
  int is_terminal = 0;
955

956
  const dragonfly_param *p = s->params;
957
958
959
960
961
962
963
964
965
966
967
968
969
 
  // Notify sender terminal about available buffer space
  if(msg->last_hop == TERMINAL) {
    dest = msg->src_terminal_id;
    type = T_BUFFER;
    is_terminal = 1;
  } else if(msg->last_hop == GLOBAL) {
    dest = msg->intm_lp_id;
  } else if(msg->last_hop == LOCAL) {
    dest = msg->intm_lp_id;
  } else
    printf("\n Invalid message type");

970
  ts = g_tw_lookahead + p->credit_delay +  tw_rand_unif(lp->rng);
971
	
972
973
974
975
976
977
978
979
980
981
982
983
984
985
  if (is_terminal) {
    buf_e = model_net_method_event_new(dest, ts, lp, DRAGONFLY, 
      (void**)&buf_msg, NULL);
    buf_msg->magic = terminal_magic_num;
  } else {
    buf_e = tw_event_new(dest, ts , lp);
    buf_msg = tw_event_data(buf_e);
    buf_msg->magic = router_magic_num;
  }
 
  if(sq == -1) {
    buf_msg->vc_index = msg->vc_index;
    buf_msg->output_chan = msg->output_chan;
  } else {
986
    buf_msg->vc_index = msg->saved_vc;
987
988
989
990
    buf_msg->output_chan = msg->saved_channel;
  }
  
  buf_msg->type = type;
991

992
993
  tw_event_send(buf_e);
  return;
994
995
}

996
void packet_generate_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp)
997
{
998
999
1000
1001
        
   s->packet_gen--;
   packet_gen--;
   
1002
   tw_rand_reverse_unif(lp->rng);
1003

1004
1005
1006
   int num_chunks = msg->packet_size/s->params->chunk_size;
   if(msg->packet_size % s->params->chunk_size)
       num_chunks++;
1007

1008
   if(!num_chunks)
1009
       num_chunks = 1;
1010

1011
1012
1013
1014
   int i;
   for(i = 0; i < num_chunks; i++) {
        delete_terminal_message_list(return_tail(s->terminal_msgs, 
          s->terminal_msgs_tail, 0));
1015
        s->terminal_length -= s->params->chunk_size;
1016
1017
   }
    if(bf->c5) {
1018
        codes_local_latency_reverse(lp);
1019
1020
        s->in_send_loop = 0;
    }
1021
1022
      if(bf->c11) {
        s->issueIdle = 0;
1023
        s->last_buf_full = msg->saved_busy_time;
1024
      }
1025
1026
     struct mn_stats* stat;
     stat = model_net_find_stats(msg->category, s->dragonfly_stats_array);
1027
1028
1029
1030
     stat->send_count--;
     stat->send_bytes -= msg->packet_size;
     stat->send_time -= (1/s->params->cn_bandwidth) * msg->packet_size;
}
1031

1032
/* generates packet at the current dragonfly compute node */
1033
1034
void packet_generate(terminal_state * s, tw_bf * bf, terminal_message * msg, 
  tw_lp * lp) {
1035
1036
  packet_gen++;
  s->packet_gen++;
1037

1038
  tw_stime ts, nic_ts;
1039

1040
  assert(lp->gid != msg->dest_terminal_id);
1041
  const dragonfly_param *p = s->params;
1042

1043
1044
  int i, total_event_size;
  int num_chunks = msg->packet_size / p->chunk_size;
1045
1046
  if (msg->packet_size % s->params->chunk_size) 
      num_chunks++;
1047
1048
1049
1050

  if(!num_chunks)
    num_chunks = 1;

1051
1052
  nic_ts = g_tw_lookahead + s->params->cn_delay * msg->packet_size + tw_rand_unif(lp->rng);
  
1053
  msg->packet_ID = lp->gid + g_tw_nlp * s->packet_counter;
1054
1055
1056
1057
  msg->my_N_hop = 0;
  msg->my_l_hop = 0;
  msg->my_g_hop = 0;
  msg->intm_group_id = -1;
1058

1059
1060
1061
1062
1063
  //if(msg->dest_terminal_id == TRACK)
  if(msg->packet_ID == TRACK_PKT)
    printf("\n Packet %ld generated at terminal %d dest %ld size %d num chunks %d ", 
            msg->packet_ID, s->terminal_id, msg->dest_terminal_id,
            msg->packet_size, num_chunks);
1064

1065
1066
1067
1068
  for(i = 0; i < num_chunks; i++)
  {
    terminal_message_list *cur_chunk = (terminal_message_list*)malloc(
      sizeof(terminal_message_list));
1069
    msg->origin_router_id = s->router_id;
1070
    init_terminal_message_list(cur_chunk, msg);
1071
  
1072

1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
    if(msg->remote_event_size_bytes + msg->local_event_size_bytes > 0) {
      cur_chunk->event_data = (char*)malloc(
          msg->remote_event_size_bytes + msg->local_event_size_bytes);
    }
    
    void * m_data_src = model_net_method_get_edata(DRAGONFLY, msg);
    if (msg->remote_event_size_bytes){
      memcpy(cur_chunk->event_data, m_data_src, msg->remote_event_size_bytes);
    }
    if (msg->local_event_size_bytes){ 
      m_data_src = (char*)m_data_src + msg->remote_event_size_bytes;
      memcpy((char*)cur_chunk->event_data + msg->remote_event_size_bytes, 
          m_data_src, msg->local_event_size_bytes);
    }
1087

1088
    cur_chunk->msg.chunk_id = i;
1089
    cur_chunk->msg.origin_router_id = s->router_id;
1090
1091
    append_to_terminal_message_list(s->terminal_msgs, s->terminal_msgs_tail,
      0, cur_chunk);
1092
    s->terminal_length += s->params->chunk_size;
1093
  }
1094

1095
1096
1097
1098
1099
  if(s->terminal_length < 2 * s->params->cn_vc_size) {
    model_net_method_idle_event(nic_ts, 0, lp);
  } else {
    bf->c11 = 1;
    s->issueIdle = 1;
1100
1101
    msg->saved_busy_time = s->last_buf_full;
    s->last_buf_full = tw_now(lp);
1102
  }
1103
  
1104
1105
  if(s->in_send_loop == 0) {
    bf->c5 = 1;
1106
    ts = codes_local_latency(lp);
1107
1108
1109
1110
1111
1112
1113
1114
    terminal_message *m;
    tw_event* e = model_net_method_event_new(lp->gid, ts, lp, DRAGONFLY, 
      (void**)&m, NULL);
    m->type = T_SEND;
    m->magic = terminal_magic_num;
    s->in_send_loop = 1;
    tw_event_send(e);
  }
1115

1116
1117
1118
1119
1120
1121
1122
1123
  total_event_size = model_net_get_msg_sz(DRAGONFLY) + 
      msg->remote_event_size_bytes + msg->local_event_size_bytes;
  mn_stats* stat;
  stat = model_net_find_stats(msg->category, s->dragonfly_stats_array);
  stat->send_count++;
  stat->send_bytes += msg->packet_size;
  stat->send_time += (1/p->cn_bandwidth) * msg->packet_size;
  if(stat->max_event_size < total_event_size)
1124
	  stat->max_event_size = total_event_size;
1125

1126
1127
1128
  return;
}

1129
1130
void packet_send_rc(terminal_state * s, tw_bf * bf, terminal_message * msg,
        tw_lp * lp)
1131
{
1132
1133
      if(bf->c1) {
        s->in_send_loop = 1;
1134
        s->last_buf_full = msg->saved_busy_time;
1135
1136
1137
1138
        return;
      }
      
      tw_rand_reverse_unif(lp->rng);
1139
      s->terminal_available_time = msg->saved_available_time;