model-net-mpi-replay.c 51.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/*
 * Copyright (C) 2014 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */
#include <ross.h>
#include <inttypes.h>

#include "codes/codes-workload.h"
#include "codes/codes.h"
#include "codes/configuration.h"
#include "codes/codes_mapping.h"
#include "codes/model-net.h"
#include "codes/rc-stack.h"
#include "codes/quicklist.h"
16
#include "codes/codes-jobmap.h"
17

18
/* turning on track lp will generate a lot of output messages */
19
#define MN_LP_NM "modelnet_dragonfly_custom"
20

21
#define TRACK_LP -1
22
#define TRACE -1
23
#define MAX_WAIT_REQS 512
24
25
26
27
#define CS_LP_DBG 0
#define lprintf(_fmt, ...) \
        do {if (CS_LP_DBG) printf(_fmt, __VA_ARGS__);} while (0)
#define MAX_STATS 65536
28
29
30
31
32
33

char workload_type[128];
char workload_file[8192];
char offset_file[8192];
static int wrkld_id;
static int num_net_traces = 0;
34
static int alloc_spec = 0;
35
static double self_overhead = 10.0;
36
37
38
39
40
41
42

/* Doing LP IO*/
static char lp_io_dir[256] = {'\0'};
static lp_io_handle io_handle;
static unsigned int lp_io_use_suffix = 0;
static int do_lp_io = 0;

43
44
45
46
47
48
49
50
51
52
53
/* variables for loading multiple applications */
/* Xu's additions start */
char workloads_conf_file[8192];
char alloc_file[8192];
int num_traces_of_job[5];
char file_name_of_job[5][8192];

struct codes_jobmap_ctx *jobmap_ctx;
struct codes_jobmap_params_list jobmap_p;
/* Xu's additions end */

54
55
56
57
58
59
/* Variables for Cortex Support */
/* Matthieu's additions start */
static char cortex_file[512];
static char cortex_class[512];
/* Matthieu's additions end */

60
61
typedef struct nw_state nw_state;
typedef struct nw_message nw_message;
62
typedef int32_t dumpi_req_id;
63
64
65

static int net_id = 0;
static float noise = 5.0;
66
static int num_net_lps = 0, num_mpi_lps = 0;
67

68
69
70
71
72
FILE * workload_log = NULL;
FILE * workload_agg_log = NULL;
FILE * workload_meta_log = NULL;

static uint64_t sample_bytes_written = 0;
73

74
75
76
77
78
79
80
81
82
83
84
85
long long num_bytes_sent=0;
long long num_bytes_recvd=0;

double max_time = 0,  max_comm_time = 0, max_wait_time = 0, max_send_time = 0, max_recv_time = 0;
double avg_time = 0, avg_comm_time = 0, avg_wait_time = 0, avg_send_time = 0, avg_recv_time = 0;

/* global variables for codes mapping */
static char lp_group_name[MAX_NAME_LENGTH], lp_type_name[MAX_NAME_LENGTH], annotation[MAX_NAME_LENGTH];
static int mapping_grp_id, mapping_type_id, mapping_rep_id, mapping_offset;

/* runtime option for disabling computation time simulation */
static int disable_delay = 0;
86
87
88
static int enable_sampling = 0;
static double sampling_interval = 5000000;
static double sampling_end_time = 3000000000;
89
static int enable_debug = 0;
90

91
92
93
/* set group context */
struct codes_mctx group_ratio;

94
/* MPI_OP_GET_NEXT is for getting next MPI operation when the previous operation completes.
95
* MPI_SEND_ARRIVED is issued when a MPI message arrives at its destination (the message is transported by model-net and an event is invoked when it arrives.
96
97
98
99
100
101
102
103
104
* MPI_SEND_POSTED is issued when a MPI message has left the source LP (message is transported via model-net). */
enum MPI_NW_EVENTS
{
	MPI_OP_GET_NEXT=1,
	MPI_SEND_ARRIVED,
    MPI_SEND_ARRIVED_CB, // for tracking message times on sender
	MPI_SEND_POSTED,
};

105
106
107
108
struct mpi_workload_sample
{
    /* Sampling data */
    int nw_id;
109
    int app_id;
110
111
112
113
114
    unsigned long num_sends_sample;
    unsigned long num_bytes_sample;
    unsigned long num_waits_sample;
    double sample_end_time;
};
115
116
117
118
119
120
121
/* stores pointers of pending MPI operations to be matched with their respective sends/receives. */
struct mpi_msgs_queue
{
    int op_type;
    int tag;
    int source_rank;
    int dest_rank;
122
    uint64_t num_bytes;
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
    tw_stime req_init_time;
	dumpi_req_id req_id;
    struct qlist_head ql;
};

/* stores request IDs of completed MPI operations (Isends or Irecvs) */
struct completed_requests
{
	dumpi_req_id req_id;
    struct qlist_head ql;
};

/* for wait operations, store the pending operation and number of completed waits so far. */
struct pending_waits
{
    int op_type;
139
    int32_t req_ids[MAX_WAIT_REQS];
140
	int num_completed;
141
142
	int count;
    tw_stime start_time;
143
144
145
146
147
148
149
150
151
152
153
154
155
    struct qlist_head ql;
};

typedef struct mpi_msgs_queue mpi_msgs_queue;
typedef struct completed_requests completed_requests;
typedef struct pending_waits pending_waits;

/* state of the network LP. It contains the pointers to send/receive lists */
struct nw_state
{
	long num_events_per_lp;
	tw_lpid nw_id;
	short wrkld_end;
156
157
    int app_id;
    int local_rank;
158
159

    struct rc_stack * processed_ops;
160
    struct rc_stack * matched_reqs;
161
162
163
164
165
166
167
168
169
170

    /* count of sends, receives, collectives and delays */
	unsigned long num_sends;
	unsigned long num_recvs;
	unsigned long num_cols;
	unsigned long num_delays;
	unsigned long num_wait;
	unsigned long num_waitall;
	unsigned long num_waitsome;

171

172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
	/* time spent by the LP in executing the app trace*/
	double start_time;
	double elapsed_time;
	/* time spent in compute operations */
	double compute_time;
	/* time spent in message send/isend */
	double send_time;
	/* time spent in message receive */
	double recv_time;
	/* time spent in wait operation */
	double wait_time;
	/* FIFO for isend messages arrived on destination */
	struct qlist_head arrival_queue;
	/* FIFO for irecv messages posted but not yet matched with send operations */
	struct qlist_head pending_recvs_queue;
	/* List of completed send/receive requests */
	struct qlist_head completed_reqs;
189

190
191
    tw_stime cur_interval_end;

192
193
    /* Pending wait operation */
    struct pending_waits * wait_op;
194
195
196
197

    unsigned long num_bytes_sent;
    unsigned long num_bytes_recvd;

198
199
200
201
    /* For sampling data */
    int sampling_indx;
    int max_arr_size;
    struct mpi_workload_sample * mpi_wkld_samples;
202
    char output_buf[512];
203
204
205
206
};

/* data for handling reverse computation.
* saved_matched_req holds the request ID of matched receives/sends for wait operations.
207
* ptr_match_op holds the matched MPI operation which are removed from the queues when a send is matched with the receive in forward event handler.
208
209
210
* network event being sent. op is the MPI operation issued by the network workloads API. rv_data holds the data for reverse computation (TODO: Fill this data structure only when the simulation runs in optimistic mode). */
struct nw_message
{
211
   // forward message handler
212
   int msg_type;
213
   int op_type;
214
   model_net_event_return event_rc;
215

216
217
218
   struct
   {
       tw_lpid src_rank;
219
       int dest_rank;
220
       int64_t num_bytes;
221
222
223
224
225
       int num_matched;
       int data_type;
       double sim_start_time;
       // for callbacks - time message was received
       double msg_send_time;
226
       int16_t req_id;
227
       int tag;
228
       int app_id;
229
230
231
232
233
234
235
236
237
       int found_match;
       short wait_completed;
   } fwd;
   struct
   {
       double saved_send_time;
       double saved_recv_time;
       double saved_wait_time;
       double saved_delay;
238
       int16_t saved_num_bytes;
239
       struct codes_workload_op * saved_op;
240
   } rc;
241
242
243
244
};

/* executes MPI isend and send operations */
static void codes_exec_mpi_send(
245
        nw_state* s, tw_bf * bf, nw_message * m, tw_lp* lp, struct codes_workload_op * mpi_op);
246
247
/* execute MPI irecv operation */
static void codes_exec_mpi_recv(
248
        nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, struct codes_workload_op * mpi_op);
249
250
/* reverse of mpi recv function. */
static void codes_exec_mpi_recv_rc(
251
        nw_state* s, tw_bf * bf, nw_message* m, tw_lp* lp);
252
253
/* execute the computational delay */
static void codes_exec_comp_delay(
254
        nw_state* s, nw_message * m, tw_lp* lp, struct codes_workload_op * mpi_op);
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
/* gets the next MPI operation from the network-workloads API. */
static void get_next_mpi_operation(
        nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp);
/* reverse handler of get next mpi operation. */
static void get_next_mpi_operation_rc(
        nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp);
/* Makes a call to get_next_mpi_operation. */
static void codes_issue_next_event(tw_lp* lp);
/* reverse handler of next operation */
static void codes_issue_next_event_rc(tw_lp* lp);


///////////////////// HELPER FUNCTIONS FOR MPI MESSAGE QUEUE HANDLING ///////////////
/* upon arrival of local completion message, inserts operation in completed send queue */
/* upon arrival of an isend operation, updates the arrival queue of the network */
270
271
272
273
274
275
276
277
static void update_completed_queue(
        nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp, dumpi_req_id req_id);
/* reverse of the above function */
static void update_completed_queue_rc(
        nw_state*s,
        tw_bf * bf,
        nw_message * m,
        tw_lp * lp);
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
static void update_arrival_queue(
        nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
/* reverse of the above function */
static void update_arrival_queue_rc(
        nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
/* callback to a message sender for computing message time */
static void update_message_time(
        nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
/* reverse for computing message time */
static void update_message_time_rc(
        nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);

/* conversion from seconds to eanaoseconds */
static tw_stime s_to_ns(tw_stime ns);

293
/* Debugging functions, may generate unused function warning */
294
static void print_waiting_reqs(int32_t * reqs, int count)
295
296
297
298
{
    printf("\n Waiting reqs: ");
    int i;
    for(i = 0; i < count; i++ )
299
        printf(" %d ", reqs[i]);
300
}
301
302
303
304
305
306
307
308
309
310
311
312
static void print_msgs_queue(struct qlist_head * head, int is_send)
{
    if(is_send)
        printf("\n Send msgs queue: ");
    else
        printf("\n Recv msgs queue: ");

    struct qlist_head * ent = NULL;
    mpi_msgs_queue * current = NULL;
    qlist_for_each(ent, head)
       {
            current = qlist_entry(ent, mpi_msgs_queue, ql);
313
            printf(" \n Source %d Dest %d bytes %llu tag %d ", current->source_rank, current->dest_rank, current->num_bytes, current->tag);
314
315
       }
}
316
317
318
319
320
321
322
323
static void print_completed_queue(struct qlist_head * head)
{
    printf("\n Completed queue: ");
      struct qlist_head * ent = NULL;
      struct completed_requests* current = NULL;
      qlist_for_each(ent, head)
       {
            current = qlist_entry(ent, completed_requests, ql);
324
            printf(" %d ", current->req_id);
325
326
       }
}
327
static int clear_completed_reqs(nw_state * s,
328
        tw_lp * lp,
329
        int32_t * reqs, int count)
330
{
331
    int i, matched = 0;
332
333
334
335
336
    for( i = 0; i < count; i++)
    {
      struct qlist_head * ent = NULL;
      qlist_for_each(ent, &s->completed_reqs)
       {
337
            struct completed_requests* current =
338
339
340
                qlist_entry(ent, completed_requests, ql);
            if(current->req_id == reqs[i])
            {
341
                ++matched;
342
343
344
345
346
                qlist_del(&current->ql);
                rc_stack_push(lp, current, free, s->matched_reqs);
            }
       }
    }
347
    return matched;
348
}
349
static void add_completed_reqs(nw_state * s,
350
351
        tw_lp * lp,
        int count)
352
353
354
355
{
    int i;
    for( i = 0; i < count; i++)
    {
356
357
       struct completed_requests * req = rc_stack_pop(s->matched_reqs);
       qlist_add(&req->ql, &s->completed_reqs);
358
359
    }
}
360

361
362
363
364
365
366
/* helper function - maps an MPI rank to an LP id */
static tw_lpid rank_to_lpid(int rank)
{
    return codes_mapping_get_lpid_from_relative(rank, NULL, "nw-lp", NULL, 0);
}

367
static int notify_posted_wait(nw_state* s,
368
        tw_bf * bf, nw_message * m, tw_lp * lp,
369
        dumpi_req_id completed_req)
370
{
371
372
    struct pending_waits* wait_elem = s->wait_op;
    int wait_completed = 0;
373

374
    m->fwd.wait_completed = 0;
375

376
377
    if(!wait_elem)
        return 0;
378

379
    int op_type = wait_elem->op_type;
380

381
382
383
384
385
    if(op_type == CODES_WK_WAIT &&
            (wait_elem->req_ids[0] == completed_req))
    {
            wait_completed = 1;
    }
386
387
    else if(op_type == CODES_WK_WAITALL
            || op_type == CODES_WK_WAITANY
388
389
390
391
392
393
            || op_type == CODES_WK_WAITSOME)
    {
        int i;
        for(i = 0; i < wait_elem->count; i++)
        {
            if(wait_elem->req_ids[i] == completed_req)
394
            {
395
                wait_elem->num_completed++;
396
                if(wait_elem->num_completed > wait_elem->count)
397
                    printf("\n Num completed %d count %d LP %llu ",
398
399
400
                            wait_elem->num_completed,
                            wait_elem->count,
                            lp->gid);
401
402
//                if(wait_elem->num_completed > wait_elem->count)
//                    tw_lp_suspend(lp, 1, 0);
403

404
                if(wait_elem->num_completed == wait_elem->count)
405
                {
406
                    if(enable_debug)
407
                        fprintf(workload_log, "\n(%lf) APP ID %d MPI WAITALL COMPLETED AT %llu ", tw_now(lp), s->app_id, s->nw_id);
408
                    wait_completed = 1;
409
                }
410

411
                m->fwd.wait_completed = 1;
412
            }
413
        }
414
    }
415
    return wait_completed;
416
}
417

418
/* reverse handler of MPI wait operation */
419
static void codes_exec_mpi_wait_rc(nw_state* s, tw_lp* lp)
420
{
421
    if(s->wait_op)
422
     {
423
424
425
         struct pending_waits * wait_op = s->wait_op;
         free(wait_op);
         s->wait_op = NULL;
426
427
428
429
     }
   else
    {
        codes_issue_next_event_rc(lp);
430
        completed_requests * qi = rc_stack_pop(s->processed_ops);
431
        qlist_add(&qi->ql, &s->completed_reqs);
432
    }
433
    return;
434
}
435

436
/* execute MPI wait operation */
437
static void codes_exec_mpi_wait(nw_state* s, tw_lp* lp, struct codes_workload_op * mpi_op)
438
{
439
440
    /* check in the completed receives queue if the request ID has already been completed.*/
    assert(!s->wait_op);
441
    dumpi_req_id req_id = mpi_op->u.wait.req_id;
442
    struct completed_requests* current = NULL;
443

444
445
446
447
448
449
450
    struct qlist_head * ent = NULL;
    qlist_for_each(ent, &s->completed_reqs)
    {
        current = qlist_entry(ent, completed_requests, ql);
        if(current->req_id == req_id)
        {
            qlist_del(&current->ql);
451
            rc_stack_push(lp, current, free, s->processed_ops);
452
453
454
455
            codes_issue_next_event(lp);
            return;
        }
    }
456
457
458
459
460
    /* If not, add the wait operation in the pending 'waits' list. */
    struct pending_waits* wait_op = malloc(sizeof(struct pending_waits));
    wait_op->op_type = mpi_op->op_type;
    wait_op->req_ids[0] = req_id;
    wait_op->count = 1;
461
462
    wait_op->num_completed = 0;
    wait_op->start_time = tw_now(lp);
463
    s->wait_op = wait_op;
464

465
    return;
466
467
}

468
static void codes_exec_mpi_wait_all_rc(
469
        nw_state* s,
470
471
        tw_bf * bf,
        nw_message * m,
472
        tw_lp* lp)
473
{
474
475
476
477
478
479
480
481
482
483
484
  if(bf->c1)
  {
    int sampling_indx = s->sampling_indx;
    s->mpi_wkld_samples[sampling_indx].num_waits_sample--;

    if(bf->c2)
    {
        s->cur_interval_end -= sampling_interval;
        s->sampling_indx--;
    }
  }
485
486
487
488
489
490
491
492
  if(s->wait_op)
  {
      struct pending_waits * wait_op = s->wait_op;
      free(wait_op);
      s->wait_op = NULL;
  }
  else
  {
493
      add_completed_reqs(s, lp, m->fwd.num_matched);
494
495
496
      codes_issue_next_event_rc(lp);
  }
  return;
497
498
}
static void codes_exec_mpi_wait_all(
499
        nw_state* s,
500
501
        tw_bf * bf,
        nw_message * m,
502
        tw_lp* lp,
503
        struct codes_workload_op * mpi_op)
504
{
505
  if(enable_debug)
506
    fprintf(workload_log, "\n MPI WAITALL POSTED AT %llu ", s->nw_id);
507

508
509
510
511
512
513
514
515
  if(enable_sampling)
  {
    bf->c1 = 1;
    if(tw_now(lp) >= s->cur_interval_end)
    {
        bf->c2 = 1;
        int indx = s->sampling_indx;
        s->mpi_wkld_samples[indx].nw_id = s->nw_id;
516
        s->mpi_wkld_samples[indx].app_id = s->app_id;
517
518
519
520
521
522
        s->mpi_wkld_samples[indx].sample_end_time = s->cur_interval_end;
        s->cur_interval_end += sampling_interval;
        s->sampling_indx++;
    }
    if(s->sampling_indx >= MAX_STATS)
    {
523
        struct mpi_workload_sample * tmp = calloc((MAX_STATS + s->max_arr_size), sizeof(struct mpi_workload_sample));
524
525
526
527
528
529
530
531
        memcpy(tmp, s->mpi_wkld_samples, s->sampling_indx);
        free(s->mpi_wkld_samples);
        s->mpi_wkld_samples = tmp;
        s->max_arr_size += MAX_STATS;
    }
    int indx = s->sampling_indx;
    s->mpi_wkld_samples[indx].num_waits_sample++;
  }
532
  int count = mpi_op->u.waits.count;
533
534
  /* If the count is not less than max wait reqs then stop */
  assert(count < MAX_WAIT_REQS);
535

536
  int i = 0, num_matched = 0;
537
  m->fwd.num_matched = 0;
538

539
  /*if(lp->gid == TRACK)
540
  {
541
      printf("\n MPI Wait all posted ");
542
543
      print_waiting_reqs(mpi_op->u.waits.req_ids, count);
      print_completed_queue(&s->completed_reqs);
544
  }*/
545
      /* check number of completed irecvs in the completion queue */
546
547
548
549
550
551
552
  for(i = 0; i < count; i++)
  {
      dumpi_req_id req_id = mpi_op->u.waits.req_ids[i];
      struct qlist_head * ent = NULL;
      struct completed_requests* current = NULL;
      qlist_for_each(ent, &s->completed_reqs)
       {
553
            current = qlist_entry(ent, struct completed_requests, ql);
554
555
556
557
            if(current->req_id == req_id)
                num_matched++;
       }
  }
558

559
  m->fwd.found_match = num_matched;
560
561
562
563
  if(num_matched == count)
  {
    /* No need to post a MPI Wait all then, issue next event */
      /* Remove all completed requests from the list */
564
565
566
      m->fwd.num_matched = clear_completed_reqs(s, lp, mpi_op->u.waits.req_ids, count);
      struct pending_waits* wait_op = s->wait_op;
      free(wait_op);
567
568
      s->wait_op = NULL;
      codes_issue_next_event(lp);
569
570
  }
  else
571
572
573
574
575
576
577
578
579
580
581
  {
      /* If not, add the wait operation in the pending 'waits' list. */
	  struct pending_waits* wait_op = malloc(sizeof(struct pending_waits));
	  wait_op->count = count;
      wait_op->op_type = mpi_op->op_type;
      assert(count < MAX_WAIT_REQS);

      for(i = 0; i < count; i++)
          wait_op->req_ids[i] =  mpi_op->u.waits.req_ids[i];

	  wait_op->num_completed = num_matched;
582
	  wait_op->start_time = tw_now(lp);
583
      s->wait_op = wait_op;
584
  }
585
586
  return;
}
587

588
589
/* search for a matching mpi operation and remove it from the list.
 * Record the index in the list from where the element got deleted.
590
 * Index is used for inserting the element once again in the queue for reverse computation. */
591
static int rm_matching_rcv(nw_state * ns,
592
        tw_bf * bf,
593
594
        nw_message * m,
        tw_lp * lp,
595
        mpi_msgs_queue * qitem)
596
597
{
    int matched = 0;
598
    int index = 0;
599
600
    struct qlist_head *ent = NULL;
    mpi_msgs_queue * qi = NULL;
601

602
603
    qlist_for_each(ent, &ns->pending_recvs_queue){
        qi = qlist_entry(ent, mpi_msgs_queue, ql);
604
        if((qi->num_bytes == qitem->num_bytes)
605
606
                && ((qi->tag == qitem->tag) || qi->tag == -1)
                && ((qi->source_rank == qitem->source_rank) || qi->source_rank == -1))
607
608
609
610
        {
            matched = 1;
            break;
        }
611
        ++index;
612
    }
613

614
615
    if(matched)
    {
616
        m->rc.saved_recv_time = ns->recv_time;
617
        ns->recv_time += (tw_now(lp) - qi->req_init_time);
618

619
620
        if(qi->op_type == CODES_WK_IRECV)
            update_completed_queue(ns, bf, m, lp, qi->req_id);
621
622
        else if(qi->op_type == CODES_WK_RECV)
            codes_issue_next_event(lp);
623

624
        qlist_del(&qi->ql);
625

626
        rc_stack_push(lp, qi, free, ns->processed_ops);
627
        return index;
628
629
630
631
    }
    return -1;
}

632
static int rm_matching_send(nw_state * ns,
633
634
635
        tw_bf * bf,
        nw_message * m,
        tw_lp * lp, mpi_msgs_queue * qitem)
636
637
638
639
640
{
    int matched = 0;
    struct qlist_head *ent = NULL;
    mpi_msgs_queue * qi = NULL;

641
    int index = 0;
642
643
    qlist_for_each(ent, &ns->arrival_queue){
        qi = qlist_entry(ent, mpi_msgs_queue, ql);
644
        if((qi->num_bytes == qitem->num_bytes)
645
646
647
648
649
650
                && (qi->tag == qitem->tag || qitem->tag == -1)
                && ((qi->source_rank == qitem->source_rank) || qitem->source_rank == -1))
        {
            matched = 1;
            break;
        }
651
        ++index;
652
653
654
655
    }

    if(matched)
    {
656
        m->rc.saved_recv_time = ns->recv_time;
657
658
659
660
661
        ns->recv_time += (tw_now(lp) - qitem->req_init_time);

        if(qitem->op_type == CODES_WK_IRECV)
            update_completed_queue(ns, bf, m, lp, qitem->req_id);

662
        qlist_del(&qi->ql);
663

664
        return index;
665
666
667
668
669
    }
    return -1;
}
static void codes_issue_next_event_rc(tw_lp * lp)
{
670
	    tw_rand_reverse_unif(lp->rng);
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
}

/* Trigger getting next event at LP */
static void codes_issue_next_event(tw_lp* lp)
{
   tw_event *e;
   nw_message* msg;

   tw_stime ts;

   ts = g_tw_lookahead + 0.1 + tw_rand_exponential(lp->rng, noise);
   e = tw_event_new( lp->gid, ts, lp );
   msg = tw_event_data(e);

   msg->msg_type = MPI_OP_GET_NEXT;
   tw_event_send(e);
}

/* Simulate delays between MPI operations */
static void codes_exec_comp_delay(
691
        nw_state* s, nw_message * m, tw_lp* lp, struct codes_workload_op * mpi_op)
692
693
694
695
696
{
	tw_event* e;
	tw_stime ts;
	nw_message* msg;

697
    m->rc.saved_delay = s->compute_time;
698
699
    s->compute_time += s_to_ns(mpi_op->u.delay.seconds);
    ts = s_to_ns(mpi_op->u.delay.seconds);
700
701

	ts += g_tw_lookahead + 0.1 + tw_rand_exponential(lp->rng, noise);
702

703
704
705
	e = tw_event_new( lp->gid, ts , lp );
	msg = tw_event_data(e);
	msg->msg_type = MPI_OP_GET_NEXT;
706
707
	tw_event_send(e);

708
709
710
}

/* reverse computation operation for MPI irecv */
711
static void codes_exec_mpi_recv_rc(
712
713
714
        nw_state* ns,
        tw_bf * bf,
        nw_message* m,
715
        tw_lp* lp)
716
{
717
	num_bytes_recvd -= m->rc.saved_num_bytes;
718
719
	ns->recv_time = m->rc.saved_recv_time;
	if(m->fwd.found_match >= 0)
720
	  {
721
		ns->recv_time = m->rc.saved_recv_time;
722
723
724
725
        int queue_count = qlist_count(&ns->arrival_queue);

        mpi_msgs_queue * qi = rc_stack_pop(ns->processed_ops);

726
        if(!m->fwd.found_match)
727
728
729
        {
            qlist_add(&qi->ql, &ns->arrival_queue);
        }
730
        else if(m->fwd.found_match >= queue_count)
731
732
733
        {
            qlist_add_tail(&qi->ql, &ns->arrival_queue);
        }
734
        else if(m->fwd.found_match > 0 && m->fwd.found_match < queue_count)
735
        {
736
737
738
739
            int index = 1;
            struct qlist_head * ent = NULL;
            qlist_for_each(ent, &ns->arrival_queue)
            {
740
               if(index == m->fwd.found_match)
741
742
743
744
               {
                 qlist_add(&qi->ql, ent);
                 break;
               }
745
               index++;
746
            }
747
        }
748
        if(qi->op_type == CODES_WK_IRECV)
749
        {
750
            update_completed_queue_rc(ns, bf, m, lp);
751
        }
752
753
        codes_issue_next_event_rc(lp);
      }
754
	else if(m->fwd.found_match < 0)
755
	    {
756
            struct qlist_head * ent = qlist_pop_back(&ns->pending_recvs_queue);
757
758
            mpi_msgs_queue * qi = qlist_entry(ent, mpi_msgs_queue, ql);
            free(qi);
759

760
            if(m->op_type == CODES_WK_IRECV)
761
                codes_issue_next_event_rc(lp);
762
763
764
	    }
}

765
/* Execute MPI Irecv operation (non-blocking receive) */
766
static void codes_exec_mpi_recv(
767
        nw_state* s,
768
        tw_bf * bf,
769
770
        nw_message * m,
        tw_lp* lp,
771
        struct codes_workload_op * mpi_op)
772
773
774
775
776
{
/* Once an irecv is posted, list of completed sends is checked to find a matching isend.
   If no matching isend is found, the receive operation is queued in the pending queue of
   receive operations. */

777
	m->rc.saved_recv_time = s->recv_time;
778
779
    m->rc.saved_num_bytes = mpi_op->u.recv.num_bytes;

780
781
782
783
784
785
786
787
788
789
790
	num_bytes_recvd += mpi_op->u.recv.num_bytes;

    mpi_msgs_queue * recv_op = (mpi_msgs_queue*) malloc(sizeof(mpi_msgs_queue));
    recv_op->req_init_time = tw_now(lp);
    recv_op->op_type = mpi_op->op_type;
    recv_op->source_rank = mpi_op->u.recv.source_rank;
    recv_op->dest_rank = mpi_op->u.recv.dest_rank;
    recv_op->num_bytes = mpi_op->u.recv.num_bytes;
    recv_op->tag = mpi_op->u.recv.tag;
    recv_op->req_id = mpi_op->u.recv.req_id;

791
792
    if(s->nw_id == (tw_lpid)TRACK_LP)
        printf("\n Receive op posted num bytes %llu source %d ", recv_op->num_bytes,
793
794
                recv_op->source_rank);

795
	int found_matching_sends = rm_matching_send(s, bf, m, lp, recv_op);
796
797
798
799

	/* save the req id inserted in the completed queue for reverse computation. */
	if(found_matching_sends < 0)
	  {
800
	   	  m->fwd.found_match = -1;
801
          qlist_add_tail(&recv_op->ql, &s->pending_recvs_queue);
802

803
804
805
	       /* for mpi irecvs, this is a non-blocking receive so just post it and move on with the trace read. */
		if(mpi_op->op_type == CODES_WK_IRECV)
		   {
806
			codes_issue_next_event(lp);
807
808
			return;
		   }
809
      }
810
811
	else
	  {
812
        m->fwd.found_match = found_matching_sends;
813
        codes_issue_next_event(lp);
814
815
	    rc_stack_push(lp, recv_op, free, s->processed_ops);
      }
816
817
}

818
819
820
821
822
823
824
825
int get_global_id_of_job_rank(tw_lpid job_rank, int app_id)
{
    struct codes_jobmap_id lid;
    lid.job = app_id;
    lid.rank = job_rank;
    int global_rank = codes_jobmap_to_global_id(lid, jobmap_ctx);
    return global_rank;
}
826
/* executes MPI send and isend operations */
827
static void codes_exec_mpi_send(nw_state* s,
828
829
        tw_bf * bf,
        nw_message * m,
830
        tw_lp* lp,
831
        struct codes_workload_op * mpi_op)
832
{
833
834
	/* model-net event */
    int global_dest_rank = mpi_op->u.send.dest_rank;
835

836
837
838
839
840
    if(alloc_spec)
    {
        global_dest_rank = get_global_id_of_job_rank(mpi_op->u.send.dest_rank, s->app_id);
    }

841
    //printf("\n Sender rank %d global dest rank %d ", s->nw_id, global_dest_rank);
842
    m->rc.saved_num_bytes = mpi_op->u.send.num_bytes;
843
	/* model-net event */
844
845
846
	tw_lpid dest_rank = codes_mapping_get_lpid_from_relative(global_dest_rank, NULL, "nw-lp", NULL, 0);
	
    num_bytes_sent += mpi_op->u.send.num_bytes;
847
    s->num_bytes_sent += mpi_op->u.send.num_bytes;
848

849
850
851
852
853
854
855
    if(enable_sampling)
    {
        if(tw_now(lp) >= s->cur_interval_end)
        {
            bf->c1 = 1;
            int indx = s->sampling_indx;
            s->mpi_wkld_samples[indx].nw_id = s->nw_id;
856
            s->mpi_wkld_samples[indx].app_id = s->app_id;
857
858
            s->mpi_wkld_samples[indx].sample_end_time = s->cur_interval_end;
            s->sampling_indx++;
859
            s->cur_interval_end += sampling_interval;
860
861
862
        }
        if(s->sampling_indx >= MAX_STATS)
        {
863
            struct mpi_workload_sample * tmp = calloc((MAX_STATS + s->max_arr_size), sizeof(struct mpi_workload_sample));
864
865
866
867
868
869
870
871
872
            memcpy(tmp, s->mpi_wkld_samples, s->sampling_indx);
            free(s->mpi_wkld_samples);
            s->mpi_wkld_samples = tmp;
            s->max_arr_size += MAX_STATS;
        }
        int indx = s->sampling_indx;
        s->mpi_wkld_samples[indx].num_sends_sample++;
        s->mpi_wkld_samples[indx].num_bytes_sample += mpi_op->u.send.num_bytes;
    }
873
874
875
	nw_message local_m;
	nw_message remote_m;

876
877
878
    local_m.fwd.sim_start_time = tw_now(lp);
    local_m.fwd.dest_rank = mpi_op->u.send.dest_rank;
    local_m.fwd.src_rank = mpi_op->u.send.source_rank;
879
    local_m.op_type = mpi_op->op_type;
880
    local_m.msg_type = MPI_SEND_POSTED;
881
882
883
    local_m.fwd.tag = mpi_op->u.send.tag;
    local_m.fwd.num_bytes = mpi_op->u.send.num_bytes;
    local_m.fwd.req_id = mpi_op->u.send.req_id;
884
    local_m.fwd.app_id = s->app_id;
885
886
887
888

    remote_m = local_m;
	remote_m.msg_type = MPI_SEND_ARRIVED;

889
890
	m->event_rc = model_net_event_mctx(net_id, &group_ratio, &group_ratio, 
            "test", dest_rank, mpi_op->u.send.num_bytes, self_overhead,
891
892
	    sizeof(nw_message), (const void*)&remote_m, sizeof(nw_message), (const void*)&local_m, lp);

893
894
895
896
    if(enable_debug)
    {
        if(mpi_op->op_type == CODES_WK_ISEND)
        {
897
            fprintf(workload_log, "\n (%lf) APP %d MPI ISEND SOURCE %llu DEST %d TAG %d BYTES %llu ",
898
                    tw_now(lp), s->app_id, s->nw_id, global_dest_rank, mpi_op->u.send.tag, mpi_op->u.send.num_bytes);
899
900
        }
        else
901
            fprintf(workload_log, "\n (%lf) APP ID %d MPI SEND SOURCE %llu DEST %d TAG %d BYTES %llu ",
902
903
                    tw_now(lp), s->app_id, s->nw_id, global_dest_rank, mpi_op->u.send.tag, mpi_op->u.send.num_bytes);
    }
904
	/* isend executed, now get next MPI operation from the queue */
905
906
907
908
909
910
911
912
913
914
	if(mpi_op->op_type == CODES_WK_ISEND)
	   codes_issue_next_event(lp);
}

/* convert seconds to ns */
static tw_stime s_to_ns(tw_stime ns)
{
    return(ns * (1000.0 * 1000.0 * 1000.0));
}

915
916
static void update_completed_queue_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
917

918
919
920
    if(bf->c0)
    {
       struct qlist_head * ent = qlist_pop_back(&s->completed_reqs);
921
922
923
924
925
926
927

        completed_requests * req = qlist_entry(ent, completed_requests, ql);
      /*if(lp->gid == TRACK)
      {
          printf("\n After popping %ld ", req->req_id);
        print_completed_queue(&s->completed_reqs);
      }*/
928
929
930
931
       free(req);
    }
    else if(bf->c1)
    {
932
       struct pending_waits* wait_elem = rc_stack_pop(s->processed_ops);
933
       s->wait_op = wait_elem;
934
935
       s->wait_time = m->rc.saved_wait_time;
       add_completed_reqs(s, lp, m->fwd.num_matched);
936
       codes_issue_next_event_rc(lp);
937
    }
938
939
    if(m->fwd.wait_completed > 0)
           s->wait_op->num_completed--;
940
941
}

942
static void update_completed_queue(nw_state* s,
943
944
945
946
947
948
949
        tw_bf * bf,
        nw_message * m,
        tw_lp * lp,
        dumpi_req_id req_id)
{
    bf->c0 = 0;
    bf->c1 = 0;
950
    m->fwd.num_matched = 0;
951

952
953
    int waiting = 0;
    waiting = notify_posted_wait(s, bf, m, lp, req_id);
954

955
956
957
958
959
960
    if(!waiting)
    {
        bf->c0 = 1;
        completed_requests * req = malloc(sizeof(completed_requests));
        req->req_id = req_id;
        qlist_add_tail(&req->ql, &s->completed_reqs);
961

962
963
964
965
966
        /*if(lp->gid == TRACK)
        {
            printf("\n Forward mode adding %ld ", req_id);
            print_completed_queue(&s->completed_reqs);
        }*/
967
    }
968
    else
969
970
     {
            bf->c1 = 1;
971
972
            m->fwd.num_matched = clear_completed_reqs(s, lp, s->wait_op->req_ids, s->wait_op->count);
            m->rc.saved_wait_time = s->wait_time;
973
            s->wait_time += (tw_now(lp) - s->wait_op->start_time);
974
975
976

            struct pending_waits* wait_elem = s->wait_op;
            rc_stack_push(lp, wait_elem, free, s->processed_ops);
977
            s->wait_op = NULL;
978
            codes_issue_next_event(lp);
979
980
981
     }
}

982
/* reverse handler for updating arrival queue function */
983
984
static void update_arrival_queue_rc(nw_state* s,
        tw_bf * bf,
985
        nw_message * m, tw_lp * lp)
986
{
987
	s->recv_time = m->rc.saved_recv_time;
988
989
    s->num_bytes_recvd -= m->fwd.num_bytes;

990
    codes_local_latency_reverse(lp);
991

992
    if(m->fwd.found_match >= 0)
993
	{
994
        mpi_msgs_queue * qi = rc_stack_pop(s->processed_ops);
995
        int queue_count = qlist_count(&s->pending_recvs_queue);
996

997
        if(!m->fwd.found_match)
998
999
1000
        {
            qlist_add(&qi->ql, &s->pending_recvs_queue);
        }
1001
        else if(m->fwd.found_match >= queue_count)
1002
        {
1003
1004
            qlist_add_tail(&qi->ql, &s->pending_recvs_queue);
        }
1005
        else if(m->fwd.found_match > 0 && m->fwd.found_match < queue_count)
1006
1007
1008
1009
1010
        {
            int index = 1;
            struct qlist_head * ent = NULL;
            qlist_for_each(ent, &s->pending_recvs_queue)
            {