dragonfly.c 101 KB
Newer Older
1 2 3 4 5 6
/*
 * Copyright (C) 2013 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

7 8 9 10
// Local router ID: 0 --- total_router-1
// Router LP ID 
// Terminal LP ID

11 12
#include <ross.h>

13
#define DEBUG_LP 892
14
#include "codes/jenkins-hash.h"
15 16 17 18
#include "codes/codes_mapping.h"
#include "codes/codes.h"
#include "codes/model-net.h"
#include "codes/model-net-method.h"
19 20
#include "codes/model-net-lp.h"
#include "codes/net/dragonfly.h"
21
#include "sys/file.h"
22
#include "codes/quickhash.h"
23
#include "codes/rc-stack.h"
24 25 26 27

#define CREDIT_SIZE 8
#define MEAN_PROCESS 1.0

28 29 30
/* collective specific parameters */
#define TREE_DEGREE 4
#define LEVEL_DELAY 1000
31
#define DRAGONFLY_COLLECTIVE_DEBUG 0
32 33 34
#define NUM_COLLECTIVES  1
#define COLLECTIVE_COMPUTATION_DELAY 5700
#define DRAGONFLY_FAN_OUT_DELAY 20.0
35
#define WINDOW_LENGTH 0
36
#define DFLY_HASH_TABLE_SIZE 262144
37

38
// debugging parameters
39 40
#define TRACK -1
#define TRACK_PKT -1
41
#define TRACK_MSG -1
42
#define PRINT_ROUTER_TABLE 1
43
#define DEBUG 0
44
#define USE_DIRECT_SCHEME 1
45
#define MAX_STATS 65536
46

47 48 49 50
#define LP_CONFIG_NM_TERM (model_net_lp_config_names[DRAGONFLY])
#define LP_METHOD_NM_TERM (model_net_method_names[DRAGONFLY])
#define LP_CONFIG_NM_ROUT (model_net_lp_config_names[DRAGONFLY_ROUTER])
#define LP_METHOD_NM_ROUT (model_net_method_names[DRAGONFLY_ROUTER])
51

52
int debug_slot_count = 0;
53
long term_ecount, router_ecount, term_rev_ecount, router_rev_ecount;
54
long packet_gen = 0, packet_fin = 0;
55

56 57
static double maxd(double a, double b) { return a < b ? b : a; }

58
/* minimal and non-minimal packet counts for adaptive routing*/
59
static int minimal_count=0, nonmin_count=0;
60

61 62 63 64 65 66
typedef struct dragonfly_param dragonfly_param;
/* annotation-specific parameters (unannotated entry occurs at the 
 * last index) */
static uint64_t                  num_params = 0;
static dragonfly_param         * all_params = NULL;
static const config_anno_map_t * anno_map   = NULL;
67 68

/* global variables for codes mapping */
69
static char lp_group_name[MAX_NAME_LENGTH];
70 71
static int mapping_grp_id, mapping_type_id, mapping_rep_id, mapping_offset;

72 73 74 75 76 77
/* router magic number */
int router_magic_num = 0;

/* terminal magic number */
int terminal_magic_num = 0;

78 79
FILE * dragonfly_log = NULL;

80
int sample_bytes_written = 0;
81
int sample_rtr_bytes_written = 0;
82

83 84 85
char cn_sample_file[MAX_NAME_LENGTH];
char router_sample_file[MAX_NAME_LENGTH];

86 87 88 89 90 91 92
typedef struct terminal_message_list terminal_message_list;
struct terminal_message_list {
    terminal_message msg;
    char* event_data;
    terminal_message_list *next;
    terminal_message_list *prev;
};
93

94 95 96 97 98 99 100
void init_terminal_message_list(terminal_message_list *this, 
    terminal_message *inmsg) {
    this->msg = *inmsg;
    this->event_data = NULL;
    this->next = NULL;
    this->prev = NULL;
}
101

102 103 104 105
void delete_terminal_message_list(terminal_message_list *this) {
    if(this->event_data != NULL) free(this->event_data);
    free(this);
}
106

107 108 109 110 111 112 113 114 115 116 117 118 119 120
struct dragonfly_param
{
    // configuration parameters
    int num_routers; /*Number of routers in a group*/
    double local_bandwidth;/* bandwidth of the router-router channels within a group */
    double global_bandwidth;/* bandwidth of the inter-group router connections */
    double cn_bandwidth;/* bandwidth of the compute node channels connected to routers */
    int num_vcs; /* number of virtual channels */
    int local_vc_size; /* buffer size of the router-router channels */
    int global_vc_size; /* buffer size of the global channels */
    int cn_vc_size; /* buffer size of the compute node channels */
    int chunk_size; /* full-sized packets are broken into smaller chunks.*/
    // derived parameters
    int num_cn;
121
    int num_groups;
122 123
    int radix;
    int total_routers;
124
    int total_terminals;
125
    int num_global_channels;
126 127 128 129
    double cn_delay;
    double local_delay;
    double global_delay;
    double credit_delay;
130
    double router_delay;
131 132
};

133 134 135 136 137 138
struct dfly_hash_key
{
    uint64_t message_id;
    tw_lpid sender_id;
};

139 140 141 142 143 144 145 146 147
struct dfly_router_sample
{
    tw_lpid router_id;
    tw_stime* busy_time;
    int64_t* link_traffic;
    tw_stime end_time;
};

struct dfly_cn_sample
148 149 150 151 152 153 154 155 156 157
{
   tw_lpid terminal_id;
   long fin_chunks_sample;
   long data_size_sample;
   double fin_hops_sample;
   tw_stime fin_chunks_time;
   tw_stime busy_time_sample;
   tw_stime end_time;
};

158 159 160 161 162 163 164 165 166
struct dfly_qhash_entry
{
   struct dfly_hash_key key;
   char * remote_event_data;
   int num_chunks;
   int remote_event_size;
   struct qhash_head hash_link;
};

167 168 169 170 171 172 173 174
/* handles terminal and router events like packet generate/send/receive/buffer */
typedef enum event_t event_t;
typedef struct terminal_state terminal_state;
typedef struct router_state router_state;

/* dragonfly compute node data structure */
struct terminal_state
{
175
   uint64_t packet_counter;
176

177 178 179
   int packet_gen;
   int packet_fin;

180
   // Dragonfly specific parameters
181 182
   unsigned int router_id;
   unsigned int terminal_id;
183 184 185

   // Each terminal will have an input and output channel with the router
   int* vc_occupancy; // NUM_VC
186
   int num_vcs;
187
   tw_stime terminal_available_time;
188 189 190
   terminal_message_list **terminal_msgs;
   terminal_message_list **terminal_msgs_tail;
   int in_send_loop;
191 192 193 194
// Terminal generate, sends and arrival T_SEND, T_ARRIVAL, T_GENERATE
// Router-Router Intra-group sends and receives RR_LSEND, RR_LARRIVE
// Router-Router Inter-group sends and receives RR_GSEND, RR_GARRIVE
   struct mn_stats dragonfly_stats_array[CATEGORY_MAX];
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
  /* collective init time */
  tw_stime collective_init_time;

  /* node ID in the tree */ 
   tw_lpid node_id;

   /* messages sent & received in collectives may get interchanged several times so we have to save the 
     origin server information in the node's state */
   tw_lpid origin_svr; 
  
  /* parent node ID of the current node */
   tw_lpid parent_node_id;
   /* array of children to be allocated in terminal_init*/
   tw_lpid* children;

   /* children of a node can be less than or equal to the tree degree */
   int num_children;

   short is_root;
   short is_leaf;

216
   struct rc_stack * st;
217 218
   int issueIdle;
   int terminal_length;
219

220 221 222
   /* to maintain a count of child nodes that have fanned in at the parent during the collective
      fan-in phase*/
   int num_fan_nodes;
223 224 225

   const char * anno;
   const dragonfly_param *params;
226

227 228 229
   struct qhash_table *rank_tbl;
   uint64_t rank_tbl_pop;

230
   tw_stime   total_time;
231
   long total_msg_size;
232
   double total_hops;
233
   long finished_msgs;
234
   long finished_chunks;
235
   long finished_packets;
236

237 238
   tw_stime last_buf_full;
   tw_stime busy_time;
239
   char output_buf[4096];
240 241
   /* For LP suspend functionality */
   int error_ct;
242 243 244 245 246 247 248 249 250

   /* For sampling */
   long fin_chunks_sample;
   long data_size_sample;
   double fin_hops_sample;
   tw_stime fin_chunks_time;
   tw_stime busy_time_sample;

   char sample_buf[4096];
251
   struct dfly_cn_sample * sample_stat;
252 253
   int op_arr_size;
   int max_arr_size;
254
};
255

256 257 258 259 260
/* terminal event type (1-4) */
enum event_t
{
  T_GENERATE=1,
  T_ARRIVE,
261
  T_SEND,
262
  T_BUFFER,
263 264
  R_SEND,
  R_ARRIVE,
265 266 267 268
  R_BUFFER,
  D_COLLECTIVE_INIT,
  D_COLLECTIVE_FAN_IN,
  D_COLLECTIVE_FAN_OUT
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291
};
/* status of a virtual channel can be idle, active, allocated or wait for credit */
enum vc_status
{
   VC_IDLE,
   VC_ACTIVE,
   VC_ALLOC,
   VC_CREDIT
};

/* whether the last hop of a packet was global, local or a terminal */
enum last_hop
{
   GLOBAL,
   LOCAL,
   TERMINAL
};

/* three forms of routing algorithms available, adaptive routing is not
 * accurate and fully functional in the current version as the formulas
 * for detecting load on global channels are not very accurate */
enum ROUTING_ALGO
{
292 293
    MINIMAL = 0,
    NON_MINIMAL,
294 295
    ADAPTIVE,
    PROG_ADAPTIVE
296 297 298 299 300
};

struct router_state
{
   unsigned int router_id;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
301
   int group_id;
302 303 304
   int op_arr_size;
   int max_arr_size;

305
   int* global_channel; 
306
   
307
   tw_stime* next_output_available_time;
308
   tw_stime* cur_hist_start_time;
309
   tw_stime* last_buf_full;
310

311
   tw_stime* busy_time;
312
   tw_stime* busy_time_sample;
313

314 315 316 317 318
   terminal_message_list ***pending_msgs;
   terminal_message_list ***pending_msgs_tail;
   terminal_message_list ***queued_msgs;
   terminal_message_list ***queued_msgs_tail;
   int *in_send_loop;
319
   int *queued_count;
320
   struct rc_stack * st;
321
   
322
   int** vc_occupancy;
323
   int64_t* link_traffic;
324
   int64_t * link_traffic_sample;
325 326 327

   const char * anno;
   const dragonfly_param *params;
328 329 330

   int* prev_hist_num;
   int* cur_hist_num;
331
   
332
   char output_buf[4096];
333
   char output_buf2[4096];
334 335

   struct dfly_router_sample * rsamples;
336 337 338 339
};

static short routing = MINIMAL;

340 341
static tw_stime         dragonfly_total_time = 0;
static tw_stime         dragonfly_max_latency = 0;
342
static tw_stime         max_collective = 0;
343

344

345 346
static long long       total_hops = 0;
static long long       N_finished_packets = 0;
347 348 349
static long long       total_msg_sz = 0;
static long long       N_finished_msgs = 0;
static long long       N_finished_chunks = 0;
350

351 352 353 354
static int dragonfly_rank_hash_compare(
        void *key, struct qhash_head *link)
{
    struct dfly_hash_key *message_key = (struct dfly_hash_key *)key;
355
    struct dfly_qhash_entry *tmp = NULL;
356 357 358 359 360 361 362 363 364

    tmp = qhash_entry(link, struct dfly_qhash_entry, hash_link);
    
    if (tmp->key.message_id == message_key->message_id
            && tmp->key.sender_id == message_key->sender_id)
        return 1;

    return 0;
}
365 366
static int dragonfly_hash_func(void *k, int table_size)
{
367
    struct dfly_hash_key *tmp = (struct dfly_hash_key *)k;
368 369 370
    uint32_t pc = 0, pb = 0;	
    bj_hashlittle2(tmp, sizeof(*tmp), &pc, &pb);
    /*uint64_t key = (~tmp->message_id) + (tmp->message_id << 18);
371 372 373
    key = key * 21;
    key = ~key ^ (tmp->sender_id >> 4);
    key = key * tmp->sender_id; 
374 375
    return (int)(key & (table_size - 1));*/
    return (int)(pc % (table_size - 1));
376 377
}

378 379 380 381 382 383 384 385 386 387 388 389 390 391
/* convert GiB/s and bytes to ns */
static tw_stime bytes_to_ns(uint64_t bytes, double GB_p_s)
{
    tw_stime time;

    /* bytes to GB */
    time = ((double)bytes)/(1024.0*1024.0*1024.0);
    /* MB to s */
    time = time / GB_p_s;
    /* s to ns */
    time = time * 1000.0 * 1000.0 * 1000.0;

    return(time);
}
392

393 394
/* returns the dragonfly message size */
static int dragonfly_get_msg_sz(void)
395
{
396 397
	   return sizeof(terminal_message);
}
398

399 400 401 402 403 404
static void free_tmp(void * ptr)
{
    struct dfly_qhash_entry * dfly = ptr; 
    free(dfly->remote_event_data);
    free(dfly);
}
405 406 407 408 409 410 411 412 413 414 415 416
static void append_to_terminal_message_list(  
        terminal_message_list ** thisq,
        terminal_message_list ** thistail,
        int index, 
        terminal_message_list *msg) {
    if(thisq[index] == NULL) {
        thisq[index] = msg;
    } else {
        thistail[index]->next = msg;
        msg->prev = thistail[index];
    } 
    thistail[index] = msg;
417 418
}

419 420 421 422 423 424 425 426 427 428 429 430 431
static void prepend_to_terminal_message_list(  
        terminal_message_list ** thisq,
        terminal_message_list ** thistail,
        int index, 
        terminal_message_list *msg) {
    if(thisq[index] == NULL) {
        thistail[index] = msg;
    } else {
        thisq[index]->prev = msg;
        msg->next = thisq[index];
    } 
    thisq[index] = msg;
}
432

433 434 435 436 437 438 439 440 441 442 443 444 445 446 447
static terminal_message_list* return_head(
        terminal_message_list ** thisq,
        terminal_message_list ** thistail,
        int index) {
    terminal_message_list *head = thisq[index];
    if(head != NULL) {
        thisq[index] = head->next;
        if(head->next != NULL) {
            head->next->prev = NULL;
            head->next = NULL;
        } else {
            thistail[index] = NULL;
        }
    }
    return head;
448 449
}

450 451 452 453 454
static terminal_message_list* return_tail(
        terminal_message_list ** thisq,
        terminal_message_list ** thistail,
        int index) {
    terminal_message_list *tail = thistail[index];
455
    assert(tail);
456 457 458 459 460 461 462 463 464
    if(tail->prev != NULL) {
        tail->prev->next = NULL;
        thistail[index] = tail->prev;
        tail->prev = NULL;
    } else {
        thistail[index] = NULL;
        thisq[index] = NULL;
    }
    return tail;
465 466
}

467 468 469
static void dragonfly_read_config(const char * anno, dragonfly_param *params){
    // shorthand
    dragonfly_param *p = params;
470

471
    int rc = configuration_get_value_int(&config, "PARAMS", "num_routers", anno,
472
            &p->num_routers);
473
    if(rc) {
474 475 476 477 478
        p->num_routers = 4;
        fprintf(stderr, "Number of dimensions not specified, setting to %d\n",
                p->num_routers);
    }

479
    p->num_vcs = 3;
480

481 482
    rc = configuration_get_value_int(&config, "PARAMS", "local_vc_size", anno, &p->local_vc_size);
    if(rc) {
483 484 485 486
        p->local_vc_size = 1024;
        fprintf(stderr, "Buffer size of local channels not specified, setting to %d\n", p->local_vc_size);
    }

487 488
    rc = configuration_get_value_int(&config, "PARAMS", "global_vc_size", anno, &p->global_vc_size);
    if(rc) {
489 490 491 492
        p->global_vc_size = 2048;
        fprintf(stderr, "Buffer size of global channels not specified, setting to %d\n", p->global_vc_size);
    }

493 494
    rc = configuration_get_value_int(&config, "PARAMS", "cn_vc_size", anno, &p->cn_vc_size);
    if(rc) {
495 496 497 498
        p->cn_vc_size = 1024;
        fprintf(stderr, "Buffer size of compute node channels not specified, setting to %d\n", p->cn_vc_size);
    }

499 500
    rc = configuration_get_value_int(&config, "PARAMS", "chunk_size", anno, &p->chunk_size);
    if(rc) {
501
        p->chunk_size = 512;
502
        fprintf(stderr, "Chunk size for packets is specified, setting to %d\n", p->chunk_size);
503 504
    }

505 506
    rc = configuration_get_value_double(&config, "PARAMS", "local_bandwidth", anno, &p->local_bandwidth);
    if(rc) {
507 508 509 510
        p->local_bandwidth = 5.25;
        fprintf(stderr, "Bandwidth of local channels not specified, setting to %lf\n", p->local_bandwidth);
    }

511 512
    rc = configuration_get_value_double(&config, "PARAMS", "global_bandwidth", anno, &p->global_bandwidth);
    if(rc) {
513 514 515 516
        p->global_bandwidth = 4.7;
        fprintf(stderr, "Bandwidth of global channels not specified, setting to %lf\n", p->global_bandwidth);
    }

517 518
    rc = configuration_get_value_double(&config, "PARAMS", "cn_bandwidth", anno, &p->cn_bandwidth);
    if(rc) {
519 520 521 522
        p->cn_bandwidth = 5.25;
        fprintf(stderr, "Bandwidth of compute node channels not specified, setting to %lf\n", p->cn_bandwidth);
    }

523 524 525 526
    p->router_delay = 50;
    configuration_get_value_double(&config, "PARAMS", "router_delay", anno,
            &p->router_delay);

527 528 529 530 531
    configuration_get_value(&config, "PARAMS", "cn_sample_file", anno, cn_sample_file,
            MAX_NAME_LENGTH);
    configuration_get_value(&config, "PARAMS", "rt_sample_file", anno, router_sample_file,
            MAX_NAME_LENGTH);
    
532 533
    char routing_str[MAX_NAME_LENGTH];
    configuration_get_value(&config, "PARAMS", "routing", anno, routing_str,
534
            MAX_NAME_LENGTH);
535 536
    if(strcmp(routing_str, "minimal") == 0)
        routing = MINIMAL;
537 538
    else if(strcmp(routing_str, "nonminimal")==0 || 
            strcmp(routing_str,"non-minimal")==0)
539 540 541 542 543
        routing = NON_MINIMAL;
    else if (strcmp(routing_str, "adaptive") == 0)
        routing = ADAPTIVE;
    else if (strcmp(routing_str, "prog-adaptive") == 0)
	routing = PROG_ADAPTIVE;
544 545 546 547
    else
    {
        fprintf(stderr, 
                "No routing protocol specified, setting to minimal routing\n");
548
        routing = -1;
549 550 551 552 553 554
    }

    // set the derived parameters
    p->num_cn = p->num_routers/2;
    p->num_global_channels = p->num_routers/2;
    p->num_groups = p->num_routers * p->num_cn + 1;
555
    p->radix = (p->num_global_channels + p->num_routers + p->num_cn);
556
    p->total_routers = p->num_groups * p->num_routers;
557
    p->total_terminals = p->total_routers * p->num_cn;
558 559 560 561 562 563 564
    int rank;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    if(!rank) {
        printf("\n Total nodes %d routers %d groups %d radix %d \n",
                p->num_cn * p->total_routers, p->total_routers, p->num_groups,
                p->radix);
    }
565
    
566 567 568 569
    p->cn_delay = bytes_to_ns(p->chunk_size, p->cn_bandwidth);
    p->local_delay = bytes_to_ns(p->chunk_size, p->local_bandwidth);
    p->global_delay = bytes_to_ns(p->chunk_size, p->global_bandwidth);
    p->credit_delay = bytes_to_ns(8.0, p->local_bandwidth); //assume 8 bytes packet
570 571 572

}

573
static void dragonfly_configure(){
574
    anno_map = codes_mapping_get_lp_anno_map(LP_CONFIG_NM_TERM);
575 576
    assert(anno_map);
    num_params = anno_map->num_annos + (anno_map->has_unanno_lp > 0);
577
    all_params = malloc(num_params * sizeof(*all_params));
578

Jonathan Jenkins's avatar
Jonathan Jenkins committed
579
    for (int i = 0; i < anno_map->num_annos; i++){
580
        const char * anno = anno_map->annotations[i].ptr;
581 582 583 584 585
        dragonfly_read_config(anno, &all_params[i]);
    }
    if (anno_map->has_unanno_lp > 0){
        dragonfly_read_config(NULL, &all_params[anno_map->num_annos]);
    }
586 587 588 589 590
}

/* report dragonfly statistics like average and maximum packet latency, average number of hops traversed */
static void dragonfly_report_stats()
{
591 592
   long long avg_hops, total_finished_packets, total_finished_chunks;
   long long total_finished_msgs, final_msg_sz;
593
   tw_stime avg_time, max_time;
594
   int total_minimal_packets, total_nonmin_packets;
595
   long total_gen, total_fin;
596 597 598

   MPI_Reduce( &total_hops, &avg_hops, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
   MPI_Reduce( &N_finished_packets, &total_finished_packets, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
599 600 601
   MPI_Reduce( &N_finished_msgs, &total_finished_msgs, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
   MPI_Reduce( &N_finished_chunks, &total_finished_chunks, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
   MPI_Reduce( &total_msg_sz, &final_msg_sz, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
602 603
   MPI_Reduce( &dragonfly_total_time, &avg_time, 1,MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
   MPI_Reduce( &dragonfly_max_latency, &max_time, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
604 605 606
   
   MPI_Reduce( &packet_gen, &total_gen, 1, MPI_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
   MPI_Reduce( &packet_fin, &total_fin, 1, MPI_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
607
   if(routing == ADAPTIVE || routing == PROG_ADAPTIVE)
608 609 610 611
    {
	MPI_Reduce(&minimal_count, &total_minimal_packets, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
 	MPI_Reduce(&nonmin_count, &total_nonmin_packets, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
    }
612

613 614
   /* print statistics */
   if(!g_tw_mynode)
615
   {	
616 617
      printf(" Average number of hops traversed %f average chunk latency %lf us maximum chunk latency %lf us avg message size %lf bytes finished messages %lld finished chunks %lld \n", 
              (float)avg_hops/total_finished_chunks, avg_time/(total_finished_chunks*1000), max_time/1000, (float)final_msg_sz/total_finished_msgs, total_finished_msgs, total_finished_chunks);
618
     if(routing == ADAPTIVE || routing == PROG_ADAPTIVE)
619 620
              printf("\n ADAPTIVE ROUTING STATS: %d chunks routed minimally %d chunks routed non-minimally completed packets %lld \n", 
                      total_minimal_packets, total_nonmin_packets, total_finished_chunks);
621
 
622
      printf("\n Total packets generated %ld finished %ld \n", total_gen, total_fin);
623
   }
624 625
   return;
}
626

627 628 629
void dragonfly_collective_init(terminal_state * s,
           		   tw_lp * lp)
{
630 631 632
    // TODO: be annotation-aware
    codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, NULL,
            &mapping_type_id, NULL, &mapping_rep_id, &mapping_offset);
633
    int num_lps = codes_mapping_get_lp_count(lp_group_name, 1, LP_CONFIG_NM_TERM,
634
            NULL, 1);
635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662
    int num_reps = codes_mapping_get_group_reps(lp_group_name);
    s->node_id = (mapping_rep_id * num_lps) + mapping_offset;

    int i;
   /* handle collective operations by forming a tree of all the LPs */
   /* special condition for root of the tree */
   if( s->node_id == 0)
    {
        s->parent_node_id = -1;
        s->is_root = 1;
   }
   else
   {
       s->parent_node_id = (s->node_id - ((s->node_id - 1) % TREE_DEGREE)) / TREE_DEGREE;
       s->is_root = 0;
   }
   s->children = (tw_lpid*)malloc(TREE_DEGREE * sizeof(tw_lpid));

   /* set the isleaf to zero by default */
   s->is_leaf = 1;
   s->num_children = 0;

   /* calculate the children of the current node. If its a leaf, no need to set children,
      only set isleaf and break the loop*/

   for( i = 0; i < TREE_DEGREE; i++ )
    {
        tw_lpid next_child = (TREE_DEGREE * s->node_id) + i + 1;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
663
        if(next_child < ((tw_lpid)num_lps * (tw_lpid)num_reps))
664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684
        {
            s->num_children++;
            s->is_leaf = 0;
            s->children[i] = next_child;
        }
        else
           s->children[i] = -1;
    }

#if DRAGONFLY_COLLECTIVE_DEBUG == 1
   printf("\n LP %ld parent node id ", s->node_id);

   for( i = 0; i < TREE_DEGREE; i++ )
        printf(" child node ID %ld ", s->children[i]);
   printf("\n");

   if(s->is_leaf)
        printf("\n LP %ld is leaf ", s->node_id);
#endif
}

685 686 687 688 689
/* initialize a dragonfly compute node terminal */
void 
terminal_init( terminal_state * s, 
	       tw_lp * lp )
{
690 691 692
    s->packet_gen = 0;
    s->packet_fin = 0;

693
    uint32_t h1 = 0, h2 = 0; 
694
    bj_hashlittle2(LP_METHOD_NM_TERM, strlen(LP_METHOD_NM_TERM), &h1, &h2);
695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713
    terminal_magic_num = h1 + h2;
    
    int i;
    char anno[MAX_NAME_LENGTH];

    // Assign the global router ID
    // TODO: be annotation-aware
    codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, NULL,
            &mapping_type_id, anno, &mapping_rep_id, &mapping_offset);
    if (anno[0] == '\0'){
        s->anno = NULL;
        s->params = &all_params[num_params-1];
    }
    else{
        s->anno = strdup(anno);
        int id = configuration_get_annotation_index(anno, anno_map);
        s->params = &all_params[id];
    }

714
   int num_lps = codes_mapping_get_lp_count(lp_group_name, 1, LP_CONFIG_NM_TERM,
715 716 717
           s->anno, 0);

   s->terminal_id = (mapping_rep_id * num_lps) + mapping_offset;  
718
   
719 720 721
   s->router_id=(int)s->terminal_id / (s->params->num_routers/2);
   s->terminal_available_time = 0.0;
   s->packet_counter = 0;
722
   
723
   s->finished_msgs = 0;
724 725 726
   s->finished_chunks = 0;
   s->finished_packets = 0;
   s->total_time = 0.0;
727
   s->total_msg_size = 0;
728

729 730 731
   s->last_buf_full = 0.0;
   s->busy_time = 0.0;

732
   rc_stack_create(&s->st);
733 734 735 736 737 738 739 740
   s->num_vcs = 1;
   s->vc_occupancy = (int*)malloc(s->num_vcs * sizeof(int));

   for( i = 0; i < s->num_vcs; i++ )
    {
      s->vc_occupancy[i]=0;
    }

741
   s->rank_tbl = qhash_init(dragonfly_rank_hash_compare, dragonfly_hash_func, DFLY_HASH_TABLE_SIZE);
742 743 744 745

   if(!s->rank_tbl)
       tw_error(TW_LOC, "\n Hash table not initialized! ");

746 747 748 749 750 751
   s->terminal_msgs = 
       (terminal_message_list**)malloc(1*sizeof(terminal_message_list*));
   s->terminal_msgs_tail = 
       (terminal_message_list**)malloc(1*sizeof(terminal_message_list*));
   s->terminal_msgs[0] = NULL;
   s->terminal_msgs_tail[0] = NULL;
752
   s->terminal_length = 0;
753
   s->in_send_loop = 0;
754
   s->issueIdle = 0;
755 756 757 758 759 760 761 762 763

   dragonfly_collective_init(s, lp);
   return;
}


/* sets up the router virtual channels, global channels, 
 * local channels, compute node channels */
void router_setup(router_state * r, tw_lp * lp)
764
{
765
    uint32_t h1 = 0, h2 = 0; 
766
    bj_hashlittle2(LP_METHOD_NM_ROUT, strlen(LP_METHOD_NM_ROUT), &h1, &h2);
767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790
    router_magic_num = h1 + h2;
    
    char anno[MAX_NAME_LENGTH];
    codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, NULL,
            &mapping_type_id, anno, &mapping_rep_id, &mapping_offset);

    if (anno[0] == '\0'){
        r->anno = NULL;
        r->params = &all_params[num_params-1];
    } else{
        r->anno = strdup(anno);
        int id = configuration_get_annotation_index(anno, anno_map);
        r->params = &all_params[id];
    }

    // shorthand
    const dragonfly_param *p = r->params;

   r->router_id=mapping_rep_id + mapping_offset;
   r->group_id=r->router_id/p->num_routers;

   r->global_channel = (int*)malloc(p->num_global_channels * sizeof(int));
   r->next_output_available_time = (tw_stime*)malloc(p->radix * sizeof(tw_stime));
   r->cur_hist_start_time = (tw_stime*)malloc(p->radix * sizeof(tw_stime));
791
   r->link_traffic = (int64_t*)malloc(p->radix * sizeof(int64_t));
792
   r->link_traffic_sample = (int64_t*)malloc(p->radix * sizeof(int64_t));
793 794 795 796 797 798 799 800 801 802 803 804 805
   r->cur_hist_num = (int*)malloc(p->radix * sizeof(int));
   r->prev_hist_num = (int*)malloc(p->radix * sizeof(int));
   
   r->vc_occupancy = (int**)malloc(p->radix * sizeof(int*));
   r->in_send_loop = (int*)malloc(p->radix * sizeof(int));
   r->pending_msgs = 
    (terminal_message_list***)malloc(p->radix * sizeof(terminal_message_list**));
   r->pending_msgs_tail = 
    (terminal_message_list***)malloc(p->radix * sizeof(terminal_message_list**));
   r->queued_msgs = 
    (terminal_message_list***)malloc(p->radix * sizeof(terminal_message_list**));
   r->queued_msgs_tail = 
    (terminal_message_list***)malloc(p->radix * sizeof(terminal_message_list**));
806
   r->queued_count = (int*)malloc(p->radix * sizeof(int));
807 808
   r->last_buf_full = (tw_stime*)malloc(p->radix * sizeof(tw_stime));
   r->busy_time = (tw_stime*)malloc(p->radix * sizeof(tw_stime));
809
   r->busy_time_sample = (tw_stime*)malloc(p->radix * sizeof(tw_stime));
810

811
   rc_stack_create(&r->st);
812
   for(int i=0; i < p->radix; i++)
813 814
    {
       // Set credit & router occupancy
815 816
    r->last_buf_full[i] = 0.0;
    r->busy_time[i] = 0.0;
817
    r->busy_time_sample[i] = 0.0;
818 819
	r->next_output_available_time[i]=0;
	r->cur_hist_start_time[i] = 0;
820
    r->link_traffic[i]=0;
821
    r->link_traffic_sample[i] = 0;
822 823
	r->cur_hist_num[i] = 0;
	r->prev_hist_num[i] = 0;
824
    r->queued_count[i] = 0;    
825 826 827 828 829 830 831 832 833 834
    r->in_send_loop[i] = 0;
    r->vc_occupancy[i] = (int*)malloc(p->num_vcs * sizeof(int));
    r->pending_msgs[i] = (terminal_message_list**)malloc(p->num_vcs * 
        sizeof(terminal_message_list*));
    r->pending_msgs_tail[i] = (terminal_message_list**)malloc(p->num_vcs * 
        sizeof(terminal_message_list*));
    r->queued_msgs[i] = (terminal_message_list**)malloc(p->num_vcs * 
        sizeof(terminal_message_list*));
    r->queued_msgs_tail[i] = (terminal_message_list**)malloc(p->num_vcs * 
        sizeof(terminal_message_list*));
835
        for(int j = 0; j < p->num_vcs; j++) {
836 837 838 839 840 841 842 843 844
            r->vc_occupancy[i][j] = 0;
            r->pending_msgs[i][j] = NULL;
            r->pending_msgs_tail[i][j] = NULL;
            r->queued_msgs[i][j] = NULL;
            r->queued_msgs_tail[i][j] = NULL;
        }
    }

#if DEBUG == 1
845
//   printf("\n LP ID %d VC occupancy radix %d Router %d is connected to ", lp->gid, p->radix, r->router_id);
846 847 848
#endif 
   //round the number of global channels to the nearest even number
#if USE_DIRECT_SCHEME
849
       int first = r->router_id % p->num_routers;
850
       for(int i=0; i < p->num_global_channels; i++)
851
        {
852
            int target_grp = first;
853 854 855
            if(target_grp == r->group_id) {
                target_grp = p->num_groups - 1;
            }
856
            int my_pos = r->group_id % p->num_routers;
857 858 859 860 861 862 863
            if(r->group_id == p->num_groups - 1) {
                my_pos = target_grp % p->num_routers;
            }
            r->global_channel[i] = target_grp * p->num_routers + my_pos;
            first += p->num_routers;
        }
#else
864 865 866
   int router_offset = (r->router_id % p->num_routers) * 
    (p->num_global_channels / 2) + 1;
   for(int i=0; i < p->num_global_channels; i++)
867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894
    {
      if(i % 2 != 0)
          {
             r->global_channel[i]=(r->router_id + (router_offset * p->num_routers))%p->total_routers;
             router_offset++;
          }
          else
           {
             r->global_channel[i]=r->router_id - ((router_offset) * p->num_routers);
           }
        if(r->global_channel[i]<0)
         {
           r->global_channel[i]=p->total_routers+r->global_channel[i]; 
	 }
#if DEBUG == 1
    printf("\n channel %d ", r->global_channel[i]);
#endif 
    }
#endif

#if DEBUG == 1
   printf("\n");
#endif
   return;
}	


/* dragonfly packet event , generates a dragonfly packet on the compute node */
895 896 897 898 899 900 901 902 903 904
static tw_stime dragonfly_packet_event(
        model_net_request const * req,
        uint64_t message_offset,
        uint64_t packet_size,
        tw_stime offset,
        mn_sched_params const * sched_params,
        void const * remote_event,
        void const * self_event,
        tw_lp *sender,
        int is_last_pckt)
905
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
906 907
    (void)message_offset;
    (void)sched_params;
908 909 910 911 912
    tw_event * e_new;
    tw_stime xfer_to_nic_time;
    terminal_message * msg;
    char* tmp_ptr;

913 914 915
    xfer_to_nic_time = codes_local_latency(sender); 
    //e_new = tw_event_new(sender->gid, xfer_to_nic_time+offset, sender);
    //msg = tw_event_data(e_new);
916 917
    e_new = model_net_method_event_new(sender->gid, xfer_to_nic_time+offset,
            sender, DRAGONFLY, (void**)&msg, (void**)&tmp_ptr);
918 919
    strcpy(msg->category, req->category);
    msg->final_dest_gid = req->final_dest_lp;
920
    msg->total_size = req->msg_size;
921
    msg->sender_lp=req->src_lp;
922
    msg->sender_mn_lp = sender->gid;
923
    msg->packet_size = packet_size;
924
    msg->travel_start_time = tw_now(sender);
925 926 927
    msg->remote_event_size_bytes = 0;
    msg->local_event_size_bytes = 0;
    msg->type = T_GENERATE;
928
    msg->dest_terminal_id = req->dest_mn_lp;
929
    msg->message_id = req->msg_id;
930 931
    msg->is_pull = req->is_pull;
    msg->pull_size = req->pull_size;
932
    msg->magic = terminal_magic_num; 
933 934
    msg->msg_start_time = req->msg_start_time;

935 936
    if(is_last_pckt) /* Its the last packet so pass in remote and local event information*/
      {
937
	if(req->remote_event_size > 0)
938
	 {
939 940 941
		msg->remote_event_size_bytes = req->remote_event_size;
		memcpy(tmp_ptr, remote_event, req->remote_event_size);
		tmp_ptr += req->remote_event_size;
942
	}
943
	if(req->self_event_size > 0)
944
	{
945 946 947
		msg->local_event_size_bytes = req->self_event_size;
		memcpy(tmp_ptr, self_event, req->self_event_size);
		tmp_ptr += req->self_event_size;
948 949
	}
     }
950
	   //printf("\n dragonfly remote event %d local event %d last packet %d %lf ", msg->remote_event_size_bytes, msg->local_event_size_bytes, is_last_pckt, xfer_to_nic_time);
951
    tw_event_send(e_new);
952
    return xfer_to_nic_time;
953 954 955 956 957 958 959 960 961
}

/* dragonfly packet event reverse handler */
static void dragonfly_packet_event_rc(tw_lp *sender)
{
	  codes_local_latency_reverse(sender);
	    return;
}

962
/* given two group IDs, find the router of the src_gid that connects to the dest_gid*/
963
tw_lpid getRouterFromGroupID(int dest_gid, 
964
		    int src_gid,
965
		    int num_routers,
966
            int total_groups)
967
{
968 969 970 971 972 973 974
#if USE_DIRECT_SCHEME
  int dest = dest_gid;
  if(dest == total_groups - 1) {
      dest = src_gid;
  }
  return src_gid * num_routers + (dest % num_routers);
#else
975 976 977
  int group_begin = src_gid * num_routers;
  int group_end = (src_gid * num_routers) + num_routers-1;
  int offset = (dest_gid * num_routers - group_begin) / num_routers;
978
  
979 980
  if((dest_gid * num_routers) < group_begin)
    offset = (group_begin - dest_gid * num_routers) / num_routers; // take absolute value
981
  
982 983
  int half_channel = num_routers / 4;
  int index = (offset - 1)/(half_channel * num_routers);
984
  
985
  offset=(offset - 1) % (half_channel * num_routers);
986 987

  // If the destination router is in the same group
988
  tw_lpid router_id;
989 990 991 992 993 994 995

  if(index % 2 != 0)
    router_id = group_end - (offset / half_channel); // start from the end
  else
    router_id = group_begin + (offset / half_channel);

  return router_id;
996
#endif
997 998 999
}	

/*When a packet is sent from the current router and a buffer slot becomes available, a credit is sent back to schedule another packet event*/
Jonathan Jenkins's avatar
Jonathan Jenkins committed
1000
void router_credit_send(router_state * s, terminal_message * msg, 
1001
  tw_lp * lp, int sq) {
1002 1003 1004 1005
  tw_event * buf_e;
  tw_stime ts;
  terminal_message * buf_msg;

1006
  int dest = 0,  type = R_BUFFER;
1007
  int is_terminal = 0;
1008

1009
  const dragonfly_param *p = s->params;
1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022
 
  // Notify sender terminal about available buffer space
  if(msg->last_hop == TERMINAL) {
    dest = msg->src_terminal_id;
    type = T_BUFFER;
    is_terminal = 1;
  } else if(msg->last_hop == GLOBAL) {
    dest = msg->intm_lp_id;
  } else if(msg->last_hop == LOCAL) {
    dest = msg->intm_lp_id;
  } else
    printf("\n Invalid message type");

1023
  ts = g_tw_lookahead + p->credit_delay +  tw_rand_unif(lp->rng);
1024
	
1025 1026 1027 1028 1029
  if (is_terminal) {
    buf_e = model_net_method_event_new(dest, ts, lp, DRAGONFLY, 
      (void**)&buf_msg, NULL);
    buf_msg->magic = terminal_magic_num;
  } else {
1030 1031
    buf_e = model_net_method_event_new(dest, ts, lp, DRAGONFLY_ROUTER,
            (void**)&buf_msg, NULL);
1032 1033 1034 1035 1036 1037 1038
    buf_msg->magic = router_magic_num;
  }
 
  if(sq == -1) {
    buf_msg->vc_index = msg->vc_index;
    buf_msg->output_chan = msg->output_chan;
  } else {
1039
    buf_msg->vc_index = msg->saved_vc;
1040 1041 1042 1043
    buf_msg->output_chan = msg->saved_channel;
  }
  
  buf_msg->type = type;
1044

1045 1046
  tw_event_send(buf_e);
  return;
1047 1048
}

1049
void packet_generate_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp)
1050
{
1051 1052 1053 1054
        
   s->packet_gen--;
   packet_gen--;
   
1055
   tw_rand_reverse_unif(lp->rng);
1056

1057 1058 1059
   int num_chunks = msg->packet_size/s->params->chunk_size;
   if(msg->packet_size % s->params->chunk_size)
       num_chunks++;
1060

1061
   if(!num_chunks)
1062
       num_chunks = 1;
1063

1064 1065 1066 1067
   int i;
   for(i = 0; i < num_chunks; i++) {
        delete_terminal_message_list(return_tail(s->terminal_msgs, 
          s->terminal_msgs_tail, 0));
1068
        s->terminal_length -= s->params->chunk_size;
1069 1070
   }
    if(bf->c5) {
1071
        codes_local_latency_reverse(lp);
1072 1073
        s->in_send_loop = 0;
    }
1074 1075
      if(bf->c11) {
        s->issueIdle = 0;
1076
        s->last_buf_full = msg->saved_busy_time;
1077
      }
1078 1079
     struct mn_stats* stat;
     stat = model_net_find_stats(msg->category, s->dragonfly_stats_array);
1080 1081 1082 1083
     stat->send_count--;
     stat->send_bytes -= msg->packet_size;
     stat->send_time -= (1/s->params->cn_bandwidth) * msg->packet_size;
}
1084

1085
/* generates packet at the current dragonfly compute node */
1086 1087
void packet_generate(terminal_state * s, tw_bf * bf, terminal_message * msg, 
  tw_lp * lp) {
1088 1089
  packet_gen++;
  s->packet_gen++;
1090

1091
  tw_stime ts, nic_ts;
1092

1093
  assert(lp->gid != msg->dest_terminal_id);
1094
  const dragonfly_param *p = s->params;
1095

Jonathan Jenkins's avatar
Jonathan Jenkins committed
1096 1097
  int total_event_size;
  uint64_t num_chunks = msg->packet_size / p->chunk_size;
1098 1099
  if (msg->packet_size % s->params->chunk_size) 
      num_chunks++;
1100 1101 1102 1103

  if(!num_chunks)
    num_chunks = 1;

1104 1105
  nic_ts = g_tw_lookahead + s->params->cn_delay * msg->packet_size + tw_rand_unif(lp->rng);
  
1106
  msg->packet_ID = lp->gid + g_tw_nlp * s->packet_counter;
1107 1108 1109 1110
  msg->my_N_hop = 0;
  msg->my_l_hop = 0;
  msg->my_g_hop = 0;
  msg->intm_group_id = -1;
1111

1112
  //if(msg->dest_terminal_id == TRACK)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
1113
  if(msg->packet_ID == LLU(TRACK_PKT))
Jonathan Jenkins's avatar
Jonathan Jenkins committed
1114 1115 1116
    printf("\n Packet %llu generated at terminal %d dest %llu size %llu num chunks %llu ", 
            msg->packet_ID, s->terminal_id, LLU(msg->dest_terminal_id),
            LLU(msg->packet_size), LLU(num_chunks));
1117

Jonathan Jenkins's avatar
Jonathan Jenkins committed
1118
  for(uint64_t i = 0; i < num_chunks; i++)
1119 1120 1121
  {
    terminal_message_list *cur_chunk = (terminal_message_list*)malloc(
      sizeof(terminal_message_list));
1122
    msg->origin_router_id = s->router_id;
1123
    init_terminal_message_list(cur_chunk, msg);
1124
  
1125

1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139
    if(msg->remote_event_size_bytes + msg->local_event_size_bytes > 0) {
      cur_chunk->event_data = (char*)malloc(
          msg->remote_event_size_bytes + msg->local_event_size_bytes);
    }
    
    void * m_data_src = model_net_method_get_edata(DRAGONFLY, msg);
    if (msg->remote_event_size_bytes){
      memcpy(cur_chunk->event_data, m_data_src, msg->remote_event_size_bytes);
    }
    if (msg->local_event_size_bytes){ 
      m_data_src = (char*)m_data_src + msg->remote_event_size_bytes;
      memcpy((char*)cur_chunk->event_data + msg->remote_event_size_bytes, 
          m_data_src, msg->local_event_size_bytes);
    }
1140

1141
    cur_chunk->msg.chunk_id = i;
1142
    cur_chunk->msg.origin_router_id = s->router_id;
1143 1144
    append_to_terminal_message_list(s->terminal_msgs, s->terminal_msgs_tail,
      0, cur_chunk);
1145
    s->terminal_length += s->params->chunk_size;
1146
  }
1147

1148 1149 1150 1151 1152
  if(s->terminal_length < 2 * s->params->cn_vc_size) {
    model_net_method_idle_event(nic_ts, 0, lp);
  } else {
    bf->c11 = 1;
    s->issueIdle = 1;
1153 1154
    msg->saved_busy_time = s->last_buf_full;
    s->last_buf_full = tw_now(lp);
1155
  }
1156
  
1157 1158
  if(s->in_send_loop == 0) {
    bf->c5 = 1;
1159
    ts = codes_local_latency(lp);
1160 1161 1162 1163 1164 1165 1166 1167
    terminal_message *m;
    tw_event* e = model_net_method_event_new(lp->gid, ts, lp, DRAGONFLY, 
      (void**)&m, NULL);
    m->type = T_SEND;
    m->magic = terminal_magic_num;
    s->in_send_loop = 1;
    tw_event_send(e);
  }
1168

1169 1170 1171 1172 1173 1174 1175 1176
  total_event_size = model_net_get_msg_sz(DRAGONFLY) + 
      msg->remote_event_size_bytes + msg->local_event_size_bytes;
  mn_stats* stat;
  stat = model_net_find_stats(msg->category, s->dragonfly_stats_array);
  stat->send_count++;
  stat->send_bytes += msg->packet_size;
  stat->send_time += (1/p->cn_bandwidth) * msg->packet_size;
  if(stat->max_event_size < total_event_size)
1177
	  stat->max_event_size = total_event_size;
1178

1179 1180 1181
  return;
}

1182 1183
void packet_send_rc(terminal_state * s, tw_bf * bf, terminal_message * msg,
        tw_lp * lp)
1184
{
1185 1186
      if(bf->c1) {
        s->in_send_loop = 1;
1187
        s->last_buf_full = msg->saved_busy_time;
1188 1189 1190 1191
        return;
      }
      
      tw_rand_reverse_unif(lp->rng);
1192
      s->terminal_available_time = msg->saved_available_time;
1193 1194 1195 1196
      if(bf->c2) {
        codes_local_latency_reverse(lp);
      }
     
1197
      s->terminal_length += s->params->chunk_size;
1198 1199
      s->packet_counter--;
      s->vc_occupancy[0] -= s->params->chunk_size;
1200

1201 1202 1203 1204
      terminal_message_list* cur_entry = rc_stack_pop(s->st);

      prepend_to_terminal_message_list(s->terminal_msgs, 
              s->terminal_msgs_tail, 0, cur_entry);
1205 1206 1207 1208 1209 1210
      if(bf->c3) {
        tw_rand_reverse_unif(lp->rng);
      }
      if(bf->c4) {
        s->in_send_loop = 1;
      }
1211
      /*if(bf->c5)
1212
      {
1213
          codes_local_latency_reverse(lp);
1214
          s->issueIdle = 1;
1215
      }*/
1216
      return;
1217 1218 1219 1220 1221 1222 1223 1224 1225
}
/* sends the packet from the current dragonfly compute node to the attached router */
void packet_send(terminal_state * s, tw_bf * bf, terminal_message * msg, 
  tw_lp * lp) {
  
  tw_stime ts;
  tw_event *e;
  terminal_message *m;
  tw_lpid router_id;
1226

1227 1228 1229 1230 1231 1232
  terminal_message_list* cur_entry = s->terminal_msgs[0];

  if(s->vc_occupancy[0] + s->params->chunk_size > s->params->cn_vc_size 
      || cur_entry == NULL) {
    bf->c1 = 1;
    s->in_send_loop = 0;
1233 1234 1235

    msg->saved_busy_time = s->last_buf_full;
    s->last_buf_full = tw_now(lp);
1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246
    return;
  }

  msg->saved_available_time = s->terminal_available_time;
  ts = g_tw_lookahead + s->params->cn_delay + tw_rand_unif(lp->rng);
  s->terminal_available_time = maxd(s->terminal_available_time, tw_now(lp));
  s->terminal_available_time += ts;

  //TODO: be annotation-aware
  codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, NULL,
      &mapping_type_id, NULL, &mapping_rep_id, &mapping_offset);
1247
  codes_mapping_get_lp_id(lp_group_name, LP_CONFIG_NM_ROUT, NULL, 1,
1248 1249
      s->router_id, 0, &router_id);
  // we are sending an event to the router, so no method_event here
1250 1251 1252
  void * remote_event;
  e = model_net_method_event_new(router_id, s->terminal_available_time - tw_now(lp), lp,
          DRAGONFLY_ROUTER, (void**)&m, &remote_event);
1253 1254
  memcpy(m, &cur_entry->msg, sizeof(terminal_message));
  if (m->remote_event_size_bytes){
1255
    memcpy(remote_event, cur_entry->event_data, m->remote_event_size_bytes);
1256
  }
1257

1258 1259 1260 1261 1262 1263 1264 1265 1266 1267
  m->type = R_ARRIVE;
  m->src_terminal_id = lp->gid;
  m->vc_index = 0;
  m->last_hop = TERMINAL;
  m->intm_group_id = -1;
  m->magic = router_magic_num;
  m->path_type = -1;
  m->local_event_size_bytes = 0;
  tw_event_send(e);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
1268
  uint64_t num_chunks = cur_entry->msg.packet_size/s->params->chunk_size;
1269 1270 1271
  if(cur_entry->msg.packet_size % s->params->chunk_size)
    num_chunks++;

1272 1273 1274
  if(!num_chunks)
      num_chunks = 1;

1275 1276 1277 1278 1279
  if(cur_entry->msg.chunk_id == num_chunks - 1 && 
      (cur_entry->msg.local_event_size_bytes > 0)) {
    bf->c2 = 1;
    ts = codes_local_latency(lp); 
    tw_event *e_new = tw_event_new(cur_entry->msg.sender_lp, ts, lp);
1280
    void * m_new = tw_event_data(e_new);
1281 1282 1283 1284 1285 1286 1287 1288
    void *local_event = (char*)cur_entry->event_data + 
      cur_entry->msg.remote_event_size_bytes;
    memcpy(m_new, local_event, cur_entry->msg.local_event_size_bytes);
    tw_event_send(e_new);
  }
  s->packet_counter++;
  s->vc_occupancy[0] += s->params->chunk_size;
  cur_entry = return_head(s->terminal_msgs, s->terminal_msgs_tail, 0); 
1289
  rc_stack_push(lp, cur_entry, free, s->st);
1290
  s->terminal_length -= s->params->chunk_size;
1291 1292

  cur_entry = s->terminal_msgs[0];
1293 1294

  /* if there is another packet inline then schedule another send event */
1295 1296 1297
  if(cur_entry != NULL &&
    s->vc_occupancy[0] + s->params->chunk_size <= s->params->cn_vc_size) {
    bf->c3 = 1;
1298
    terminal_message *m_new;
1299
    ts = g_tw_lookahead + s->params->cn_delay + tw_rand_unif(lp->rng);
1300 1301 1302 1303
    e = model_net_method_event_new(lp->gid, ts, lp, DRAGONFLY, 
      (void**)&m_new, NULL);
    m_new->type = T_SEND;
    m_new->magic = terminal_magic_num;
1304 1305
    tw_event_send(e);
  } else {