model-net-mpi-replay.c 75.9 KB
Newer Older
1
2
3
4
5
6
7
/*
 * Copyright (C) 2014 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */
#include <ross.h>
#include <inttypes.h>
8
#include <sys/stat.h>
9
10
11
12
13
14
15
16

#include "codes/codes-workload.h"
#include "codes/codes.h"
#include "codes/configuration.h"
#include "codes/codes_mapping.h"
#include "codes/model-net.h"
#include "codes/rc-stack.h"
#include "codes/quicklist.h"
17
#include "codes/quickhash.h"
18
#include "codes/codes-jobmap.h"
19

20
/* turning on track lp will generate a lot of output messages */
21
#define MN_LP_NM "modelnet_dragonfly_custom"
22
#define CONTROL_MSG_SZ 64
23
#define TRACE -1
24
#define MAX_WAIT_REQS 512
25
#define CS_LP_DBG 1
26
#define EAGER_THRESHOLD 8192
27
28
29
#define RANK_HASH_TABLE_SZ 2000
#define NOISE 3.0
#define NW_LP_NM "nw-lp"
30
31
32
#define lprintf(_fmt, ...) \
        do {if (CS_LP_DBG) printf(_fmt, __VA_ARGS__);} while (0)
#define MAX_STATS 65536
33
#define PAYLOAD_SZ 1024
34

35
36
37
static int msg_size_hash_compare(
            void *key, struct qhash_head *link);

38
/* NOTE: Message tracking works in sequential mode only! */
39
int enable_msg_tracking = 0;
40
tw_lpid TRACK_LP = 0;
41
42

int unmatched = 0;
43
44
45
46
47
char workload_type[128];
char workload_file[8192];
char offset_file[8192];
static int wrkld_id;
static int num_net_traces = 0;
48
49
static int num_dumpi_traces = 0;

50
static int alloc_spec = 0;
51
52
static tw_stime self_overhead = 10.0;
static tw_stime mean_interval = 100000;
53
54
55

/* Doing LP IO*/
static char lp_io_dir[256] = {'\0'};
56
static char sampling_dir[32] = {'\0'};
57
58
59
60
static lp_io_handle io_handle;
static unsigned int lp_io_use_suffix = 0;
static int do_lp_io = 0;

61
62
63
64
/* variables for loading multiple applications */
char workloads_conf_file[8192];
char alloc_file[8192];
int num_traces_of_job[5];
65
66
67
tw_stime soft_delay_mpi = 2500;
tw_stime nic_delay = 1000;
tw_stime copy_per_byte_eager = 0.55;
68
69
70
71
72
char file_name_of_job[5][8192];

struct codes_jobmap_ctx *jobmap_ctx;
struct codes_jobmap_params_list jobmap_p;

73
74
/* Variables for Cortex Support */
/* Matthieu's additions start */
75
#ifdef ENABLE_CORTEX_PYTHON
76
77
78
static char cortex_file[512] = "\0";
static char cortex_class[512] = "\0";
static char cortex_gen[512] = "\0";
79
#endif
80
81
/* Matthieu's additions end */

82
83
typedef struct nw_state nw_state;
typedef struct nw_message nw_message;
84
typedef int32_t dumpi_req_id;
85
86
87

static int net_id = 0;
static float noise = 5.0;
88
89
90
static int num_nw_lps = 0, num_mpi_lps = 0;

static int num_syn_clients;
91

92
FILE * workload_log = NULL;
93
FILE * msg_size_log = NULL;
94
95
96
97
FILE * workload_agg_log = NULL;
FILE * workload_meta_log = NULL;

static uint64_t sample_bytes_written = 0;
98

99
100
101
long long num_bytes_sent=0;
long long num_bytes_recvd=0;

102
103
104
long long num_syn_bytes_sent = 0;
long long num_syn_bytes_recvd = 0;

105
106
107
108
109
110
double max_time = 0,  max_comm_time = 0, max_wait_time = 0, max_send_time = 0, max_recv_time = 0;
double avg_time = 0, avg_comm_time = 0, avg_wait_time = 0, avg_send_time = 0, avg_recv_time = 0;


/* runtime option for disabling computation time simulation */
static int disable_delay = 0;
111
112
113
static int enable_sampling = 0;
static double sampling_interval = 5000000;
static double sampling_end_time = 3000000000;
114
static int enable_debug = 0;
115

116
117
118
/* set group context */
struct codes_mctx group_ratio;

119
/* MPI_OP_GET_NEXT is for getting next MPI operation when the previous operation completes.
120
* MPI_SEND_ARRIVED is issued when a MPI message arrives at its destination (the message is transported by model-net and an event is invoked when it arrives.
121
122
123
124
125
126
127
* MPI_SEND_POSTED is issued when a MPI message has left the source LP (message is transported via model-net). */
enum MPI_NW_EVENTS
{
	MPI_OP_GET_NEXT=1,
	MPI_SEND_ARRIVED,
    MPI_SEND_ARRIVED_CB, // for tracking message times on sender
	MPI_SEND_POSTED,
128
129
130
131
132
133
    MPI_REND_ARRIVED,
    MPI_REND_ACK_ARRIVED,
    CLI_BCKGND_FIN,
    CLI_BCKGND_ARRIVE,
    CLI_BCKGND_GEN,
    CLI_NBR_FINISH,
134
135
};

136
137
138
139
struct mpi_workload_sample
{
    /* Sampling data */
    int nw_id;
140
    int app_id;
141
142
143
144
145
    unsigned long num_sends_sample;
    unsigned long num_bytes_sample;
    unsigned long num_waits_sample;
    double sample_end_time;
};
146
147
148
149
150
151
152
/* stores pointers of pending MPI operations to be matched with their respective sends/receives. */
struct mpi_msgs_queue
{
    int op_type;
    int tag;
    int source_rank;
    int dest_rank;
153
    uint64_t num_bytes;
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
    tw_stime req_init_time;
	dumpi_req_id req_id;
    struct qlist_head ql;
};

/* stores request IDs of completed MPI operations (Isends or Irecvs) */
struct completed_requests
{
	dumpi_req_id req_id;
    struct qlist_head ql;
};

/* for wait operations, store the pending operation and number of completed waits so far. */
struct pending_waits
{
    int op_type;
170
    int32_t req_ids[MAX_WAIT_REQS];
171
	int num_completed;
172
173
	int count;
    tw_stime start_time;
174
175
176
    struct qlist_head ql;
};

177
178
179
180
181
182
183
184
185
struct msg_size_info
{
    int64_t msg_size;
    int num_msgs;
    tw_stime agg_latency;
    tw_stime avg_latency;
    struct qhash_head * hash_link;
    struct qlist_head ql; 
};
186
187
188
189
190
191
192
193
194
195
typedef struct mpi_msgs_queue mpi_msgs_queue;
typedef struct completed_requests completed_requests;
typedef struct pending_waits pending_waits;

/* state of the network LP. It contains the pointers to send/receive lists */
struct nw_state
{
	long num_events_per_lp;
	tw_lpid nw_id;
	short wrkld_end;
196
197
    int app_id;
    int local_rank;
198

199
200
201
    int is_finished;
    int neighbor_completed;

202
    struct rc_stack * processed_ops;
203
    struct rc_stack * processed_wait_op;
204
    struct rc_stack * matched_reqs;
205
206
207
208
209
210
211
212
213
214

    /* count of sends, receives, collectives and delays */
	unsigned long num_sends;
	unsigned long num_recvs;
	unsigned long num_cols;
	unsigned long num_delays;
	unsigned long num_wait;
	unsigned long num_waitall;
	unsigned long num_waitsome;

215

216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
	/* time spent by the LP in executing the app trace*/
	double start_time;
	double elapsed_time;
	/* time spent in compute operations */
	double compute_time;
	/* time spent in message send/isend */
	double send_time;
	/* time spent in message receive */
	double recv_time;
	/* time spent in wait operation */
	double wait_time;
	/* FIFO for isend messages arrived on destination */
	struct qlist_head arrival_queue;
	/* FIFO for irecv messages posted but not yet matched with send operations */
	struct qlist_head pending_recvs_queue;
	/* List of completed send/receive requests */
	struct qlist_head completed_reqs;
233

234
235
    tw_stime cur_interval_end;

236
237
    /* Pending wait operation */
    struct pending_waits * wait_op;
238

239
240
241
242
243
244
    /* Message size latency information */
    struct qhash_table * msg_sz_table;
    struct qlist_head msg_sz_list;

    /* quick hash for maintaining message latencies */

245
246
247
    unsigned long num_bytes_sent;
    unsigned long num_bytes_recvd;

248
249
250
    unsigned long syn_data;
    unsigned long gen_data;
    
251
252
253
254
    /* For sampling data */
    int sampling_indx;
    int max_arr_size;
    struct mpi_workload_sample * mpi_wkld_samples;
255
    char output_buf[512];
256
257
258
259
};

/* data for handling reverse computation.
* saved_matched_req holds the request ID of matched receives/sends for wait operations.
260
* ptr_match_op holds the matched MPI operation which are removed from the queues when a send is matched with the receive in forward event handler.
261
262
263
* network event being sent. op is the MPI operation issued by the network workloads API. rv_data holds the data for reverse computation (TODO: Fill this data structure only when the simulation runs in optimistic mode). */
struct nw_message
{
264
   // forward message handler
265
   int msg_type;
266
   int op_type;
267
   model_net_event_return event_rc;
268

269
270
271
   struct
   {
       tw_lpid src_rank;
272
       int dest_rank;
273
       int64_t num_bytes;
274
275
276
277
278
       int num_matched;
       int data_type;
       double sim_start_time;
       // for callbacks - time message was received
       double msg_send_time;
279
       int32_t req_id;
280
       int tag;
281
       int app_id;
282
283
284
285
286
287
288
289
290
       int found_match;
       short wait_completed;
   } fwd;
   struct
   {
       double saved_send_time;
       double saved_recv_time;
       double saved_wait_time;
       double saved_delay;
291
       int32_t saved_num_bytes;
292
   } rc;
293
294
};

295
static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, mpi_msgs_queue * mpi_op);
296
297

static void send_ack_back_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp);
298
299
/* executes MPI isend and send operations */
static void codes_exec_mpi_send(
300
        nw_state* s, tw_bf * bf, nw_message * m, tw_lp* lp, struct codes_workload_op * mpi_op, int is_rend);
301
302
/* execute MPI irecv operation */
static void codes_exec_mpi_recv(
303
        nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, struct codes_workload_op * mpi_op);
304
305
/* reverse of mpi recv function. */
static void codes_exec_mpi_recv_rc(
306
        nw_state* s, tw_bf * bf, nw_message* m, tw_lp* lp);
307
308
/* execute the computational delay */
static void codes_exec_comp_delay(
309
        nw_state* s, nw_message * m, tw_lp* lp, struct codes_workload_op * mpi_op);
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
/* gets the next MPI operation from the network-workloads API. */
static void get_next_mpi_operation(
        nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp);
/* reverse handler of get next mpi operation. */
static void get_next_mpi_operation_rc(
        nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp);
/* Makes a call to get_next_mpi_operation. */
static void codes_issue_next_event(tw_lp* lp);
/* reverse handler of next operation */
static void codes_issue_next_event_rc(tw_lp* lp);


///////////////////// HELPER FUNCTIONS FOR MPI MESSAGE QUEUE HANDLING ///////////////
/* upon arrival of local completion message, inserts operation in completed send queue */
/* upon arrival of an isend operation, updates the arrival queue of the network */
325
326
327
328
329
330
331
332
static void update_completed_queue(
        nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp, dumpi_req_id req_id);
/* reverse of the above function */
static void update_completed_queue_rc(
        nw_state*s,
        tw_bf * bf,
        nw_message * m,
        tw_lp * lp);
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
static void update_arrival_queue(
        nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
/* reverse of the above function */
static void update_arrival_queue_rc(
        nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
/* callback to a message sender for computing message time */
static void update_message_time(
        nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
/* reverse for computing message time */
static void update_message_time_rc(
        nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);

/* conversion from seconds to eanaoseconds */
static tw_stime s_to_ns(tw_stime ns);

348
/*static void update_message_size_rc(
349
350
351
352
        struct nw_state * ns,
        tw_lp * lp,
        tw_bf * bf,
        struct nw_message * m)
353
{*/
354
/*TODO: Complete reverse handler */
355
/*    (void)ns;
356
357
358
    (void)lp;
    (void)bf;
    (void)m;
359
}*/
360
361
362
363
364
365
366
367
368
369
/* update the message size */
static void update_message_size(
        struct nw_state * ns,
        tw_lp * lp,
        tw_bf * bf,
        struct nw_message * m,
        mpi_msgs_queue * qitem,
        int is_eager,
        int is_send)
{
370
371
372
            (void)bf;
            (void)is_eager;

373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
            struct qhash_head * hash_link = NULL;
            tw_stime msg_init_time = qitem->req_init_time;
        
            if(!ns->msg_sz_table)
                ns->msg_sz_table = qhash_init(msg_size_hash_compare, quickhash_64bit_hash, RANK_HASH_TABLE_SZ); 
            
            hash_link = qhash_search(ns->msg_sz_table, &(qitem->num_bytes));

            if(is_send)
                msg_init_time = m->fwd.sim_start_time;
            
            /* update hash table */
            if(!hash_link)
            {
                struct msg_size_info * msg_info = malloc(sizeof(struct msg_size_info));
                msg_info->msg_size = qitem->num_bytes;
                msg_info->num_msgs = 1;
390
                msg_info->agg_latency = tw_now(lp) - msg_init_time;
391
                msg_info->avg_latency = msg_info->agg_latency;
392
                qhash_add(ns->msg_sz_table, &(msg_info->msg_size), msg_info->hash_link);
393
394
395
396
397
398
399
                qlist_add(&msg_info->ql, &ns->msg_sz_list);
                //printf("\n Msg size %d aggregate latency %f num messages %d ", m->fwd.num_bytes, msg_info->agg_latency, msg_info->num_msgs);
            }
            else
            {
                struct msg_size_info * tmp = qhash_entry(hash_link, struct msg_size_info, hash_link);
                tmp->num_msgs++;
400
                tmp->agg_latency += tw_now(lp) - msg_init_time;  
401
402
403
404
405
406
407
408
409
410
                tmp->avg_latency = (tmp->agg_latency / tmp->num_msgs);
//                printf("\n Msg size %d aggregate latency %f num messages %d ", qitem->num_bytes, tmp->agg_latency, tmp->num_msgs);
            }
}
static void notify_background_traffic_rc(
	    struct nw_state * ns,
        tw_lp * lp,
        tw_bf * bf,
        struct nw_message * m)
{
411
412
413
    (void)ns;
    (void)bf;
    (void)m;
414
415
416
417
418
419
420
421
422
    tw_rand_reverse_unif(lp->rng); 
}

static void notify_background_traffic(
	    struct nw_state * ns,
        tw_lp * lp,
        tw_bf * bf,
        struct nw_message * m)
{
423
424
425
        (void)bf;
        (void)m;

426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
        struct codes_jobmap_id jid; 
        jid = codes_jobmap_to_local_id(ns->nw_id, jobmap_ctx);
        
        int num_jobs = codes_jobmap_get_num_jobs(jobmap_ctx); 
        
        for(int other_id = 0; other_id < num_jobs; other_id++)
        {
            if(other_id == jid.job)
                continue;

            struct codes_jobmap_id other_jid;
            other_jid.job = other_id;

            int num_other_ranks = codes_jobmap_get_num_ranks(other_id, jobmap_ctx);

441
            lprintf("\n Other ranks %d ", num_other_ranks);
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
            tw_stime ts = (1.1 * g_tw_lookahead) + tw_rand_exponential(lp->rng, mean_interval/10000);
            tw_lpid global_dest_id;
     
            for(int k = 0; k < num_other_ranks; k++)    
            {
                other_jid.rank = k;
                int intm_dest_id = codes_jobmap_to_global_id(other_jid, jobmap_ctx); 
                global_dest_id = codes_mapping_get_lpid_from_relative(intm_dest_id, NULL, NW_LP_NM, NULL, 0);

                tw_event * e;
                struct nw_message * m_new;  
                e = tw_event_new(global_dest_id, ts, lp);
                m_new = tw_event_data(e);
                m_new->msg_type = CLI_BCKGND_FIN;
                tw_event_send(e);   
            }
        }
        return;
}
static void notify_neighbor_rc(
	    struct nw_state * ns,
        tw_lp * lp,
        tw_bf * bf,
        struct nw_message * m)
{
       if(bf->c0)
       {
            notify_background_traffic_rc(ns, lp, bf, m);
            return;
       }
   
       if(bf->c1)
       {
          tw_rand_reverse_unif(lp->rng); 
       }
} 
static void notify_neighbor(
	    struct nw_state * ns,
        tw_lp * lp,
        tw_bf * bf,
        struct nw_message * m)
{
    if(ns->local_rank == num_dumpi_traces - 1 
            && ns->is_finished == 1
            && ns->neighbor_completed == 1)
    {
        printf("\n All workloads completed, notifying background traffic ");
        bf->c0 = 1;
        notify_background_traffic(ns, lp, bf, m);
        return;
    }
    
    struct codes_jobmap_id nbr_jid;
    nbr_jid.job = ns->app_id;
    tw_lpid global_dest_id;

    if(ns->is_finished == 1 && (ns->neighbor_completed == 1 || ns->local_rank == 0))
    {
        bf->c1 = 1;

        printf("\n Local rank %d notifying neighbor %d ", ns->local_rank, ns->local_rank+1);
        tw_stime ts = (1.1 * g_tw_lookahead) + tw_rand_exponential(lp->rng, mean_interval/10000);
        nbr_jid.rank = ns->local_rank + 1;
        
        /* Send a notification to the neighbor about completion */
        int intm_dest_id = codes_jobmap_to_global_id(nbr_jid, jobmap_ctx); 
        global_dest_id = codes_mapping_get_lpid_from_relative(intm_dest_id, NULL, NW_LP_NM, NULL, 0);
       
        tw_event * e;
        struct nw_message * m_new;  
        e = tw_event_new(global_dest_id, ts, lp);
        m_new = tw_event_data(e); 
        m_new->msg_type = CLI_NBR_FINISH;
        tw_event_send(e);   
    }
}
void finish_bckgnd_traffic_rc(
    struct nw_state * ns,
    tw_bf * b,
    struct nw_message * msg,
    tw_lp * lp)
{
524
525
526
527
        (void)b;
        (void)msg;
        (void)lp;

528
529
530
531
532
533
534
535
536
        ns->is_finished = 0;
        return;
}
void finish_bckgnd_traffic(
    struct nw_state * ns,
    tw_bf * b,
    struct nw_message * msg,
    tw_lp * lp)
{
537
538
        (void)b;
        (void)msg;
539
        ns->is_finished = 1;
540
        lprintf("\n LP %llu completed sending data %lu completed at time %lf ", lp->gid, ns->gen_data, tw_now(lp));
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
        return;
}

void finish_nbr_wkld_rc(
    struct nw_state * ns,
    tw_bf * b,
    struct nw_message * msg,
    tw_lp * lp)
{
    ns->neighbor_completed = 0;
    
    notify_neighbor_rc(ns, lp, b, msg);
}

void finish_nbr_wkld(
    struct nw_state * ns,
    tw_bf * b,
    struct nw_message * msg,
    tw_lp * lp)
{
    printf("\n Workload completed, notifying neighbor ");
    ns->neighbor_completed = 1;

    notify_neighbor(ns, lp, b, msg);
}
static void gen_synthetic_tr_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
    if(bf->c0)
        return;

    model_net_event_rc2(lp, &m->event_rc);
    s->gen_data -= PAYLOAD_SZ;

    num_syn_bytes_sent -= PAYLOAD_SZ;
    tw_rand_reverse_unif(lp->rng);
    tw_rand_reverse_unif(lp->rng);

}

/* generate synthetic traffic */
static void gen_synthetic_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
    if(s->is_finished == 1)
    {
        bf->c0 = 1;
        return;
    }

    /* Get job information */
    tw_lpid global_dest_id;

    struct codes_jobmap_id jid;
    jid = codes_jobmap_to_local_id(s->nw_id, jobmap_ctx); 

    int num_clients = codes_jobmap_get_num_ranks(jid.job, jobmap_ctx);
    int dest_svr = tw_rand_integer(lp->rng, 0, num_clients - 1);

    if(dest_svr == s->local_rank)
    {
       dest_svr = (s->local_rank + 1) % num_clients;
    }
   
    jid.rank = dest_svr;

    int intm_dest_id = codes_jobmap_to_global_id(jid, jobmap_ctx); 
    global_dest_id = codes_mapping_get_lpid_from_relative(intm_dest_id, NULL, NW_LP_NM, NULL, 0);

    nw_message remote_m;
    remote_m.fwd.sim_start_time = tw_now(lp);
    remote_m.fwd.dest_rank = dest_svr;
    remote_m.msg_type = CLI_BCKGND_ARRIVE;
    remote_m.fwd.num_bytes = PAYLOAD_SZ;
    remote_m.fwd.app_id = s->app_id;
    remote_m.fwd.src_rank = s->local_rank;

    m->event_rc = model_net_event(net_id, "synthetic-tr", global_dest_id, PAYLOAD_SZ, 0.0, 
            sizeof(nw_message), (const void*)&remote_m, 
            0, NULL, lp);
    
    s->gen_data += PAYLOAD_SZ;
    num_syn_bytes_sent += PAYLOAD_SZ; 

    /* New event after MEAN_INTERVAL */  
    tw_stime ts = mean_interval  + tw_rand_exponential(lp->rng, NOISE); 
    tw_event * e;
    nw_message * m_new;
    e = tw_event_new(lp->gid, ts, lp);
    m_new = tw_event_data(e);
    m_new->msg_type = CLI_BCKGND_GEN;
    tw_event_send(e);
}

void arrive_syn_tr_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
635
636
637
    (void)bf;
    (void)m;
    (void)lp;
638
639
640
641
642
643
644
//    printf("\n Data arrived %d total data %ld ", m->fwd.num_bytes, s->syn_data);
    int data = m->fwd.num_bytes;
    s->syn_data -= data;
    num_syn_bytes_recvd -= data;
}
void arrive_syn_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
645
646
647
    (void)bf;
    (void)lp;

648
649
650
651
652
//    printf("\n Data arrived %d total data %ld ", m->fwd.num_bytes, s->syn_data);
    int data = m->fwd.num_bytes;
    s->syn_data += data;
    num_syn_bytes_recvd += data;
}
653
/* Debugging functions, may generate unused function warning */
654
static void print_waiting_reqs(int32_t * reqs, int count)
655
{
656
    lprintf("\n Waiting reqs: %d count", count);
657
658
    int i;
    for(i = 0; i < count; i++ )
659
        lprintf(" %d ", reqs[i]);
660
}
661
662
663
664
665
666
667
668
669
670
671
672
static void print_msgs_queue(struct qlist_head * head, int is_send)
{
    if(is_send)
        printf("\n Send msgs queue: ");
    else
        printf("\n Recv msgs queue: ");

    struct qlist_head * ent = NULL;
    mpi_msgs_queue * current = NULL;
    qlist_for_each(ent, head)
       {
            current = qlist_entry(ent, mpi_msgs_queue, ql);
673
            printf(" \n Source %d Dest %d bytes %llu tag %d ", current->source_rank, current->dest_rank, current->num_bytes, current->tag);
674
675
       }
}
676
677
678
679
680
681
682
683
static void print_completed_queue(struct qlist_head * head)
{
    printf("\n Completed queue: ");
      struct qlist_head * ent = NULL;
      struct completed_requests* current = NULL;
      qlist_for_each(ent, head)
       {
            current = qlist_entry(ent, completed_requests, ql);
684
            printf(" %d ", current->req_id);
685
686
       }
}
687
static int clear_completed_reqs(nw_state * s,
688
        tw_lp * lp,
689
        int32_t * reqs, int count)
690
{
691
692
693
    (void)s;
    (void)lp;

694
    int i, matched = 0;
695

696
697
698
    for( i = 0; i < count; i++)
    {
      struct qlist_head * ent = NULL;
699
700
701
      struct completed_requests * current = NULL;
      struct completed_requests * prev = NULL;

702
703
      qlist_for_each(ent, &s->completed_reqs)
       {
704
705
706
707
708
            current = qlist_entry(ent, completed_requests, ql);
            
            if(prev)
              rc_stack_push(lp, prev, free, s->matched_reqs);
            
709
710
            if(current->req_id == reqs[i])
            {
711
                ++matched;
712
                qlist_del(&current->ql);
713
                prev = current;
714
            }
715
716
            else
                prev = NULL;
717
       }
718
719
720

      if(prev)
          rc_stack_push(lp, prev, free, s->matched_reqs);
721
    }
722
    return matched;
723
}
724
static void add_completed_reqs(nw_state * s,
725
726
        tw_lp * lp,
        int count)
727
{
728
    (void)lp;
729
730
731
    int i;
    for( i = 0; i < count; i++)
    {
732
733
       struct completed_requests * req = rc_stack_pop(s->matched_reqs);
       qlist_add(&req->ql, &s->completed_reqs);
734
735
    }
}
736

737
738
739
740
741
742
/* helper function - maps an MPI rank to an LP id */
static tw_lpid rank_to_lpid(int rank)
{
    return codes_mapping_get_lpid_from_relative(rank, NULL, "nw-lp", NULL, 0);
}

743
static int notify_posted_wait(nw_state* s,
744
        tw_bf * bf, nw_message * m, tw_lp * lp,
745
        dumpi_req_id completed_req)
746
{
747
748
    (void)bf;

749
750
    struct pending_waits* wait_elem = s->wait_op;
    int wait_completed = 0;
751

752
    m->fwd.wait_completed = 0;
753

754
755
    if(!wait_elem)
        return 0;
756

757
    int op_type = wait_elem->op_type;
758

759
760
761
762
763
    if(op_type == CODES_WK_WAIT &&
            (wait_elem->req_ids[0] == completed_req))
    {
            wait_completed = 1;
    }
764
765
    else if(op_type == CODES_WK_WAITALL
            || op_type == CODES_WK_WAITANY
766
767
768
769
770
771
            || op_type == CODES_WK_WAITSOME)
    {
        int i;
        for(i = 0; i < wait_elem->count; i++)
        {
            if(wait_elem->req_ids[i] == completed_req)
772
            {
773
                wait_elem->num_completed++;
774
                if(wait_elem->num_completed > wait_elem->count)
775
                    printf("\n Num completed %d count %d LP %llu ",
776
777
778
                            wait_elem->num_completed,
                            wait_elem->count,
                            lp->gid);
779
780
//                if(wait_elem->num_completed > wait_elem->count)
//                    tw_lp_suspend(lp, 1, 0);
781

782
                if(wait_elem->num_completed >= wait_elem->count)
783
                {
784
                    if(enable_debug)
785
                        fprintf(workload_log, "\n(%lf) APP ID %d MPI WAITALL COMPLETED AT %llu ", tw_now(lp), s->app_id, s->nw_id);
786
                    wait_completed = 1;
787
                }
788

789
                m->fwd.wait_completed = 1;
790
            }
791
        }
792
    }
793
    return wait_completed;
794
}
795

796
/* reverse handler of MPI wait operation */
797
static void codes_exec_mpi_wait_rc(nw_state* s, tw_bf * bf, tw_lp* lp)
798
{
799
    if(bf->c2)
800
     {
801
802
803
         struct pending_waits * wait_op = s->wait_op;
         free(wait_op);
         s->wait_op = NULL;
804
     }
805
   if(bf->c1)
806
807
    {
        codes_issue_next_event_rc(lp);
808
        completed_requests * qi = rc_stack_pop(s->processed_ops);
809
        qlist_add(&qi->ql, &s->completed_reqs);
810
    }
811
    return;
812
}
813

814
/* execute MPI wait operation */
815
static void codes_exec_mpi_wait(nw_state* s, tw_bf * bf, tw_lp* lp, struct codes_workload_op * mpi_op)
816
{
817
818
    /* check in the completed receives queue if the request ID has already been completed.*/
    assert(!s->wait_op);
819
    dumpi_req_id req_id = mpi_op->u.wait.req_id;
820

821
    struct completed_requests* current = NULL;
822

823
824
825
826
827
828
    struct qlist_head * ent = NULL;
    qlist_for_each(ent, &s->completed_reqs)
    {
        current = qlist_entry(ent, completed_requests, ql);
        if(current->req_id == req_id)
        {
829
            bf->c1=1;
830
            qlist_del(&current->ql);
831
            rc_stack_push(lp, current, free, s->processed_ops);
832
833
834
835
            codes_issue_next_event(lp);
            return;
        }
    }
836
837

    bf->c2 = 1;
838
839
840
841
842
    /* If not, add the wait operation in the pending 'waits' list. */
    struct pending_waits* wait_op = malloc(sizeof(struct pending_waits));
    wait_op->op_type = mpi_op->op_type;
    wait_op->req_ids[0] = req_id;
    wait_op->count = 1;
843
844
    wait_op->num_completed = 0;
    wait_op->start_time = tw_now(lp);
845
    s->wait_op = wait_op;
846

847
    return;
848
849
}

850
static void codes_exec_mpi_wait_all_rc(
851
        nw_state* s,
852
853
        tw_bf * bf,
        nw_message * m,
854
        tw_lp* lp)
855
{
856
857
858
859
860
861
862
863
864
865
866
  if(bf->c1)
  {
    int sampling_indx = s->sampling_indx;
    s->mpi_wkld_samples[sampling_indx].num_waits_sample--;

    if(bf->c2)
    {
        s->cur_interval_end -= sampling_interval;
        s->sampling_indx--;
    }
  }
867
868
869
870
871
872
873
874
  if(s->wait_op)
  {
      struct pending_waits * wait_op = s->wait_op;
      free(wait_op);
      s->wait_op = NULL;
  }
  else
  {
875
      add_completed_reqs(s, lp, m->fwd.num_matched);
876
877
878
      codes_issue_next_event_rc(lp);
  }
  return;
879
}
880

881
static void codes_exec_mpi_wait_all(
882
        nw_state* s,
883
884
        tw_bf * bf,
        nw_message * m,
885
        tw_lp* lp,
886
        struct codes_workload_op * mpi_op)
887
{
888
  if(enable_debug)
889
    fprintf(workload_log, "\n MPI WAITALL POSTED AT %llu ", s->nw_id);
890

891
892
893
894
895
896
897
898
  if(enable_sampling)
  {
    bf->c1 = 1;
    if(tw_now(lp) >= s->cur_interval_end)
    {
        bf->c2 = 1;
        int indx = s->sampling_indx;
        s->mpi_wkld_samples[indx].nw_id = s->nw_id;
899
        s->mpi_wkld_samples[indx].app_id = s->app_id;
900
901
902
903
904
905
        s->mpi_wkld_samples[indx].sample_end_time = s->cur_interval_end;
        s->cur_interval_end += sampling_interval;
        s->sampling_indx++;
    }
    if(s->sampling_indx >= MAX_STATS)
    {
906
        struct mpi_workload_sample * tmp = calloc((MAX_STATS + s->max_arr_size), sizeof(struct mpi_workload_sample));
907
908
909
910
911
912
913
914
        memcpy(tmp, s->mpi_wkld_samples, s->sampling_indx);
        free(s->mpi_wkld_samples);
        s->mpi_wkld_samples = tmp;
        s->max_arr_size += MAX_STATS;
    }
    int indx = s->sampling_indx;
    s->mpi_wkld_samples[indx].num_waits_sample++;
  }
915
  int count = mpi_op->u.waits.count;
916
917
  /* If the count is not less than max wait reqs then stop */
  assert(count < MAX_WAIT_REQS);
918

919
  int i = 0, num_matched = 0;
920
  m->fwd.num_matched = 0;
921

922
  if(lp->gid == TRACK_LP)
923
  {
924
      printf("\n MPI Wait all posted ");
925
926
      print_waiting_reqs(mpi_op->u.waits.req_ids, count);
      print_completed_queue(&s->completed_reqs);
927
  }
928
      /* check number of completed irecvs in the completion queue */
929
930
931
932
933
934
935
  for(i = 0; i < count; i++)
  {
      dumpi_req_id req_id = mpi_op->u.waits.req_ids[i];
      struct qlist_head * ent = NULL;
      struct completed_requests* current = NULL;
      qlist_for_each(ent, &s->completed_reqs)
       {
936
            current = qlist_entry(ent, struct completed_requests, ql);
937
938
939
940
            if(current->req_id == req_id)
                num_matched++;
       }
  }
941

942
  m->fwd.found_match = num_matched;
943
944
945
946
  if(num_matched == count)
  {
    /* No need to post a MPI Wait all then, issue next event */
      /* Remove all completed requests from the list */
947
948
949
      m->fwd.num_matched = clear_completed_reqs(s, lp, mpi_op->u.waits.req_ids, count);
      struct pending_waits* wait_op = s->wait_op;
      free(wait_op);
950
951
      s->wait_op = NULL;
      codes_issue_next_event(lp);
952
953
  }
  else
954
955
956
957
958
959
960
961
962
963
964
  {
      /* If not, add the wait operation in the pending 'waits' list. */
	  struct pending_waits* wait_op = malloc(sizeof(struct pending_waits));
	  wait_op->count = count;
      wait_op->op_type = mpi_op->op_type;
      assert(count < MAX_WAIT_REQS);

      for(i = 0; i < count; i++)
          wait_op->req_ids[i] =  mpi_op->u.waits.req_ids[i];

	  wait_op->num_completed = num_matched;
965
	  wait_op->start_time = tw_now(lp);
966
      s->wait_op = wait_op;
967
  }
968
969
  return;
}
970

971
972
/* search for a matching mpi operation and remove it from the list.
 * Record the index in the list from where the element got deleted.
973
 * Index is used for inserting the element once again in the queue for reverse computation. */
974
static int rm_matching_rcv(nw_state * ns,
975
        tw_bf * bf,
976
977
        nw_message * m,
        tw_lp * lp,
978
        mpi_msgs_queue * qitem)
979
980
{
    int matched = 0;
981
    int index = 0;
982
983
    struct qlist_head *ent = NULL;
    mpi_msgs_queue * qi = NULL;
984

985
986
    qlist_for_each(ent, &ns->pending_recvs_queue){
        qi = qlist_entry(ent, mpi_msgs_queue, ql);
987
988
        if(//(qi->num_bytes == qitem->num_bytes)
                //&& 
989
               ((qi->tag == qitem->tag) || qi->tag == -1)
990
                && ((qi->source_rank == qitem->source_rank) || qi->source_rank == -1))
991
992
        {
            matched = 1;
993
            qitem->num_bytes = qi->num_bytes;
994
995
            break;
        }
996
        ++index;
997
    }
998

999
1000
    if(matched)
    {
For faster browsing, not all history is shown. View entire blame