model-net-mpi-replay.c 86.9 KB
Newer Older
1 2 3 4 5 6 7
/*
 * Copyright (C) 2014 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */
#include <ross.h>
#include <inttypes.h>
8
#include <sys/stat.h>
9 10 11 12 13 14 15 16

#include "codes/codes-workload.h"
#include "codes/codes.h"
#include "codes/configuration.h"
#include "codes/codes_mapping.h"
#include "codes/model-net.h"
#include "codes/rc-stack.h"
#include "codes/quicklist.h"
17
#include "codes/quickhash.h"
18
#include "codes/codes-jobmap.h"
19

20
/* turning on track lp will generate a lot of output messages */
21
#define MN_LP_NM "modelnet_dragonfly_custom"
22
#define CONTROL_MSG_SZ 64
23
#define TRACE -1
24
#define MAX_WAIT_REQS 512
25
#define CS_LP_DBG 1
26 27
#define RANK_HASH_TABLE_SZ 2000
#define NW_LP_NM "nw-lp"
28 29 30
#define lprintf(_fmt, ...) \
        do {if (CS_LP_DBG) printf(_fmt, __VA_ARGS__);} while (0)
#define MAX_STATS 65536
31
#define MAX_MSGS 50 
32

33 34 35
static int msg_size_hash_compare(
            void *key, struct qhash_head *link);

36
/* NOTE: Message tracking works in sequential mode only! */
Xin Wang's avatar
Xin Wang committed
37
static int debug_cols = 0;
Xin Wang's avatar
Xin Wang committed
38
static int enable_col_overhead = 0;
Xin Wang's avatar
Xin Wang committed
39

40 41 42 43 44
/* Turning on this option slows down optimistic mode substantially. Only turn
 * on if you get issues with wait-all completion with traces. */
static int preserve_wait_ordering = 0;
static int enable_msg_tracking = 0;
static int is_synthetic = 0;
45
tw_lpid TRACK_LP = -1;
46

Xin Wang's avatar
Xin Wang committed
47
static double total_syn_data = 0;
48
static int unmatched = 0;
Xin Wang's avatar
Xin Wang committed
49

50 51 52 53 54
char workload_type[128];
char workload_file[8192];
char offset_file[8192];
static int wrkld_id;
static int num_net_traces = 0;
55
static int num_dumpi_traces = 0;
Xin Wang's avatar
Xin Wang committed
56
static uint64_t EAGER_THRESHOLD = 8192;
57

58
static int alloc_spec = 0;
59 60
static tw_stime self_overhead = 10.0;
static tw_stime mean_interval = 100000;
Xin Wang's avatar
Xin Wang committed
61
static int payload_sz = 1024;
62 63 64

/* Doing LP IO*/
static char lp_io_dir[256] = {'\0'};
65
static char sampling_dir[32] = {'\0'};
66 67 68 69
static lp_io_handle io_handle;
static unsigned int lp_io_use_suffix = 0;
static int do_lp_io = 0;

70 71 72 73
/* variables for loading multiple applications */
char workloads_conf_file[8192];
char alloc_file[8192];
int num_traces_of_job[5];
Xin's avatar
Xin committed
74 75
tw_stime soft_delay_mpi = 1250;
tw_stime nic_delay = 250;
76
tw_stime copy_per_byte_eager = 0.55;
77 78
tw_stime col_overhead_per_byte = 0.7;
tw_stime col_soft_overhead = 6600;
79 80 81 82 83
char file_name_of_job[5][8192];

struct codes_jobmap_ctx *jobmap_ctx;
struct codes_jobmap_params_list jobmap_p;

84 85
/* Variables for Cortex Support */
/* Matthieu's additions start */
86
#ifdef ENABLE_CORTEX_PYTHON
87 88 89
static char cortex_file[512] = "\0";
static char cortex_class[512] = "\0";
static char cortex_gen[512] = "\0";
90
#endif
91 92
/* Matthieu's additions end */

93 94
typedef struct nw_state nw_state;
typedef struct nw_message nw_message;
95
typedef int dumpi_req_id;
96 97

static int net_id = 0;
Xin Wang's avatar
Xin Wang committed
98
static float noise = 2.0;
99 100 101
static int num_nw_lps = 0, num_mpi_lps = 0;

static int num_syn_clients;
102

103
FILE * workload_log = NULL;
104
FILE * msg_size_log = NULL;
105 106 107 108
FILE * workload_agg_log = NULL;
FILE * workload_meta_log = NULL;

static uint64_t sample_bytes_written = 0;
109

110 111 112
long long num_bytes_sent=0;
long long num_bytes_recvd=0;

113 114 115
long long num_syn_bytes_sent = 0;
long long num_syn_bytes_recvd = 0;

116 117 118 119 120 121
double max_time = 0,  max_comm_time = 0, max_wait_time = 0, max_send_time = 0, max_recv_time = 0;
double avg_time = 0, avg_comm_time = 0, avg_wait_time = 0, avg_send_time = 0, avg_recv_time = 0;


/* runtime option for disabling computation time simulation */
static int disable_delay = 0;
122 123 124
static int enable_sampling = 0;
static double sampling_interval = 5000000;
static double sampling_end_time = 3000000000;
125
static int enable_debug = 0;
126

127 128 129
/* set group context */
struct codes_mctx group_ratio;

130
/* MPI_OP_GET_NEXT is for getting next MPI operation when the previous operation completes.
131
* MPI_SEND_ARRIVED is issued when a MPI message arrives at its destination (the message is transported by model-net and an event is invoked when it arrives.
132 133 134
* MPI_SEND_POSTED is issued when a MPI message has left the source LP (message is transported via model-net). */
enum MPI_NW_EVENTS
{
Xin Wang's avatar
Xin Wang committed
135 136
	MPI_OP_GET_NEXT=1,
	MPI_SEND_ARRIVED,
137
    MPI_SEND_ARRIVED_CB, // for tracking message times on sender
Xin Wang's avatar
Xin Wang committed
138
	MPI_SEND_POSTED,
139 140 141 142 143 144
    MPI_REND_ARRIVED,
    MPI_REND_ACK_ARRIVED,
    CLI_BCKGND_FIN,
    CLI_BCKGND_ARRIVE,
    CLI_BCKGND_GEN,
    CLI_NBR_FINISH,
145 146
};

147 148 149 150 151 152 153 154 155
/* type of synthetic traffic */
enum TRAFFIC
{
    UNIFORM = 1, /* sends message to a randomly selected node */
    NEAREST_NEIGHBOR = 2, /* sends message to the next node (potentially connected to the same router) */
    ALLTOALL = 3, /* sends message to all other nodes */
    STENCIL = 4  /* sends message to 4 nearby neighbors */
};

156 157 158 159
struct mpi_workload_sample
{
    /* Sampling data */
    int nw_id;
160
    int app_id;
161 162 163 164 165
    unsigned long num_sends_sample;
    unsigned long num_bytes_sample;
    unsigned long num_waits_sample;
    double sample_end_time;
};
166 167 168 169 170 171 172
/* stores pointers of pending MPI operations to be matched with their respective sends/receives. */
struct mpi_msgs_queue
{
    int op_type;
    int tag;
    int source_rank;
    int dest_rank;
173
    uint64_t num_bytes;
174
    tw_stime req_init_time;
Xin Wang's avatar
Xin Wang committed
175
	dumpi_req_id req_id;
176 177 178 179 180 181
    struct qlist_head ql;
};

/* stores request IDs of completed MPI operations (Isends or Irecvs) */
struct completed_requests
{
182
	int req_id;
183
    struct qlist_head ql;
184
    int index;
185 186 187 188 189 190
};

/* for wait operations, store the pending operation and number of completed waits so far. */
struct pending_waits
{
    int op_type;
191
    int req_ids[MAX_WAIT_REQS];
Xin Wang's avatar
Xin Wang committed
192 193
	int num_completed;
	int count;
194
    tw_stime start_time;
195 196 197
    struct qlist_head ql;
};

198 199 200 201 202 203 204 205 206
struct msg_size_info
{
    int64_t msg_size;
    int num_msgs;
    tw_stime agg_latency;
    tw_stime avg_latency;
    struct qhash_head * hash_link;
    struct qlist_head ql; 
};
207 208 209 210 211 212 213
typedef struct mpi_msgs_queue mpi_msgs_queue;
typedef struct completed_requests completed_requests;
typedef struct pending_waits pending_waits;

/* state of the network LP. It contains the pointers to send/receive lists */
struct nw_state
{
Xin Wang's avatar
Xin Wang committed
214 215 216
	long num_events_per_lp;
	tw_lpid nw_id;
	short wrkld_end;
217 218
    int app_id;
    int local_rank;
219
    int synthetic_pattern;
Xin Wang's avatar
Xin Wang committed
220
    int is_collective;
221

222 223 224
    int is_finished;
    int neighbor_completed;

225
    struct rc_stack * processed_ops;
226
    struct rc_stack * processed_wait_op;
227
    struct rc_stack * matched_reqs;
228
//    struct rc_stack * indices;
229 230

    /* count of sends, receives, collectives and delays */
Xin Wang's avatar
Xin Wang committed
231 232 233 234 235 236 237
	unsigned long num_sends;
	unsigned long num_recvs;
	unsigned long num_cols;
	unsigned long num_delays;
	unsigned long num_wait;
	unsigned long num_waitall;
	unsigned long num_waitsome;
238

239

Xin Wang's avatar
Xin Wang committed
240 241
	/* time spent by the LP in executing the app trace*/
	double start_time;
242 243 244 245 246 247 248 249

    double col_time;

    double reduce_time;
    int num_reduce;

    double all_reduce_time;
    int num_all_reduce;
Xin Wang's avatar
Xin Wang committed
250

251 252 253 254
    double col_latency[MAX_MSGS];
    uint64_t col_msizes[MAX_MSGS];
    int num_msg_sizes;

Xin Wang's avatar
Xin Wang committed
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269
	double elapsed_time;
	/* time spent in compute operations */
	double compute_time;
	/* time spent in message send/isend */
	double send_time;
	/* time spent in message receive */
	double recv_time;
	/* time spent in wait operation */
	double wait_time;
	/* FIFO for isend messages arrived on destination */
	struct qlist_head arrival_queue;
	/* FIFO for irecv messages posted but not yet matched with send operations */
	struct qlist_head pending_recvs_queue;
	/* List of completed send/receive requests */
	struct qlist_head completed_reqs;
270

271
    tw_stime cur_interval_end;
272
    
273 274
    /* Pending wait operation */
    struct pending_waits * wait_op;
275

276 277 278 279 280 281
    /* Message size latency information */
    struct qhash_table * msg_sz_table;
    struct qlist_head msg_sz_list;

    /* quick hash for maintaining message latencies */

282 283 284
    unsigned long num_bytes_sent;
    unsigned long num_bytes_recvd;

285 286 287
    unsigned long syn_data;
    unsigned long gen_data;
    
288 289 290 291
    /* For sampling data */
    int sampling_indx;
    int max_arr_size;
    struct mpi_workload_sample * mpi_wkld_samples;
292
    char output_buf[512];
293
    char col_stats[100*MAX_MSGS];
Xin Wang's avatar
Xin Wang committed
294

295 296 297 298
};

/* data for handling reverse computation.
* saved_matched_req holds the request ID of matched receives/sends for wait operations.
299
* ptr_match_op holds the matched MPI operation which are removed from the queues when a send is matched with the receive in forward event handler.
300 301 302
* network event being sent. op is the MPI operation issued by the network workloads API. rv_data holds the data for reverse computation (TODO: Fill this data structure only when the simulation runs in optimistic mode). */
struct nw_message
{
Xin Wang's avatar
Xin Wang committed
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
   // forward message handler
   int msg_type;
   int op_type;
   model_net_event_return event_rc;

   struct
   {
       tw_lpid src_rank;
       int dest_rank;
       uint64_t num_bytes;
       int num_matched;
       int data_type;
       double sim_start_time;
       // for callbacks - time message was received
       double msg_send_time;
318
       int req_id;
Xin Wang's avatar
Xin Wang committed
319 320 321 322 323 324 325 326 327 328 329 330 331
       int tag;
       int app_id;
       int found_match;
       short wait_completed;
   } fwd;
   struct
   {
       double saved_send_time;
       double saved_recv_time;
       double saved_wait_time;
       double saved_delay;
       int32_t saved_num_bytes;
   } rc;
332 333
};

334
static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, mpi_msgs_queue * mpi_op);
335 336

static void send_ack_back_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp);
337 338
/* executes MPI isend and send operations */
static void codes_exec_mpi_send(
339
        nw_state* s, tw_bf * bf, nw_message * m, tw_lp* lp, struct codes_workload_op * mpi_op, int is_rend);
340 341
/* execute MPI irecv operation */
static void codes_exec_mpi_recv(
342
        nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, struct codes_workload_op * mpi_op);
343 344
/* reverse of mpi recv function. */
static void codes_exec_mpi_recv_rc(
345
        nw_state* s, tw_bf * bf, nw_message* m, tw_lp* lp);
346 347
/* execute the computational delay */
static void codes_exec_comp_delay(
348
        nw_state* s, nw_message * m, tw_lp* lp, struct codes_workload_op * mpi_op);
349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
/* gets the next MPI operation from the network-workloads API. */
static void get_next_mpi_operation(
        nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp);
/* reverse handler of get next mpi operation. */
static void get_next_mpi_operation_rc(
        nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp);
/* Makes a call to get_next_mpi_operation. */
static void codes_issue_next_event(tw_lp* lp);
/* reverse handler of next operation */
static void codes_issue_next_event_rc(tw_lp* lp);


///////////////////// HELPER FUNCTIONS FOR MPI MESSAGE QUEUE HANDLING ///////////////
/* upon arrival of local completion message, inserts operation in completed send queue */
/* upon arrival of an isend operation, updates the arrival queue of the network */
364 365 366 367 368 369 370 371
static void update_completed_queue(
        nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp, dumpi_req_id req_id);
/* reverse of the above function */
static void update_completed_queue_rc(
        nw_state*s,
        tw_bf * bf,
        nw_message * m,
        tw_lp * lp);
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386
static void update_arrival_queue(
        nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
/* reverse of the above function */
static void update_arrival_queue_rc(
        nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
/* callback to a message sender for computing message time */
static void update_message_time(
        nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
/* reverse for computing message time */
static void update_message_time_rc(
        nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);

/* conversion from seconds to eanaoseconds */
static tw_stime s_to_ns(tw_stime ns);

387
/*static void update_message_size_rc(
388 389 390 391
        struct nw_state * ns,
        tw_lp * lp,
        tw_bf * bf,
        struct nw_message * m)
392
{*/
393
/*TODO: Complete reverse handler */
394
/*    (void)ns;
395 396 397
    (void)lp;
    (void)bf;
    (void)m;
398
}*/
399 400 401 402 403 404 405 406 407 408
/* update the message size */
static void update_message_size(
        struct nw_state * ns,
        tw_lp * lp,
        tw_bf * bf,
        struct nw_message * m,
        mpi_msgs_queue * qitem,
        int is_eager,
        int is_send)
{
Xin Wang's avatar
Xin Wang committed
409 410
            (void)bf;
            (void)is_eager;
411

Xin Wang's avatar
Xin Wang committed
412 413 414 415 416 417 418
            struct qhash_head * hash_link = NULL;
            tw_stime msg_init_time = qitem->req_init_time;
        
            if(!ns->msg_sz_table)
                ns->msg_sz_table = qhash_init(msg_size_hash_compare, quickhash_64bit_hash, RANK_HASH_TABLE_SZ); 
            
            hash_link = qhash_search(ns->msg_sz_table, &(qitem->num_bytes));
419

Xin Wang's avatar
Xin Wang committed
420 421 422 423 424 425
            if(is_send)
                msg_init_time = m->fwd.sim_start_time;
            
            /* update hash table */
            if(!hash_link)
            {
426
                struct msg_size_info * msg_info = (struct msg_size_info*)malloc(sizeof(struct msg_size_info));
Xin Wang's avatar
Xin Wang committed
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442
                msg_info->msg_size = qitem->num_bytes;
                msg_info->num_msgs = 1;
                msg_info->agg_latency = tw_now(lp) - msg_init_time;
                msg_info->avg_latency = msg_info->agg_latency;
                qhash_add(ns->msg_sz_table, &(msg_info->msg_size), msg_info->hash_link);
                qlist_add(&msg_info->ql, &ns->msg_sz_list);
                //printf("\n Msg size %d aggregate latency %f num messages %d ", m->fwd.num_bytes, msg_info->agg_latency, msg_info->num_msgs);
            }
            else
            {
                struct msg_size_info * tmp = qhash_entry(hash_link, struct msg_size_info, hash_link);
                tmp->num_msgs++;
                tmp->agg_latency += tw_now(lp) - msg_init_time;  
                tmp->avg_latency = (tmp->agg_latency / tmp->num_msgs);
//                printf("\n Msg size %d aggregate latency %f num messages %d ", qitem->num_bytes, tmp->agg_latency, tmp->num_msgs);
            }
443 444
}
static void notify_background_traffic_rc(
Xin Wang's avatar
Xin Wang committed
445
	    struct nw_state * ns,
446 447 448 449
        tw_lp * lp,
        tw_bf * bf,
        struct nw_message * m)
{
450 451 452
    (void)ns;
    (void)bf;
    (void)m;
Xin Wang's avatar
Xin Wang committed
453 454 455 456 457
        
    int num_jobs = codes_jobmap_get_num_jobs(jobmap_ctx); 
    
    for(int i = 0; i < num_jobs - 1; i++)
        tw_rand_reverse_unif(lp->rng); 
458 459 460
}

static void notify_background_traffic(
Xin Wang's avatar
Xin Wang committed
461
	    struct nw_state * ns,
462 463 464 465
        tw_lp * lp,
        tw_bf * bf,
        struct nw_message * m)
{
Xin Wang's avatar
Xin Wang committed
466 467
        (void)bf;
        (void)m;
468

Xin Wang's avatar
Xin Wang committed
469 470 471 472 473 474 475 476 477
        struct codes_jobmap_id jid; 
        jid = codes_jobmap_to_local_id(ns->nw_id, jobmap_ctx);
        
        int num_jobs = codes_jobmap_get_num_jobs(jobmap_ctx); 
        
        for(int other_id = 0; other_id < num_jobs; other_id++)
        {
            if(other_id == jid.job)
                continue;
478

Xin Wang's avatar
Xin Wang committed
479 480
            struct codes_jobmap_id other_jid;
            other_jid.job = other_id;
481

Xin Wang's avatar
Xin Wang committed
482
            int num_other_ranks = codes_jobmap_get_num_ranks(other_id, jobmap_ctx);
483

Xin Wang's avatar
Xin Wang committed
484 485 486 487 488 489 490 491 492 493 494 495 496
            lprintf("\n Other ranks %d ", num_other_ranks);
            tw_stime ts = (1.1 * g_tw_lookahead) + tw_rand_exponential(lp->rng, mean_interval/10000);
            tw_lpid global_dest_id;
     
            for(int k = 0; k < num_other_ranks; k++)    
            {
                other_jid.rank = k;
                int intm_dest_id = codes_jobmap_to_global_id(other_jid, jobmap_ctx); 
                global_dest_id = codes_mapping_get_lpid_from_relative(intm_dest_id, NULL, NW_LP_NM, NULL, 0);

                tw_event * e;
                struct nw_message * m_new;  
                e = tw_event_new(global_dest_id, ts, lp);
497
                m_new = (struct nw_message*)tw_event_data(e);
Xin Wang's avatar
Xin Wang committed
498 499 500
                m_new->msg_type = CLI_BCKGND_FIN;
                tw_event_send(e);   
            }
501
        }
Xin Wang's avatar
Xin Wang committed
502
        return;
503 504
}
static void notify_neighbor_rc(
Xin Wang's avatar
Xin Wang committed
505
	    struct nw_state * ns,
506 507 508 509
        tw_lp * lp,
        tw_bf * bf,
        struct nw_message * m)
{
Xin Wang's avatar
Xin Wang committed
510 511 512 513 514
       if(bf->c0)
       {
            notify_background_traffic_rc(ns, lp, bf, m);
            return;
       }
515
   
Xin Wang's avatar
Xin Wang committed
516 517 518 519
       if(bf->c1)
       {
          tw_rand_reverse_unif(lp->rng); 
       }
520 521
} 
static void notify_neighbor(
Xin Wang's avatar
Xin Wang committed
522
	    struct nw_state * ns,
523 524 525 526 527 528 529 530
        tw_lp * lp,
        tw_bf * bf,
        struct nw_message * m)
{
    if(ns->local_rank == num_dumpi_traces - 1 
            && ns->is_finished == 1
            && ns->neighbor_completed == 1)
    {
Xin Wang's avatar
Xin Wang committed
531
//        printf("\n All workloads completed, notifying background traffic ");
532 533 534 535 536 537 538 539 540 541 542 543 544
        bf->c0 = 1;
        notify_background_traffic(ns, lp, bf, m);
        return;
    }
    
    struct codes_jobmap_id nbr_jid;
    nbr_jid.job = ns->app_id;
    tw_lpid global_dest_id;

    if(ns->is_finished == 1 && (ns->neighbor_completed == 1 || ns->local_rank == 0))
    {
        bf->c1 = 1;

Xin Wang's avatar
Xin Wang committed
545
//        printf("\n Local rank %d notifying neighbor %d ", ns->local_rank, ns->local_rank+1);
546 547 548 549 550 551 552 553 554 555
        tw_stime ts = (1.1 * g_tw_lookahead) + tw_rand_exponential(lp->rng, mean_interval/10000);
        nbr_jid.rank = ns->local_rank + 1;
        
        /* Send a notification to the neighbor about completion */
        int intm_dest_id = codes_jobmap_to_global_id(nbr_jid, jobmap_ctx); 
        global_dest_id = codes_mapping_get_lpid_from_relative(intm_dest_id, NULL, NW_LP_NM, NULL, 0);
       
        tw_event * e;
        struct nw_message * m_new;  
        e = tw_event_new(global_dest_id, ts, lp);
556
        m_new = (struct nw_message*)tw_event_data(e); 
557 558 559 560 561 562 563 564 565 566
        m_new->msg_type = CLI_NBR_FINISH;
        tw_event_send(e);   
    }
}
void finish_bckgnd_traffic_rc(
    struct nw_state * ns,
    tw_bf * b,
    struct nw_message * msg,
    tw_lp * lp)
{
Xin Wang's avatar
Xin Wang committed
567 568 569
        (void)b;
        (void)msg;
        (void)lp;
570

Xin Wang's avatar
Xin Wang committed
571 572
        ns->is_finished = 0;
        return;
573 574 575 576 577 578 579
}
void finish_bckgnd_traffic(
    struct nw_state * ns,
    tw_bf * b,
    struct nw_message * msg,
    tw_lp * lp)
{
Xin Wang's avatar
Xin Wang committed
580 581 582 583 584
        (void)b;
        (void)msg;
        ns->is_finished = 1;
        lprintf("\n LP %llu completed sending data %lu completed at time %lf ", lp->gid, ns->gen_data, tw_now(lp));
        return;
585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613
}

void finish_nbr_wkld_rc(
    struct nw_state * ns,
    tw_bf * b,
    struct nw_message * msg,
    tw_lp * lp)
{
    ns->neighbor_completed = 0;
    
    notify_neighbor_rc(ns, lp, b, msg);
}

void finish_nbr_wkld(
    struct nw_state * ns,
    tw_bf * b,
    struct nw_message * msg,
    tw_lp * lp)
{
    ns->neighbor_completed = 1;

    notify_neighbor(ns, lp, b, msg);
}
static void gen_synthetic_tr_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
    if(bf->c0)
        return;

    model_net_event_rc2(lp, &m->event_rc);
Xin Wang's avatar
Xin Wang committed
614
    s->gen_data -= payload_sz;
615

Xin Wang's avatar
Xin Wang committed
616
    num_syn_bytes_sent -= payload_sz;
617
    tw_rand_reverse_unif(lp->rng);
Xin Wang's avatar
Xin Wang committed
618 619
    tw_rand_reverse_unif(lp->rng);

620 621 622 623 624 625 626 627 628 629 630 631 632
}

/* generate synthetic traffic */
static void gen_synthetic_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
    if(s->is_finished == 1)
    {
        bf->c0 = 1;
        return;
    }

    /* Get job information */
    tw_lpid global_dest_id;
633 634
    int intm_dest_id;
    nw_message remote_m;
635 636 637 638 639 640

    struct codes_jobmap_id jid;
    jid = codes_jobmap_to_local_id(s->nw_id, jobmap_ctx); 

    int num_clients = codes_jobmap_get_num_ranks(jid.job, jobmap_ctx);

641 642

    /* Find destination */
643 644
    int* dest_svr = NULL; 
    int i, length=0;
645
    switch(s->synthetic_pattern)
646
    {
647
        case UNIFORM:
648
        {
Xin Wang's avatar
Xin Wang committed
649
        length = 1;
650 651 652 653 654 655
            dest_svr = (int*) calloc(1, sizeof(int));
            dest_svr[0] = tw_rand_integer(lp->rng, 0, num_clients - 1);
            if(dest_svr[0] == s->local_rank)
                dest_svr[0] = (s->local_rank + 1) % num_clients;
        }
        break;
656
        case NEAREST_NEIGHBOR:
657
        {
Xin Wang's avatar
Xin Wang committed
658
        length = 1;
659 660 661 662
            dest_svr = (int*) calloc(1, sizeof(int));
            dest_svr[0] = (s->local_rank + 1) % num_clients;
        }
        break;
663
        case ALLTOALL:
664 665 666 667 668 669 670 671 672
        {
            dest_svr = (int*) calloc(num_clients-1, sizeof(int));
            int index = 0;
            for (i=0;i<num_clients;i++)
            {
                if(i!=s->local_rank) 
                {
                    dest_svr[index] = i;
                    index++;
Xin Wang's avatar
Xin Wang committed
673
            length++;
674 675 676 677
                }
            }
        }
        break;
678
        case STENCIL:
679
        {
680
            int digits, x=1, y=1, row, col, temp=num_clients;
Xin Wang's avatar
Xin Wang committed
681
            length = 4;
682
            dest_svr = (int*) calloc(4, sizeof(int));
683
            for (digits = 0; temp > 0; temp >>= 1)
684 685 686 687
                digits++;
            digits = digits/2;
            for (i = 0; i < digits; i++)
                x = x * 2;
688 689
            y = num_clients / x;
            //printf("\nStencil Syn: x=%d, y=%d", x, y);
690 691 692 693 694 695 696 697 698
            row = s->local_rank / y;
            col = s->local_rank % y;

            dest_svr[0] = row * y + ((col-1+y)%y);   /* left neighbor */
            dest_svr[1] = row * y + ((col+1+y)%y);   /* right neighbor */
            dest_svr[2] = ((row-1+x)%x) * y + col;   /* bottom neighbor */
            dest_svr[3] = ((row+1+x)%x) * y + col;   /* up neighbor */
        }
        break;
699 700
    }

701
    if(length > 0)
702
    {
703 704
        //printf("\nRANK %d Dests %d", s->local_rank, length);
        for (i = 0; i < length; i++)
705 706
        {
            /* Generate synthetic traffic */
Xin Wang's avatar
Xin Wang committed
707
        //printf("\nAPP %d SRC %d Dest %d", jid.job, s->local_rank, dest_svr[i]);
708
            jid.rank = dest_svr[i];
709
            intm_dest_id = codes_jobmap_to_global_id(jid, jobmap_ctx); 
710 711 712 713 714
            global_dest_id = codes_mapping_get_lpid_from_relative(intm_dest_id, NULL, NW_LP_NM, NULL, 0);

            remote_m.fwd.sim_start_time = tw_now(lp);
            remote_m.fwd.dest_rank = dest_svr[i];
            remote_m.msg_type = CLI_BCKGND_ARRIVE;
Xin Wang's avatar
Xin Wang committed
715
            remote_m.fwd.num_bytes = payload_sz;
716 717 718
            remote_m.fwd.app_id = s->app_id;
            remote_m.fwd.src_rank = s->local_rank;

Xin Wang's avatar
Xin Wang committed
719
            m->event_rc = model_net_event(net_id, "synthetic-tr", global_dest_id, payload_sz, 0.0, 
720 721 722
                    sizeof(nw_message), (const void*)&remote_m, 
                    0, NULL, lp);
            
Xin Wang's avatar
Xin Wang committed
723 724
            s->gen_data += payload_sz;
            num_syn_bytes_sent += payload_sz; 
725
        }
726

727 728 729 730 731 732 733 734 735 736 737
        /* New event after MEAN_INTERVAL */  
        tw_stime ts = mean_interval  + tw_rand_exponential(lp->rng, noise); 
        tw_event * e;
        nw_message * m_new;
        e = tw_event_new(lp->gid, ts, lp);
        m_new = tw_event_data(e);
        m_new->msg_type = CLI_BCKGND_GEN;
        tw_event_send(e);

        free(dest_svr);
    }
738 739 740 741
}

void arrive_syn_tr_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
742 743 744
    (void)bf;
    (void)m;
    (void)lp;
Xin Wang's avatar
Xin Wang committed
745
//    printf("\n Data arrived %d total data %ld ", m->fwd.num_bytes, s->syn_data);
746 747 748 749 750 751
    int data = m->fwd.num_bytes;
    s->syn_data -= data;
    num_syn_bytes_recvd -= data;
}
void arrive_syn_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
752 753 754
    (void)bf;
    (void)lp;

Xin Wang's avatar
Xin Wang committed
755
//    printf("\n Data arrived %d total data %ld ", m->fwd.num_bytes, s->syn_data);
756 757 758 759
    int data = m->fwd.num_bytes;
    s->syn_data += data;
    num_syn_bytes_recvd += data;
}
760
/* Debugging functions, may generate unused function warning */
761
static void print_waiting_reqs(uint32_t * reqs, int count)
762
{
763
    lprintf("\n Waiting reqs: %d count", count);
764 765
    int i;
    for(i = 0; i < count; i++ )
766
        lprintf(" %d ", reqs[i]);
767
}
768 769 770 771 772 773 774 775 776 777
static void print_msgs_queue(struct qlist_head * head, int is_send)
{
    if(is_send)
        printf("\n Send msgs queue: ");
    else
        printf("\n Recv msgs queue: ");

    struct qlist_head * ent = NULL;
    mpi_msgs_queue * current = NULL;
    qlist_for_each(ent, head)
Xin Wang's avatar
Xin Wang committed
778 779 780 781
       {
            current = qlist_entry(ent, mpi_msgs_queue, ql);
            printf(" \n Source %d Dest %d bytes %llu tag %d ", current->source_rank, current->dest_rank, current->num_bytes, current->tag);
       }
782
}
783
static void print_completed_queue(tw_lp * lp, struct qlist_head * head)
784
{
785
//    printf("\n Completed queue: ");
Xin Wang's avatar
Xin Wang committed
786 787
      struct qlist_head * ent = NULL;
      struct completed_requests* current = NULL;
788
      tw_output(lp, "\n");
Xin Wang's avatar
Xin Wang committed
789 790 791
      qlist_for_each(ent, head)
       {
            current = qlist_entry(ent, completed_requests, ql);
792
            tw_output(lp, " %llu ", current->req_id);
Xin Wang's avatar
Xin Wang committed
793
       }
794
}
795
static int clear_completed_reqs(nw_state * s,
796
        tw_lp * lp,
797
        int * reqs, int count)
798
{
799 800 801
    (void)s;
    (void)lp;

802
    int i, matched = 0;
803

804 805
    for( i = 0; i < count; i++)
    {
Xin Wang's avatar
Xin Wang committed
806 807 808
      struct qlist_head * ent = NULL;
      struct completed_requests * current = NULL;
      struct completed_requests * prev = NULL;
809

810
      int index = 0;
Xin Wang's avatar
Xin Wang committed
811 812
      qlist_for_each(ent, &s->completed_reqs)
       {
813 814
           if(prev)
           {
815
              rc_stack_push(lp, prev, free, s->matched_reqs);
816 817
              prev = NULL;
           }
818
            
819 820
           current = qlist_entry(ent, completed_requests, ql);
           current->index = index; 
821 822
            if(current->req_id == reqs[i])
            {
823
                ++matched;
824
                qlist_del(&current->ql);
825
                prev = current;
826
            }
827
            ++index;
Xin Wang's avatar
Xin Wang committed
828
       }
829

Xin Wang's avatar
Xin Wang committed
830
      if(prev)
831 832 833 834
      {
         rc_stack_push(lp, prev, free, s->matched_reqs);
         prev = NULL;
      }
835
    }
836
    return matched;
837
}
838
static void add_completed_reqs(nw_state * s,
839 840
        tw_lp * lp,
        int count)
841
{
842
    (void)lp;
843
    for(int i = 0; i < count; i++)
844
    {
845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871
       struct completed_requests * req = (struct completed_requests*)rc_stack_pop(s->matched_reqs);

       // turn on only if wait-all unmatched error arises in optimistic mode.
       if(preserve_wait_ordering)
       {
            if(req->index == 0)
            {
                qlist_add(&req->ql, &s->completed_reqs);
            }
            else
            {
                int index = 1;
                struct qlist_head * ent = NULL;
                qlist_for_each(ent, &s->completed_reqs) 
                {
                    if(index == req->index)
                    {
                        qlist_add(&req->ql, ent);
                    }
                }//end qlist
            }// end else*/
       }
       else
       {
               qlist_add(&req->ql, &s->completed_reqs);
       }
    }//end for
872
}
873

874 875 876 877 878 879
/* helper function - maps an MPI rank to an LP id */
static tw_lpid rank_to_lpid(int rank)
{
    return codes_mapping_get_lpid_from_relative(rank, NULL, "nw-lp", NULL, 0);
}

880
static int notify_posted_wait(nw_state* s,
881
        tw_bf * bf, nw_message * m, tw_lp * lp,
882
        int completed_req)
883
{
884 885
    (void)bf;

886 887
    struct pending_waits* wait_elem = s->wait_op;
    int wait_completed = 0;
888

889
    m->fwd.wait_completed = 0;
890

891 892
    if(!wait_elem)
        return 0;
893

894
    int op_type = wait_elem->op_type;
895

896 897 898
    if(op_type == CODES_WK_WAIT &&
            (wait_elem->req_ids[0] == completed_req))
    {
899
            m->fwd.wait_completed = 1;
Xin Wang's avatar
Xin Wang committed
900
            wait_completed = 1;
901
    }
902 903
    else if(op_type == CODES_WK_WAITALL
            || op_type == CODES_WK_WAITANY
904 905 906 907 908 909
            || op_type == CODES_WK_WAITSOME)
    {
        int i;
        for(i = 0; i < wait_elem->count; i++)
        {
            if(wait_elem->req_ids[i] == completed_req)
910
            {
911
                wait_elem->num_completed++;
912
                if(wait_elem->num_completed > wait_elem->count)
913
                    printf("\n Num completed %d count %d LP %llu ",
914 915 916
                            wait_elem->num_completed,
                            wait_elem->count,
                            lp->gid);
Xin Wang's avatar
Xin Wang committed
917 918
//                if(wait_elem->num_completed > wait_elem->count)
//                    tw_lp_suspend(lp, 1, 0);
919

920
                if(wait_elem->num_completed >= wait_elem->count)
921
                {
922
                    if(enable_debug)
923
                        fprintf(workload_log, "\n(%lf) APP ID %d MPI WAITALL COMPLETED AT %llu ", tw_now(lp), s->app_id, s->nw_id);
924
                    wait_completed = 1;
925
                }
926

927
                m->fwd.wait_completed = 1;
928
            }
929
        }
930
    }
931
    return wait_completed;
932
}
933

934
/* reverse handler of MPI wait operation */
935
static void codes_exec_mpi_wait_rc(nw_state* s, tw_bf * bf, tw_lp* lp, nw_message * m)
936
{
Xin Wang's avatar
Xin Wang committed
937
   if(bf->c1)
938
    {
939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957
        completed_requests * qi = (completed_requests*)rc_stack_pop(s->processed_ops);
        if(m->fwd.found_match == 0)
        {
            qlist_add(&qi->ql, &s->completed_reqs);
        }
        else
        {
           int index = 1;
           struct qlist_head * ent = NULL;
           qlist_for_each(ent, &s->completed_reqs)
           {
                if(index == m->fwd.found_match)
                {
                    qlist_add(&qi->ql, ent);
                    break;
                }
                index++;
           }
        }
958
        codes_issue_next_event_rc(lp);
959
        return;
960
    }
961 962 963
         struct pending_waits * wait_op = s->wait_op;
         free(wait_op);
         s->wait_op = NULL;
964
}
965

966
/* execute MPI wait operation */
967
static void codes_exec_mpi_wait(nw_state* s, tw_bf * bf, nw_message * m, tw_lp* lp, struct codes_workload_op * mpi_op)
968
{
969 970
    /* check in the completed receives queue if the request ID has already been completed.*/
    assert(!s->wait_op);
971
    int req_id = mpi_op->u.wait.req_id;
972

973
    struct completed_requests* current = NULL;
974

975
    struct qlist_head * ent = NULL;
976
    int index = 0;
977 978 979 980 981
    qlist_for_each(ent, &s->completed_reqs)
    {
        current = qlist_entry(ent, completed_requests, ql);
        if(current->req_id == req_id)
        {
982
            bf->c1=1;
983
            qlist_del(&current->ql);
984
            rc_stack_push(lp, current, free, s->processed_ops);
985
            codes_issue_next_event(lp);
986 987 988 989 990 991
            m->fwd.found_match = index;
            /*if(s->nw_id == (tw_lpid)TRACK_LP)
            {
                tw_output(lp, "\n wait matched at post %d ", req_id);
                print_completed_queue(lp, &s->completed_reqs);
            }*/
992 993
            return;
        }
994
        ++index;
995
    }
996

997 998 999 1000 1001
    /*if(s->nw_id == (tw_lpid)TRACK_LP)
    {
        tw_output(lp, "\n wait posted %llu ", req_id);
        print_completed_queue(lp, &s->completed_reqs);
    }*/
1002
    /* If not, add the wait operation in the pending 'waits' list. */
1003
    struct pending_waits* wait_op = (struct pending_waits*)malloc(sizeof(struct pending_waits));
1004 1005 1006
    wait_op->op_type = mpi_op->op_type;
    wait_op->req_ids[0] = req_id;
    wait_op->count = 1;
1007 1008
    wait_op->num_completed = 0;
    wait_op->start_time = tw_now(lp);
1009
    s->wait_op = wait_op;
1010

1011
    return;
1012 1013
}

1014
static void codes_exec_mpi_wait_all_rc(
1015
        nw_state* s,
1016 1017
        tw_bf * bf,
        nw_message * m,
1018
        tw_lp* lp)
1019
{
Xin Wang's avatar
Xin Wang committed
1020 1021 1022 1023
  if(bf->c1)
  {
    int sampling_indx = s->sampling_indx;
    s->mpi_wkld_samples[sampling_indx].num_waits_sample--;
1024

Xin Wang's avatar
Xin Wang committed
1025
    if(bf->c2)
1026
    {
Xin Wang's avatar
Xin Wang committed
1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042
        s->cur_interval_end -= sampling_interval;
        s->sampling_indx--;
    }
  }
  if(s->wait_op)
  {
      struct pending_waits * wait_op = s->wait_op;
      free(wait_op);
      s->wait_op = NULL;
  }
  else
  {
      add_completed_reqs(s, lp, m->fwd.num_matched);
      codes_issue_next_event_rc(lp);
  }
  return;
1043
}
1044

1045
static void codes_exec_mpi_wait_all(
1046
        nw_state* s,
1047 1048
        tw_bf * bf,
        nw_message * m,
1049
        tw_lp* lp,
1050
        struct codes_workload_op * mpi_op)
1051
{
Xin Wang's avatar
Xin Wang committed
1052 1053
  if(enable_debug)
    fprintf(workload_log, "\n MPI WAITALL POSTED AT %llu ", s->nw_id);
1054

Xin Wang's avatar
Xin Wang committed
1055 1056 1057 1058
  if(enable_sampling)
  {
    bf->c1 = 1;
    if(tw_now(lp) >= s->cur_interval_end)
1059
    {
Xin Wang's avatar
Xin Wang committed
1060
        bf->c2 = 1;
1061
        int indx = s->sampling_indx;
Xin Wang's avatar
Xin Wang committed
1062 1063 1064 1065 1066 1067 1068 1069
        s->mpi_wkld_samples[indx].nw_id = s->nw_id;
        s->mpi_wkld_samples[indx].app_id = s->app_id;
        s->mpi_wkld_samples[indx].sample_end_time = s->cur_interval_end;
        s->cur_interval_end += sampling_interval;
        s->sampling_indx++;
    }
    if(s->sampling_indx >= MAX_STATS)
    {
1070
        struct mpi_workload_sample * tmp = (struct mpi_workload_sample*)calloc((MAX_STATS + s->max_arr_size), sizeof(struct mpi_workload_sample));
Xin Wang's avatar
Xin Wang committed
1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085
        memcpy(tmp, s->mpi_wkld_samples, s->sampling_indx);
        free(s->mpi_wkld_samples);
        s->mpi_wkld_samples = tmp;
        s->max_arr_size += MAX_STATS;
    }
    int indx = s->sampling_indx;
    s->mpi_wkld_samples[indx].num_waits_sample++;
  }
  int count = mpi_op->u.waits.count;
  /* If the count is not less than max wait reqs then stop */
  assert(count < MAX_WAIT_REQS);

  int i = 0, num_matched = 0;
  m->fwd.num_matched = 0;

1086
  /*if(lp->gid == TRACK_LP)
Xin Wang's avatar
Xin Wang committed
1087 1088 1089
  {
      printf("\n MPI Wait all posted ");
      print_waiting_reqs(mpi_op->u.waits.req_ids, count);
1090 1091
      print_completed_queue(lp, &s->completed_reqs);
  }*/
Xin Wang's avatar
Xin Wang committed
1092 1093 1094
      /* check number of completed irecvs in the completion queue */
  for(i = 0; i < count; i++)
  {
1095
      int req_id = mpi_op->u.waits.req_ids[i];
Xin Wang's avatar
Xin Wang committed
1096 1097 1098 1099
      struct qlist_head * ent = NULL;
      struct completed_requests* current = NULL;
      qlist_for_each(ent, &s->completed_reqs)
       {
1100
            current = qlist_entry(ent, struct completed_requests, ql);
1101 1102
            if(current->req_id == req_id)
                num_matched++;
Xin Wang's avatar
Xin Wang committed
1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119
       }
  }

  m->fwd.found_match = num_matched;
  if(num_matched == count)
  {
    /* No need to post a MPI Wait all then, issue next event */
      /* Remove all completed requests from the list */
      m->fwd.num_matched = clear_completed_reqs(s, lp, mpi_op->u.waits.req_ids, count);
      struct pending_waits* wait_op = s->wait_op;
      free(wait_op);
      s->wait_op = NULL;
      codes_issue_next_event(lp);
  }
  else
  {
      /* If not, add the wait operation in the pending 'waits' list. */
1120
	  struct pending_waits* wait_op = (struct pending_waits*)malloc(sizeof(struct pending_waits));
Xin Wang's avatar
Xin Wang committed
1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132
	  wait_op->count = count;
      wait_op->op_type = mpi_op->op_type;
      assert(count < MAX_WAIT_REQS);

      for(i = 0; i < count; i++)
          wait_op->req_ids[i] =  mpi_op->u.waits.req_ids[i];

	  wait_op->num_completed = num_matched;
	  wait_op->start_time = tw_now(lp);
      s->wait_op = wait_op;
  }
  return;
1133
}
1134

1135 1136
/* search for a matching mpi operation and remove it from the list.
 * Record the index in the list from where the element got deleted.