model-net-mpi-replay.c 74.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/*
 * Copyright (C) 2014 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */
#include <ross.h>
#include <inttypes.h>

#include "codes/codes-workload.h"
#include "codes/codes.h"
#include "codes/configuration.h"
#include "codes/codes_mapping.h"
#include "codes/model-net.h"
#include "codes/rc-stack.h"
#include "codes/quicklist.h"
16
#include "codes/quickhash.h"
17
#include "codes/codes-jobmap.h"
18

19
/* turning on track lp will generate a lot of output messages */
20
#define MN_LP_NM "modelnet_dragonfly_custom"
21

22
#define CONTROL_MSG_SZ 64
23
#define TRACK_LP -1
24
#define TRACE -1
25
#define MAX_WAIT_REQS 512
26
#define CS_LP_DBG 0
27
#define EAGER_THRESHOLD 81920000
28
29
30
#define RANK_HASH_TABLE_SZ 2000
#define NOISE 3.0
#define NW_LP_NM "nw-lp"
31
32
33
#define lprintf(_fmt, ...) \
        do {if (CS_LP_DBG) printf(_fmt, __VA_ARGS__);} while (0)
#define MAX_STATS 65536
34
#define PAYLOAD_SZ 1024
35

36
37
38
static int msg_size_hash_compare(
            void *key, struct qhash_head *link);

39
int enable_msg_tracking = 0;
40
41

int unmatched = 0;
42
43
44
45
46
char workload_type[128];
char workload_file[8192];
char offset_file[8192];
static int wrkld_id;
static int num_net_traces = 0;
47
48
static int num_dumpi_traces = 0;

49
static int alloc_spec = 0;
50
51
static tw_stime self_overhead = 10.0;
static tw_stime mean_interval = 100000;
52
53
54
55
56
57
58

/* Doing LP IO*/
static char lp_io_dir[256] = {'\0'};
static lp_io_handle io_handle;
static unsigned int lp_io_use_suffix = 0;
static int do_lp_io = 0;

59
60
61
62
/* variables for loading multiple applications */
char workloads_conf_file[8192];
char alloc_file[8192];
int num_traces_of_job[5];
63
64
65
tw_stime soft_delay_mpi = 2500;
tw_stime nic_delay = 1000;
tw_stime copy_per_byte_eager = 0.55;
66
67
68
69
70
char file_name_of_job[5][8192];

struct codes_jobmap_ctx *jobmap_ctx;
struct codes_jobmap_params_list jobmap_p;

71
72
/* Variables for Cortex Support */
/* Matthieu's additions start */
73
#ifdef ENABLE_CORTEX_PYTHON
74
75
76
static char cortex_file[512] = "\0";
static char cortex_class[512] = "\0";
static char cortex_gen[512] = "\0";
77
#endif
78
79
/* Matthieu's additions end */

80
81
typedef struct nw_state nw_state;
typedef struct nw_message nw_message;
82
typedef int32_t dumpi_req_id;
83
84
85

static int net_id = 0;
static float noise = 5.0;
86
87
88
static int num_nw_lps = 0, num_mpi_lps = 0;

static int num_syn_clients;
89

90
FILE * workload_log = NULL;
91
FILE * msg_size_log = NULL;
92
93
94
95
FILE * workload_agg_log = NULL;
FILE * workload_meta_log = NULL;

static uint64_t sample_bytes_written = 0;
96

97
98
99
long long num_bytes_sent=0;
long long num_bytes_recvd=0;

100
101
102
long long num_syn_bytes_sent = 0;
long long num_syn_bytes_recvd = 0;

103
104
105
106
107
108
109
110
111
double max_time = 0,  max_comm_time = 0, max_wait_time = 0, max_send_time = 0, max_recv_time = 0;
double avg_time = 0, avg_comm_time = 0, avg_wait_time = 0, avg_send_time = 0, avg_recv_time = 0;

/* global variables for codes mapping */
static char lp_group_name[MAX_NAME_LENGTH], lp_type_name[MAX_NAME_LENGTH], annotation[MAX_NAME_LENGTH];
static int mapping_grp_id, mapping_type_id, mapping_rep_id, mapping_offset;

/* runtime option for disabling computation time simulation */
static int disable_delay = 0;
112
113
114
static int enable_sampling = 0;
static double sampling_interval = 5000000;
static double sampling_end_time = 3000000000;
115
static int enable_debug = 0;
116

117
118
119
/* set group context */
struct codes_mctx group_ratio;

120
/* MPI_OP_GET_NEXT is for getting next MPI operation when the previous operation completes.
121
* MPI_SEND_ARRIVED is issued when a MPI message arrives at its destination (the message is transported by model-net and an event is invoked when it arrives.
122
123
124
125
126
127
128
* MPI_SEND_POSTED is issued when a MPI message has left the source LP (message is transported via model-net). */
enum MPI_NW_EVENTS
{
	MPI_OP_GET_NEXT=1,
	MPI_SEND_ARRIVED,
    MPI_SEND_ARRIVED_CB, // for tracking message times on sender
	MPI_SEND_POSTED,
129
130
131
132
133
134
    MPI_REND_ARRIVED,
    MPI_REND_ACK_ARRIVED,
    CLI_BCKGND_FIN,
    CLI_BCKGND_ARRIVE,
    CLI_BCKGND_GEN,
    CLI_NBR_FINISH,
135
136
};

137
138
139
140
struct mpi_workload_sample
{
    /* Sampling data */
    int nw_id;
141
    int app_id;
142
143
144
145
146
    unsigned long num_sends_sample;
    unsigned long num_bytes_sample;
    unsigned long num_waits_sample;
    double sample_end_time;
};
147
148
149
150
151
152
153
/* stores pointers of pending MPI operations to be matched with their respective sends/receives. */
struct mpi_msgs_queue
{
    int op_type;
    int tag;
    int source_rank;
    int dest_rank;
154
    uint64_t num_bytes;
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
    tw_stime req_init_time;
	dumpi_req_id req_id;
    struct qlist_head ql;
};

/* stores request IDs of completed MPI operations (Isends or Irecvs) */
struct completed_requests
{
	dumpi_req_id req_id;
    struct qlist_head ql;
};

/* for wait operations, store the pending operation and number of completed waits so far. */
struct pending_waits
{
    int op_type;
171
    int32_t req_ids[MAX_WAIT_REQS];
172
	int num_completed;
173
174
	int count;
    tw_stime start_time;
175
176
177
    struct qlist_head ql;
};

178
179
180
181
182
183
184
185
186
struct msg_size_info
{
    int64_t msg_size;
    int num_msgs;
    tw_stime agg_latency;
    tw_stime avg_latency;
    struct qhash_head * hash_link;
    struct qlist_head ql; 
};
187
188
189
190
191
192
193
194
195
196
typedef struct mpi_msgs_queue mpi_msgs_queue;
typedef struct completed_requests completed_requests;
typedef struct pending_waits pending_waits;

/* state of the network LP. It contains the pointers to send/receive lists */
struct nw_state
{
	long num_events_per_lp;
	tw_lpid nw_id;
	short wrkld_end;
197
198
    int app_id;
    int local_rank;
199

200
201
202
    int is_finished;
    int neighbor_completed;

203
    struct rc_stack * processed_ops;
204
    struct rc_stack * matched_reqs;
205
206
207
208
209
210
211
212
213
214

    /* count of sends, receives, collectives and delays */
	unsigned long num_sends;
	unsigned long num_recvs;
	unsigned long num_cols;
	unsigned long num_delays;
	unsigned long num_wait;
	unsigned long num_waitall;
	unsigned long num_waitsome;

215

216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
	/* time spent by the LP in executing the app trace*/
	double start_time;
	double elapsed_time;
	/* time spent in compute operations */
	double compute_time;
	/* time spent in message send/isend */
	double send_time;
	/* time spent in message receive */
	double recv_time;
	/* time spent in wait operation */
	double wait_time;
	/* FIFO for isend messages arrived on destination */
	struct qlist_head arrival_queue;
	/* FIFO for irecv messages posted but not yet matched with send operations */
	struct qlist_head pending_recvs_queue;
	/* List of completed send/receive requests */
	struct qlist_head completed_reqs;
233

234
235
    tw_stime cur_interval_end;

236
237
    /* Pending wait operation */
    struct pending_waits * wait_op;
238

239
240
241
242
243
244
    /* Message size latency information */
    struct qhash_table * msg_sz_table;
    struct qlist_head msg_sz_list;

    /* quick hash for maintaining message latencies */

245
246
247
    unsigned long num_bytes_sent;
    unsigned long num_bytes_recvd;

248
249
250
    unsigned long syn_data;
    unsigned long gen_data;
    
251
252
253
254
    /* For sampling data */
    int sampling_indx;
    int max_arr_size;
    struct mpi_workload_sample * mpi_wkld_samples;
255
    char output_buf[512];
256
257
258
259
};

/* data for handling reverse computation.
* saved_matched_req holds the request ID of matched receives/sends for wait operations.
260
* ptr_match_op holds the matched MPI operation which are removed from the queues when a send is matched with the receive in forward event handler.
261
262
263
* network event being sent. op is the MPI operation issued by the network workloads API. rv_data holds the data for reverse computation (TODO: Fill this data structure only when the simulation runs in optimistic mode). */
struct nw_message
{
264
   // forward message handler
265
   int msg_type;
266
   int op_type;
267
   model_net_event_return event_rc;
268

269
270
271
   struct
   {
       tw_lpid src_rank;
272
       int dest_rank;
273
       int64_t num_bytes;
274
275
276
277
278
       int num_matched;
       int data_type;
       double sim_start_time;
       // for callbacks - time message was received
       double msg_send_time;
279
       int16_t req_id;
280
       int tag;
281
       int app_id;
282
283
284
285
286
287
288
289
290
       int found_match;
       short wait_completed;
   } fwd;
   struct
   {
       double saved_send_time;
       double saved_recv_time;
       double saved_wait_time;
       double saved_delay;
291
       int16_t saved_num_bytes;
292
       struct codes_workload_op * saved_op;
293
   } rc;
294
295
};

296
static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, mpi_msgs_queue * mpi_op);
297
298

static void send_ack_back_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp);
299
300
/* executes MPI isend and send operations */
static void codes_exec_mpi_send(
301
        nw_state* s, tw_bf * bf, nw_message * m, tw_lp* lp, struct codes_workload_op * mpi_op, int is_rend);
302
303
/* execute MPI irecv operation */
static void codes_exec_mpi_recv(
304
        nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, struct codes_workload_op * mpi_op);
305
306
/* reverse of mpi recv function. */
static void codes_exec_mpi_recv_rc(
307
        nw_state* s, tw_bf * bf, nw_message* m, tw_lp* lp);
308
309
/* execute the computational delay */
static void codes_exec_comp_delay(
310
        nw_state* s, nw_message * m, tw_lp* lp, struct codes_workload_op * mpi_op);
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
/* gets the next MPI operation from the network-workloads API. */
static void get_next_mpi_operation(
        nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp);
/* reverse handler of get next mpi operation. */
static void get_next_mpi_operation_rc(
        nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp);
/* Makes a call to get_next_mpi_operation. */
static void codes_issue_next_event(tw_lp* lp);
/* reverse handler of next operation */
static void codes_issue_next_event_rc(tw_lp* lp);


///////////////////// HELPER FUNCTIONS FOR MPI MESSAGE QUEUE HANDLING ///////////////
/* upon arrival of local completion message, inserts operation in completed send queue */
/* upon arrival of an isend operation, updates the arrival queue of the network */
326
327
328
329
330
331
332
333
static void update_completed_queue(
        nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp, dumpi_req_id req_id);
/* reverse of the above function */
static void update_completed_queue_rc(
        nw_state*s,
        tw_bf * bf,
        nw_message * m,
        tw_lp * lp);
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
static void update_arrival_queue(
        nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
/* reverse of the above function */
static void update_arrival_queue_rc(
        nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
/* callback to a message sender for computing message time */
static void update_message_time(
        nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
/* reverse for computing message time */
static void update_message_time_rc(
        nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);

/* conversion from seconds to eanaoseconds */
static tw_stime s_to_ns(tw_stime ns);

349
350
351
352
353
354
355
356
static void update_message_size_rc(
        struct nw_state * ns,
        tw_lp * lp,
        tw_bf * bf,
        struct nw_message * m)
{

}
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
/* update the message size */
static void update_message_size(
        struct nw_state * ns,
        tw_lp * lp,
        tw_bf * bf,
        struct nw_message * m,
        mpi_msgs_queue * qitem,
        int is_eager,
        int is_send)
{
            struct qhash_head * hash_link = NULL;
            tw_stime msg_init_time = qitem->req_init_time;
        
            if(!ns->msg_sz_table)
                ns->msg_sz_table = qhash_init(msg_size_hash_compare, quickhash_64bit_hash, RANK_HASH_TABLE_SZ); 
            
            hash_link = qhash_search(ns->msg_sz_table, &(qitem->num_bytes));

            if(is_send)
                msg_init_time = m->fwd.sim_start_time;
            
            /* update hash table */
            if(!hash_link)
            {
                struct msg_size_info * msg_info = malloc(sizeof(struct msg_size_info));
                msg_info->msg_size = qitem->num_bytes;
                msg_info->num_msgs = 1;
384
                msg_info->agg_latency = tw_now(lp) - msg_init_time;
385
386
387
388
389
390
391
392
393
                msg_info->avg_latency = msg_info->agg_latency;
                qhash_add(ns->msg_sz_table, &(msg_info->msg_size), &(msg_info->hash_link));
                qlist_add(&msg_info->ql, &ns->msg_sz_list);
                //printf("\n Msg size %d aggregate latency %f num messages %d ", m->fwd.num_bytes, msg_info->agg_latency, msg_info->num_msgs);
            }
            else
            {
                struct msg_size_info * tmp = qhash_entry(hash_link, struct msg_size_info, hash_link);
                tmp->num_msgs++;
394
                tmp->agg_latency += tw_now(lp) - msg_init_time;  
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
                tmp->avg_latency = (tmp->agg_latency / tmp->num_msgs);
//                printf("\n Msg size %d aggregate latency %f num messages %d ", qitem->num_bytes, tmp->agg_latency, tmp->num_msgs);
            }
}
static void notify_background_traffic_rc(
	    struct nw_state * ns,
        tw_lp * lp,
        tw_bf * bf,
        struct nw_message * m)
{
    tw_rand_reverse_unif(lp->rng); 
}

static void notify_background_traffic(
	    struct nw_state * ns,
        tw_lp * lp,
        tw_bf * bf,
        struct nw_message * m)
{
        struct codes_jobmap_id jid; 
        jid = codes_jobmap_to_local_id(ns->nw_id, jobmap_ctx);
        
        int num_jobs = codes_jobmap_get_num_jobs(jobmap_ctx); 
        
        for(int other_id = 0; other_id < num_jobs; other_id++)
        {
            if(other_id == jid.job)
                continue;

            struct codes_jobmap_id other_jid;
            other_jid.job = other_id;

            int num_other_ranks = codes_jobmap_get_num_ranks(other_id, jobmap_ctx);

            lprintf("\n Other ranks %ld ", num_other_ranks);
            tw_stime ts = (1.1 * g_tw_lookahead) + tw_rand_exponential(lp->rng, mean_interval/10000);
            tw_lpid global_dest_id;
     
            for(int k = 0; k < num_other_ranks; k++)    
            {
                other_jid.rank = k;
                int intm_dest_id = codes_jobmap_to_global_id(other_jid, jobmap_ctx); 
                global_dest_id = codes_mapping_get_lpid_from_relative(intm_dest_id, NULL, NW_LP_NM, NULL, 0);

                tw_event * e;
                struct nw_message * m_new;  
                e = tw_event_new(global_dest_id, ts, lp);
                m_new = tw_event_data(e);
                m_new->msg_type = CLI_BCKGND_FIN;
                tw_event_send(e);   
            }
        }
        return;
}
static void notify_neighbor_rc(
	    struct nw_state * ns,
        tw_lp * lp,
        tw_bf * bf,
        struct nw_message * m)
{
       if(bf->c0)
       {
            notify_background_traffic_rc(ns, lp, bf, m);
            return;
       }
   
       if(bf->c1)
       {
          tw_rand_reverse_unif(lp->rng); 
       }
} 
static void notify_neighbor(
	    struct nw_state * ns,
        tw_lp * lp,
        tw_bf * bf,
        struct nw_message * m)
{
    if(ns->local_rank == num_dumpi_traces - 1 
            && ns->is_finished == 1
            && ns->neighbor_completed == 1)
    {
        printf("\n All workloads completed, notifying background traffic ");
        bf->c0 = 1;
        notify_background_traffic(ns, lp, bf, m);
        return;
    }
    
    struct codes_jobmap_id nbr_jid;
    nbr_jid.job = ns->app_id;
    tw_lpid global_dest_id;

    if(ns->is_finished == 1 && (ns->neighbor_completed == 1 || ns->local_rank == 0))
    {
        bf->c1 = 1;

        printf("\n Local rank %d notifying neighbor %d ", ns->local_rank, ns->local_rank+1);
        tw_stime ts = (1.1 * g_tw_lookahead) + tw_rand_exponential(lp->rng, mean_interval/10000);
        nbr_jid.rank = ns->local_rank + 1;
        
        /* Send a notification to the neighbor about completion */
        int intm_dest_id = codes_jobmap_to_global_id(nbr_jid, jobmap_ctx); 
        global_dest_id = codes_mapping_get_lpid_from_relative(intm_dest_id, NULL, NW_LP_NM, NULL, 0);
       
        tw_event * e;
        struct nw_message * m_new;  
        e = tw_event_new(global_dest_id, ts, lp);
        m_new = tw_event_data(e); 
        m_new->msg_type = CLI_NBR_FINISH;
        tw_event_send(e);   
    }
}
void finish_bckgnd_traffic_rc(
    struct nw_state * ns,
    tw_bf * b,
    struct nw_message * msg,
    tw_lp * lp)
{
        ns->is_finished = 0;
        return;
}
void finish_bckgnd_traffic(
    struct nw_state * ns,
    tw_bf * b,
    struct nw_message * msg,
    tw_lp * lp)
{
        ns->is_finished = 1;
        lprintf("\n LP %llu completed sending data %lld completed at time %lf ", lp->gid, ns->gen_data, tw_now(lp));
        return;
}

void finish_nbr_wkld_rc(
    struct nw_state * ns,
    tw_bf * b,
    struct nw_message * msg,
    tw_lp * lp)
{
    ns->neighbor_completed = 0;
    
    notify_neighbor_rc(ns, lp, b, msg);
}

void finish_nbr_wkld(
    struct nw_state * ns,
    tw_bf * b,
    struct nw_message * msg,
    tw_lp * lp)
{
    printf("\n Workload completed, notifying neighbor ");
    ns->neighbor_completed = 1;

    notify_neighbor(ns, lp, b, msg);
}
static void gen_synthetic_tr_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
    if(bf->c0)
        return;

    model_net_event_rc2(lp, &m->event_rc);
    s->gen_data -= PAYLOAD_SZ;

    num_syn_bytes_sent -= PAYLOAD_SZ;
    tw_rand_reverse_unif(lp->rng);
    tw_rand_reverse_unif(lp->rng);

}

/* generate synthetic traffic */
static void gen_synthetic_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
    if(s->is_finished == 1)
    {
        bf->c0 = 1;
        return;
    }

    /* Get job information */
    tw_lpid global_dest_id;

    struct codes_jobmap_id jid;
    jid = codes_jobmap_to_local_id(s->nw_id, jobmap_ctx); 

    int num_clients = codes_jobmap_get_num_ranks(jid.job, jobmap_ctx);
    int dest_svr = tw_rand_integer(lp->rng, 0, num_clients - 1);

    if(dest_svr == s->local_rank)
    {
       dest_svr = (s->local_rank + 1) % num_clients;
    }
   
    jid.rank = dest_svr;

    int intm_dest_id = codes_jobmap_to_global_id(jid, jobmap_ctx); 
    global_dest_id = codes_mapping_get_lpid_from_relative(intm_dest_id, NULL, NW_LP_NM, NULL, 0);

    nw_message remote_m;
    remote_m.fwd.sim_start_time = tw_now(lp);
    remote_m.fwd.dest_rank = dest_svr;
    remote_m.msg_type = CLI_BCKGND_ARRIVE;
    remote_m.fwd.num_bytes = PAYLOAD_SZ;
    remote_m.fwd.app_id = s->app_id;
    remote_m.fwd.src_rank = s->local_rank;

    m->event_rc = model_net_event(net_id, "synthetic-tr", global_dest_id, PAYLOAD_SZ, 0.0, 
            sizeof(nw_message), (const void*)&remote_m, 
            0, NULL, lp);
    
    s->gen_data += PAYLOAD_SZ;
    num_syn_bytes_sent += PAYLOAD_SZ; 

    /* New event after MEAN_INTERVAL */  
    tw_stime ts = mean_interval  + tw_rand_exponential(lp->rng, NOISE); 
    tw_event * e;
    nw_message * m_new;
    e = tw_event_new(lp->gid, ts, lp);
    m_new = tw_event_data(e);
    m_new->msg_type = CLI_BCKGND_GEN;
    tw_event_send(e);
}

void arrive_syn_tr_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
//    printf("\n Data arrived %d total data %ld ", m->fwd.num_bytes, s->syn_data);
    int data = m->fwd.num_bytes;
    s->syn_data -= data;
    num_syn_bytes_recvd -= data;
}
void arrive_syn_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
//    printf("\n Data arrived %d total data %ld ", m->fwd.num_bytes, s->syn_data);
    int data = m->fwd.num_bytes;
    s->syn_data += data;
    num_syn_bytes_recvd += data;
}
629
/* Debugging functions, may generate unused function warning */
630
static void print_waiting_reqs(int32_t * reqs, int count)
631
632
633
634
{
    printf("\n Waiting reqs: ");
    int i;
    for(i = 0; i < count; i++ )
635
        printf(" %d ", reqs[i]);
636
}
637
638
639
640
641
642
643
644
645
646
647
648
static void print_msgs_queue(struct qlist_head * head, int is_send)
{
    if(is_send)
        printf("\n Send msgs queue: ");
    else
        printf("\n Recv msgs queue: ");

    struct qlist_head * ent = NULL;
    mpi_msgs_queue * current = NULL;
    qlist_for_each(ent, head)
       {
            current = qlist_entry(ent, mpi_msgs_queue, ql);
649
            printf(" \n Source %d Dest %d bytes %llu tag %d ", current->source_rank, current->dest_rank, current->num_bytes, current->tag);
650
651
       }
}
652
653
654
655
656
657
658
659
static void print_completed_queue(struct qlist_head * head)
{
    printf("\n Completed queue: ");
      struct qlist_head * ent = NULL;
      struct completed_requests* current = NULL;
      qlist_for_each(ent, head)
       {
            current = qlist_entry(ent, completed_requests, ql);
660
            printf(" %d ", current->req_id);
661
662
       }
}
663
static int clear_completed_reqs(nw_state * s,
664
        tw_lp * lp,
665
        int32_t * reqs, int count)
666
{
667
    int i, matched = 0;
668

669
670
671
    for( i = 0; i < count; i++)
    {
      struct qlist_head * ent = NULL;
672
673
674
      struct completed_requests * current = NULL;
      struct completed_requests * prev = NULL;

675
676
      qlist_for_each(ent, &s->completed_reqs)
       {
677
678
679
680
681
            current = qlist_entry(ent, completed_requests, ql);
            
            if(prev)
              rc_stack_push(lp, prev, free, s->matched_reqs);
            
682
683
            if(current->req_id == reqs[i])
            {
684
                ++matched;
685
                qlist_del(&current->ql);
686
                prev = current;
687
            }
688
689
            else
                prev = NULL;
690
       }
691
692
693

      if(prev)
          rc_stack_push(lp, prev, free, s->matched_reqs);
694
    }
695
    return matched;
696
}
697
static void add_completed_reqs(nw_state * s,
698
699
        tw_lp * lp,
        int count)
700
701
702
703
{
    int i;
    for( i = 0; i < count; i++)
    {
704
705
       struct completed_requests * req = rc_stack_pop(s->matched_reqs);
       qlist_add(&req->ql, &s->completed_reqs);
706
707
    }
}
708

709
710
711
712
713
714
/* helper function - maps an MPI rank to an LP id */
static tw_lpid rank_to_lpid(int rank)
{
    return codes_mapping_get_lpid_from_relative(rank, NULL, "nw-lp", NULL, 0);
}

715
static int notify_posted_wait(nw_state* s,
716
        tw_bf * bf, nw_message * m, tw_lp * lp,
717
        dumpi_req_id completed_req)
718
{
719
720
    struct pending_waits* wait_elem = s->wait_op;
    int wait_completed = 0;
721

722
    m->fwd.wait_completed = 0;
723

724
725
    if(!wait_elem)
        return 0;
726

727
    int op_type = wait_elem->op_type;
728

729
730
731
732
733
    if(op_type == CODES_WK_WAIT &&
            (wait_elem->req_ids[0] == completed_req))
    {
            wait_completed = 1;
    }
734
735
    else if(op_type == CODES_WK_WAITALL
            || op_type == CODES_WK_WAITANY
736
737
738
739
740
741
            || op_type == CODES_WK_WAITSOME)
    {
        int i;
        for(i = 0; i < wait_elem->count; i++)
        {
            if(wait_elem->req_ids[i] == completed_req)
742
            {
743
                wait_elem->num_completed++;
744
                if(wait_elem->num_completed > wait_elem->count)
745
                    printf("\n Num completed %d count %d LP %llu ",
746
747
748
                            wait_elem->num_completed,
                            wait_elem->count,
                            lp->gid);
749
750
//                if(wait_elem->num_completed > wait_elem->count)
//                    tw_lp_suspend(lp, 1, 0);
751

752
                if(wait_elem->num_completed == wait_elem->count)
753
                {
754
                    if(enable_debug)
755
                        fprintf(workload_log, "\n(%lf) APP ID %d MPI WAITALL COMPLETED AT %llu ", tw_now(lp), s->app_id, s->nw_id);
756
                    wait_completed = 1;
757
                }
758

759
                m->fwd.wait_completed = 1;
760
            }
761
        }
762
    }
763
    return wait_completed;
764
}
765

766
/* reverse handler of MPI wait operation */
767
static void codes_exec_mpi_wait_rc(nw_state* s, tw_lp* lp)
768
{
769
    if(s->wait_op)
770
     {
771
772
773
         struct pending_waits * wait_op = s->wait_op;
         free(wait_op);
         s->wait_op = NULL;
774
775
776
777
     }
   else
    {
        codes_issue_next_event_rc(lp);
778
        completed_requests * qi = rc_stack_pop(s->processed_ops);
779
        qlist_add(&qi->ql, &s->completed_reqs);
780
    }
781
    return;
782
}
783

784
/* execute MPI wait operation */
785
static void codes_exec_mpi_wait(nw_state* s, tw_lp* lp, struct codes_workload_op * mpi_op)
786
{
787
788
    /* check in the completed receives queue if the request ID has already been completed.*/
    assert(!s->wait_op);
789
    dumpi_req_id req_id = mpi_op->u.wait.req_id;
790
    struct completed_requests* current = NULL;
791

792
793
794
795
796
797
798
    struct qlist_head * ent = NULL;
    qlist_for_each(ent, &s->completed_reqs)
    {
        current = qlist_entry(ent, completed_requests, ql);
        if(current->req_id == req_id)
        {
            qlist_del(&current->ql);
799
            rc_stack_push(lp, current, free, s->processed_ops);
800
801
802
803
            codes_issue_next_event(lp);
            return;
        }
    }
804
805
806
807
808
    /* If not, add the wait operation in the pending 'waits' list. */
    struct pending_waits* wait_op = malloc(sizeof(struct pending_waits));
    wait_op->op_type = mpi_op->op_type;
    wait_op->req_ids[0] = req_id;
    wait_op->count = 1;
809
810
    wait_op->num_completed = 0;
    wait_op->start_time = tw_now(lp);
811
    s->wait_op = wait_op;
812

813
    return;
814
815
}

816
static void codes_exec_mpi_wait_all_rc(
817
        nw_state* s,
818
819
        tw_bf * bf,
        nw_message * m,
820
        tw_lp* lp)
821
{
822
823
824
825
826
827
828
829
830
831
832
  if(bf->c1)
  {
    int sampling_indx = s->sampling_indx;
    s->mpi_wkld_samples[sampling_indx].num_waits_sample--;

    if(bf->c2)
    {
        s->cur_interval_end -= sampling_interval;
        s->sampling_indx--;
    }
  }
833
834
835
836
837
838
839
840
  if(s->wait_op)
  {
      struct pending_waits * wait_op = s->wait_op;
      free(wait_op);
      s->wait_op = NULL;
  }
  else
  {
841
      add_completed_reqs(s, lp, m->fwd.num_matched);
842
843
844
      codes_issue_next_event_rc(lp);
  }
  return;
845
}
846

847
static void codes_exec_mpi_wait_all(
848
        nw_state* s,
849
850
        tw_bf * bf,
        nw_message * m,
851
        tw_lp* lp,
852
        struct codes_workload_op * mpi_op)
853
{
854
  if(enable_debug)
855
    fprintf(workload_log, "\n MPI WAITALL POSTED AT %llu ", s->nw_id);
856

857
858
859
860
861
862
863
864
  if(enable_sampling)
  {
    bf->c1 = 1;
    if(tw_now(lp) >= s->cur_interval_end)
    {
        bf->c2 = 1;
        int indx = s->sampling_indx;
        s->mpi_wkld_samples[indx].nw_id = s->nw_id;
865
        s->mpi_wkld_samples[indx].app_id = s->app_id;
866
867
868
869
870
871
        s->mpi_wkld_samples[indx].sample_end_time = s->cur_interval_end;
        s->cur_interval_end += sampling_interval;
        s->sampling_indx++;
    }
    if(s->sampling_indx >= MAX_STATS)
    {
872
        struct mpi_workload_sample * tmp = calloc((MAX_STATS + s->max_arr_size), sizeof(struct mpi_workload_sample));
873
874
875
876
877
878
879
880
        memcpy(tmp, s->mpi_wkld_samples, s->sampling_indx);
        free(s->mpi_wkld_samples);
        s->mpi_wkld_samples = tmp;
        s->max_arr_size += MAX_STATS;
    }
    int indx = s->sampling_indx;
    s->mpi_wkld_samples[indx].num_waits_sample++;
  }
881
  int count = mpi_op->u.waits.count;
882
883
  /* If the count is not less than max wait reqs then stop */
  assert(count < MAX_WAIT_REQS);
884

885
  int i = 0, num_matched = 0;
886
  m->fwd.num_matched = 0;
887

888
  /*if(lp->gid == TRACK)
889
  {
890
      printf("\n MPI Wait all posted ");
891
892
      print_waiting_reqs(mpi_op->u.waits.req_ids, count);
      print_completed_queue(&s->completed_reqs);
893
  }*/
894
      /* check number of completed irecvs in the completion queue */
895
896
897
898
899
900
901
  for(i = 0; i < count; i++)
  {
      dumpi_req_id req_id = mpi_op->u.waits.req_ids[i];
      struct qlist_head * ent = NULL;
      struct completed_requests* current = NULL;
      qlist_for_each(ent, &s->completed_reqs)
       {
902
            current = qlist_entry(ent, struct completed_requests, ql);
903
904
905
906
            if(current->req_id == req_id)
                num_matched++;
       }
  }
907

908
  m->fwd.found_match = num_matched;
909
910
911
912
  if(num_matched == count)
  {
    /* No need to post a MPI Wait all then, issue next event */
      /* Remove all completed requests from the list */
913
914
915
      m->fwd.num_matched = clear_completed_reqs(s, lp, mpi_op->u.waits.req_ids, count);
      struct pending_waits* wait_op = s->wait_op;
      free(wait_op);
916
917
      s->wait_op = NULL;
      codes_issue_next_event(lp);
918
919
  }
  else
920
921
922
923
924
925
926
927
928
929
930
  {
      /* If not, add the wait operation in the pending 'waits' list. */
	  struct pending_waits* wait_op = malloc(sizeof(struct pending_waits));
	  wait_op->count = count;
      wait_op->op_type = mpi_op->op_type;
      assert(count < MAX_WAIT_REQS);

      for(i = 0; i < count; i++)
          wait_op->req_ids[i] =  mpi_op->u.waits.req_ids[i];

	  wait_op->num_completed = num_matched;
931
	  wait_op->start_time = tw_now(lp);
932
      s->wait_op = wait_op;
933
  }
934
935
  return;
}
936

937
938
/* search for a matching mpi operation and remove it from the list.
 * Record the index in the list from where the element got deleted.
939
 * Index is used for inserting the element once again in the queue for reverse computation. */
940
static int rm_matching_rcv(nw_state * ns,
941
        tw_bf * bf,
942
943
        nw_message * m,
        tw_lp * lp,
944
        mpi_msgs_queue * qitem)
945
946
{
    int matched = 0;
947
    int index = 0;
948
949
    struct qlist_head *ent = NULL;
    mpi_msgs_queue * qi = NULL;
950

951
952
    qlist_for_each(ent, &ns->pending_recvs_queue){
        qi = qlist_entry(ent, mpi_msgs_queue, ql);
953
954
        if(//(qi->num_bytes == qitem->num_bytes)
                //&& 
955
               ((qi->tag == qitem->tag) || qi->tag == -1)
956
                && ((qi->source_rank == qitem->source_rank) || qi->source_rank == -1))
957
958
        {
            matched = 1;
959
            //qitem->num_bytes = qi->num_bytes;
960
961
            break;
        }
962
        ++index;
963
    }
964

965
966
    if(matched)
    {
967
968
969
970
971
972
973
        if(enable_msg_tracking && qitem->num_bytes < EAGER_THRESHOLD)
        {
            update_message_size(ns, lp, bf, m, qitem, 1, 1);
        }
        if(qitem->num_bytes >= EAGER_THRESHOLD)
        {
            /* Matching receive found, need to notify the sender to transmit
974
975
             * the data * (only works in sequential mode)*/
            bf->c10 = 1;
976
977
            send_ack_back(ns, bf, m, lp, qitem);
        }
978
        m->rc.saved_recv_time = ns->recv_time;
979
        ns->recv_time += (tw_now(lp) - m->fwd.sim_start_time);
980

981
982
        if(qi->op_type == CODES_WK_IRECV)
            update_completed_queue(ns, bf, m, lp, qi->req_id);
983
984
        else if(qi->op_type == CODES_WK_RECV)
            codes_issue_next_event(lp);
985

986
        qlist_del(&qi->ql);
987

988
        rc_stack_push(lp, qi, free, ns->processed_ops);
989
        return index;
990
991
992
993
    }
    return -1;
}

994
static int rm_matching_send(nw_state * ns,
995
996
997
        tw_bf * bf,
        nw_message * m,
        tw_lp * lp, mpi_msgs_queue * qitem)
998
999
1000
{
    int matched = 0;
    struct qlist_head *ent = NULL;
For faster browsing, not all history is shown. View entire blame