model-net-mpi-replay.c 41 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (C) 2014 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */
#include <ross.h>
#include <inttypes.h>

#include "codes/codes-workload.h"
#include "codes/codes.h"
#include "codes/configuration.h"
#include "codes/codes_mapping.h"
#include "codes/model-net.h"
#include "codes/rc-stack.h"
#include "codes/quicklist.h"

17 18
/* turning on track lp will generate a lot of output messages */
#define TRACK_LP -1
19
#define TRACE -1
20
#define MAX_WAIT_REQS 512
21
#define DBG_MPI_SIM 0
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36

char workload_type[128];
char workload_file[8192];
char offset_file[8192];
static int wrkld_id;
static int num_net_traces = 0;

/* Doing LP IO*/
static char lp_io_dir[256] = {'\0'};
static lp_io_handle io_handle;
static unsigned int lp_io_use_suffix = 0;
static int do_lp_io = 0;

typedef struct nw_state nw_state;
typedef struct nw_message nw_message;
37
typedef int32_t dumpi_req_id;
38 39 40 41 42

static int net_id = 0;
static float noise = 5.0;
static int num_net_lps, num_nw_lps;

43 44 45 46
#define CS_LP_DBG 0
#define lprintf(_fmt, ...) \
        do {if (CS_LP_DBG) printf(_fmt, __VA_ARGS__);} while (0)

47 48 49 50 51 52 53 54 55 56 57 58
long long num_bytes_sent=0;
long long num_bytes_recvd=0;

double max_time = 0,  max_comm_time = 0, max_wait_time = 0, max_send_time = 0, max_recv_time = 0;
double avg_time = 0, avg_comm_time = 0, avg_wait_time = 0, avg_send_time = 0, avg_recv_time = 0;

/* global variables for codes mapping */
static char lp_group_name[MAX_NAME_LENGTH], lp_type_name[MAX_NAME_LENGTH], annotation[MAX_NAME_LENGTH];
static int mapping_grp_id, mapping_type_id, mapping_rep_id, mapping_offset;

/* runtime option for disabling computation time simulation */
static int disable_delay = 0;
59 60 61
static int enable_sampling = 0;
static double sampling_interval = 5000000;
static double sampling_end_time = 3000000000;
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97

/* MPI_OP_GET_NEXT is for getting next MPI operation when the previous operation completes.
* MPI_SEND_ARRIVED is issued when a MPI message arrives at its destination (the message is transported by model-net and an event is invoked when it arrives. 
* MPI_SEND_POSTED is issued when a MPI message has left the source LP (message is transported via model-net). */
enum MPI_NW_EVENTS
{
	MPI_OP_GET_NEXT=1,
	MPI_SEND_ARRIVED,
    MPI_SEND_ARRIVED_CB, // for tracking message times on sender
	MPI_SEND_POSTED,
};

/* stores pointers of pending MPI operations to be matched with their respective sends/receives. */
struct mpi_msgs_queue
{
    int op_type;
    int tag;
    int source_rank;
    int dest_rank;
    int num_bytes;
    tw_stime req_init_time;
	dumpi_req_id req_id;
    struct qlist_head ql;
};

/* stores request IDs of completed MPI operations (Isends or Irecvs) */
struct completed_requests
{
	dumpi_req_id req_id;
    struct qlist_head ql;
};

/* for wait operations, store the pending operation and number of completed waits so far. */
struct pending_waits
{
    int op_type;
98
    int32_t req_ids[MAX_WAIT_REQS];
99
	int num_completed;
100 101
	int count;
    tw_stime start_time;
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
    struct qlist_head ql;
};

typedef struct mpi_msgs_queue mpi_msgs_queue;
typedef struct completed_requests completed_requests;
typedef struct pending_waits pending_waits;

/* state of the network LP. It contains the pointers to send/receive lists */
struct nw_state
{
	long num_events_per_lp;
	tw_lpid nw_id;
	short wrkld_end;

    struct rc_stack * processed_ops;
117
    struct rc_stack * matched_reqs;
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144

    /* count of sends, receives, collectives and delays */
	unsigned long num_sends;
	unsigned long num_recvs;
	unsigned long num_cols;
	unsigned long num_delays;
	unsigned long num_wait;
	unsigned long num_waitall;
	unsigned long num_waitsome;

	/* time spent by the LP in executing the app trace*/
	double start_time;
	double elapsed_time;
	/* time spent in compute operations */
	double compute_time;
	/* time spent in message send/isend */
	double send_time;
	/* time spent in message receive */
	double recv_time;
	/* time spent in wait operation */
	double wait_time;
	/* FIFO for isend messages arrived on destination */
	struct qlist_head arrival_queue;
	/* FIFO for irecv messages posted but not yet matched with send operations */
	struct qlist_head pending_recvs_queue;
	/* List of completed send/receive requests */
	struct qlist_head completed_reqs;
145 146 147

    /* Pending wait operation */
    struct pending_waits * wait_op;
148 149 150 151 152

    unsigned long num_bytes_sent;
    unsigned long num_bytes_recvd;

    char output_buf[512];
153 154 155 156 157 158 159 160
};

/* data for handling reverse computation.
* saved_matched_req holds the request ID of matched receives/sends for wait operations.
* ptr_match_op holds the matched MPI operation which are removed from the queues when a send is matched with the receive in forward event handler. 
* network event being sent. op is the MPI operation issued by the network workloads API. rv_data holds the data for reverse computation (TODO: Fill this data structure only when the simulation runs in optimistic mode). */
struct nw_message
{
161
   // forward message handler 
162
   int msg_type;
163 164
   int op_type;
   
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
   struct
   {
       tw_lpid src_rank;
       tw_lpid dest_rank;
       int num_bytes;
       int num_matched;
       int data_type;
       double sim_start_time;
       // for callbacks - time message was received
       double msg_send_time;
       int16_t req_id;   
       int tag;
       int found_match;
       short wait_completed;
   } fwd;
   struct
   {
       double saved_send_time;
       double saved_recv_time;
       double saved_wait_time;
       double saved_delay;
       int saved_num_bytes;
187
       struct codes_workload_op * saved_op;
188
   } rc;
189 190 191 192
};

/* executes MPI isend and send operations */
static void codes_exec_mpi_send(
193
        nw_state* s, tw_bf * bf, nw_message * m, tw_lp* lp, struct codes_workload_op * mpi_op);
194 195
/* execute MPI irecv operation */
static void codes_exec_mpi_recv(
196
        nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, struct codes_workload_op * mpi_op);
197 198
/* reverse of mpi recv function. */
static void codes_exec_mpi_recv_rc(
199
        nw_state* s, tw_bf * bf, nw_message* m, tw_lp* lp);
200 201
/* execute the computational delay */
static void codes_exec_comp_delay(
202
        nw_state* s, nw_message * m, tw_lp* lp, struct codes_workload_op * mpi_op);
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
/* gets the next MPI operation from the network-workloads API. */
static void get_next_mpi_operation(
        nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp);
/* reverse handler of get next mpi operation. */
static void get_next_mpi_operation_rc(
        nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp);
/* Makes a call to get_next_mpi_operation. */
static void codes_issue_next_event(tw_lp* lp);
/* reverse handler of next operation */
static void codes_issue_next_event_rc(tw_lp* lp);


///////////////////// HELPER FUNCTIONS FOR MPI MESSAGE QUEUE HANDLING ///////////////
/* upon arrival of local completion message, inserts operation in completed send queue */
/* upon arrival of an isend operation, updates the arrival queue of the network */
218 219 220 221 222 223 224 225
static void update_completed_queue(
        nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp, dumpi_req_id req_id);
/* reverse of the above function */
static void update_completed_queue_rc(
        nw_state*s,
        tw_bf * bf,
        nw_message * m,
        tw_lp * lp);
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
static void update_arrival_queue(
        nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
/* reverse of the above function */
static void update_arrival_queue_rc(
        nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
/* callback to a message sender for computing message time */
static void update_message_time(
        nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
/* reverse for computing message time */
static void update_message_time_rc(
        nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);

/* conversion from seconds to eanaoseconds */
static tw_stime s_to_ns(tw_stime ns);

241
/* Debugging functions, may generate unused function warning */
242
static void print_waiting_reqs(int32_t * reqs, int count)
243 244 245 246
{
    printf("\n Waiting reqs: ");
    int i;
    for(i = 0; i < count; i++ )
247
        printf(" %d ", reqs[i]);
248 249 250 251 252 253 254 255 256
}
static void print_completed_queue(struct qlist_head * head)
{
    printf("\n Completed queue: ");
      struct qlist_head * ent = NULL;
      struct completed_requests* current = NULL;
      qlist_for_each(ent, head)
       {
            current = qlist_entry(ent, completed_requests, ql);
257
            printf(" %d ", current->req_id);
258 259
       }
}
260
static int clear_completed_reqs(nw_state * s,
261
        tw_lp * lp,
262
        int32_t * reqs, int count)
263
{
264
    int i, matched = 0;
265 266 267 268 269 270 271 272 273
    for( i = 0; i < count; i++)
    {
      struct qlist_head * ent = NULL;
      qlist_for_each(ent, &s->completed_reqs)
       {
            struct completed_requests* current = 
                qlist_entry(ent, completed_requests, ql);
            if(current->req_id == reqs[i])
            {
274
                ++matched;
275 276 277 278 279
                qlist_del(&current->ql);
                rc_stack_push(lp, current, free, s->matched_reqs);
            }
       }
    }
280
    return matched;
281 282
}
static void add_completed_reqs(nw_state * s, 
283 284
        tw_lp * lp,
        int count)
285 286 287 288 289 290 291 292
{
    int i;
    for( i = 0; i < count; i++)
    {
       struct completed_requests * req = rc_stack_pop(s->matched_reqs); 
       qlist_add(&req->ql, &s->completed_reqs); 
    }
}
293 294 295 296 297 298
/* helper function - maps an MPI rank to an LP id */
static tw_lpid rank_to_lpid(int rank)
{
    return codes_mapping_get_lpid_from_relative(rank, NULL, "nw-lp", NULL, 0);
}

299 300 301
static int notify_posted_wait(nw_state* s,
        tw_bf * bf, nw_message * m, tw_lp * lp, 
        dumpi_req_id completed_req)
302
{
303 304
    struct pending_waits* wait_elem = s->wait_op;
    int wait_completed = 0;
305

306 307
    m->fwd.wait_completed = 0;
    
308 309
    if(!wait_elem)
        return 0;
310
    
311
    int op_type = wait_elem->op_type;
312

313 314 315 316 317 318 319 320 321 322 323 324 325
    if(op_type == CODES_WK_WAIT &&
            (wait_elem->req_ids[0] == completed_req))
    {
            wait_completed = 1;
    }
    else if(op_type == CODES_WK_WAITALL 
            || op_type == CODES_WK_WAITANY 
            || op_type == CODES_WK_WAITSOME)
    {
        int i;
        for(i = 0; i < wait_elem->count; i++)
        {
            if(wait_elem->req_ids[i] == completed_req)
326
            {
327
                wait_elem->num_completed++;
328
                if(wait_elem->num_completed > wait_elem->count)
329
                    printf("\n Num completed %d count %d LP %llu ",
330 331 332
                            wait_elem->num_completed,
                            wait_elem->count,
                            lp->gid);
333 334 335 336
                assert(wait_elem->num_completed <= wait_elem->count);
                if(wait_elem->num_completed == wait_elem->count)
                    wait_completed = 1;
           
337
                m->fwd.wait_completed = 1;
338
            }
339
        }
340
    }
341
    return wait_completed;
342
}
343

344
/* reverse handler of MPI wait operation */
345
static void codes_exec_mpi_wait_rc(nw_state* s, tw_lp* lp)
346
{
347
    if(s->wait_op)
348
     {
349 350 351
         struct pending_waits * wait_op = s->wait_op;
         free(wait_op);
         s->wait_op = NULL;
352 353 354 355
     }
   else
    {
        codes_issue_next_event_rc(lp);
356
        completed_requests * qi = rc_stack_pop(s->processed_ops);
357
        qlist_add(&qi->ql, &s->completed_reqs);
358
    }
359
    return;
360
}
361

362
/* execute MPI wait operation */
363
static void codes_exec_mpi_wait(nw_state* s, tw_lp* lp, struct codes_workload_op * mpi_op)
364
{
365 366
    /* check in the completed receives queue if the request ID has already been completed.*/
    assert(!s->wait_op);
367
    dumpi_req_id req_id = mpi_op->u.wait.req_id;
368
    struct completed_requests* current = NULL;
369

370 371 372 373 374 375 376
    struct qlist_head * ent = NULL;
    qlist_for_each(ent, &s->completed_reqs)
    {
        current = qlist_entry(ent, completed_requests, ql);
        if(current->req_id == req_id)
        {
            qlist_del(&current->ql);
377
            rc_stack_push(lp, current, free, s->processed_ops);
378 379 380 381
            codes_issue_next_event(lp);
            return;
        }
    }
382 383 384 385 386
    /* If not, add the wait operation in the pending 'waits' list. */
    struct pending_waits* wait_op = malloc(sizeof(struct pending_waits));
    wait_op->op_type = mpi_op->op_type;
    wait_op->req_ids[0] = req_id;
    wait_op->count = 1;
387 388
    wait_op->num_completed = 0;
    wait_op->start_time = tw_now(lp);
389
    s->wait_op = wait_op;
390

391
    return;
392 393
}

394 395 396 397
static void codes_exec_mpi_wait_all_rc(
        nw_state* s, 
        tw_bf * bf,
        nw_message * m,
398
        tw_lp* lp)
399
{
400 401 402 403 404 405 406 407
  if(s->wait_op)
  {
      struct pending_waits * wait_op = s->wait_op;
      free(wait_op);
      s->wait_op = NULL;
  }
  else
  {
408
      add_completed_reqs(s, lp, m->fwd.num_matched);
409 410 411
      codes_issue_next_event_rc(lp);
  }
  return;
412 413
}
static void codes_exec_mpi_wait_all(
414 415 416 417 418
        nw_state* s, 
        tw_bf * bf,
        nw_message * m,
        tw_lp* lp, 
        struct codes_workload_op * mpi_op)
419 420
{
  int count = mpi_op->u.waits.count;
421 422
  /* If the count is not less than max wait reqs then stop */
  assert(count < MAX_WAIT_REQS);
423

424
  int i = 0, num_matched = 0;
425
  m->fwd.num_matched = 0;
426

427
  /*if(lp->gid == TRACK)
428
  {
429
      printf("\n MPI Wait all posted ");
430 431
      print_waiting_reqs(mpi_op->u.waits.req_ids, count);
      print_completed_queue(&s->completed_reqs);
432
  }*/
433 434 435 436 437 438 439 440 441 442 443 444 445
      /* check number of completed irecvs in the completion queue */ 
  for(i = 0; i < count; i++)
  {
      dumpi_req_id req_id = mpi_op->u.waits.req_ids[i];
      struct qlist_head * ent = NULL;
      struct completed_requests* current = NULL;
      qlist_for_each(ent, &s->completed_reqs)
       {
            current = qlist_entry(ent, completed_requests, ql);
            if(current->req_id == req_id)
                num_matched++;
       }
  }
446

447
  m->fwd.found_match = num_matched;
448 449 450 451
  if(num_matched == count)
  {
    /* No need to post a MPI Wait all then, issue next event */
      /* Remove all completed requests from the list */
452 453 454
      m->fwd.num_matched = clear_completed_reqs(s, lp, mpi_op->u.waits.req_ids, count);
      struct pending_waits* wait_op = s->wait_op;
      free(wait_op);
455 456
      s->wait_op = NULL;
      codes_issue_next_event(lp);
457 458
  }
  else
459 460 461 462 463 464 465 466 467 468 469
  {
      /* If not, add the wait operation in the pending 'waits' list. */
	  struct pending_waits* wait_op = malloc(sizeof(struct pending_waits));
	  wait_op->count = count;
      wait_op->op_type = mpi_op->op_type;
      assert(count < MAX_WAIT_REQS);

      for(i = 0; i < count; i++)
          wait_op->req_ids[i] =  mpi_op->u.waits.req_ids[i];

	  wait_op->num_completed = num_matched;
470
	  wait_op->start_time = tw_now(lp);
471
      s->wait_op = wait_op;
472
  }
473 474
  return;
}
475 476 477 478

/* search for a matching mpi operation and remove it from the list. 
 * Record the index in the list from where the element got deleted. 
 * Index is used for inserting the element once again in the queue for reverse computation. */
479 480 481 482 483
static int rm_matching_rcv(nw_state * ns, 
        tw_bf * bf,
        nw_message * m, 
        tw_lp * lp, 
        mpi_msgs_queue * qitem)
484 485
{
    int matched = 0;
486
    int index = 0;
487 488
    struct qlist_head *ent = NULL;
    mpi_msgs_queue * qi = NULL;
489

490 491
    qlist_for_each(ent, &ns->pending_recvs_queue){
        qi = qlist_entry(ent, mpi_msgs_queue, ql);
492
        if((qi->num_bytes == qitem->num_bytes)
493 494
                && ((qi->tag == qitem->tag) || qi->tag == -1)
                && ((qi->source_rank == qitem->source_rank) || qi->source_rank == -1))
495 496 497 498
        {
            matched = 1;
            break;
        }
499
        ++index;
500 501 502 503
    }
    
    if(matched)
    {
504
        m->rc.saved_recv_time = ns->recv_time;
505
        ns->recv_time += (tw_now(lp) - qi->req_init_time);
506 507 508
        
        if(qi->op_type == CODES_WK_IRECV)
            update_completed_queue(ns, bf, m, lp, qi->req_id);
509
        
510
        qlist_del(&qi->ql);
511 512
        
        rc_stack_push(lp, qi, free, ns->processed_ops);
513
        return index;
514 515 516 517
    }
    return -1;
}

518 519 520 521
static int rm_matching_send(nw_state * ns, 
        tw_bf * bf,
        nw_message * m,
        tw_lp * lp, mpi_msgs_queue * qitem)
522 523 524 525 526
{
    int matched = 0;
    struct qlist_head *ent = NULL;
    mpi_msgs_queue * qi = NULL;

527
    int index = 0;
528 529
    qlist_for_each(ent, &ns->arrival_queue){
        qi = qlist_entry(ent, mpi_msgs_queue, ql);
530
        if((qi->num_bytes == qitem->num_bytes) 
531 532 533 534 535 536
                && (qi->tag == qitem->tag || qitem->tag == -1)
                && ((qi->source_rank == qitem->source_rank) || qitem->source_rank == -1))
        {
            matched = 1;
            break;
        }
537
        ++index;
538 539 540 541
    }

    if(matched)
    {
542
        m->rc.saved_recv_time = ns->recv_time;
543 544 545 546 547
        ns->recv_time += (tw_now(lp) - qitem->req_init_time);

        if(qitem->op_type == CODES_WK_IRECV)
            update_completed_queue(ns, bf, m, lp, qitem->req_id);

548
        qlist_del(&qi->ql);
549

550
        return index;
551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576
    }
    return -1;
}
static void codes_issue_next_event_rc(tw_lp * lp)
{
	    tw_rand_reverse_unif(lp->rng);	
}

/* Trigger getting next event at LP */
static void codes_issue_next_event(tw_lp* lp)
{
   tw_event *e;
   nw_message* msg;

   tw_stime ts;

   ts = g_tw_lookahead + 0.1 + tw_rand_exponential(lp->rng, noise);
   e = tw_event_new( lp->gid, ts, lp );
   msg = tw_event_data(e);

   msg->msg_type = MPI_OP_GET_NEXT;
   tw_event_send(e);
}

/* Simulate delays between MPI operations */
static void codes_exec_comp_delay(
577
        nw_state* s, nw_message * m, tw_lp* lp, struct codes_workload_op * mpi_op)
578 579 580 581 582
{
	tw_event* e;
	tw_stime ts;
	nw_message* msg;

583
    m->rc.saved_delay = s->compute_time;
584 585
    s->compute_time += s_to_ns(mpi_op->u.delay.seconds);
    ts = s_to_ns(mpi_op->u.delay.seconds);
586 587 588 589 590 591 592 593 594 595 596

	ts += g_tw_lookahead + 0.1 + tw_rand_exponential(lp->rng, noise);
	
	e = tw_event_new( lp->gid, ts , lp );
	msg = tw_event_data(e);
	msg->msg_type = MPI_OP_GET_NEXT;
	tw_event_send(e); 
                
}

/* reverse computation operation for MPI irecv */
597 598 599 600
static void codes_exec_mpi_recv_rc(
        nw_state* ns, 
        tw_bf * bf, 
        nw_message* m, 
601
        tw_lp* lp)
602
{
603
	num_bytes_recvd -= m->rc.saved_num_bytes;
604 605
	ns->recv_time = m->rc.saved_recv_time;
	if(m->fwd.found_match >= 0)
606
	  {
607
		ns->recv_time = m->rc.saved_recv_time;
608
        int queue_count = qlist_count(&ns->arrival_queue); 
609
        
610
        mpi_msgs_queue * qi = rc_stack_pop(ns->processed_ops);	
611
       
612
        if(!m->fwd.found_match)
613 614 615
        {
            qlist_add(&qi->ql, &ns->arrival_queue);
        }
616
        else if(m->fwd.found_match >= queue_count)
617 618 619
        {
            qlist_add_tail(&qi->ql, &ns->arrival_queue);
        }
620
        else if(m->fwd.found_match > 0 && m->fwd.found_match < queue_count) 
621
        {
622 623 624 625
            int index = 1;
            struct qlist_head * ent = NULL;
            qlist_for_each(ent, &ns->arrival_queue)
            {
626
               if(index == m->fwd.found_match)
627 628 629 630 631 632
               {
                 qlist_add(&qi->ql, ent);
                 break;
               }
               index++; 
            }
633
        }
634
        if(qi->op_type == CODES_WK_IRECV)
635
        {
636
            update_completed_queue_rc(ns, bf, m, lp);
637
        }
638 639
        codes_issue_next_event_rc(lp);
      }
640
	else if(m->fwd.found_match < 0)
641
	    {
642 643 644 645
            struct qlist_head * ent = qlist_pop_back(&ns->pending_recvs_queue); 
            mpi_msgs_queue * qi = qlist_entry(ent, mpi_msgs_queue, ql);
            free(qi);
            
646
            if(m->op_type == CODES_WK_IRECV)
647
                codes_issue_next_event_rc(lp);
648 649 650 651
	    }
}

/* Execute MPI Irecv operation (non-blocking receive) */ 
652 653 654 655 656 657
static void codes_exec_mpi_recv(
        nw_state* s, 
        tw_bf * bf,
        nw_message * m, 
        tw_lp* lp, 
        struct codes_workload_op * mpi_op)
658 659 660 661 662
{
/* Once an irecv is posted, list of completed sends is checked to find a matching isend.
   If no matching isend is found, the receive operation is queued in the pending queue of
   receive operations. */

663
	m->rc.saved_recv_time = s->recv_time;
664 665
    m->rc.saved_num_bytes = mpi_op->u.recv.num_bytes;

666 667 668 669 670 671 672 673 674 675 676
	num_bytes_recvd += mpi_op->u.recv.num_bytes;

    mpi_msgs_queue * recv_op = (mpi_msgs_queue*) malloc(sizeof(mpi_msgs_queue));
    recv_op->req_init_time = tw_now(lp);
    recv_op->op_type = mpi_op->op_type;
    recv_op->source_rank = mpi_op->u.recv.source_rank;
    recv_op->dest_rank = mpi_op->u.recv.dest_rank;
    recv_op->num_bytes = mpi_op->u.recv.num_bytes;
    recv_op->tag = mpi_op->u.recv.tag;
    recv_op->req_id = mpi_op->u.recv.req_id;

677 678 679 680
    if(s->nw_id == TRACK_LP)
        printf("\n Receive op posted num bytes %d source %d ", recv_op->num_bytes,
                recv_op->source_rank);

681
	int found_matching_sends = rm_matching_send(s, bf, m, lp, recv_op);
682 683 684 685

	/* save the req id inserted in the completed queue for reverse computation. */
	if(found_matching_sends < 0)
	  {
686
	   	  m->fwd.found_match = -1;
687
          qlist_add_tail(&recv_op->ql, &s->pending_recvs_queue);
688 689 690 691 692 693 694
	
	       /* for mpi irecvs, this is a non-blocking receive so just post it and move on with the trace read. */
		if(mpi_op->op_type == CODES_WK_IRECV)
		   {
			codes_issue_next_event(lp);	
			return;
		   }
695
      }
696 697
	else
	  {
698
        m->fwd.found_match = found_matching_sends;
699
        codes_issue_next_event(lp); 
700 701
	    rc_stack_push(lp, recv_op, free, s->processed_ops);
      }
702 703 704
}

/* executes MPI send and isend operations */
705 706 707 708 709
static void codes_exec_mpi_send(nw_state* s, 
        tw_bf * bf,
        nw_message * m,
        tw_lp* lp, 
        struct codes_workload_op * mpi_op)
710
{
711
    m->rc.saved_num_bytes = mpi_op->u.send.num_bytes;
712 713 714 715 716 717 718 719
	/* model-net event */
	tw_lpid dest_rank;
	codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, 
	    lp_type_name, &mapping_type_id, annotation, &mapping_rep_id, &mapping_offset);

	if(net_id == DRAGONFLY) /* special handling for the dragonfly case */
	{
		int num_routers, lps_per_rep, factor;
720 721
		num_routers = codes_mapping_get_lp_count(lp_group_name, 1,
                  "modelnet_dragonfly_router", NULL, 1);
722 723 724 725 726 727 728 729 730 731 732 733
	 	lps_per_rep = (2 * num_nw_lps) + num_routers;	
		factor = mpi_op->u.send.dest_rank / num_nw_lps;
		dest_rank = (lps_per_rep * factor) + (mpi_op->u.send.dest_rank % num_nw_lps);	
	}
	else
	{
		/* other cases like torus/simplenet/loggp etc. */
		codes_mapping_get_lp_id(lp_group_name, lp_type_name, NULL, 1,  
	    	  mpi_op->u.send.dest_rank, mapping_offset, &dest_rank);
	}

	num_bytes_sent += mpi_op->u.send.num_bytes;
734
    s->num_bytes_sent += mpi_op->u.send.num_bytes;
735 736 737 738

	nw_message local_m;
	nw_message remote_m;

739 740 741
    local_m.fwd.sim_start_time = tw_now(lp);
    local_m.fwd.dest_rank = mpi_op->u.send.dest_rank;
    local_m.fwd.src_rank = mpi_op->u.send.source_rank;
742
    local_m.op_type = mpi_op->op_type; 
743
    local_m.msg_type = MPI_SEND_POSTED;
744 745 746
    local_m.fwd.tag = mpi_op->u.send.tag;
    local_m.fwd.num_bytes = mpi_op->u.send.num_bytes;
    local_m.fwd.req_id = mpi_op->u.send.req_id;
747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764

    remote_m = local_m;
	remote_m.msg_type = MPI_SEND_ARRIVED;

	model_net_event(net_id, "test", dest_rank, mpi_op->u.send.num_bytes, 0.0, 
	    sizeof(nw_message), (const void*)&remote_m, sizeof(nw_message), (const void*)&local_m, lp);

	/* isend executed, now get next MPI operation from the queue */ 
	if(mpi_op->op_type == CODES_WK_ISEND)
	   codes_issue_next_event(lp);
}

/* convert seconds to ns */
static tw_stime s_to_ns(tw_stime ns)
{
    return(ns * (1000.0 * 1000.0 * 1000.0));
}

765 766
static void update_completed_queue_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
767
   
768 769 770
    if(bf->c0)
    {
       struct qlist_head * ent = qlist_pop_back(&s->completed_reqs);
771 772 773 774 775 776 777

        completed_requests * req = qlist_entry(ent, completed_requests, ql);
      /*if(lp->gid == TRACK)
      {
          printf("\n After popping %ld ", req->req_id);
        print_completed_queue(&s->completed_reqs);
      }*/
778 779 780 781
       free(req);
    }
    else if(bf->c1)
    {
782
       struct pending_waits* wait_elem = rc_stack_pop(s->processed_ops); 
783
       s->wait_op = wait_elem;
784 785
       s->wait_time = m->rc.saved_wait_time;
       add_completed_reqs(s, lp, m->fwd.num_matched);
786 787
       codes_issue_next_event_rc(lp); 
    }
788 789
    if(m->fwd.wait_completed > 0)
           s->wait_op->num_completed--;
790 791 792 793 794 795 796 797 798 799
}

static void update_completed_queue(nw_state* s, 
        tw_bf * bf,
        nw_message * m,
        tw_lp * lp,
        dumpi_req_id req_id)
{
    bf->c0 = 0;
    bf->c1 = 0;
800
    m->fwd.num_matched = 0; 
801

802 803
    int waiting = 0;
    waiting = notify_posted_wait(s, bf, m, lp, req_id);
804 805 806 807 808 809 810
 
    if(!waiting)
    {
        bf->c0 = 1;
        completed_requests * req = malloc(sizeof(completed_requests));
        req->req_id = req_id;
        qlist_add_tail(&req->ql, &s->completed_reqs);
811 812 813 814 815 816
       
        /*if(lp->gid == TRACK)
        {
            printf("\n Forward mode adding %ld ", req_id);
            print_completed_queue(&s->completed_reqs);
        }*/
817 818 819 820
    }
    else 
     {
            bf->c1 = 1;
821 822
            m->fwd.num_matched = clear_completed_reqs(s, lp, s->wait_op->req_ids, s->wait_op->count);
            m->rc.saved_wait_time = s->wait_time;
823
            s->wait_time += (tw_now(lp) - s->wait_op->start_time);  
824 825 826

            struct pending_waits* wait_elem = s->wait_op;
            rc_stack_push(lp, wait_elem, free, s->processed_ops);
827 828 829 830 831
            s->wait_op = NULL;
            codes_issue_next_event(lp); 
     }
}

832
/* reverse handler for updating arrival queue function */
833 834 835
static void update_arrival_queue_rc(nw_state* s, 
        tw_bf * bf, 
        nw_message * m, tw_lp * lp)
836
{
837
	s->recv_time = m->rc.saved_recv_time;
838 839
    s->num_bytes_recvd -= m->fwd.num_bytes;

840
    codes_local_latency_reverse(lp);
841
  
842
    if(m->fwd.found_match >= 0)
843
	{
844
        mpi_msgs_queue * qi = rc_stack_pop(s->processed_ops);
845 846
        int queue_count = qlist_count(&s->pending_recvs_queue); 

847
        if(!m->fwd.found_match)
848 849 850
        {
            qlist_add(&qi->ql, &s->pending_recvs_queue);
        }
851
        else if(m->fwd.found_match >= queue_count)
852
        {
853 854
            qlist_add_tail(&qi->ql, &s->pending_recvs_queue);
        }
855
        else if(m->fwd.found_match > 0 && m->fwd.found_match < queue_count)
856 857 858 859 860
        {
            int index = 1;
            struct qlist_head * ent = NULL;
            qlist_for_each(ent, &s->pending_recvs_queue)
            {
861
               if(index == m->fwd.found_match)
862 863 864 865 866 867
               {
                 qlist_add(&qi->ql, ent);
                 break;
               }
               index++; 
            }
868
        }
869 870
        if(qi->op_type == CODES_WK_IRECV)
            update_completed_queue_rc(s, bf, m, lp);
871
    }
872
	else if(m->fwd.found_match < 0)
873 874 875 876 877 878 879 880 881 882
	{
	    struct qlist_head * ent = qlist_pop_back(&s->arrival_queue); 
        mpi_msgs_queue * qi = qlist_entry(ent, mpi_msgs_queue, ql);
        free(qi);
    }
}

/* once an isend operation arrives, the pending receives queue is checked to find out if there is a irecv that has already been posted. If no isend has been posted, */
static void update_arrival_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
883
	m->rc.saved_recv_time = s->recv_time;
884
    s->num_bytes_recvd += m->fwd.num_bytes;
885 886 887

    // send a callback to the sender to increment times
    tw_event *e_callback =
888
        tw_event_new(rank_to_lpid(m->fwd.src_rank),
889 890 891
                codes_local_latency(lp), lp);
    nw_message *m_callback = tw_event_data(e_callback);
    m_callback->msg_type = MPI_SEND_ARRIVED_CB;
892
    m_callback->fwd.msg_send_time = tw_now(lp) - m->fwd.sim_start_time;
893 894 895 896
    tw_event_send(e_callback);

    /* Now reconstruct the queue item */
    mpi_msgs_queue * arrived_op = (mpi_msgs_queue *) malloc(sizeof(mpi_msgs_queue));
897
    arrived_op->req_init_time = m->fwd.sim_start_time;
898
    arrived_op->op_type = m->op_type;
899 900 901 902
    arrived_op->source_rank = m->fwd.src_rank;
    arrived_op->dest_rank = m->fwd.dest_rank;
    arrived_op->num_bytes = m->fwd.num_bytes;
    arrived_op->tag = m->fwd.tag;
903

904
    if(s->nw_id == TRACK_LP)
905
        printf("\n Send op arrived source rank %d num bytes %d ", arrived_op->source_rank,
906 907
                arrived_op->num_bytes);

908
    int found_matching_recv = rm_matching_rcv(s, bf, m, lp, arrived_op);
909 910 911

    if(found_matching_recv < 0)
    {
912
        m->fwd.found_match = -1;
913 914 915 916
        qlist_add_tail(&arrived_op->ql, &s->arrival_queue);
    }
    else
    {
917
        m->fwd.found_match = found_matching_recv;
918
        free(arrived_op);
919 920 921 922 923 924 925 926
    }
}
static void update_message_time(
        nw_state * s,
        tw_bf * bf,
        nw_message * m,
        tw_lp * lp)
{
927 928
    m->rc.saved_send_time = s->send_time;
    s->send_time += m->fwd.msg_send_time;
929 930 931 932 933 934 935 936
}

static void update_message_time_rc(
        nw_state * s,
        tw_bf * bf,
        nw_message * m,
        tw_lp * lp)
{
937
    s->send_time = m->rc.saved_send_time;
938 939 940 941 942 943 944 945
}

/* initializes the network node LP, loads the trace file in the structs, calls the first MPI operation to be executed */
void nw_test_init(nw_state* s, tw_lp* lp)
{
   /* initialize the LP's and load the data */
   char * params = NULL;
   dumpi_trace_params params_d;
946
 
947
   memset(s, 0, sizeof(*s));
948
   s->nw_id = codes_mapping_get_lp_relative_id(lp->gid, 0, 0);
949 950 951 952

   if(!num_net_traces) 
	num_net_traces = num_net_lps;

953 954
   assert(num_net_traces <= num_net_lps);

955 956 957 958 959 960 961 962 963 964 965 966
   if (strcmp(workload_type, "dumpi") == 0){
       strcpy(params_d.file_name, workload_file);
       params_d.num_net_traces = num_net_traces;

       params = (char*)&params_d;
   }
  /* In this case, the LP will not generate any workload related events*/
   if(s->nw_id >= params_d.num_net_traces)
	    return;

   /* Initialize the RC stack */
   rc_stack_create(&s->processed_ops);
967
   rc_stack_create(&s->matched_reqs);
968 969

   assert(s->processed_ops != NULL);
970
   assert(s->matched_reqs != NULL);
971 972 973 974 975 976 977 978 979 980

   wrkld_id = codes_workload_load("dumpi-trace-workload", params, 0, (int)s->nw_id);

   INIT_QLIST_HEAD(&s->arrival_queue);
   INIT_QLIST_HEAD(&s->pending_recvs_queue);
   INIT_QLIST_HEAD(&s->completed_reqs);

   /* clock starts when the first event is processed */
   s->start_time = tw_now(lp);
   codes_issue_next_event(lp);
981 982
   s->num_bytes_sent = 0;
   s->num_bytes_recvd = 0;
983 984
   s->compute_time = 0;
   s->elapsed_time = 0;
985 986 987 988 989 990 991

   return;
}

void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
	*(int *)bf = (int)0;
992 993
    rc_stack_gc(lp, s->matched_reqs);
    rc_stack_gc(lp, s->processed_ops);
994 995 996 997 998 999 1000 1001 1002 1003 1004

    switch(m->msg_type)
	{
		case MPI_SEND_ARRIVED:
			update_arrival_queue(s, bf, m, lp);
		break;

		case MPI_SEND_ARRIVED_CB:
			update_message_time(s, bf, m, lp);
		break;

1005 1006 1007 1008 1009 1010 1011
        case MPI_SEND_POSTED:
        {
           if(m->op_type == CODES_WK_SEND)
               codes_issue_next_event(lp);
           else
            if(m->op_type == CODES_WK_ISEND)
            {
1012
              update_completed_queue(s, bf, m, lp, m->fwd.req_id);  
1013 1014 1015
            }
        }
        break;
1016 1017 1018 1019 1020 1021 1022 1023
		case MPI_OP_GET_NEXT:
			get_next_mpi_operation(s, bf, m, lp);	
		break; 
	}
}

static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
1024
    codes_workload_get_next_rc2(wrkld_id, 0, (int)s->nw_id);
1025

1026
	if(m->op_type == CODES_WK_END)
1027
    {
1028
		return;
1029
    }
1030
	switch(m->op_type)
1031 1032 1033 1034
	{
		case CODES_WK_SEND:
		case CODES_WK_ISEND:
		{
1035
			model_net_event_rc(net_id, lp, m->rc.saved_num_bytes);
1036
			if(m->op_type == CODES_WK_ISEND)
1037 1038
				codes_issue_next_event_rc(lp);
			s->num_sends--;
1039 1040
            s->num_bytes_sent += m->rc.saved_num_bytes;
			num_bytes_sent -= m->rc.saved_num_bytes;
1041 1042 1043 1044 1045 1046
		}
		break;

		case CODES_WK_IRECV:
		case CODES_WK_RECV:
		{
1047
			codes_exec_mpi_recv_rc(s, bf, m, lp);
1048 1049 1050
			s->num_recvs--;
		}
		break;
1051 1052 1053
        
		
        case CODES_WK_DELAY:
1054 1055
		{
			s->num_delays--;
1056
            tw_rand_reverse_unif(lp->rng);
1057 1058
            if(!disable_delay)
                s->compute_time = m->rc.saved_delay;
1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074
		}
		break;
		case CODES_WK_BCAST:
		case CODES_WK_ALLGATHER:
		case CODES_WK_ALLGATHERV:
		case CODES_WK_ALLTOALL:
		case CODES_WK_ALLTOALLV:
		case CODES_WK_REDUCE:
		case CODES_WK_ALLREDUCE:
		case CODES_WK_COL:
		{
			s->num_cols--;
		    codes_issue_next_event_rc(lp);
        }
		break;
	
1075 1076 1077 1078 1079 1080 1081 1082
		case CODES_WK_WAITSOME:
		case CODES_WK_WAITANY:
        {
           s->num_waitsome--;
           codes_issue_next_event_rc(lp); 
        }
        break;

1083 1084 1085
		case CODES_WK_WAIT:
		{
			s->num_wait--;
1086
			codes_exec_mpi_wait_rc(s, lp);
1087 1088 1089 1090 1091
		}
		break;
		case CODES_WK_WAITALL:
		{
			s->num_waitall--;
1092
            codes_exec_mpi_wait_all_rc(s, bf, m, lp);
1093 1094 1095
		}
		break;
		default:
1096
			printf("\n Invalid op type %d ", m->op_type);
1097 1098 1099 1100 1101
	}
}

static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
1102 1103 1104
		//struct codes_workload_op * mpi_op = malloc(sizeof(struct codes_workload_op));
        struct codes_workload_op mpi_op;
        codes_workload_get_next(wrkld_id, 0, (int)s->nw_id, &mpi_op);
1105
      
1106
        m->op_type = mpi_op.op_type;
1107

1108
        if(mpi_op.op_type == CODES_WK_END)
1109 1110 1111
        {
            return;
        }
1112
		switch(mpi_op.op_type)
1113 1114 1115 1116 1117
		{
			case CODES_WK_SEND:
			case CODES_WK_ISEND:
			 {
				s->num_sends++;
1118
				codes_exec_mpi_send(s, bf, m, lp, &mpi_op);
1119 1120 1121 1122 1123 1124 1125
			 }
			break;
	
			case CODES_WK_RECV:
			case CODES_WK_IRECV:
			  {
				s->num_recvs++;
1126
				codes_exec_mpi_recv(s, bf, m, lp, &mpi_op);
1127 1128 1129 1130 1131
			  }
			break;

			case CODES_WK_DELAY:
			  {
1132
                
1133
				s->num_delays++;
1134 1135 1136
                if(disable_delay)
                    codes_issue_next_event(lp);
                else
1137
				    codes_exec_comp_delay(s, m, lp, &mpi_op);
1138 1139 1140 1141 1142
			  }
			break;

            case CODES_WK_WAITSOME:
            case CODES_WK_WAITANY:
1143 1144 1145 1146 1147 1148
            {
                s->num_waitsome++;
                codes_issue_next_event(lp);
            }
            break;

1149
			case CODES_WK_WAITALL:
1150
			  {
1151
				s->num_waitall++;
1152
			    codes_exec_mpi_wait_all(s, bf, m, lp, &mpi_op);
1153
              }
1154 1155 1156 1157
			break;
			case CODES_WK_WAIT:
			{
				s->num_wait++;
1158
                codes_exec_mpi_wait(s, lp, &mpi_op);
1159 1160
			}
			break;
1161 1162 1163 1164 1165 1166 1167 1168
			case CODES_WK_BCAST:
			case CODES_WK_ALLGATHER:
			case CODES_WK_ALLGATHERV:
			case CODES_WK_ALLTOALL:
			case CODES_WK_ALLTOALLV:
			case CODES_WK_REDUCE:
			case CODES_WK_ALLREDUCE:
			case CODES_WK_COL:
1169
			{
1170
				s->num_cols++;
1171 1172 1173 1174
			    codes_issue_next_event(lp);
            }
			break;
			default:
1175
				printf("\n Invalid op type %d ", mpi_op.op_type);
1176 1177 1178 1179 1180 1181
		}
        return;
}

void nw_test_finalize(nw_state* s, tw_lp* lp)
{
1182 1183 1184
    int written = 0;
    if(!s->nw_id)
        written = sprintf(s->output_buf, "# Format <LP ID> <Terminal ID> <Total sends> <Total Recvs> <Bytes sent> <Bytes recvd> <Send time> <Comm. time> <Compute time>");
1185
	if(s->nw_id < (tw_lpid)num_net_traces)
1186
	{
1187 1188 1189
        s->elapsed_time = tw_now(lp) - s->start_time;
		
        int count_irecv = qlist_count(&s->pending_recvs_queue);
1190
        int count_isend = qlist_count(&s->arrival_queue);
1191
# if DBG_MPI_SIM == 1
1192
		printf("\n LP %llu unmatched irecvs %d unmatched sends %d Total sends %ld receives %ld collectives %ld delays %ld wait alls %ld waits %ld send time %lf wait %lf", 
1193
			lp->gid, count_irecv, count_isend, s->num_sends, s->num_recvs, s->num_cols, s->num_delays, s->num_waitall, s->num_wait, s->send_time, s->wait_time);
1194
# endif
1195
        written += sprintf(s->output_buf + written, "\n %llu %llu %ld %ld %ld %ld %lf %lf %lf", lp->gid, s->nw_id, s->num_sends, s->num_recvs, s->num_bytes_sent, 
1196 1197 1198
                s->num_bytes_recvd, s->send_time, s->elapsed_time - s->compute_time, s->compute_time);
        lp_io_write(lp->gid, "mpi-replay-stats", written, s->output_buf);

1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220
		if(s->elapsed_time - s->compute_time > max_comm_time)
			max_comm_time = s->elapsed_time - s->compute_time;
		
		if(s->elapsed_time > max_time )
			max_time = s->elapsed_time;

		/*if(s->wait_time > max_wait_time)
			max_wait_time = s->wait_time;
        */
		if(s->send_time > max_send_time)
			max_send_time = s->send_time;

		if(s->recv_time > max_recv_time)
			max_recv_time = s->recv_time;

		avg_time += s->elapsed_time;
		avg_comm_time += (s->elapsed_time - s->compute_time);
		avg_wait_time += s->wait_time;
		avg_send_time += s->send_time;
		 avg_recv_time += s->recv_time;

		//printf("\n LP %ld Time spent in communication %llu ", lp->gid, total_time - s->compute_time);
1221
	    rc_stack_destroy(s->matched_reqs);    
1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237
	    rc_stack_destroy(s->processed_ops);    
    }
}

void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
	switch(m->msg_type)
	{
		case MPI_SEND_ARRIVED:
			update_arrival_queue_rc(s, bf, m, lp);
		break;

		case MPI_SEND_ARRIVED_CB:
			update_message_time_rc(s, bf, m, lp);
		break;

1238 1239 1240 1241 1242 1243 1244 1245 1246 1247
        case MPI_SEND_POSTED:
        {
         if(m->op_type == CODES_WK_SEND