model-net-mpi-replay.c 51.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (C) 2014 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */
#include <ross.h>
#include <inttypes.h>

#include "codes/codes-workload.h"
#include "codes/codes.h"
#include "codes/configuration.h"
#include "codes/codes_mapping.h"
#include "codes/model-net.h"
#include "codes/rc-stack.h"
#include "codes/quicklist.h"
16
#include "codes/codes-jobmap.h"
17

18
/* turning on track lp will generate a lot of output messages */
19
#define MN_LP_NM "modelnet_dragonfly_custom"
20

21
#define TRACK_LP -1
22
#define TRACE -1
23
#define MAX_WAIT_REQS 512
24 25 26 27
#define CS_LP_DBG 0
#define lprintf(_fmt, ...) \
        do {if (CS_LP_DBG) printf(_fmt, __VA_ARGS__);} while (0)
#define MAX_STATS 65536
28 29 30 31 32 33

char workload_type[128];
char workload_file[8192];
char offset_file[8192];
static int wrkld_id;
static int num_net_traces = 0;
34
static int alloc_spec = 0;
35
static double self_overhead = 10.0;
36 37 38 39 40 41 42

/* Doing LP IO*/
static char lp_io_dir[256] = {'\0'};
static lp_io_handle io_handle;
static unsigned int lp_io_use_suffix = 0;
static int do_lp_io = 0;

43 44 45 46 47 48 49 50 51 52 53
/* variables for loading multiple applications */
/* Xu's additions start */
char workloads_conf_file[8192];
char alloc_file[8192];
int num_traces_of_job[5];
char file_name_of_job[5][8192];

struct codes_jobmap_ctx *jobmap_ctx;
struct codes_jobmap_params_list jobmap_p;
/* Xu's additions end */

54 55
/* Variables for Cortex Support */
/* Matthieu's additions start */
56
#ifdef ENABLE_CORTEX_PYTHON
57 58 59
static char cortex_file[512] = "\0";
static char cortex_class[512] = "\0";
static char cortex_gen[512] = "\0";
60
#endif
61 62
/* Matthieu's additions end */

63 64
typedef struct nw_state nw_state;
typedef struct nw_message nw_message;
65
typedef int32_t dumpi_req_id;
66 67 68

static int net_id = 0;
static float noise = 5.0;
69
static int num_net_lps = 0, num_mpi_lps = 0;
70

71 72 73 74 75
FILE * workload_log = NULL;
FILE * workload_agg_log = NULL;
FILE * workload_meta_log = NULL;

static uint64_t sample_bytes_written = 0;
76

77 78 79 80 81 82 83 84 85 86 87 88
long long num_bytes_sent=0;
long long num_bytes_recvd=0;

double max_time = 0,  max_comm_time = 0, max_wait_time = 0, max_send_time = 0, max_recv_time = 0;
double avg_time = 0, avg_comm_time = 0, avg_wait_time = 0, avg_send_time = 0, avg_recv_time = 0;

/* global variables for codes mapping */
static char lp_group_name[MAX_NAME_LENGTH], lp_type_name[MAX_NAME_LENGTH], annotation[MAX_NAME_LENGTH];
static int mapping_grp_id, mapping_type_id, mapping_rep_id, mapping_offset;

/* runtime option for disabling computation time simulation */
static int disable_delay = 0;
89 90 91
static int enable_sampling = 0;
static double sampling_interval = 5000000;
static double sampling_end_time = 3000000000;
92
static int enable_debug = 0;
93

94 95 96
/* set group context */
struct codes_mctx group_ratio;

97
/* MPI_OP_GET_NEXT is for getting next MPI operation when the previous operation completes.
98
* MPI_SEND_ARRIVED is issued when a MPI message arrives at its destination (the message is transported by model-net and an event is invoked when it arrives.
99 100 101 102 103 104 105 106 107
* MPI_SEND_POSTED is issued when a MPI message has left the source LP (message is transported via model-net). */
enum MPI_NW_EVENTS
{
	MPI_OP_GET_NEXT=1,
	MPI_SEND_ARRIVED,
    MPI_SEND_ARRIVED_CB, // for tracking message times on sender
	MPI_SEND_POSTED,
};

108 109 110 111
struct mpi_workload_sample
{
    /* Sampling data */
    int nw_id;
112
    int app_id;
113 114 115 116 117
    unsigned long num_sends_sample;
    unsigned long num_bytes_sample;
    unsigned long num_waits_sample;
    double sample_end_time;
};
118 119 120 121 122 123 124
/* stores pointers of pending MPI operations to be matched with their respective sends/receives. */
struct mpi_msgs_queue
{
    int op_type;
    int tag;
    int source_rank;
    int dest_rank;
125
    uint64_t num_bytes;
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
    tw_stime req_init_time;
	dumpi_req_id req_id;
    struct qlist_head ql;
};

/* stores request IDs of completed MPI operations (Isends or Irecvs) */
struct completed_requests
{
	dumpi_req_id req_id;
    struct qlist_head ql;
};

/* for wait operations, store the pending operation and number of completed waits so far. */
struct pending_waits
{
    int op_type;
142
    int32_t req_ids[MAX_WAIT_REQS];
143
	int num_completed;
144 145
	int count;
    tw_stime start_time;
146 147 148 149 150 151 152 153 154 155 156 157 158
    struct qlist_head ql;
};

typedef struct mpi_msgs_queue mpi_msgs_queue;
typedef struct completed_requests completed_requests;
typedef struct pending_waits pending_waits;

/* state of the network LP. It contains the pointers to send/receive lists */
struct nw_state
{
	long num_events_per_lp;
	tw_lpid nw_id;
	short wrkld_end;
159 160
    int app_id;
    int local_rank;
161 162

    struct rc_stack * processed_ops;
163
    struct rc_stack * matched_reqs;
164 165 166 167 168 169 170 171 172 173

    /* count of sends, receives, collectives and delays */
	unsigned long num_sends;
	unsigned long num_recvs;
	unsigned long num_cols;
	unsigned long num_delays;
	unsigned long num_wait;
	unsigned long num_waitall;
	unsigned long num_waitsome;

174

175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
	/* time spent by the LP in executing the app trace*/
	double start_time;
	double elapsed_time;
	/* time spent in compute operations */
	double compute_time;
	/* time spent in message send/isend */
	double send_time;
	/* time spent in message receive */
	double recv_time;
	/* time spent in wait operation */
	double wait_time;
	/* FIFO for isend messages arrived on destination */
	struct qlist_head arrival_queue;
	/* FIFO for irecv messages posted but not yet matched with send operations */
	struct qlist_head pending_recvs_queue;
	/* List of completed send/receive requests */
	struct qlist_head completed_reqs;
192

193 194
    tw_stime cur_interval_end;

195 196
    /* Pending wait operation */
    struct pending_waits * wait_op;
197 198 199 200

    unsigned long num_bytes_sent;
    unsigned long num_bytes_recvd;

201 202 203 204
    /* For sampling data */
    int sampling_indx;
    int max_arr_size;
    struct mpi_workload_sample * mpi_wkld_samples;
205
    char output_buf[512];
206 207 208 209
};

/* data for handling reverse computation.
* saved_matched_req holds the request ID of matched receives/sends for wait operations.
210
* ptr_match_op holds the matched MPI operation which are removed from the queues when a send is matched with the receive in forward event handler.
211 212 213
* network event being sent. op is the MPI operation issued by the network workloads API. rv_data holds the data for reverse computation (TODO: Fill this data structure only when the simulation runs in optimistic mode). */
struct nw_message
{
214
   // forward message handler
215
   int msg_type;
216
   int op_type;
217
   model_net_event_return event_rc;
218

219 220 221
   struct
   {
       tw_lpid src_rank;
222
       int dest_rank;
223
       int64_t num_bytes;
224 225 226 227 228
       int num_matched;
       int data_type;
       double sim_start_time;
       // for callbacks - time message was received
       double msg_send_time;
229
       int16_t req_id;
230
       int tag;
231
       int app_id;
232 233 234 235 236 237 238 239 240
       int found_match;
       short wait_completed;
   } fwd;
   struct
   {
       double saved_send_time;
       double saved_recv_time;
       double saved_wait_time;
       double saved_delay;
241
       int16_t saved_num_bytes;
242
       struct codes_workload_op * saved_op;
243
   } rc;
244 245 246 247
};

/* executes MPI isend and send operations */
static void codes_exec_mpi_send(
248
        nw_state* s, tw_bf * bf, nw_message * m, tw_lp* lp, struct codes_workload_op * mpi_op);
249 250
/* execute MPI irecv operation */
static void codes_exec_mpi_recv(
251
        nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, struct codes_workload_op * mpi_op);
252 253
/* reverse of mpi recv function. */
static void codes_exec_mpi_recv_rc(
254
        nw_state* s, tw_bf * bf, nw_message* m, tw_lp* lp);
255 256
/* execute the computational delay */
static void codes_exec_comp_delay(
257
        nw_state* s, nw_message * m, tw_lp* lp, struct codes_workload_op * mpi_op);
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
/* gets the next MPI operation from the network-workloads API. */
static void get_next_mpi_operation(
        nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp);
/* reverse handler of get next mpi operation. */
static void get_next_mpi_operation_rc(
        nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp);
/* Makes a call to get_next_mpi_operation. */
static void codes_issue_next_event(tw_lp* lp);
/* reverse handler of next operation */
static void codes_issue_next_event_rc(tw_lp* lp);


///////////////////// HELPER FUNCTIONS FOR MPI MESSAGE QUEUE HANDLING ///////////////
/* upon arrival of local completion message, inserts operation in completed send queue */
/* upon arrival of an isend operation, updates the arrival queue of the network */
273 274 275 276 277 278 279 280
static void update_completed_queue(
        nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp, dumpi_req_id req_id);
/* reverse of the above function */
static void update_completed_queue_rc(
        nw_state*s,
        tw_bf * bf,
        nw_message * m,
        tw_lp * lp);
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295
static void update_arrival_queue(
        nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
/* reverse of the above function */
static void update_arrival_queue_rc(
        nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
/* callback to a message sender for computing message time */
static void update_message_time(
        nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
/* reverse for computing message time */
static void update_message_time_rc(
        nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);

/* conversion from seconds to eanaoseconds */
static tw_stime s_to_ns(tw_stime ns);

296
/* Debugging functions, may generate unused function warning */
297
static void print_waiting_reqs(int32_t * reqs, int count)
298 299 300 301
{
    printf("\n Waiting reqs: ");
    int i;
    for(i = 0; i < count; i++ )
302
        printf(" %d ", reqs[i]);
303
}
304 305 306 307 308 309 310 311 312 313 314 315
static void print_msgs_queue(struct qlist_head * head, int is_send)
{
    if(is_send)
        printf("\n Send msgs queue: ");
    else
        printf("\n Recv msgs queue: ");

    struct qlist_head * ent = NULL;
    mpi_msgs_queue * current = NULL;
    qlist_for_each(ent, head)
       {
            current = qlist_entry(ent, mpi_msgs_queue, ql);
316
            printf(" \n Source %d Dest %d bytes %llu tag %d ", current->source_rank, current->dest_rank, current->num_bytes, current->tag);
317 318
       }
}
319 320 321 322 323 324 325 326
static void print_completed_queue(struct qlist_head * head)
{
    printf("\n Completed queue: ");
      struct qlist_head * ent = NULL;
      struct completed_requests* current = NULL;
      qlist_for_each(ent, head)
       {
            current = qlist_entry(ent, completed_requests, ql);
327
            printf(" %d ", current->req_id);
328 329
       }
}
330
static int clear_completed_reqs(nw_state * s,
331
        tw_lp * lp,
332
        int32_t * reqs, int count)
333
{
334
    int i, matched = 0;
335 336 337 338 339
    for( i = 0; i < count; i++)
    {
      struct qlist_head * ent = NULL;
      qlist_for_each(ent, &s->completed_reqs)
       {
340
            struct completed_requests* current =
341 342 343
                qlist_entry(ent, completed_requests, ql);
            if(current->req_id == reqs[i])
            {
344
                ++matched;
345 346 347 348 349
                qlist_del(&current->ql);
                rc_stack_push(lp, current, free, s->matched_reqs);
            }
       }
    }
350
    return matched;
351
}
352
static void add_completed_reqs(nw_state * s,
353 354
        tw_lp * lp,
        int count)
355 356 357 358
{
    int i;
    for( i = 0; i < count; i++)
    {
359 360
       struct completed_requests * req = rc_stack_pop(s->matched_reqs);
       qlist_add(&req->ql, &s->completed_reqs);
361 362
    }
}
363

364 365 366 367 368 369
/* helper function - maps an MPI rank to an LP id */
static tw_lpid rank_to_lpid(int rank)
{
    return codes_mapping_get_lpid_from_relative(rank, NULL, "nw-lp", NULL, 0);
}

370
static int notify_posted_wait(nw_state* s,
371
        tw_bf * bf, nw_message * m, tw_lp * lp,
372
        dumpi_req_id completed_req)
373
{
374 375
    struct pending_waits* wait_elem = s->wait_op;
    int wait_completed = 0;
376

377
    m->fwd.wait_completed = 0;
378

379 380
    if(!wait_elem)
        return 0;
381

382
    int op_type = wait_elem->op_type;
383

384 385 386 387 388
    if(op_type == CODES_WK_WAIT &&
            (wait_elem->req_ids[0] == completed_req))
    {
            wait_completed = 1;
    }
389 390
    else if(op_type == CODES_WK_WAITALL
            || op_type == CODES_WK_WAITANY
391 392 393 394 395 396
            || op_type == CODES_WK_WAITSOME)
    {
        int i;
        for(i = 0; i < wait_elem->count; i++)
        {
            if(wait_elem->req_ids[i] == completed_req)
397
            {
398
                wait_elem->num_completed++;
399
                if(wait_elem->num_completed > wait_elem->count)
400
                    printf("\n Num completed %d count %d LP %llu ",
401 402 403
                            wait_elem->num_completed,
                            wait_elem->count,
                            lp->gid);
404 405
//                if(wait_elem->num_completed > wait_elem->count)
//                    tw_lp_suspend(lp, 1, 0);
406

407
                if(wait_elem->num_completed == wait_elem->count)
408
                {
409
                    if(enable_debug)
410
                        fprintf(workload_log, "\n(%lf) APP ID %d MPI WAITALL COMPLETED AT %llu ", tw_now(lp), s->app_id, s->nw_id);
411
                    wait_completed = 1;
412
                }
413

414
                m->fwd.wait_completed = 1;
415
            }
416
        }
417
    }
418
    return wait_completed;
419
}
420

421
/* reverse handler of MPI wait operation */
422
static void codes_exec_mpi_wait_rc(nw_state* s, tw_lp* lp)
423
{
424
    if(s->wait_op)
425
     {
426 427 428
         struct pending_waits * wait_op = s->wait_op;
         free(wait_op);
         s->wait_op = NULL;
429 430 431 432
     }
   else
    {
        codes_issue_next_event_rc(lp);
433
        completed_requests * qi = rc_stack_pop(s->processed_ops);
434
        qlist_add(&qi->ql, &s->completed_reqs);
435
    }
436
    return;
437
}
438

439
/* execute MPI wait operation */
440
static void codes_exec_mpi_wait(nw_state* s, tw_lp* lp, struct codes_workload_op * mpi_op)
441
{
442 443
    /* check in the completed receives queue if the request ID has already been completed.*/
    assert(!s->wait_op);
444
    dumpi_req_id req_id = mpi_op->u.wait.req_id;
445
    struct completed_requests* current = NULL;
446

447 448 449 450 451 452 453
    struct qlist_head * ent = NULL;
    qlist_for_each(ent, &s->completed_reqs)
    {
        current = qlist_entry(ent, completed_requests, ql);
        if(current->req_id == req_id)
        {
            qlist_del(&current->ql);
454
            rc_stack_push(lp, current, free, s->processed_ops);
455 456 457 458
            codes_issue_next_event(lp);
            return;
        }
    }
459 460 461 462 463
    /* If not, add the wait operation in the pending 'waits' list. */
    struct pending_waits* wait_op = malloc(sizeof(struct pending_waits));
    wait_op->op_type = mpi_op->op_type;
    wait_op->req_ids[0] = req_id;
    wait_op->count = 1;
464 465
    wait_op->num_completed = 0;
    wait_op->start_time = tw_now(lp);
466
    s->wait_op = wait_op;
467

468
    return;
469 470
}

471
static void codes_exec_mpi_wait_all_rc(
472
        nw_state* s,
473 474
        tw_bf * bf,
        nw_message * m,
475
        tw_lp* lp)
476
{
477 478 479 480 481 482 483 484 485 486 487
  if(bf->c1)
  {
    int sampling_indx = s->sampling_indx;
    s->mpi_wkld_samples[sampling_indx].num_waits_sample--;

    if(bf->c2)
    {
        s->cur_interval_end -= sampling_interval;
        s->sampling_indx--;
    }
  }
488 489 490 491 492 493 494 495
  if(s->wait_op)
  {
      struct pending_waits * wait_op = s->wait_op;
      free(wait_op);
      s->wait_op = NULL;
  }
  else
  {
496
      add_completed_reqs(s, lp, m->fwd.num_matched);
497 498 499
      codes_issue_next_event_rc(lp);
  }
  return;
500 501
}
static void codes_exec_mpi_wait_all(
502
        nw_state* s,
503 504
        tw_bf * bf,
        nw_message * m,
505
        tw_lp* lp,
506
        struct codes_workload_op * mpi_op)
507
{
508
  if(enable_debug)
509
    fprintf(workload_log, "\n MPI WAITALL POSTED AT %llu ", s->nw_id);
510

511 512 513 514 515 516 517 518
  if(enable_sampling)
  {
    bf->c1 = 1;
    if(tw_now(lp) >= s->cur_interval_end)
    {
        bf->c2 = 1;
        int indx = s->sampling_indx;
        s->mpi_wkld_samples[indx].nw_id = s->nw_id;
519
        s->mpi_wkld_samples[indx].app_id = s->app_id;
520 521 522 523 524 525
        s->mpi_wkld_samples[indx].sample_end_time = s->cur_interval_end;
        s->cur_interval_end += sampling_interval;
        s->sampling_indx++;
    }
    if(s->sampling_indx >= MAX_STATS)
    {
526
        struct mpi_workload_sample * tmp = calloc((MAX_STATS + s->max_arr_size), sizeof(struct mpi_workload_sample));
527 528 529 530 531 532 533 534
        memcpy(tmp, s->mpi_wkld_samples, s->sampling_indx);
        free(s->mpi_wkld_samples);
        s->mpi_wkld_samples = tmp;
        s->max_arr_size += MAX_STATS;
    }
    int indx = s->sampling_indx;
    s->mpi_wkld_samples[indx].num_waits_sample++;
  }
535
  int count = mpi_op->u.waits.count;
536 537
  /* If the count is not less than max wait reqs then stop */
  assert(count < MAX_WAIT_REQS);
538

539
  int i = 0, num_matched = 0;
540
  m->fwd.num_matched = 0;
541

542
  /*if(lp->gid == TRACK)
543
  {
544
      printf("\n MPI Wait all posted ");
545 546
      print_waiting_reqs(mpi_op->u.waits.req_ids, count);
      print_completed_queue(&s->completed_reqs);
547
  }*/
548
      /* check number of completed irecvs in the completion queue */
549 550 551 552 553 554 555
  for(i = 0; i < count; i++)
  {
      dumpi_req_id req_id = mpi_op->u.waits.req_ids[i];
      struct qlist_head * ent = NULL;
      struct completed_requests* current = NULL;
      qlist_for_each(ent, &s->completed_reqs)
       {
556
            current = qlist_entry(ent, struct completed_requests, ql);
557 558 559 560
            if(current->req_id == req_id)
                num_matched++;
       }
  }
561

562
  m->fwd.found_match = num_matched;
563 564 565 566
  if(num_matched == count)
  {
    /* No need to post a MPI Wait all then, issue next event */
      /* Remove all completed requests from the list */
567 568 569
      m->fwd.num_matched = clear_completed_reqs(s, lp, mpi_op->u.waits.req_ids, count);
      struct pending_waits* wait_op = s->wait_op;
      free(wait_op);
570 571
      s->wait_op = NULL;
      codes_issue_next_event(lp);
572 573
  }
  else
574 575 576 577 578 579 580 581 582 583 584
  {
      /* If not, add the wait operation in the pending 'waits' list. */
	  struct pending_waits* wait_op = malloc(sizeof(struct pending_waits));
	  wait_op->count = count;
      wait_op->op_type = mpi_op->op_type;
      assert(count < MAX_WAIT_REQS);

      for(i = 0; i < count; i++)
          wait_op->req_ids[i] =  mpi_op->u.waits.req_ids[i];

	  wait_op->num_completed = num_matched;
585
	  wait_op->start_time = tw_now(lp);
586
      s->wait_op = wait_op;
587
  }
588 589
  return;
}
590

591 592
/* search for a matching mpi operation and remove it from the list.
 * Record the index in the list from where the element got deleted.
593
 * Index is used for inserting the element once again in the queue for reverse computation. */
594
static int rm_matching_rcv(nw_state * ns,
595
        tw_bf * bf,
596 597
        nw_message * m,
        tw_lp * lp,
598
        mpi_msgs_queue * qitem)
599 600
{
    int matched = 0;
601
    int index = 0;
602 603
    struct qlist_head *ent = NULL;
    mpi_msgs_queue * qi = NULL;
604

605 606
    qlist_for_each(ent, &ns->pending_recvs_queue){
        qi = qlist_entry(ent, mpi_msgs_queue, ql);
607 608 609
        if(//(qi->num_bytes == qitem->num_bytes)
                //&& 
                ((qi->tag == qitem->tag) || qi->tag == -1)
610
                && ((qi->source_rank == qitem->source_rank) || qi->source_rank == -1))
611 612 613 614
        {
            matched = 1;
            break;
        }
615
        ++index;
616
    }
617

618 619
    if(matched)
    {
620
        m->rc.saved_recv_time = ns->recv_time;
621
        ns->recv_time += (tw_now(lp) - qi->req_init_time);
622

623 624
        if(qi->op_type == CODES_WK_IRECV)
            update_completed_queue(ns, bf, m, lp, qi->req_id);
625 626
        else if(qi->op_type == CODES_WK_RECV)
            codes_issue_next_event(lp);
627

628
        qlist_del(&qi->ql);
629

630
        rc_stack_push(lp, qi, free, ns->processed_ops);
631
        return index;
632 633 634 635
    }
    return -1;
}

636
static int rm_matching_send(nw_state * ns,
637 638 639
        tw_bf * bf,
        nw_message * m,
        tw_lp * lp, mpi_msgs_queue * qitem)
640 641 642 643 644
{
    int matched = 0;
    struct qlist_head *ent = NULL;
    mpi_msgs_queue * qi = NULL;

645
    int index = 0;
646 647
    qlist_for_each(ent, &ns->arrival_queue){
        qi = qlist_entry(ent, mpi_msgs_queue, ql);
648 649 650
        if(//(qi->num_bytes == qitem->num_bytes) // it is not a requirement in MPI that the send and receive sizes match
                // && 
		(qi->tag == qitem->tag || qitem->tag == -1)
651 652
                && ((qi->source_rank == qitem->source_rank) || qitem->source_rank == -1))
        {
653
            qitem->num_bytes = qi->num_bytes;
654 655 656
            matched = 1;
            break;
        }
657
        ++index;
658 659 660 661
    }

    if(matched)
    {
662
        m->rc.saved_recv_time = ns->recv_time;
663 664 665 666 667
        ns->recv_time += (tw_now(lp) - qitem->req_init_time);

        if(qitem->op_type == CODES_WK_IRECV)
            update_completed_queue(ns, bf, m, lp, qitem->req_id);

668
        qlist_del(&qi->ql);
669

670
        return index;
671 672 673 674 675
    }
    return -1;
}
static void codes_issue_next_event_rc(tw_lp * lp)
{
676
	    tw_rand_reverse_unif(lp->rng);
677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696
}

/* Trigger getting next event at LP */
static void codes_issue_next_event(tw_lp* lp)
{
   tw_event *e;
   nw_message* msg;

   tw_stime ts;

   ts = g_tw_lookahead + 0.1 + tw_rand_exponential(lp->rng, noise);
   e = tw_event_new( lp->gid, ts, lp );
   msg = tw_event_data(e);

   msg->msg_type = MPI_OP_GET_NEXT;
   tw_event_send(e);
}

/* Simulate delays between MPI operations */
static void codes_exec_comp_delay(
697
        nw_state* s, nw_message * m, tw_lp* lp, struct codes_workload_op * mpi_op)
698 699 700 701 702
{
	tw_event* e;
	tw_stime ts;
	nw_message* msg;

703
    m->rc.saved_delay = s->compute_time;
704 705
    s->compute_time += s_to_ns(mpi_op->u.delay.seconds);
    ts = s_to_ns(mpi_op->u.delay.seconds);
706 707

	ts += g_tw_lookahead + 0.1 + tw_rand_exponential(lp->rng, noise);
708

709 710 711
	e = tw_event_new( lp->gid, ts , lp );
	msg = tw_event_data(e);
	msg->msg_type = MPI_OP_GET_NEXT;
712 713
	tw_event_send(e);

714 715 716
}

/* reverse computation operation for MPI irecv */
717
static void codes_exec_mpi_recv_rc(
718 719 720
        nw_state* ns,
        tw_bf * bf,
        nw_message* m,
721
        tw_lp* lp)
722
{
723 724
	ns->recv_time = m->rc.saved_recv_time;
	if(m->fwd.found_match >= 0)
725
	  {
726
		ns->recv_time = m->rc.saved_recv_time;
727 728 729 730
        int queue_count = qlist_count(&ns->arrival_queue);

        mpi_msgs_queue * qi = rc_stack_pop(ns->processed_ops);

731
        if(!m->fwd.found_match)
732 733 734
        {
            qlist_add(&qi->ql, &ns->arrival_queue);
        }
735
        else if(m->fwd.found_match >= queue_count)
736 737 738
        {
            qlist_add_tail(&qi->ql, &ns->arrival_queue);
        }
739
        else if(m->fwd.found_match > 0 && m->fwd.found_match < queue_count)
740
        {
741 742 743 744
            int index = 1;
            struct qlist_head * ent = NULL;
            qlist_for_each(ent, &ns->arrival_queue)
            {
745
               if(index == m->fwd.found_match)
746 747 748 749
               {
                 qlist_add(&qi->ql, ent);
                 break;
               }
750
               index++;
751
            }
752
        }
753
        if(qi->op_type == CODES_WK_IRECV)
754
        {
755
            update_completed_queue_rc(ns, bf, m, lp);
756
        }
757 758
        codes_issue_next_event_rc(lp);
      }
759
	else if(m->fwd.found_match < 0)
760
	    {
761
            struct qlist_head * ent = qlist_pop_back(&ns->pending_recvs_queue);
762 763
            mpi_msgs_queue * qi = qlist_entry(ent, mpi_msgs_queue, ql);
            free(qi);
764

765
            if(m->op_type == CODES_WK_IRECV)
766
                codes_issue_next_event_rc(lp);
767 768 769
	    }
}

770
/* Execute MPI Irecv operation (non-blocking receive) */
771
static void codes_exec_mpi_recv(
772
        nw_state* s,
773
        tw_bf * bf,
774 775
        nw_message * m,
        tw_lp* lp,
776
        struct codes_workload_op * mpi_op)
777 778 779 780 781
{
/* Once an irecv is posted, list of completed sends is checked to find a matching isend.
   If no matching isend is found, the receive operation is queued in the pending queue of
   receive operations. */

782
	m->rc.saved_recv_time = s->recv_time;
783 784
    m->rc.saved_num_bytes = mpi_op->u.recv.num_bytes;

785 786 787 788 789 790 791 792 793
    mpi_msgs_queue * recv_op = (mpi_msgs_queue*) malloc(sizeof(mpi_msgs_queue));
    recv_op->req_init_time = tw_now(lp);
    recv_op->op_type = mpi_op->op_type;
    recv_op->source_rank = mpi_op->u.recv.source_rank;
    recv_op->dest_rank = mpi_op->u.recv.dest_rank;
    recv_op->num_bytes = mpi_op->u.recv.num_bytes;
    recv_op->tag = mpi_op->u.recv.tag;
    recv_op->req_id = mpi_op->u.recv.req_id;

794 795
    if(s->nw_id == (tw_lpid)TRACK_LP)
        printf("\n Receive op posted num bytes %llu source %d ", recv_op->num_bytes,
796 797
                recv_op->source_rank);

798
	int found_matching_sends = rm_matching_send(s, bf, m, lp, recv_op);
799 800 801 802

	/* save the req id inserted in the completed queue for reverse computation. */
	if(found_matching_sends < 0)
	  {
803
	   	  m->fwd.found_match = -1;
804
          qlist_add_tail(&recv_op->ql, &s->pending_recvs_queue);
805

806 807 808
	       /* for mpi irecvs, this is a non-blocking receive so just post it and move on with the trace read. */
		if(mpi_op->op_type == CODES_WK_IRECV)
		   {
809
			codes_issue_next_event(lp);
810 811
			return;
		   }
812
      }
813 814
	else
	  {
815
        m->fwd.found_match = found_matching_sends;
816
        codes_issue_next_event(lp);
817 818
	    rc_stack_push(lp, recv_op, free, s->processed_ops);
      }
819 820
}

821 822 823 824 825 826 827 828
int get_global_id_of_job_rank(tw_lpid job_rank, int app_id)
{
    struct codes_jobmap_id lid;
    lid.job = app_id;
    lid.rank = job_rank;
    int global_rank = codes_jobmap_to_global_id(lid, jobmap_ctx);
    return global_rank;
}
829
/* executes MPI send and isend operations */
830
static void codes_exec_mpi_send(nw_state* s,
831 832
        tw_bf * bf,
        nw_message * m,
833
        tw_lp* lp,
834
        struct codes_workload_op * mpi_op)
835
{
836 837
	/* model-net event */
    int global_dest_rank = mpi_op->u.send.dest_rank;
838

839 840 841 842 843
    if(alloc_spec)
    {
        global_dest_rank = get_global_id_of_job_rank(mpi_op->u.send.dest_rank, s->app_id);
    }

844
    //printf("\n Sender rank %d global dest rank %d ", s->nw_id, global_dest_rank);
845
    m->rc.saved_num_bytes = mpi_op->u.send.num_bytes;
846
	/* model-net event */
847 848 849
	tw_lpid dest_rank = codes_mapping_get_lpid_from_relative(global_dest_rank, NULL, "nw-lp", NULL, 0);
	
    num_bytes_sent += mpi_op->u.send.num_bytes;
850
    s->num_bytes_sent += mpi_op->u.send.num_bytes;
851

852 853 854 855 856 857 858
    if(enable_sampling)
    {
        if(tw_now(lp) >= s->cur_interval_end)
        {
            bf->c1 = 1;
            int indx = s->sampling_indx;
            s->mpi_wkld_samples[indx].nw_id = s->nw_id;
859
            s->mpi_wkld_samples[indx].app_id = s->app_id;
860 861
            s->mpi_wkld_samples[indx].sample_end_time = s->cur_interval_end;
            s->sampling_indx++;
862
            s->cur_interval_end += sampling_interval;
863 864 865
        }
        if(s->sampling_indx >= MAX_STATS)
        {
866
            struct mpi_workload_sample * tmp = calloc((MAX_STATS + s->max_arr_size), sizeof(struct mpi_workload_sample));
867 868 869 870 871 872 873 874 875
            memcpy(tmp, s->mpi_wkld_samples, s->sampling_indx);
            free(s->mpi_wkld_samples);
            s->mpi_wkld_samples = tmp;
            s->max_arr_size += MAX_STATS;
        }
        int indx = s->sampling_indx;
        s->mpi_wkld_samples[indx].num_sends_sample++;
        s->mpi_wkld_samples[indx].num_bytes_sample += mpi_op->u.send.num_bytes;
    }
876 877 878
	nw_message local_m;
	nw_message remote_m;

879 880 881
    local_m.fwd.sim_start_time = tw_now(lp);
    local_m.fwd.dest_rank = mpi_op->u.send.dest_rank;
    local_m.fwd.src_rank = mpi_op->u.send.source_rank;
882
    local_m.op_type = mpi_op->op_type;
883
    local_m.msg_type = MPI_SEND_POSTED;
884 885 886
    local_m.fwd.tag = mpi_op->u.send.tag;
    local_m.fwd.num_bytes = mpi_op->u.send.num_bytes;
    local_m.fwd.req_id = mpi_op->u.send.req_id;
887
    local_m.fwd.app_id = s->app_id;
888 889 890 891

    remote_m = local_m;
	remote_m.msg_type = MPI_SEND_ARRIVED;

892 893
	m->event_rc = model_net_event_mctx(net_id, &group_ratio, &group_ratio, 
            "test", dest_rank, mpi_op->u.send.num_bytes, self_overhead,
894 895
	    sizeof(nw_message), (const void*)&remote_m, sizeof(nw_message), (const void*)&local_m, lp);

896 897 898 899
    if(enable_debug)
    {
        if(mpi_op->op_type == CODES_WK_ISEND)
        {
900
            fprintf(workload_log, "\n (%lf) APP %d MPI ISEND SOURCE %llu DEST %d TAG %d BYTES %llu ",
901
                    tw_now(lp), s->app_id, s->nw_id, global_dest_rank, mpi_op->u.send.tag, mpi_op->u.send.num_bytes);
902 903
        }
        else
904
            fprintf(workload_log, "\n (%lf) APP ID %d MPI SEND SOURCE %llu DEST %d TAG %d BYTES %llu ",
905 906
                    tw_now(lp), s->app_id, s->nw_id, global_dest_rank, mpi_op->u.send.tag, mpi_op->u.send.num_bytes);
    }
907
	/* isend executed, now get next MPI operation from the queue */
908 909 910 911 912 913 914 915 916 917
	if(mpi_op->op_type == CODES_WK_ISEND)
	   codes_issue_next_event(lp);
}

/* convert seconds to ns */
static tw_stime s_to_ns(tw_stime ns)
{
    return(ns * (1000.0 * 1000.0 * 1000.0));
}

918 919
static void update_completed_queue_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
920

921 922 923
    if(bf->c0)
    {
       struct qlist_head * ent = qlist_pop_back(&s->completed_reqs);
924 925 926 927 928 929 930

        completed_requests * req = qlist_entry(ent, completed_requests, ql);
      /*if(lp->gid == TRACK)
      {
          printf("\n After popping %ld ", req->req_id);
        print_completed_queue(&s->completed_reqs);
      }*/
931 932 933 934
       free(req);
    }
    else if(bf->c1)
    {
935
       struct pending_waits* wait_elem = rc_stack_pop(s->processed_ops);
936
       s->wait_op = wait_elem;
937 938
       s->wait_time = m->rc.saved_wait_time;
       add_completed_reqs(s, lp, m->fwd.num_matched);
939
       codes_issue_next_event_rc(lp);
940
    }
941 942
    if(m->fwd.wait_completed > 0)
           s->wait_op->num_completed--;
943 944
}

945
static void update_completed_queue(nw_state* s,
946 947 948 949 950 951 952
        tw_bf * bf,
        nw_message * m,
        tw_lp * lp,
        dumpi_req_id req_id)
{
    bf->c0 = 0;
    bf->c1 = 0;
953
    m->fwd.num_matched = 0;
954

955 956
    int waiting = 0;
    waiting = notify_posted_wait(s, bf, m, lp, req_id);
957

958 959 960 961 962 963
    if(!waiting)
    {
        bf->c0 = 1;
        completed_requests * req = malloc(sizeof(completed_requests));
        req->req_id = req_id;
        qlist_add_tail(&req->ql, &s->completed_reqs);
964

965 966 967 968 969
        /*if(lp->gid == TRACK)
        {
            printf("\n Forward mode adding %ld ", req_id);
            print_completed_queue(&s->completed_reqs);
        }*/
970
    }
971
    else
972 973
     {
            bf->c1 = 1;
974 975
            m->fwd.num_matched = clear_completed_reqs(s, lp, s->wait_op->req_ids, s->wait_op->count);
            m->rc.saved_wait_time = s->wait_time;
976
            s->wait_time += (tw_now(lp) - s->wait_op->start_time);
977 978 979

            struct pending_waits* wait_elem = s->wait_op;
            rc_stack_push(lp, wait_elem, free, s->processed_ops);
980
            s->wait_op = NULL;
981
            codes_issue_next_event(lp);
982 983 984
     }
}

985
/* reverse handler for updating arrival queue function */
986 987
static void update_arrival_queue_rc(nw_state* s,
        tw_bf * bf,
988
        nw_message * m, tw_lp * lp)
989
{
990
	s->recv_time = m->rc.saved_recv_time;
991
    s->num_bytes_recvd -= m->fwd.num_bytes;
992
    num_bytes_recvd -= m->fwd.num_bytes;
993

994
    codes_local_latency_reverse(lp);
995

996
    if(m->fwd.found_match >= 0)
997
	{
998
        mpi_msgs_queue * qi = rc_stack_pop(s->processed_ops);
999
        int queue_count = qlist_count(&s->pending_recvs_queue);
1000

1001
        if(!m->fwd.found_match)
1002 1003 1004
        {
            qlist_add(&qi->ql, &s->pending_recvs_queue);
        }
1005
        else if(m->fwd.found_match >= queue_count)
1006
        {
1007 1008
            qlist_add_tail(&qi->ql, &s->pending_recvs_queue);
        }
1009
        else if(m->fwd.found_match > 0 && m->fwd.found_match < queue_count)
1010 1011 1012 1013 1014
        {
            int index = 1;
            struct qlist_head * ent = NULL;
            qlist_for_each(ent, &s->pending_recvs_queue)
            {
1015
               if(index == m->fwd.found_match)
1016 1017 1018 1019
               {
                 qlist_add(&qi->ql, ent);
                 break;
               }
1020
               index++;
1021
            }
1022
        }
1023 1024
        if(qi->op_type == CODES_WK_IRECV)
            update_completed_queue_rc(s, bf, m, lp);
1025 1026
        else if(qi->op_type == CODES_WK_RECV)
            codes_issue_next_event_rc(lp);
1027
    }
1028
	else if(m->fwd.found_match < 0)
1029
	{
1030
	    struct qlist_head * ent = qlist_pop_back(&s->arrival_queue);
1031 1032 1033 1034 1035 1036 1037 1038
        mpi_msgs_queue * qi = qlist_entry(ent, mpi_msgs_queue, ql);
        free(qi);
    }
}

/* once an isend operation arrives, the pending receives queue is checked to find out if there is a irecv that has already been posted. If no isend has been posted, */
static void update_arrival_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
1039
    if(s->app_id != m->fwd.app_id)
1040
        printf("\n Received message for app %d my id %d my rank %llu ",
1041 1042 1043
                m->fwd.app_id, s->app_id, s->nw_id);
    assert(s->app_id == m->fwd.app_id);

1044 1045
    //if(s->local_rank != m->fwd.dest_rank)
    //    printf("\n Dest rank %d local rank %d ", m->fwd.dest_rank, s->local_rank);
1046
	m->rc.saved_recv_time = s->recv_time;
1047
    s->num_bytes_recvd += m->fwd.num_bytes;
1048
    num_bytes_recvd += m->fwd.num_bytes;
1049 1050

    // send a callback to the sender to increment times
1051 1052 1053 1054 1055 1056
    // find the global id of the source
    int global_src_id = m->fwd.src_rank;
    if(alloc_spec)
    {
        global_src_id = get_global_id_of_job_rank(m->fwd.src_rank, s->app_id);
    }
1057
    tw_event *e_callback =
1058
        tw_event_new(rank_to_lpid(global_src_id),
1059 1060 1061
                codes_local_latency(lp), lp);
    nw_message *m_callback = tw_event_data(e_callback);
    m_callback->msg_type = MPI_SEND_ARRIVED_CB;
1062
    m_callback->fwd.msg_send_time = tw_now(lp) - m->fwd.sim_start_time;
1063 1064 1065 1066
    tw_event_send(e_callback);

    /* Now reconstruct the queue item */
    mpi_msgs_queue * arrived_op = (mpi_msgs_queue *) malloc(sizeof(mpi_msgs_queue));
1067
    arrived_op->req_init_time = m->fwd.sim_start_time;
1068
    arrived_op->op_type = m->op_type;
1069 1070 1071 1072
    arrived_op->source_rank = m->fwd.src_rank;
    arrived_op->dest_rank = m->fwd.dest_rank;
    arrived_op->num_bytes = m->fwd.num_bytes;
    arrived_op->tag = m->fwd.tag;
1073

1074 1075
    if(s->nw_id == (tw_lpid)TRACK_LP)
        printf("\n Send op arrived source rank %d num bytes %llu ", arrived_op->source_rank,
1076 1077
                arrived_op->num_bytes);

1078
    int found_matching_recv = rm_matching_rcv(s, bf, m, lp, arrived_op);
1079 1080 1081

    if(found_matching_recv < 0)
    {
1082
        m->fwd.found_match = -1;
1083 1084 1085 1086
        qlist_add_tail(&arrived_op->ql, &s->arrival_queue);
    }
    else
    {
1087
        m->fwd.found_match = found_matching_recv;
1088
        free(arrived_op);
1089 1090 1091 1092 1093 1094 1095 1096
    }
}
static void update_message_time(
        nw_state * s,
        tw_bf * bf,
        nw_message * m,
        tw_lp * lp)
{
1097 1098
    m->rc.saved_send_time = s->send_time;
    s->send_time += m->fwd.msg_send_time;
1099 1100 1101 1102 1103 1104 1105 1106
}

static void update_message_time_rc(
        nw_state * s,
        tw_bf * bf,
        nw_message * m,
        tw_lp * lp)
{
1107
    s->send_time = m->rc.saved_send_time;
1108 1109 1110 1111 1112 1113 1114 1115
}

/* initializes the network node LP, loads the trace file in the structs, calls the first MPI operation to be executed */
void nw_test_init(nw_state* s, tw_lp* lp)
{
   /* initialize the LP's and load the data */
   char * params = NULL;
   dumpi_trace_params params_d;
1116

1117
   memset(s, 0, sizeof(*s));
1118
   s->nw_id = codes_mapping_get_lp_relative_id(lp->gid, 0, 0);
1119
   s->mpi_wkld_samples = calloc(MAX_STATS, sizeof(struct mpi_workload_sample));
1120
   s->sampling_indx = 0;
1121

1122
   if(!num_net_traces)
1123
	num_net_traces = num_mpi_lps;
1124

1125
   assert(num_net_traces <= num_mpi_lps);
1126 1127

   struct codes_jobmap_id lid;
1128

1129 1130
   if(alloc_spec)
   {
1131
        lid = codes_jobmap_to_local_id(s->nw_id, jobmap_ctx);
1132

1133 1134 1135 1136 1137 1138 1139 1140 1141
        if(lid.job == -1)
        {
            s->app_id = -1;
            s->local_rank = -1;
            return;
        }
   }
   else
   {
1142
       /* Only one job running with contiguous mapping */
1143 1144 1145
       lid.job = 0;
       lid.rank = s->nw_id;
       s->app_id = 0;
1146

1147
       if((int)s->nw_id >= num_net_traces)
1148
	        return;
1149
   }
1150

Matthieu Dorier's avatar
Matthieu Dorier committed
1151
   if (strcmp(workload_type, "dumpi") == 0){
1152 1153
       strcpy(params_d.file_name, file_name_of_job[lid.job]);
       params_d.num_net_traces = num_traces_of_job[lid.job];
1154
       params = (char*)&params_d;
1155
       s->app_id = lid.job;
1156
       s->local_rank = lid.rank;
1157 1158
//       printf("network LP nw id %d app id %d local rank %d generating events, lp gid is %ld \n", s->nw_id, 
//               s->app_id, s->local_rank, lp->gid);
1159
#ifdef ENABLE_CORTEX_PYTHON
1160 1161
	strcpy(params_d.cortex_script, cortex_file);
	strcpy(params_d.cortex_class, cortex_class);
1162
	strcpy(params_d.cortex_gen, cortex_gen);
1163
#endif
Matthieu Dorier's avatar
Matthieu Dorier committed
1164
   }
1165

1166 1167
   wrkld_id = codes_workload_load("dumpi-trace-workload", params, s->app_id, s->local_rank);

1168 1169 1170 1171 1172 1173
   double overhead;
   int rc = configuration_get_value_double(&config, "PARAMS", "self_msg_overhead", NULL, &overhead);

   if(overhead)
       self_overhead = overhead;

1174 1175 1176
   INIT_QLIST_HEAD(&s->arrival_queue);
   INIT_QLIST_HEAD(&s->pending_recvs_queue);
   INIT_QLIST_HEAD(&s->completed_reqs);
1177 1178 1179

   /* Initialize the RC stack */
   rc_stack_create(&s->processed_ops);
1180
   rc_stack_create(&s->matched_reqs);
1181 1182

   assert(s->processed_ops != NULL);
1183
   assert(s->matched_reqs != NULL);
1184 1185


1186
   /* clock starts ticking when the first event is processed */
1187 1188
   s->start_time = tw_now(lp);
   codes_issue_next_event(lp);
1189 1190
   s->num_bytes_sent = 0;
   s->num_bytes_recvd = 0;
1191 1192
   s->compute_time = 0;
   s->elapsed_time = 0;
1193

1194 1195 1196 1197 1198 1199
   if(enable_sampling && sampling_interval > 0)
   {
       s->max_arr_size = MAX_STATS;
       s->cur_interval_end = sampling_interval;
       if(!g_tw_mynode && !s->nw_id)
       {
1200
           fprintf(workload_meta_log, "\n mpi_proc_id app_id num_waits "