model-net-mpi-wrklds.c 42.7 KB
Newer Older
1
2
3
4
5
6
/*
 * Copyright (C) 2014 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */
#include <ross.h>
7
#include <inttypes.h>
8

9
#include "codes/codes-workload.h"
10
11
12
13
14
#include "codes/codes.h"
#include "codes/configuration.h"
#include "codes/codes_mapping.h"
#include "codes/model-net.h"

15
#define TRACE -1
16
17
18
19
20
21
22
23
24

char workload_type[128];
char workload_file[8192];
char offset_file[8192];
static int wrkld_id;
static int num_net_traces = 0;

typedef struct nw_state nw_state;
typedef struct nw_message nw_message;
25
typedef int16_t dumpi_req_id;
26
27
28
29
30
31

static int net_id = 0;
static float noise = 5.0;
static int num_net_lps, num_nw_lps;
long long num_bytes_sent=0;
long long num_bytes_recvd=0;
32
33
double max_time = 0,  max_comm_time = 0, max_wait_time = 0, max_send_time = 0, max_recv_time = 0;
double avg_time = 0, avg_comm_time = 0, avg_wait_time = 0, avg_send_time = 0, avg_recv_time = 0;
34
35
36
37
38

/* global variables for codes mapping */
static char lp_group_name[MAX_NAME_LENGTH], lp_type_name[MAX_NAME_LENGTH], annotation[MAX_NAME_LENGTH];
static int mapping_grp_id, mapping_type_id, mapping_rep_id, mapping_offset;

39
40
41
/* MPI_OP_GET_NEXT is for getting next MPI operation when the previous operation completes.
* MPI_SEND_ARRIVED is issued when a MPI message arrives at its destination (the message is transported by model-net and an event is invoked when it arrives. 
* MPI_SEND_POSTED is issued when a MPI message has left the source LP (message is transported via model-net). */
42
43
44
45
46
47
48
enum MPI_NW_EVENTS
{
	MPI_OP_GET_NEXT=1,
	MPI_SEND_ARRIVED,
	MPI_SEND_POSTED,
};

49
/* stores pointers of pending MPI operations to be matched with their respective sends/receives. */
50
51
struct mpi_msgs_queue
{
52
	struct codes_workload_op* mpi_op;
53
54
55
	struct mpi_msgs_queue* next;
};

56
/* stores request IDs of completed MPI operations (Isends or Irecvs) */
57
58
59
60
61
62
struct completed_requests
{
	dumpi_req_id req_id;
	struct completed_requests* next;
};

63
/* for wait operations, store the pending operation and number of completed waits so far. */
64
65
struct pending_waits
{
66
	struct codes_workload_op* mpi_op;
67
68
69
70
	int num_completed;
	tw_stime start_time;
};

71
/* maintains the head and tail of the queue, as well as the number of elements currently in queue. Queues are pending_recvs queue (holds unmatched MPI recv operations) and arrival_queue (holds unmatched MPI send messages). */
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
struct mpi_queue_ptrs
{
	int num_elems;
	struct mpi_msgs_queue* queue_head;
	struct mpi_msgs_queue* queue_tail;
};

/* state of the network LP. It contains the pointers to send/receive lists */
struct nw_state
{
	long num_events_per_lp;
	tw_lpid nw_id;
	short wrkld_end;

	/* count of sends, receives, collectives and delays */
	unsigned long num_sends;
	unsigned long num_recvs;
	unsigned long num_cols;
	unsigned long num_delays;
91
92
93
	unsigned long num_wait;
	unsigned long num_waitall;
	unsigned long num_waitsome;
94
95

	/* time spent by the LP in executing the app trace*/
96
	double start_time;
97
98
	double elapsed_time;

99
	/* time spent in compute operations */
100
101
102
103
104
105
106
107
108
109
	double compute_time;

	/* time spent in message send/isend */
	double send_time;

	/* time spent in message receive */
	double recv_time;
	
	/* time spent in wait operation */
	double wait_time;
110
111
112

	/* FIFO for isend messages arrived on destination */
	struct mpi_queue_ptrs* arrival_queue;
113

114
115
	/* FIFO for irecv messages posted but not yet matched with send operations */
	struct mpi_queue_ptrs* pending_recvs_queue;
116

117
	/* list of pending waits (and saved pending wait for reverse computation) */
118
119
120
121
	struct pending_waits* pending_waits;

	/* List of completed send/receive requests */
	struct completed_requests* completed_reqs;
122
123
};

124
125
126
127
/* data for handling reverse computation.
* saved_matched_req holds the request ID of matched receives/sends for wait operations.
* ptr_match_op holds the matched MPI operation which are removed from the queues when a send is matched with the receive in forward event handler. 
* network event being sent. op is the MPI operation issued by the network workloads API. rv_data holds the data for reverse computation (TODO: Fill this data structure only when the simulation runs in optimistic mode). */
128
129
struct nw_message
{
130
131
132
   int msg_type;
   /* for reverse computation */
   struct codes_workload_op * op;
133

134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
   struct
   {
     /* forward event handler */
     struct
     {
        int op_type;
        tw_lpid src_rank;
        tw_lpid dest_rank;
        int num_bytes;
        int data_type;
        double sim_start_time;
        int16_t req_id;   
        int tag;
     } msg_info;

     /* required for reverse computation*/
     struct 
      {
152
153
154
	int found_match;
	short matched_op;
	dumpi_req_id saved_matched_req;
155
	struct codes_workload_op* ptr_match_op;
156
157
158
159
160
	struct pending_waits* saved_pending_wait;

	double saved_send_time;
	double saved_recv_time;
	double saved_wait_time;
161
162
      } rc;
  } u;
163
164
};

165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
/* executes MPI wait operation */
static void codes_exec_mpi_wait(nw_state* s, tw_bf* bf, nw_message* m, tw_lp* lp);

/* reverse of mpi wait function. */
static void codes_exec_mpi_wait_rc(nw_state* s, tw_bf* bf, nw_message* m, tw_lp* lp);

/* executes MPI isend and send operations */
static void codes_exec_mpi_send(nw_state* s, nw_message* m, tw_lp* lp);

/* execute MPI irecv operation */
static void codes_exec_mpi_recv(nw_state* s, nw_message* m, tw_lp* lp);

/* reverse of mpi recv function. */
static void codes_exec_mpi_recv_rc(nw_state* s, nw_message* m, tw_lp* lp);

/* execute the computational delay */
static void codes_exec_comp_delay(nw_state* s, nw_message* m, tw_lp* lp);

/* execute collective operation, currently only skips these operations. */
static void codes_exec_mpi_col(nw_state* s, nw_message* m, tw_lp* lp);

/* gets the next MPI operation from the network-workloads API. */
187
188
static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp);

189
190
191
192
193
194
195
/* reverse handler of get next mpi operation. */
static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp);

/* Makes a call to get_next_mpi_operation. */
static void codes_issue_next_event(tw_lp* lp);

///////////////////// HELPER FUNCTIONS FOR MPI MESSAGE QUEUE HANDLING ///////////////
196
197
198
199
200
201
202
203
204
205
206
207
/* upon arrival of local completion message, inserts operation in completed send queue */
static void update_send_completion_queue(nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);

/* reverse of the above function */
static void update_send_completion_queue_rc(nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);

/* upon arrival of an isend operation, updates the arrival queue of the network */
static void update_arrival_queue(nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);

/* reverse of the above function */
static void update_arrival_queue_rc(nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);

208
/* insert MPI operation in the waiting queue*/
209
static void mpi_pending_queue_insert_op(struct mpi_queue_ptrs* mpi_queue, struct codes_workload_op* mpi_op);
210

211
/* remove completed request IDs from the queue for reuse. Reverse of above function. */
212
static void remove_req_id(struct completed_requests** requests, int16_t req_id);
213

214
/* remove MPI operation from the waiting queue.*/
215
static int mpi_queue_remove_matching_op(nw_state* s, tw_lp* lp, struct mpi_queue_ptrs* mpi_queue, nw_message * m);
216

217
/* remove the tail of the MPI operation from waiting queue */
218
static int mpi_queue_remove_tail(tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue);
219

220
/* insert completed MPI requests in the queue. */
221
static void mpi_completed_queue_insert_op(struct completed_requests** mpi_completed_queue, dumpi_req_id req_id);
222

223
224
/* notifies the wait operations (if any) about the completed receives and sends requests. */
static int notify_waits(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_req_id req_id);
225

226
227
/* reverse of notify waits function. */
static void notify_waits_rc(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_req_id completed_req);
228

229
230
/* conversion from seconds to eanaoseconds */
static tw_stime s_to_ns(tw_stime ns);
231

232

233
234
235
236
237
238
239
240
241
242
243
244
/* initializes the queue and allocates memory */
static struct mpi_queue_ptrs* queue_init()
{
	struct mpi_queue_ptrs* mpi_queue = malloc(sizeof(struct mpi_queue_ptrs));

	mpi_queue->num_elems = 0;
	mpi_queue->queue_head = NULL;
	mpi_queue->queue_tail = NULL;
	
	return mpi_queue;
}

245
/* helper function: counts number of elements in the queue */
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
static int numQueue(struct mpi_queue_ptrs* mpi_queue)
{
	struct mpi_msgs_queue* tmp = malloc(sizeof(struct mpi_msgs_queue)); 
	assert(tmp);

	tmp = mpi_queue->queue_head;
	int count = 0;

	while(tmp)
	{
		++count;
		tmp = tmp->next;
	}
	return count;
	free(tmp);
}

/* prints elements in a send/recv queue */
static void printQueue(tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue, char* msg)
{
	printf("\n ************ Printing the queue %s *************** ", msg);
	struct mpi_msgs_queue* tmp = malloc(sizeof(struct mpi_msgs_queue));
	assert(tmp);

	tmp = mpi_queue->queue_head;
	
	while(tmp)
	{
274
		if(tmp->mpi_op->op_type == CODES_WK_SEND || tmp->mpi_op->op_type == CODES_WK_ISEND)
275
276
277
			printf("\n lpid %ld send operation data type %d count %d tag %d source %d", 
				    lpid, tmp->mpi_op->u.send.data_type, tmp->mpi_op->u.send.count, 
				     tmp->mpi_op->u.send.tag, tmp->mpi_op->u.send.source_rank);
278
		else if(tmp->mpi_op->op_type == CODES_WK_IRECV || tmp->mpi_op->op_type == CODES_WK_RECV)
279
280
281
282
283
284
285
286
287
288
289
			printf("\n lpid %ld recv operation data type %d count %d tag %d source %d", 
				   lpid, tmp->mpi_op->u.recv.data_type, tmp->mpi_op->u.recv.count, 
				    tmp->mpi_op->u.recv.tag, tmp->mpi_op->u.recv.source_rank );
		else
			printf("\n Invalid data type in the queue %d ", tmp->mpi_op->op_type);
		tmp = tmp->next;
	}
	free(tmp);
}

/* re-insert element in the queue at the index --- maintained for reverse computation */
290
static void mpi_queue_update(struct mpi_queue_ptrs* mpi_queue, struct codes_workload_op* mpi_op, int pos)
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
{
	struct mpi_msgs_queue* elem = malloc(sizeof(struct mpi_msgs_queue));
	assert(elem);
	elem->mpi_op = mpi_op;
	
	/* inserting at the head */
	if(pos == 0)
	{
	   if(!mpi_queue->queue_tail)
		mpi_queue->queue_tail = elem;
	   elem->next = mpi_queue->queue_head;
	   mpi_queue->queue_head = elem;
	   mpi_queue->num_elems++;
	   return;
	}

	int index = 0;
	struct mpi_msgs_queue* tmp = mpi_queue->queue_head;
	while(index < pos - 1)
	{
		tmp = tmp->next;
		++index;
	}

	if(!tmp)
		printf("\n Invalid index! %d pos %d size %d ", index, pos, numQueue(mpi_queue));
	if(tmp == mpi_queue->queue_tail)
	    mpi_queue->queue_tail = elem;

	elem->next = tmp->next;
	tmp->next = elem;
	mpi_queue->num_elems++;

	return;
}

327
/* prints the elements of a queue (for debugging purposes). */
328
329
330
331
static void printCompletedQueue(nw_state* s, tw_lp* lp)
{
	   if(TRACE == lp->gid)
	   {
332
	   	printf("\n %lf contents of completed operations queue ", tw_now(lp));
333
334
335
336
337
338
339
340
341
	   	struct completed_requests* current = s->completed_reqs;
	   	while(current)
	    	{
			printf(" %d ",current->req_id);
			current = current->next;
	   	}
	   }
}

342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
/* reverse handler of notify_waits function. */
static void notify_waits_rc(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_req_id completed_req)
{
   int i;

   /*if(bf->c1)
    {*/
	/* if pending wait is still present and is of type MPI_WAIT then do nothing*/
/*	s->wait_time = s->saved_wait_time; 	
	mpi_completed_queue_insert_op(&s->completed_reqs, completed_req);	
	s->pending_waits = wait_elem;
	s->saved_pending_wait = NULL;
    }
*/
  if(lp->gid == TRACE)
	  printf("\n %lf reverse -- notify waits req id %d ", tw_now(lp), completed_req);
  printCompletedQueue(s, lp);
359
  if(m->u.rc.matched_op == 1)
360
361
	s->pending_waits->num_completed--;
   /* if a wait-elem exists, it means the request ID has been matched*/
362
   if(m->u.rc.matched_op == 2) 
363
364
365
366
367
368
    {
	if(lp->gid == TRACE)
	{
		printf("\n %lf matched req id %d ", tw_now(lp), completed_req);
		printCompletedQueue(s, lp);
	}
369
370
        struct pending_waits* wait_elem = m->u.rc.saved_pending_wait;
	s->wait_time = m->u.rc.saved_wait_time;
371
372
373
374
375
376
377
378
379
380
381
382
383
384
	int count = wait_elem->mpi_op->u.waits.count; 

	for( i = 0; i < count; i++ )
		mpi_completed_queue_insert_op(&s->completed_reqs, wait_elem->mpi_op->u.waits.req_ids[i]);

	wait_elem->num_completed--;	
	s->pending_waits = wait_elem;
	tw_rand_reverse_unif(lp->rng);

   }
}

/* notify the completed send/receive request to the wait operation. */
static int notify_waits(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_req_id completed_req)
385
386
387
388
389
390
391
{
	int i;
	/* traverse the pending waits list and look what type of wait operations are 
	there. If its just a single wait and the request ID has just been completed, 
	then the network node LP can go on with fetching the next operation from the log.
	If its waitall then wait for all pending requests to complete and then proceed. */
	struct pending_waits* wait_elem = s->pending_waits;
392
	m->u.rc.matched_op = 0;
393
394
395
	
	if(lp->gid == TRACE)
		printf("\n %lf notify waits req id %d ", tw_now(lp), completed_req);
396
397
398

	if(!wait_elem)
		return 0;
399

400
401
	int op_type = wait_elem->mpi_op->op_type;

402
	if(op_type == CODES_WK_WAIT)
403
404
405
	{
		if(wait_elem->mpi_op->u.wait.req_id == completed_req)	
		  {
406
			m->u.rc.saved_wait_time = s->wait_time;
407
			s->wait_time += (tw_now(lp) - wait_elem->start_time);
408
                        remove_req_id(&s->completed_reqs, completed_req);
409
	
410
			m->u.rc.saved_pending_wait = wait_elem;			
411
412
413
414
415
416
			s->pending_waits = NULL;
			codes_issue_next_event(lp);	
			return 0;
		 }
	}
	else
417
	if(op_type == CODES_WK_WAITALL)
418
	{
419
420
	   int required_count = wait_elem->mpi_op->u.waits.count;
	  for(i = 0; i < required_count; i++)
421
422
	   {
	    if(wait_elem->mpi_op->u.waits.req_ids[i] == completed_req)
423
424
425
		{
			if(lp->gid == TRACE)
				printCompletedQueue(s, lp);
426
			m->u.rc.matched_op = 1;
427
			wait_elem->num_completed++;	
428
		}
429
430
	   }
	   
431
	    if(wait_elem->num_completed == required_count)
432
	     {
433
434
435
436
437
		if(lp->gid == TRACE)
		{
			printf("\n %lf req %d completed %d", tw_now(lp), completed_req, wait_elem->num_completed);
			printCompletedQueue(s, lp);
		}
438
439
		m->u.rc.matched_op = 2;
		m->u.rc.saved_wait_time = s->wait_time;
440
		s->wait_time += (tw_now(lp) - wait_elem->start_time);
441
		m->u.rc.saved_pending_wait = wait_elem;
442
		s->pending_waits = NULL; 
443
		for(i = 0; i < required_count; i++)
444
445
446
			remove_req_id(&s->completed_reqs, wait_elem->mpi_op->u.waits.req_ids[i]);	
		codes_issue_next_event(lp); //wait completed
	    }
447
       }
448
449
450
	return 0;
}

451
452
453
454
455
456
457
458
459
460
/* reverse handler of MPI wait operation */
static void codes_exec_mpi_wait_rc(nw_state* s, tw_bf* bf, nw_message* m, tw_lp* lp)
{
    if(s->pending_waits)
     {
    	s->pending_waits = NULL;
	return;
     }
   else
    {
461
 	mpi_completed_queue_insert_op(&s->completed_reqs, m->op->u.wait.req_id);	
462
463
464
	tw_rand_reverse_unif(lp->rng);		
    }
}
465
466

/* execute MPI wait operation */
467
static void codes_exec_mpi_wait(nw_state* s, tw_bf* bf, nw_message* m, tw_lp* lp)
468
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
469
470
471
472
473
474
475
476
477
478
479
480
481
    /* check in the completed receives queue if the request ID has already been completed.*/
    assert(!s->pending_waits);
    dumpi_req_id req_id = m->op->u.wait.req_id;

    struct completed_requests* current = s->completed_reqs;
    while(current) {
        if(current->req_id == req_id) {
            remove_req_id(&s->completed_reqs, req_id);
            m->u.rc.saved_wait_time = s->wait_time;
            codes_issue_next_event(lp);
            return;
        }
        current = current->next;
482
483
    }

Jonathan Jenkins's avatar
Jonathan Jenkins committed
484
    /* If not, add the wait operation in the pending 'waits' list. */
485
    struct pending_waits* wait_op = malloc(sizeof(struct pending_waits));
Jonathan Jenkins's avatar
Jonathan Jenkins committed
486
487
488
    wait_op->mpi_op = m->op;
    wait_op->num_completed = 0;
    wait_op->start_time = tw_now(lp);
489
    s->pending_waits = wait_op;
490
491
}

492
static void codes_exec_mpi_wait_all_rc(nw_state* s, tw_bf* bf, nw_message* m, tw_lp* lp)
493
{
494
495
   if(lp->gid == TRACE)
  {
496
   printf("\n %lf codes exec mpi waitall reverse %d ", tw_now(lp), m->u.rc.found_match);
497
498
   printCompletedQueue(s, lp); 
  } 
499
  if(m->u.rc.found_match)
500
501
    {
   	int i;
502
	int count = m->op->u.waits.count;
503
504
505
506
	dumpi_req_id req_id[count];

	for( i = 0; i < count; i++)
	{
507
		req_id[i] = m->op->u.waits.req_ids[i];
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
		mpi_completed_queue_insert_op(&s->completed_reqs, req_id[i]);
	}
	tw_rand_reverse_unif(lp->rng);
    }
    else
    {
	struct pending_waits* wait_op = s->pending_waits;
	free(wait_op);
	s->pending_waits = NULL;
	assert(!s->pending_waits);
	if(lp->gid == TRACE)
		printf("\n %lf Nullifying codes waitall ", tw_now(lp));
    }
}
static void codes_exec_mpi_wait_all(nw_state* s, tw_bf* bf, nw_message* m, tw_lp* lp)
{
  //assert(!s->pending_waits);
525
  int count = m->op->u.waits.count;
526
527
528
529
530
531
532
  int i, num_completed = 0;
  dumpi_req_id req_id[count];
  struct completed_requests* current = s->completed_reqs;

  /* check number of completed irecvs in the completion queue */ 
  if(lp->gid == TRACE)
    {
533
  	printf(" \n (%lf) MPI waitall posted %d count", tw_now(lp), m->op->u.waits.count);
534
	for(i = 0; i < count; i++)
535
		printf(" %d ", (int)m->op->u.waits.req_ids[i]);
536
537
538
539
540
541
   	printCompletedQueue(s, lp);	 
   }
  while(current) 
   {
	  for(i = 0; i < count; i++)
	   {
542
	     req_id[i] = m->op->u.waits.req_ids[i];
543
544
545
546
547
548
549
	     if(req_id[i] == current->req_id)
 		 num_completed++;
   	  }
	 current = current->next;
   }

  if(TRACE== lp->gid)
550
	  printf("\n %lf Num completed %d count %d ", tw_now(lp), num_completed, count);
551

552
  m->u.rc.found_match = 0;
553
  if(count == num_completed)
554
  {
555
	m->u.rc.found_match = 1;
556
	for( i = 0; i < count; i++)	
557
		remove_req_id(&s->completed_reqs, req_id[i]);
558

559
560
561
562
563
564
	codes_issue_next_event(lp);
  }
  else
  {
 	/* If not, add the wait operation in the pending 'waits' list. */
	  struct pending_waits* wait_op = malloc(sizeof(struct pending_waits));
565
	  wait_op->mpi_op = m->op;  
566
	  wait_op->num_completed = num_completed;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
567
	  wait_op->start_time = tw_now(lp);
568
569
570
571
572
573
574
575
576
577
	  s->pending_waits = wait_op;
  }
}

/* request ID is being reused so delete it from the list once the matching is done */
static void remove_req_id(struct completed_requests** mpi_completed_queue, dumpi_req_id req_id)
{
	struct completed_requests* current = *mpi_completed_queue;

	if(!current)
578
579
580
		tw_error(TW_LOC, "\n REQ ID DOES NOT EXIST");
	
       if(current->req_id == req_id)
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
	{
		*mpi_completed_queue = current->next;
		free(current);
		return;
	}
	
	struct completed_requests* elem;
	while(current->next)
	{
	   elem = current->next;
	   if(elem->req_id == req_id)	
	     {
		current->next = elem->next;
		free(elem);
		return;
	     }
	   current = current->next;	
	}
	return;
}

/* inserts mpi operation in the completed requests queue */
603
static void mpi_completed_queue_insert_op(struct completed_requests** mpi_completed_queue, dumpi_req_id req_id)
604
605
606
607
608
609
610
611
{
	struct completed_requests* reqs = malloc(sizeof(struct completed_requests));
	assert(reqs);

	reqs->req_id = req_id;

	if(!(*mpi_completed_queue))	
	{
612
			reqs->next = NULL;
613
			*mpi_completed_queue = reqs;
614
			return;
615
616
617
	}
	reqs->next = *mpi_completed_queue;
	*mpi_completed_queue = reqs;
618
	return;
619
620
}

621
/* insert MPI send or receive operation in the queues starting from tail. Unmatched sends go to arrival queue and unmatched receives go to pending receives queues. */
622
static void mpi_pending_queue_insert_op(struct mpi_queue_ptrs* mpi_queue, struct codes_workload_op* mpi_op)
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
{
	/* insert mpi operation */
	struct mpi_msgs_queue* elem = malloc(sizeof(struct mpi_msgs_queue));
	assert(elem);

	elem->mpi_op = mpi_op;
     	elem->next = NULL;

	if(!mpi_queue->queue_head)
	  mpi_queue->queue_head = elem;

	if(mpi_queue->queue_tail)
	    mpi_queue->queue_tail->next = elem;
	
        mpi_queue->queue_tail = elem;
	mpi_queue->num_elems++;

	return;
}

/* match the send/recv operations */
644
static int match_receive(nw_state* s, tw_lp* lp, tw_lpid lpid, struct codes_workload_op* op1, struct codes_workload_op* op2)
645
{
646
647
648
649
650
651
652
653
654
        assert(op1->op_type == CODES_WK_IRECV || op1->op_type == CODES_WK_RECV);
        assert(op2->op_type == CODES_WK_SEND || op2->op_type == CODES_WK_ISEND);

        if((op1->u.recv.num_bytes >= op2->u.send.num_bytes) &&
                   ((op1->u.recv.tag == op2->u.send.tag) || op1->u.recv.tag == -1) &&
                   ((op1->u.recv.source_rank == op2->u.send.source_rank) || op1->u.recv.source_rank == -1))
                   {
                        if(lp->gid == TRACE)
                           printf("\n op1 rank %d bytes %d ", op1->u.recv.source_rank, op1->u.recv.num_bytes);
655
                        s->recv_time += tw_now(lp) - op1->sim_start_time;
656
657
658
659
                        mpi_completed_queue_insert_op(&s->completed_reqs, op1->u.recv.req_id);
                        return 1;
                   }
        return -1;
660
661
662
}

/* used for reverse computation. removes the tail of the queue */
663
static int mpi_queue_remove_tail(tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue)
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
{
	assert(mpi_queue->queue_tail);
	if(mpi_queue->queue_tail == NULL)
	{
		printf("\n Error! tail not updated ");	
		return 0;
	}
	struct mpi_msgs_queue* tmp = mpi_queue->queue_head;

	if(mpi_queue->queue_head == mpi_queue->queue_tail)
	{
		mpi_queue->queue_head = NULL;
		mpi_queue->queue_tail = NULL;
		free(tmp);
		mpi_queue->num_elems--;
		 return 1;
	}

	struct mpi_msgs_queue* elem = mpi_queue->queue_tail;

	while(tmp->next != mpi_queue->queue_tail)
		tmp = tmp->next;

	mpi_queue->queue_tail = tmp;
	mpi_queue->queue_tail->next = NULL;
	mpi_queue->num_elems--;

	free(elem);
	return 1;
}

/* search for a matching mpi operation and remove it from the list. 
 * Record the index in the list from where the element got deleted. 
 * Index is used for inserting the element once again in the queue for reverse computation. */
698
static int mpi_queue_remove_matching_op(nw_state* s, tw_lp* lp, struct mpi_queue_ptrs* mpi_queue, nw_message * m)
699
{
700
701
       struct codes_workload_op * mpi_op = m->op;
 
702
703
704
705
706
707
708
709
	if(mpi_queue->queue_head == NULL)
		return -1;

	/* remove mpi operation */
	struct mpi_msgs_queue* tmp = mpi_queue->queue_head;
	int indx = 0;

	/* if head of the list has the required mpi op to be deleted */
710
	int rcv_val = 0;
711
	if(mpi_op->op_type == CODES_WK_SEND || mpi_op->op_type == CODES_WK_ISEND)
712
	  {
713
		rcv_val = match_receive(s, lp, lp->gid, tmp->mpi_op, mpi_op);
714
		m->u.rc.saved_matched_req = tmp->mpi_op->u.recv.req_id;  
715
	 }
716
	else if(mpi_op->op_type == CODES_WK_RECV || mpi_op->op_type == CODES_WK_IRECV)
717
	  {
718
		rcv_val = match_receive(s, lp, lp->gid, mpi_op, tmp->mpi_op);
719
	  	m->u.rc.saved_matched_req = mpi_op->u.recv.req_id;
720
721
	  }
	if(rcv_val >= 0)
722
	{
723
724
		/* TODO: fix RC */
		/*memcpy(&m->u.rc.ptr_match_op, &tmp->mpi_op, sizeof(struct codes_workload_op));*/
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
		if(mpi_queue->queue_head == mpi_queue->queue_tail)
		   {
			mpi_queue->queue_tail = NULL;
			mpi_queue->queue_head = NULL;
			 free(tmp);
		   }
		 else
		   {
			mpi_queue->queue_head = tmp->next;
			free(tmp);	
		   }
		mpi_queue->num_elems--;
		return indx;
	}

	/* record the index where matching operation has been found */
	struct mpi_msgs_queue* elem;

	while(tmp->next)	
	{
	   indx++;
	   elem = tmp->next;
747
	   
748
	    if(mpi_op->op_type == CODES_WK_SEND || mpi_op->op_type == CODES_WK_ISEND)
749
	     {
750
		rcv_val = match_receive(s, lp, lp->gid, elem->mpi_op, mpi_op);
751
	     	m->u.rc.saved_matched_req = elem->mpi_op->u.recv.req_id; 
752
	     }
753
	    else if(mpi_op->op_type == CODES_WK_RECV || mpi_op->op_type == CODES_WK_IRECV)
754
	     {
755
		rcv_val = match_receive(s, lp, lp->gid, mpi_op, elem->mpi_op);
756
		m->u.rc.saved_matched_req = mpi_op->u.recv.req_id;
757
758
	     }
   	     if(rcv_val >= 0)
759
		{
760
761
		    /* TODO: fix RC */
		    /*memcpy(&m->u.rc.ptr_match_op, &elem->mpi_op, sizeof(struct codes_workload_op));*/
762
763
		    if(elem == mpi_queue->queue_tail)
			mpi_queue->queue_tail = tmp;
764
		    
765
766
767
768
		    tmp->next = elem->next;

		    free(elem);
		    mpi_queue->num_elems--;
769
		
770
771
772
		    return indx;
		}
	   tmp = tmp->next;
773
        }
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
	return -1;
}
/* Trigger getting next event at LP */
static void codes_issue_next_event(tw_lp* lp)
{
   tw_event *e;
   nw_message* msg;

   tw_stime ts;

   ts = g_tw_lookahead + 0.1 + tw_rand_exponential(lp->rng, noise);
   e = tw_event_new( lp->gid, ts, lp );
   msg = tw_event_data(e);

   msg->msg_type = MPI_OP_GET_NEXT;
   tw_event_send(e);
}

/* Simulate delays between MPI operations */
static void codes_exec_comp_delay(nw_state* s, nw_message* m, tw_lp* lp)
{
795
	struct codes_workload_op* mpi_op = m->op;
796
797
798
799
	tw_event* e;
	tw_stime ts;
	nw_message* msg;

800
801
	s->compute_time += s_to_ns(mpi_op->u.delay.seconds);
	ts = s_to_ns(mpi_op->u.delay.seconds) + g_tw_lookahead + 0.1;
802
803
804
805
806
807
808
809
810
811
	ts += tw_rand_exponential(lp->rng, noise);
	
	e = tw_event_new( lp->gid, ts , lp );
	msg = tw_event_data(e);
	msg->msg_type = MPI_OP_GET_NEXT;

	tw_event_send(e); 
}

/* reverse computation operation for MPI irecv */
812
static void codes_exec_mpi_recv_rc(nw_state* s, nw_message* m, tw_lp* lp)
813
{
814
815
816
	num_bytes_recvd -= m->op->u.recv.num_bytes;
	s->recv_time = m->u.rc.saved_recv_time;
	if(m->u.rc.found_match >= 0)
817
	  {
818
819
820
		s->recv_time = m->u.rc.saved_recv_time;
		mpi_queue_update(s->arrival_queue, m->u.rc.ptr_match_op, m->u.rc.found_match);
		remove_req_id(&s->completed_reqs, m->op->u.recv.req_id);
821
		tw_rand_reverse_unif(lp->rng);
822
	  }
823
	else if(m->u.rc.found_match < 0)
824
	    {
825
826
		mpi_queue_remove_tail(lp->gid, s->pending_recvs_queue);
		if(m->op->op_type == CODES_WK_IRECV)
827
			tw_rand_reverse_unif(lp->rng);
828
829
830
831
	    }
}

/* Execute MPI Irecv operation (non-blocking receive) */ 
832
static void codes_exec_mpi_recv(nw_state* s, nw_message* m, tw_lp* lp)
833
834
835
836
837
{
/* Once an irecv is posted, list of completed sends is checked to find a matching isend.
   If no matching isend is found, the receive operation is queued in the pending queue of
   receive operations. */

838
839
	m->u.rc.saved_recv_time = s->recv_time;
	struct codes_workload_op* mpi_op = m->op;
840
	mpi_op->sim_start_time = tw_now(lp);
841
	num_bytes_recvd += mpi_op->u.recv.num_bytes;
842
843

	if(lp->gid == TRACE)
844
		printf("\n %lf codes exec mpi recv req id %d", tw_now(lp), (int)mpi_op->u.recv.req_id);
845
846
	
	dumpi_req_id req_id;
847
	int found_matching_sends = mpi_queue_remove_matching_op(s, lp, s->arrival_queue, m);
848
849
850
	
	/* save the req id inserted in the completed queue for reverse computation. */
	//m->matched_recv = req_id;
851
852
853

	if(found_matching_sends < 0)
	  {
854
		m->u.rc.found_match = -1;
855
856
857
		mpi_pending_queue_insert_op(s->pending_recvs_queue, mpi_op);
	
	       /* for mpi irecvs, this is a non-blocking receive so just post it and move on with the trace read. */
858
		if(mpi_op->op_type == CODES_WK_IRECV)
859
860
861
862
863
864
		   {
			codes_issue_next_event(lp);	
			return;
		   }
		else
			printf("\n CODES MPI RECV OPERATION!!! ");
865
	  }
866
	else
867
868
	  {
		/*if(lp->gid == TRACE)
869
870
871
872
			printf("\n Matched after removing: arrival queue num_elems %d ", s->arrival_queue->num_elems);*/
		/* update completed requests list */
		//int count_after = numQueue(s->arrival_queue);
		//assert(count_before == (count_after+1));
873
	   	m->u.rc.found_match = found_matching_sends;
874
		codes_issue_next_event(lp); 
875
876
877
878
	 }
}

/* executes MPI send and isend operations */
879
static void codes_exec_mpi_send(nw_state* s, nw_message * m, tw_lp* lp)
880
{
881
        struct codes_workload_op * mpi_op = m->op; 
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
	/* model-net event */
	tw_lpid dest_rank;

	codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, 
	    lp_type_name, &mapping_type_id, annotation, &mapping_rep_id, &mapping_offset);

	if(net_id == DRAGONFLY) /* special handling for the dragonfly case */
	{
		int num_routers, lps_per_rep, factor;
		num_routers = codes_mapping_get_lp_count("MODELNET_GRP", 1,
                  "dragonfly_router", NULL, 1);
	 	lps_per_rep = (2 * num_nw_lps) + num_routers;	
		factor = mpi_op->u.send.dest_rank / num_nw_lps;
		dest_rank = (lps_per_rep * factor) + (mpi_op->u.send.dest_rank % num_nw_lps);	
	}
	else
	{
		/* other cases like torus/simplenet/loggp etc. */
		codes_mapping_get_lp_id(lp_group_name, lp_type_name, NULL, 1,  
	    	  mpi_op->u.send.dest_rank, mapping_offset, &dest_rank);
	}

	num_bytes_sent += mpi_op->u.send.num_bytes;

	nw_message* local_m = malloc(sizeof(nw_message));
	nw_message* remote_m = malloc(sizeof(nw_message));
	assert(local_m && remote_m);

910
911
912
913
914
915
916
917
918
919
        local_m->u.msg_info.sim_start_time = tw_now(lp);
        local_m->u.msg_info.dest_rank = mpi_op->u.send.dest_rank;
	local_m->u.msg_info.src_rank = mpi_op->u.send.source_rank;
        local_m->u.msg_info.op_type = mpi_op->op_type; 
        local_m->msg_type = MPI_SEND_POSTED;
        local_m->u.msg_info.tag = mpi_op->u.send.tag;
        local_m->u.msg_info.num_bytes = mpi_op->u.send.num_bytes;
        local_m->u.msg_info.req_id = mpi_op->u.send.req_id;

        memcpy(remote_m, local_m, sizeof(nw_message));
920
	remote_m->msg_type = MPI_SEND_ARRIVED;
921

922
923
	model_net_event(net_id, "test", dest_rank, mpi_op->u.send.num_bytes, 0.0, 
	    sizeof(nw_message), (const void*)remote_m, sizeof(nw_message), (const void*)local_m, lp);
924

925
926
927
	/*if(TRACE == lp->gid)	
		printf("\n !!! %lf send req id %d dest %d nw_message %d ", tw_now(lp), (int)mpi_op->u.send.req_id, (int)dest_rank, sizeof(nw_message));
	*/
928
	/* isend executed, now get next MPI operation from the queue */ 
929
	if(mpi_op->op_type == CODES_WK_ISEND)
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
	   codes_issue_next_event(lp);
}

/* MPI collective operations */
static void codes_exec_mpi_col(nw_state* s, nw_message* m, tw_lp* lp)
{
	codes_issue_next_event(lp);
}

/* convert seconds to ns */
static tw_stime s_to_ns(tw_stime ns)
{
    return(ns * (1000.0 * 1000.0 * 1000.0));
}


static void update_send_completion_queue_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
	//mpi_queue_remove_matching_op(&s->completed_isend_queue_head, &s->completed_isend_queue_tail, &m->op, SEND);
949
	if(m->u.msg_info.op_type == CODES_WK_SEND)
950
		tw_rand_reverse_unif(lp->rng);	
951

952
	if(m->u.msg_info.op_type == CODES_WK_ISEND)
953
	  {
954
955
		notify_waits_rc(s, bf, lp, m, m->u.msg_info.req_id);
		remove_req_id(&s->completed_reqs, m->u.msg_info.req_id);
956
	 }
957
958
959
960
961
}

/* completed isends are added in the list */
static void update_send_completion_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
962
	if(TRACE == lp->gid)
963
964
		printf("\n %lf isend operation completed req id %d ", tw_now(lp), m->u.msg_info.req_id);
	if(m->u.msg_info.op_type == CODES_WK_ISEND)
965
	   {	
966
967
		mpi_completed_queue_insert_op(&s->completed_reqs, m->u.msg_info.req_id);
	   	notify_waits(s, bf, lp, m, m->u.msg_info.req_id);
968
969
	   }  
	
970
	/* blocking send operation */
971
	if(m->u.msg_info.op_type == CODES_WK_SEND)
972
973
974
975
976
977
978
979
		codes_issue_next_event(lp);	

	 return;
}

/* reverse handler for updating arrival queue function */
static void update_arrival_queue_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
980
	s->send_time = m->u.rc.saved_send_time; s->recv_time = m->u.rc.saved_recv_time;
981

982
	if(m->u.rc.found_match >= 0)
983
	{
984
985
		// TODO: Modify for recvs
		if(lp->gid == TRACE)
986
987
988
			printf("\n %lf reverse-- update arrival queue req ID %d", tw_now(lp), (int) m->u.rc.saved_matched_req);
		dumpi_req_id req_id = m->u.rc.saved_matched_req;
		notify_waits_rc(s, bf, lp, m, m->u.rc.saved_matched_req);
989
		//int count = numQueue(s->pending_recvs_queue);
990
991
		mpi_queue_update(s->pending_recvs_queue, m->u.rc.ptr_match_op, m->u.rc.found_match);
		remove_req_id(&s->completed_reqs, m->u.rc.saved_matched_req);
992
	
993
994
995
		/*if(lp->gid == TRACE)
			printf("\n Reverse: after adding pending recvs queue %d ", s->pending_recvs_queue->num_elems);*/
	}
996
	else if(m->u.rc.found_match < 0)
997
	{
998
		mpi_queue_remove_tail(lp->gid, s->arrival_queue);	
999
1000
1001
1002
1003
1004
1005
1006
		/*if(lp->gid == TRACE)
			printf("\n Reverse: after removing arrivals queue %d ", s->arrival_queue->num_elems);*/
	}
}

/* once an isend operation arrives, the pending receives queue is checked to find out if there is a irecv that has already been posted. If no isend has been posted, */
static void update_arrival_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
1007
1008
	//int count_before = numQueue(s->pending_recvs_queue);
	int is_blocking = 0; /* checks if the recv operation was blocking or not */
1009

1010
1011
	m->u.rc.saved_send_time = s->send_time;
	m->u.rc.saved_recv_time = s->recv_time;
1012

1013
	s->send_time += tw_now(lp) - m->u.msg_info.sim_start_time;
1014
1015
	dumpi_req_id req_id = -1;

1016
1017
        /* Now reconstruct the mpi op */
        struct codes_workload_op * arrived_op = (struct codes_workload_op *) malloc(sizeof(struct codes_workload_op));
1018
        arrived_op->sim_start_time = m->u.msg_info.sim_start_time;
1019
1020
1021
1022
1023
1024
1025
1026
        arrived_op->op_type = m->u.msg_info.op_type;
        arrived_op->u.send.source_rank = m->u.msg_info.src_rank;
        arrived_op->u.send.dest_rank = m->u.msg_info.dest_rank;
        arrived_op->u.send.num_bytes = m->u.msg_info.num_bytes;
        arrived_op->u.send.tag = m->u.msg_info.tag;
        arrived_op->u.send.req_id = m->u.msg_info.req_id;
        m->op = arrived_op;

1027
	int found_matching_recv = mpi_queue_remove_matching_op(s, lp, s->pending_recvs_queue, m);
1028

1029
1030
	if(TRACE == lp->gid)
		printf("\n %lf update arrival queue req id %d %d", tw_now(lp), arrived_op->u.send.req_id, m->op->u.send.source_rank);
1031
1032
	if(found_matching_recv < 0)
	 {
1033
1034
		m->u.rc.found_match = -1;
		mpi_pending_queue_insert_op(s->arrival_queue, m->op);
1035
1036
1037
	}
	else
	  {
1038
1039
		m->u.rc.found_match = found_matching_recv;
	   	notify_waits(s, bf, lp, m, m->u.rc.saved_matched_req);
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
	  }
}

/* initializes the network node LP, loads the trace file in the structs, calls the first MPI operation to be executed */
void nw_test_init(nw_state* s, tw_lp* lp)
{
   /* initialize the LP's and load the data */
   char * params;
   scala_trace_params params_sc;
   dumpi_trace_params params_d;
  
   codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, lp_type_name, 
	&mapping_type_id, annotation, &mapping_rep_id, &mapping_offset);
  
1054
   memset(s, 0, sizeof(*s));
1055
   s->nw_id = (mapping_rep_id * num_nw_lps) + mapping_offset;
1056
1057
1058
   s->completed_reqs = NULL;

   s->pending_waits = NULL;
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
   if(!num_net_traces) 
	num_net_traces = num_net_lps;

   if (strcmp(workload_type, "scalatrace") == 0){
       if (params_sc.offset_file_name[0] == '\0'){
           tw_error(TW_LOC, "required argument for scalatrace offset_file");
           return;
       }
       strcpy(params_sc.offset_file_name, offset_file);
       strcpy(params_sc.nw_wrkld_file_name, workload_file);
       params = (char*)&params_sc;
   }
   else if (strcmp(workload_type, "dumpi") == 0){
       strcpy(params_d.file_name, workload_file);
       params_d.num_net_traces = num_net_traces;

       params = (char*)&params_d;
   }
  /* In this case, the LP will not generate any workload related events*/
   if(s->nw_id >= params_d.num_net_traces)
     {
	//printf("\n network LP not generating events %d ", (int)s->nw_id);
	return;
     }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
1083
   wrkld_id = codes_workload_load("dumpi-trace-workload", params, 0, (int)s->nw_id);
1084
1085
1086
1087

   s->arrival_queue = queue_init(); 
   s->pending_recvs_queue = queue_init();

1088
1089
   /* clock starts when the first event is processed */
   s->start_time = tw_now(lp);
1090
1091
1092
1093
1094
1095
1096
   codes_issue_next_event(lp);

   return;
}

void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
1097
	*(int *)bf = (int)0;
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
	switch(m->msg_type)
	{
		case MPI_SEND_POSTED:
			update_send_completion_queue(s, bf, m, lp);
		break;

		case MPI_SEND_ARRIVED:
			update_arrival_queue(s, bf, m, lp);
		break;

		case MPI_OP_GET_NEXT:
			get_next_mpi_operation(s, bf, m, lp);	
		break; 
	}
}

static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
1116
1117
	codes_workload_get_next_rc(wrkld_id, 0, (int)s->nw_id, m->op);
	if(m->op->op_type == CODES_WK_END)
1118
		return;
1119

1120
	switch(m->op->op_type)
1121
	{
1122
1123
		case CODES_WK_SEND:
		case CODES_WK_ISEND:
1124
		{
1125
			if(lp->gid == TRACE)
1126
1127
1128
				printf("\n %lf reverse send req %d ", tw_now(lp), (int)m->op->u.send.req_id);
			model_net_event_rc(net_id, lp, m->op->u.send.num_bytes);
			if(m->op->op_type == CODES_WK_ISEND)
1129
1130
				tw_rand_reverse_unif(lp->rng);	
			s->num_sends--;
1131
			num_bytes_sent -= m->op->u.send.num_bytes;
1132
1133
		}
		break;
1134
1135

		case CODES_WK_IRECV:
1136
		case CODES_WK_RECV:
1137
		{
1138
			codes_exec_mpi_recv_rc(s, m, lp);
1139
1140
1141
			s->num_recvs--;
		}
		break;
1142
		case CODES_WK_DELAY:
1143
1144
1145
		{
			tw_rand_reverse_unif(lp->rng);
			s->num_delays--;
1146
			s->compute_time -= s_to_ns(m->op->u.delay.seconds);
1147
1148
		}
		break;
1149
1150
1151
1152
1153
1154
1155
1156
		case CODES_WK_BCAST:
		case CODES_WK_ALLGATHER:
		case CODES_WK_ALLGATHERV:
		case CODES_WK_ALLTOALL:
		case CODES_WK_ALLTOALLV:
		case CODES_WK_REDUCE:
		case CODES_WK_ALLREDUCE:
		case CODES_WK_COL:
1157
1158
1159
1160
1161
		{
			s->num_cols--;
			tw_rand_reverse_unif(lp->rng);
		}
		break;
1162
	
1163
		case CODES_WK_WAIT:
1164
		{
1165
1166
			s->num_wait--;
			codes_exec_mpi_wait_rc(s, bf, m, lp);
1167
1168
		}
		break;
1169
		case CODES_WK_WAITALL:
1170
1171
1172
1173
1174
		{
			s->num_waitall--;
			codes_exec_mpi_wait_all_rc(s, bf, m, lp);
		}
		break;
1175
1176
		case CODES_WK_WAITSOME:
		case CODES_WK_WAITANY:
1177
		{
1178
1179
			s->num_waitsome--;
			tw_rand_reverse_unif(lp->rng);
1180
1181
		}
		break;
1182
		default:
1183
			printf("\n Invalid op type %d ", m->op->op_type);
1184
1185
1186
1187
1188
	}
}

static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
1189
		struct codes_workload_op mpi_op;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
1190
    		codes_workload_get_next(wrkld_id, 0, (int)s->nw_id, &mpi_op);
1191
1192
		m->op = malloc(sizeof(struct codes_workload_op));
                memcpy(m->op, &mpi_op, sizeof(struct codes_workload_op));
1193