codes-dumpi-trace-nw-wrkld.c 34.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (C) 2014 University of Chicago
 * See COPYRIGHT notice in top-level directory.
 *
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mpi.h>
#include <ross.h>
#include <assert.h>
#include "dumpi/libundumpi/bindings.h"
#include "dumpi/libundumpi/libundumpi.h"
#include "codes/codes-workload.h"
#include "codes/quickhash.h"
Matthieu Dorier's avatar
Matthieu Dorier committed
17
#include "codes/codes-jobmap.h"
18
#include "codes/model-net.h"
19

20 21
#if ENABLE_CORTEX
#include <cortex/cortex.h>
22
#include <cortex/datatype.h>
23 24
#include <cortex/cortex-mpich.h>
#ifdef ENABLE_CORTEX_PYTHON
25
#include <cortex/cortex-python.h>
26
#endif
27
#define PROFILE_TYPE cortex_dumpi_profile*
28
//#define UNDUMPI_OPEN cortex_undumpi_open
29 30 31 32
#define DUMPI_START_STREAM_READ cortex_dumpi_start_stream_read
#define UNDUMPI_CLOSE cortex_undumpi_close
#else
#define PROFILE_TYPE dumpi_profile*
33
//#define UNDUMPI_OPEN undumpi_open
34 35 36 37
#define DUMPI_START_STREAM_READ dumpi_start_stream_read
#define UNDUMPI_CLOSE undumpi_close
#endif

38
#define MAX_LENGTH_FILE 512
39 40
#define MAX_OPERATIONS 32768
#define DUMPI_IGNORE_DELAY 100
41
#define RANK_HASH_TABLE_SIZE 400
42

Matthieu Dorier's avatar
Matthieu Dorier committed
43 44 45
/* This variable is defined in src/network-workloads/model-net-mpi-replay.c */
extern struct codes_jobmap_ctx *jobmap_ctx; 

46 47 48 49 50 51
static struct qhash_table *rank_tbl = NULL;
static int rank_tbl_pop = 0;

/* context of the MPI workload */
typedef struct rank_mpi_context
{
52
    PROFILE_TYPE profile;
53
    int my_app_id;
54 55
    // whether we've seen an init op (needed for timing correctness)
    int is_init;
56 57
    int64_t my_rank;
    double last_op_time;
58
    double init_time;
59 60
    void* dumpi_mpi_array;	
    struct qhash_head hash_link;
61 62
    
    struct rc_stack * completed_ctx;
63 64
} rank_mpi_context;

65 66 67 68 69 70
typedef struct rank_mpi_compare
{
    int app;
    int rank;
} rank_mpi_compare;

71 72 73 74 75 76 77 78
/* Holds all the data about MPI operations from the log */
typedef struct dumpi_op_data_array
{
	struct codes_workload_op* op_array;
        int64_t op_arr_ndx;
        int64_t op_arr_cnt;
} dumpi_op_data_array;

79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
/* timing utilities */

#ifdef __GNUC__
__attribute__((unused))
#endif
static dumpi_clock timediff(
        dumpi_clock end,
        dumpi_clock start)
{
    dumpi_clock temp;
    if ((end.nsec-start.nsec)<0) {
        temp.sec = end.sec-start.sec-1;
        temp.nsec = 1000000000+end.nsec-start.nsec;
    } else {
        temp.sec = end.sec-start.sec;
        temp.nsec = end.nsec-start.nsec;
    }
    return temp;
}

99
/*static inline double time_to_ms_lf(dumpi_clock t){
100 101 102 103
        return (double) t.sec * 1e3 + (double) t.nsec / 1e6;
}
static inline double time_to_us_lf(dumpi_clock t){
        return (double) t.sec * 1e6 + (double) t.nsec / 1e3;
104
}*/
105 106 107
static inline double time_to_ns_lf(dumpi_clock t){
        return (double) t.sec * 1e9 + (double) t.nsec;
}
108
/*static inline double time_to_s_lf(dumpi_clock t){
109
        return (double) t.sec + (double) t.nsec / 1e9;
110
}*/
111

112
/* load the trace */
113
static int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank);
114 115

/* dumpi implementation of get next operation in the workload */
116
static void dumpi_trace_nw_workload_get_next(int app_id, int rank, struct codes_workload_op *op);
117 118

/* get number of bytes from the workload data type and count */
119
static int64_t get_num_bytes(rank_mpi_context* my_ctx, dumpi_datatype dt);
120 121

/* computes the delay between MPI operations */
122
static void update_compute_time(const dumpi_time* time, rank_mpi_context* my_ctx);
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184

/* initializes the data structures */
static void* dumpi_init_op_data();

/* removes next operations from the dynamic array */
static void dumpi_remove_next_op(void *mpi_op_array, struct codes_workload_op *mpi_op,
                                      double last_op_time);

/* resets the counters for the dynamic array once the workload is completely loaded*/
static void dumpi_finalize_mpi_op_data(void *mpi_op_array);

/* insert next operation */
static void dumpi_insert_next_op(void *mpi_op_array, struct codes_workload_op *mpi_op);

/* initialize the array data structure */
static void* dumpi_init_op_data()
{
	dumpi_op_data_array* tmp;
	
	tmp = malloc(sizeof(dumpi_op_data_array));
	assert(tmp);
	tmp->op_array = malloc(MAX_OPERATIONS * sizeof(struct codes_workload_op));
	assert(tmp->op_array);
        tmp->op_arr_ndx = 0;
	tmp->op_arr_cnt = MAX_OPERATIONS;

	return (void *)tmp;	
}

/* inserts next operation in the array */
static void dumpi_insert_next_op(void *mpi_op_array, struct codes_workload_op *mpi_op)
{
	dumpi_op_data_array *array = (dumpi_op_data_array*)mpi_op_array;
	struct codes_workload_op *tmp;

	/*check if array is full.*/
	if (array->op_arr_ndx == array->op_arr_cnt)
	{
		tmp = malloc((array->op_arr_cnt + MAX_OPERATIONS) * sizeof(struct codes_workload_op));
		assert(tmp);
		memcpy(tmp, array->op_array, array->op_arr_cnt * sizeof(struct codes_workload_op));
		free(array->op_array);
	        array->op_array = tmp;
	        array->op_arr_cnt += MAX_OPERATIONS;
	}

	/* add the MPI operation to the op array */
	array->op_array[array->op_arr_ndx] = *mpi_op;
	//printf("\n insert time %f end time %f ", array->op_array[array->op_arr_ndx].start_time, array->op_array[array->op_arr_ndx].end_time);
	array->op_arr_ndx++;
	return;
}

/* resets the counters after file is fully loaded */
static void dumpi_finalize_mpi_op_data(void *mpi_op_array)
{
	struct dumpi_op_data_array* array = (struct dumpi_op_data_array*)mpi_op_array;

	array->op_arr_cnt = array->op_arr_ndx;	
	array->op_arr_ndx = 0;
}

185 186 187 188 189 190 191
/* rolls back to previous index */
static void dumpi_roll_back_prev_op(void * mpi_op_array)
{
    dumpi_op_data_array *array = (dumpi_op_data_array*)mpi_op_array;
    array->op_arr_ndx--;
    assert(array->op_arr_ndx >= 0);
}
192 193 194 195
/* removes the next operation from the array */
static void dumpi_remove_next_op(void *mpi_op_array, struct codes_workload_op *mpi_op,
                                      double last_op_time)
{
196 197
    (void)last_op_time;

198 199 200 201 202 203 204 205 206 207
	dumpi_op_data_array *array = (dumpi_op_data_array*)mpi_op_array;
	//printf("\n op array index %d array count %d ", array->op_arr_ndx, array->op_arr_cnt);
	if (array->op_arr_ndx == array->op_arr_cnt)
	 {
		mpi_op->op_type = CODES_WK_END;
	 }
	else
	{
		struct codes_workload_op *tmp = &(array->op_array[array->op_arr_ndx]);
		*mpi_op = *tmp;
208
        array->op_arr_ndx++;
209
	}
210
	/*if(mpi_op->op_type == CODES_WK_END)
211 212 213
	{
		free(array->op_array);
		free(array);
214
	}*/
215 216
}

217 218 219 220 221 222 223 224 225 226
/* check for initialization and normalize reported time */
static inline void check_set_init_time(const dumpi_time *t, rank_mpi_context * my_ctx)
{
    if (!my_ctx->is_init) {
        my_ctx->is_init = 1;
        my_ctx->init_time = time_to_ns_lf(t->start);
        my_ctx->last_op_time = time_to_ns_lf(t->stop) - my_ctx->init_time;
    }
}

227 228 229
/* introduce delay between operations: delay is the compute time NOT spent in MPI operations*/
void update_compute_time(const dumpi_time* time, rank_mpi_context* my_ctx)
{
230 231
    double start = time_to_ns_lf(time->start) - my_ctx->init_time;
    double stop = time_to_ns_lf(time->stop) - my_ctx->init_time;
232
    if((start - my_ctx->last_op_time) > DUMPI_IGNORE_DELAY)
233
    {
234 235 236 237 238 239 240
        struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_DELAY;
        wrkld_per_rank.start_time = my_ctx->last_op_time;
        wrkld_per_rank.end_time = start;
        wrkld_per_rank.u.delay.seconds = (start - my_ctx->last_op_time) / 1e9;
        dumpi_insert_next_op(my_ctx->dumpi_mpi_array, &wrkld_per_rank); 
241
    }
242
    my_ctx->last_op_time = stop;
243 244
}

245 246 247 248 249 250 251 252
static int handleDUMPIInit(
        const dumpi_init *prm,
        uint16_t thread,
        const dumpi_time *cpu,
        const dumpi_time *wall,
        const dumpi_perfinfo *perf,
        void *uarg)
{
253 254 255 256 257 258
    (void)prm;
    (void)thread;
    (void)cpu;
    (void)wall;
    (void)perf;

259
    rank_mpi_context *myctx = (rank_mpi_context*)uarg;
260
    check_set_init_time(wall, myctx);
261 262 263
    return 0;
}

264 265
int handleDUMPIError(const void* prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *uarg)
{
266 267 268 269 270 271 272
    (void)prm;
    (void)thread;
    (void)cpu;
    (void)wall;
    (void)perf;
    (void)uarg;

273 274 275
    tw_error(TW_LOC, "\n MPI operation not supported by the MPI-Sim Layer ");
}

276
int handleDUMPIIgnore(const void* prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *uarg)
277
{
278 279 280 281 282 283 284
    (void)prm;
    (void)thread;
    (void)cpu;
    (void)wall;
    (void)perf;
	
    rank_mpi_context* myctx = (rank_mpi_context*)uarg;
285

286
    check_set_init_time(wall, myctx);
287
	update_compute_time(wall, myctx);
288 289 290 291

	return 0;
}

292 293 294 295 296
static void update_times_and_insert(
        struct codes_workload_op *op,
        const dumpi_time *t,
        rank_mpi_context *ctx)
{
297
    check_set_init_time(t, ctx);
298 299 300
    op->start_time = time_to_ns_lf(t->start) - ctx->init_time;
    op->end_time = time_to_ns_lf(t->stop) - ctx->init_time;
    update_compute_time(t, ctx);
301
    dumpi_insert_next_op(ctx->dumpi_mpi_array, op);
302 303 304
}


305 306 307 308
int handleDUMPIWait(const dumpi_wait *prm, uint16_t thread,
                    const dumpi_time *cpu, const dumpi_time *wall,
                    const dumpi_perfinfo *perf, void *userarg)
{
309 310 311 312 313
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)perf;
        
314 315 316
        rank_mpi_context* myctx = (rank_mpi_context*)userarg;
        struct codes_workload_op wrkld_per_rank;

317
        wrkld_per_rank.op_type = CODES_WK_WAIT;
318 319
        wrkld_per_rank.u.wait.req_id = prm->request;

320 321
        update_times_and_insert(&wrkld_per_rank, wall, myctx);

322 323 324 325 326 327 328
        return 0;
}

int handleDUMPIWaitsome(const dumpi_waitsome *prm, uint16_t thread,
                    const dumpi_time *cpu, const dumpi_time *wall,
                    const dumpi_perfinfo *perf, void *userarg)
{
329 330 331 332 333
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
        
334 335 336 337
        int i;
        rank_mpi_context* myctx = (rank_mpi_context*)userarg;
        struct codes_workload_op wrkld_per_rank;

338
        wrkld_per_rank.op_type = CODES_WK_WAITSOME;
339
        wrkld_per_rank.u.waits.count = prm->count;
340
        wrkld_per_rank.u.waits.req_ids = (int32_t*)malloc(prm->count * sizeof(int32_t));
341 342

        for( i = 0; i < prm->count; i++ )
343
                wrkld_per_rank.u.waits.req_ids[i] = (int32_t)prm->requests[i];
344

345
        update_times_and_insert(&wrkld_per_rank, wall, myctx);
346 347 348 349 350 351 352
        return 0;
}

int handleDUMPIWaitany(const dumpi_waitany *prm, uint16_t thread,
                    const dumpi_time *cpu, const dumpi_time *wall,
                    const dumpi_perfinfo *perf, void *userarg)
{
353 354 355 356 357 358
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
        
359 360 361 362
        int i;
        rank_mpi_context* myctx = (rank_mpi_context*)userarg;
        struct codes_workload_op wrkld_per_rank;

363
        wrkld_per_rank.op_type = CODES_WK_WAITANY;
364
        wrkld_per_rank.u.waits.count = prm->count;
365
        wrkld_per_rank.u.waits.req_ids = (int32_t*)malloc(prm->count * sizeof(int32_t));
366 367

        for( i = 0; i < prm->count; i++ )
368
                wrkld_per_rank.u.waits.req_ids[i] = (int32_t)prm->requests[i];
369

370
        update_times_and_insert(&wrkld_per_rank, wall, myctx);
371 372 373 374 375 376 377
        return 0;
}

int handleDUMPIWaitall(const dumpi_waitall *prm, uint16_t thread,
                    const dumpi_time *cpu, const dumpi_time *wall,
                    const dumpi_perfinfo *perf, void *userarg)
{
378 379 380 381 382
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
383
        int i;
384
        
385 386 387
        rank_mpi_context* myctx = (rank_mpi_context*)userarg;
        struct codes_workload_op wrkld_per_rank;

388
        wrkld_per_rank.op_type = CODES_WK_WAITALL;
389 390

        wrkld_per_rank.u.waits.count = prm->count;
391
        wrkld_per_rank.u.waits.req_ids = (int32_t*)malloc(prm->count * sizeof(int32_t));
392 393 394
        for( i = 0; i < prm->count; i++ )
                wrkld_per_rank.u.waits.req_ids[i] = prm->requests[i];

395
        update_times_and_insert(&wrkld_per_rank, wall, myctx);
396 397 398
        return 0;
}

399 400
int handleDUMPIISend(const dumpi_isend *prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *userarg)
{
401 402 403 404 405 406 407 408
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
	
        rank_mpi_context* myctx = (rank_mpi_context*)userarg;
        struct codes_workload_op wrkld_per_rank;
409

410 411 412 413 414
        wrkld_per_rank.op_type = CODES_WK_ISEND;
        wrkld_per_rank.u.send.tag = prm->tag;
        wrkld_per_rank.u.send.count = prm->count;
        wrkld_per_rank.u.send.data_type = prm->datatype;
        wrkld_per_rank.u.send.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
415
        printf("\n Num bytes %lld num bytes %lld ", prm->count, get_num_bytes(myctx,prm->datatype));
416
        assert(wrkld_per_rank.u.send.num_bytes >= 0);
417
    	wrkld_per_rank.u.send.req_id = prm->request;
418 419
        wrkld_per_rank.u.send.dest_rank = prm->dest;
        wrkld_per_rank.u.send.source_rank = myctx->my_rank;
420 421

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
422 423
	
        return 0;
424 425 426 427
}

int handleDUMPIIRecv(const dumpi_irecv *prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *userarg)
{
428 429 430 431 432 433 434 435
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
	
        //printf("\n irecv source %d count %d data type %d", prm->source, prm->count, prm->datatype);
        rank_mpi_context* myctx = (rank_mpi_context*)userarg;
436 437 438
        struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_IRECV;
439 440 441
	    wrkld_per_rank.u.recv.data_type = prm->datatype;
	    wrkld_per_rank.u.recv.count = prm->count;
	    wrkld_per_rank.u.recv.tag = prm->tag;
442
        wrkld_per_rank.u.recv.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
443
	    
444
        //assert(wrkld_per_rank.u.recv.num_bytes >= 0);
445 446
        wrkld_per_rank.u.recv.source_rank = prm->source;
        wrkld_per_rank.u.recv.dest_rank = -1;
447
	    wrkld_per_rank.u.recv.req_id = prm->request;
448 449

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
450 451 452 453 454 455 456
        return 0;
}

int handleDUMPISend(const dumpi_send *prm, uint16_t thread,
                      const dumpi_time *cpu, const dumpi_time *wall,
                      const dumpi_perfinfo *perf, void *uarg)
{
457 458 459 460 461 462 463
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
	    
        rank_mpi_context* myctx = (rank_mpi_context*)uarg;
464 465 466
        struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_SEND;
467
	    wrkld_per_rank.u.send.tag = prm->tag;
468 469
        wrkld_per_rank.u.send.count = prm->count;
        wrkld_per_rank.u.send.data_type = prm->datatype;
470
        wrkld_per_rank.u.send.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
471
	    assert(wrkld_per_rank.u.send.num_bytes >= 0);
472 473
        wrkld_per_rank.u.send.dest_rank = prm->dest;
        wrkld_per_rank.u.send.source_rank = myctx->my_rank;
474
         wrkld_per_rank.u.send.req_id = -1;
475 476

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
477 478 479 480 481 482 483
        return 0;
}

int handleDUMPIRecv(const dumpi_recv *prm, uint16_t thread,
                      const dumpi_time *cpu, const dumpi_time *wall,
                      const dumpi_perfinfo *perf, void *uarg)
{
484 485 486 487 488 489
     (void)prm;
     (void)thread;
     (void)cpu;
     (void)wall;
     (void)perf;

490 491 492 493 494
	//printf("\n irecv source %d count %d data type %d", prm->source, prm->count, prm->datatype);
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	struct codes_workload_op wrkld_per_rank;

	wrkld_per_rank.op_type = CODES_WK_RECV;
495 496 497
    wrkld_per_rank.u.recv.tag = prm->tag;
    wrkld_per_rank.u.recv.count = prm->count;
    wrkld_per_rank.u.recv.data_type = prm->datatype;
498
    wrkld_per_rank.u.recv.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
499
	assert(wrkld_per_rank.u.recv.num_bytes >= 0);
500 501
    wrkld_per_rank.u.recv.source_rank = prm->source;
    wrkld_per_rank.u.recv.dest_rank = -1;
502

503 504
    update_times_and_insert(&wrkld_per_rank, wall, myctx);
    return 0;
505 506 507

}

508 509 510 511
int handleDUMPISendrecv(const dumpi_sendrecv* prm, uint16_t thread,
			const dumpi_time *cpu, const dumpi_time *wall,
			const dumpi_perfinfo *perf, void *uarg)
{
512 513 514 515 516 517 518
     (void)prm;
     (void)thread;
     (void)cpu;
     (void)wall;
     (void)perf;
	
     rank_mpi_context* myctx = (rank_mpi_context*)uarg;
519 520 521 522 523 524 525 526

	{
		struct codes_workload_op wrkld_per_rank;
		wrkld_per_rank.op_type = CODES_WK_SEND;
		wrkld_per_rank.u.send.tag = prm->sendtag;
		wrkld_per_rank.u.send.count = prm->sendcount;
		wrkld_per_rank.u.send.data_type = prm->sendtype;
		wrkld_per_rank.u.send.num_bytes = prm->sendcount * get_num_bytes(myctx,prm->sendtype);
527

528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549
		assert(wrkld_per_rank.u.send.num_bytes >= 0);
		wrkld_per_rank.u.send.dest_rank = prm->dest;
		wrkld_per_rank.u.send.source_rank = myctx->my_rank;
		wrkld_per_rank.u.send.req_id = -1;
		update_times_and_insert(&wrkld_per_rank, wall, myctx);
	}

	{
		struct codes_workload_op wrkld_per_rank;
		wrkld_per_rank.op_type = CODES_WK_RECV;
		wrkld_per_rank.u.recv.tag = prm->recvtag;
		wrkld_per_rank.u.recv.count = prm->recvcount;
		wrkld_per_rank.u.recv.data_type = prm->recvtype;
		wrkld_per_rank.u.recv.num_bytes = prm->recvcount * get_num_bytes(myctx,prm->recvtype);
		assert(wrkld_per_rank.u.recv.num_bytes >= 0);
		wrkld_per_rank.u.recv.source_rank = prm->source;
		wrkld_per_rank.u.recv.dest_rank = -1;
		update_times_and_insert(&wrkld_per_rank, wall, myctx);
	}
	return 0;
}

550 551 552 553
int handleDUMPIBcast(const dumpi_bcast *prm, uint16_t thread,
                       const dumpi_time *cpu, const dumpi_time *wall,
                       const dumpi_perfinfo *perf, void *uarg)
{
554 555 556 557 558 559 560
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
        rank_mpi_context* myctx = (rank_mpi_context*)uarg;
        struct codes_workload_op wrkld_per_rank;
561 562

        wrkld_per_rank.op_type = CODES_WK_BCAST;
563
        wrkld_per_rank.u.collective.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
564
	    assert(wrkld_per_rank.u.collective.num_bytes >= 0);
565 566

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
567 568 569 570 571 572 573
        return 0;
}

int handleDUMPIAllgather(const dumpi_allgather *prm, uint16_t thread,
                           const dumpi_time *cpu, const dumpi_time *wall,
                           const dumpi_perfinfo *perf, void *uarg)
{
574 575 576 577 578
    (void)prm;
    (void)thread;
    (void)cpu;
    (void)wall;
    (void)perf;
579 580 581
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	struct codes_workload_op wrkld_per_rank;

582 583
    wrkld_per_rank.op_type = CODES_WK_ALLGATHER;
    wrkld_per_rank.u.collective.num_bytes = prm->sendcount * get_num_bytes(myctx,prm->sendtype);
584
	assert(wrkld_per_rank.u.collective.num_bytes > 0);
585

586 587
    update_times_and_insert(&wrkld_per_rank, wall, myctx);
    return 0;
588 589 590 591 592 593
}

int handleDUMPIAllgatherv(const dumpi_allgatherv *prm, uint16_t thread,
                            const dumpi_time *cpu, const dumpi_time *wall,
                            const dumpi_perfinfo *perf, void *uarg)
{
594 595 596 597 598 599 600
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
	    rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	    struct codes_workload_op wrkld_per_rank;
601 602

        wrkld_per_rank.op_type = CODES_WK_ALLGATHERV;
603
        wrkld_per_rank.u.collective.num_bytes = prm->sendcount * get_num_bytes(myctx,prm->sendtype);
604
	    assert(wrkld_per_rank.u.collective.num_bytes > 0);
605 606

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
607 608 609 610 611 612 613
        return 0;
}

int handleDUMPIAlltoall(const dumpi_alltoall *prm, uint16_t thread,
                          const dumpi_time *cpu, const dumpi_time *wall,
                          const dumpi_perfinfo *perf, void *uarg)
{
614 615 616 617 618 619 620
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
	    rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	    struct codes_workload_op wrkld_per_rank;
621 622

        wrkld_per_rank.op_type = CODES_WK_ALLTOALL;
623
        wrkld_per_rank.u.collective.num_bytes = prm->sendcount * get_num_bytes(myctx,prm->sendtype);
624
	    assert(wrkld_per_rank.u.collective.num_bytes > 0);
625 626

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
627 628 629 630 631 632 633
        return 0;
}

int handleDUMPIAlltoallv(const dumpi_alltoallv *prm, uint16_t thread,
                           const dumpi_time *cpu, const dumpi_time *wall,
                           const dumpi_perfinfo *perf, void *uarg)
{
634 635 636 637 638 639 640 641
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
	
        rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	    struct codes_workload_op wrkld_per_rank;
642 643

        wrkld_per_rank.op_type = CODES_WK_ALLTOALLV;
644
        wrkld_per_rank.u.collective.num_bytes = prm->sendcounts[0] * get_num_bytes(myctx,prm->sendtype);
645
	    assert(wrkld_per_rank.u.collective.num_bytes > 0);
646 647

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
648 649 650 651 652 653 654
        return 0;
}

int handleDUMPIReduce(const dumpi_reduce *prm, uint16_t thread,
                        const dumpi_time *cpu, const dumpi_time *wall,
                        const dumpi_perfinfo *perf, void *uarg)
{
655 656 657 658 659 660 661 662
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
	
        rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	    struct codes_workload_op wrkld_per_rank;
663 664

        wrkld_per_rank.op_type = CODES_WK_REDUCE;
665
        wrkld_per_rank.u.collective.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
666
	    assert(wrkld_per_rank.u.collective.num_bytes > 0);
667 668

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
669 670 671 672 673 674 675
        return 0;
}

int handleDUMPIAllreduce(const dumpi_allreduce *prm, uint16_t thread,
                           const dumpi_time *cpu, const dumpi_time *wall,
                           const dumpi_perfinfo *perf, void *uarg)
{
676 677 678 679 680 681 682 683
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
	
        rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	    struct codes_workload_op wrkld_per_rank;
684 685

        wrkld_per_rank.op_type = CODES_WK_ALLREDUCE;
686
        wrkld_per_rank.u.collective.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
687
	    assert(wrkld_per_rank.u.collective.num_bytes > 0);
688 689

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
690 691 692 693 694
        return 0;
}

int handleDUMPIFinalize(const dumpi_finalize *prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *uarg)
{
695 696 697 698 699 700 701 702
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
	
        rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	    struct codes_workload_op wrkld_per_rank;
703 704

        wrkld_per_rank.op_type = CODES_WK_END;
705 706

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
707 708 709
        return 0;
}

710 711
int handleDUMPIReqFree(const dumpi_request_free *prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *userarg)
{
712 713 714 715 716 717 718 719
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
    
        rank_mpi_context* myctx = (rank_mpi_context*)userarg;
        struct codes_workload_op wrkld_per_rank;
720

721 722
        wrkld_per_rank.op_type = CODES_WK_REQ_FREE;
        wrkld_per_rank.u.free.req_id = prm->request;
723

724 725
        update_times_and_insert(&wrkld_per_rank, wall, myctx);
        return 0;
726 727
}

728 729
static int hash_rank_compare(void *key, struct qhash_head *link)
{
730
    rank_mpi_compare *in = key;
731 732 733
    rank_mpi_context *tmp;

    tmp = qhash_entry(link, rank_mpi_context, hash_link);
734
    if (tmp->my_rank == in->rank && tmp->my_app_id == in->app)
735 736 737 738
        return 1;
    return 0;
}

739
int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank)
740 741 742
{
	libundumpi_callbacks callbacks;
	libundumpi_cbpair callarr[DUMPI_END_OF_STREAM];
743 744 745 746
#ifdef ENABLE_CORTEX
	libundumpi_cbpair transarr[DUMPI_END_OF_STREAM];
#endif
	PROFILE_TYPE profile;
747
	dumpi_trace_params* dumpi_params = (dumpi_trace_params*)params;
748
	char file_name[MAX_LENGTH_FILE];
749 750 751 752 753 754

	if(rank >= dumpi_params->num_net_traces)
		return -1;

	if(!rank_tbl)
    	{
755
            rank_tbl = qhash_init(hash_rank_compare, quickhash_64bit_hash, RANK_HASH_TABLE_SIZE);
756 757 758 759 760 761 762 763
            if(!rank_tbl)
                  return -1;
    	}
	
	rank_mpi_context *my_ctx;
	my_ctx = malloc(sizeof(rank_mpi_context));
	assert(my_ctx);
	my_ctx->my_rank = rank;
764
        my_ctx->my_app_id = app_id;
765
	my_ctx->last_op_time = 0.0;
766
        my_ctx->is_init = 0;
767 768 769 770 771 772 773 774 775 776
	my_ctx->dumpi_mpi_array = dumpi_init_op_data();

	if(rank < 10)
            sprintf(file_name, "%s000%d.bin", dumpi_params->file_name, rank);
         else if(rank >=10 && rank < 100)
            sprintf(file_name, "%s00%d.bin", dumpi_params->file_name, rank);
           else if(rank >=100 && rank < 1000)
             sprintf(file_name, "%s0%d.bin", dumpi_params->file_name, rank);
             else
              sprintf(file_name, "%s%d.bin", dumpi_params->file_name, rank);
777
#ifdef ENABLE_CORTEX
778 779 780 781 782
	if(strcmp(dumpi_params->file_name,"none") == 0) {
		profile = cortex_undumpi_open(NULL, app_id, dumpi_params->num_net_traces, rank);
	} else {
		profile = cortex_undumpi_open(file_name, app_id, dumpi_params->num_net_traces, rank);
	}
783
	
Matthieu Dorier's avatar
Matthieu Dorier committed
784 785 786 787 788 789 790 791 792 793 794 795 796 797 798
	{ int i;
	for(i=0; i < dumpi_params->num_net_traces; i++) {
		struct codes_jobmap_id id = {
			.job = app_id,
			.rank = i
		};
		uint32_t cn_id;
		if(jobmap_ctx) {
			cn_id = codes_jobmap_to_global_id(id, jobmap_ctx);
		} else {
			cn_id = i;
		}
		cortex_placement_set(profile, i, cn_id);
	}
	}
799
	
800
	cortex_topology_set(profile,&model_net_topology);
801
#else
802
	profile =  undumpi_open(file_name);
803
#endif
804
        my_ctx->profile = profile;
805 806 807 808 809 810 811
        if(NULL == profile) {
                printf("Error: unable to open DUMPI trace: %s", file_name);
                exit(-1);
        }
	
	memset(&callbacks, 0, sizeof(libundumpi_callbacks));
        memset(&callarr, 0, sizeof(libundumpi_cbpair) * DUMPI_END_OF_STREAM);
812 813 814
#ifdef ENABLE_CORTEX
	memset(&transarr, 0, sizeof(libundumpi_cbpair) * DUMPI_END_OF_STREAM);
#endif
815 816

	/* handle MPI function calls */	        
817
        callbacks.on_init = handleDUMPIInit;
818 819 820 821 822 823
	callbacks.on_send = (dumpi_send_call)handleDUMPISend;
        callbacks.on_recv = (dumpi_recv_call)handleDUMPIRecv;
        callbacks.on_isend = (dumpi_isend_call)handleDUMPIISend;
        callbacks.on_irecv = (dumpi_irecv_call)handleDUMPIIRecv;
        callbacks.on_allreduce = (dumpi_allreduce_call)handleDUMPIAllreduce;
	callbacks.on_bcast = (dumpi_bcast_call)handleDUMPIBcast;
824 825 826 827 828 829 830 831 832
	callbacks.on_get_count = (dumpi_get_count_call)handleDUMPIIgnore;
	callbacks.on_bsend = (dumpi_bsend_call)handleDUMPIIgnore;
	callbacks.on_ssend = (dumpi_ssend_call)handleDUMPIIgnore;
	callbacks.on_rsend = (dumpi_rsend_call)handleDUMPIIgnore;
	callbacks.on_buffer_attach = (dumpi_buffer_attach_call)handleDUMPIIgnore;
	callbacks.on_buffer_detach = (dumpi_buffer_detach_call)handleDUMPIIgnore;
	callbacks.on_ibsend = (dumpi_ibsend_call)handleDUMPIIgnore;
	callbacks.on_issend = (dumpi_issend_call)handleDUMPIIgnore;
	callbacks.on_irsend = (dumpi_irsend_call)handleDUMPIIgnore;
833
	callbacks.on_wait = (dumpi_wait_call)handleDUMPIWait;
834
	callbacks.on_test = (dumpi_test_call)handleDUMPIIgnore;
835
	callbacks.on_request_free = (dumpi_request_free_call)handleDUMPIReqFree;
836
	callbacks.on_waitany = (dumpi_waitany_call)handleDUMPIWaitany;
837
	callbacks.on_testany = (dumpi_testany_call)handleDUMPIIgnore;
838
	callbacks.on_waitall = (dumpi_waitall_call)handleDUMPIWaitall;
839
	callbacks.on_testall = (dumpi_testall_call)handleDUMPIIgnore;
840
	callbacks.on_waitsome = (dumpi_waitsome_call)handleDUMPIWaitsome;
841 842 843 844 845 846 847 848 849 850 851 852
	callbacks.on_testsome = (dumpi_testsome_call)handleDUMPIIgnore;
	callbacks.on_iprobe = (dumpi_iprobe_call)handleDUMPIIgnore;
	callbacks.on_probe = (dumpi_probe_call)handleDUMPIIgnore;
	callbacks.on_cancel = (dumpi_cancel_call)handleDUMPIIgnore;
	callbacks.on_test_cancelled = (dumpi_test_cancelled_call)handleDUMPIIgnore;
	callbacks.on_send_init = (dumpi_send_init_call)handleDUMPIIgnore;
	callbacks.on_bsend_init = (dumpi_bsend_init_call)handleDUMPIIgnore;
	callbacks.on_ssend_init = (dumpi_ssend_init_call)handleDUMPIIgnore;
	callbacks.on_rsend_init = (dumpi_rsend_init_call)handleDUMPIIgnore;
	callbacks.on_recv_init = (dumpi_recv_init_call)handleDUMPIIgnore;
	callbacks.on_start = (dumpi_start_call)handleDUMPIIgnore;
	callbacks.on_startall = (dumpi_startall_call)handleDUMPIIgnore;
853
	callbacks.on_sendrecv = (dumpi_sendrecv_call)handleDUMPISendrecv;
854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872
	callbacks.on_sendrecv_replace = (dumpi_sendrecv_replace_call)handleDUMPIIgnore;
	callbacks.on_type_contiguous = (dumpi_type_contiguous_call)handleDUMPIIgnore;
	callbacks.on_barrier = (dumpi_barrier_call)handleDUMPIIgnore;
        callbacks.on_gather = (dumpi_gather_call)handleDUMPIIgnore;
        callbacks.on_gatherv = (dumpi_gatherv_call)handleDUMPIIgnore;
        callbacks.on_scatter = (dumpi_scatter_call)handleDUMPIIgnore;
        callbacks.on_scatterv = (dumpi_scatterv_call)handleDUMPIIgnore;
        callbacks.on_allgather = (dumpi_allgather_call)handleDUMPIIgnore;
        callbacks.on_allgatherv = (dumpi_allgatherv_call)handleDUMPIIgnore;
        callbacks.on_alltoall = (dumpi_alltoall_call)handleDUMPIIgnore;
        callbacks.on_alltoallv = (dumpi_alltoallv_call)handleDUMPIIgnore;
        callbacks.on_alltoallw = (dumpi_alltoallw_call)handleDUMPIIgnore;
        callbacks.on_reduce = (dumpi_reduce_call)handleDUMPIIgnore;
        callbacks.on_reduce_scatter = (dumpi_reduce_scatter_call)handleDUMPIIgnore;
        callbacks.on_group_size = (dumpi_group_size_call)handleDUMPIIgnore;
        callbacks.on_group_rank = (dumpi_group_rank_call)handleDUMPIIgnore;
        callbacks.on_comm_size = (dumpi_comm_size_call)handleDUMPIIgnore;
        callbacks.on_comm_rank = (dumpi_comm_rank_call)handleDUMPIIgnore;
        callbacks.on_comm_get_attr = (dumpi_comm_get_attr_call)handleDUMPIIgnore;
873 874
        callbacks.on_comm_dup = (dumpi_comm_dup_call)handleDUMPIError;
        callbacks.on_comm_create = (dumpi_comm_create_call)handleDUMPIError;
875
        callbacks.on_wtime = (dumpi_wtime_call)handleDUMPIIgnore;
876 877 878 879
        callbacks.on_finalize = (dumpi_finalize_call)handleDUMPIFinalize;

        libundumpi_populate_callbacks(&callbacks, callarr);

880
#ifdef ENABLE_CORTEX
881
#ifdef ENABLE_CORTEX_PYTHON
882 883 884 885 886
	if(dumpi_params->cortex_script[0] != 0) {
		libundumpi_populate_callbacks(CORTEX_PYTHON_TRANSLATION, transarr);
	} else {
		libundumpi_populate_callbacks(CORTEX_MPICH_TRANSLATION, transarr);
	}
887 888 889
#else
	libundumpi_populate_callbacks(CORTEX_MPICH_TRANSLATION, transarr);
#endif
890 891
#endif
        DUMPI_START_STREAM_READ(profile);
892 893
        //dumpi_header* trace_header = undumpi_read_header(profile);
        //dumpi_free_header(trace_header);
894

895
#ifdef ENABLE_CORTEX_PYTHON
896 897 898 899 900 901 902 903 904 905
	if(dumpi_params->cortex_script[0] != 0) {
		if(dumpi_params->cortex_class[0] != 0) {
			cortex_python_set_module(dumpi_params->cortex_script, dumpi_params->cortex_class);
		} else {
			cortex_python_set_module(dumpi_params->cortex_script, NULL);
		}
		if(dumpi_params->cortex_gen[0] != 0) {
			cortex_python_call_generator(profile, dumpi_params->cortex_gen);
		}
	}
906 907
#endif

908 909 910 911 912 913
        int finalize_reached = 0;
        int active = 1;
        int num_calls = 0;
        while(active && !finalize_reached)
        {
           num_calls++;
914 915 916
#ifdef ENABLE_CORTEX
	   active = cortex_undumpi_read_single_call(profile, callarr, transarr, (void*)my_ctx, &finalize_reached);
#else
917
           active = undumpi_read_single_call(profile, callarr, (void*)my_ctx, &finalize_reached);
918
#endif
919
        }
920
	UNDUMPI_CLOSE(profile);
921 922
	dumpi_finalize_mpi_op_data(my_ctx->dumpi_mpi_array);
	/* add this rank context to hash table */	
923 924 925 926
        rank_mpi_compare cmp;
        cmp.app = my_ctx->my_app_id;
        cmp.rank = my_ctx->my_rank;
	qhash_add(rank_tbl, &cmp, &(my_ctx->hash_link));
927 928 929 930
	rank_tbl_pop++;

	return 0;
}
931 932 933
/* Data types are for 64-bit archs. Source:
 * https://www.tutorialspoint.com/cprogramming/c_data_types.htm 
 * */
934
static int64_t get_num_bytes(rank_mpi_context* myctx, dumpi_datatype dt)
935
{
936 937 938
#ifdef ENABLE_CORTEX
   return cortex_datatype_get_size(myctx->profile,dt);
#endif
939 940 941 942
   switch(dt)
   {
	case DUMPI_DATATYPE_ERROR:
	case DUMPI_DATATYPE_NULL:
943
		tw_error(TW_LOC, "\n data type error");
944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963
	break;

	case DUMPI_CHAR:
	case DUMPI_UNSIGNED_CHAR:
	case DUMPI_SIGNED_CHAR:
	case DUMPI_BYTE:
		return 1; /* 1 byte for char */
	break;

	case DUMPI_WCHAR:
		return 4; /* 4 bytes for a 64-bit version */
	break;

	case DUMPI_SHORT:
	case DUMPI_SHORT_INT:
	case DUMPI_UNSIGNED_SHORT:
		return 2;
	break;

	 case DUMPI_INT:
964 965 966
		return 4;
	 break;

967
	 case DUMPI_UNSIGNED:
968 969 970
     return 4;
     break;

971 972
	 case DUMPI_FLOAT:
	 case DUMPI_FLOAT_INT:
973 974
        return 4;
     break;
975 976

	case DUMPI_DOUBLE:
977 978 979
     return 8;
    break;

980
	case DUMPI_LONG:
981 982 983
     return 8;
     break;

984
	case DUMPI_LONG_INT:
985 986 987
     return 8;
     break;

988
	case DUMPI_UNSIGNED_LONG:
989 990 991
     return 8;
     break;

992
	case DUMPI_LONG_LONG_INT:
993 994 995
     return 8;
     break;

996
	case DUMPI_UNSIGNED_LONG_LONG:
997 998 999
     return 8;
     break;

1000
	case DUMPI_LONG_LONG:
1001 1002 1003
     return 8;
     break;

1004 1005 1006 1007 1008
	case DUMPI_DOUBLE_INT:
		return 8;
	break;

	case DUMPI_LONG_DOUBLE_INT:
1009 1010 1011 1012
	case DUMPI_LONG_DOUBLE:
        return 10;
        break;

1013 1014
	default:
	  {
1015
        tw_error(TW_LOC, "\n undefined data type");
1016 1017 1018 1019