codes-dumpi-trace-nw-wrkld.c 35.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (C) 2014 University of Chicago
 * See COPYRIGHT notice in top-level directory.
 *
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mpi.h>
#include <ross.h>
#include <assert.h>
#include "dumpi/libundumpi/bindings.h"
#include "dumpi/libundumpi/libundumpi.h"
#include "codes/codes-workload.h"
#include "codes/quickhash.h"
Matthieu Dorier's avatar
Matthieu Dorier committed
17
#include "codes/codes-jobmap.h"
18
#include "codes/model-net.h"
19

20 21
#if ENABLE_CORTEX
#include <cortex/cortex.h>
22
#include <cortex/datatype.h>
23 24
#include <cortex/cortex-mpich.h>
#ifdef ENABLE_CORTEX_PYTHON
25
#include <cortex/cortex-python.h>
26
#endif
27
#define PROFILE_TYPE cortex_dumpi_profile*
28
//#define UNDUMPI_OPEN cortex_undumpi_open
29 30 31 32
#define DUMPI_START_STREAM_READ cortex_dumpi_start_stream_read
#define UNDUMPI_CLOSE cortex_undumpi_close
#else
#define PROFILE_TYPE dumpi_profile*
33
//#define UNDUMPI_OPEN undumpi_open
34 35 36 37
#define DUMPI_START_STREAM_READ dumpi_start_stream_read
#define UNDUMPI_CLOSE undumpi_close
#endif

38
#define MAX_LENGTH_FILE 512
39 40
#define MAX_OPERATIONS 32768
#define DUMPI_IGNORE_DELAY 100
41
#define RANK_HASH_TABLE_SIZE 400
42

Matthieu Dorier's avatar
Matthieu Dorier committed
43 44 45
/* This variable is defined in src/network-workloads/model-net-mpi-replay.c */
extern struct codes_jobmap_ctx *jobmap_ctx; 

46 47 48
static struct qhash_table *rank_tbl = NULL;
static int rank_tbl_pop = 0;

49
static unsigned int max_threshold = INT_MAX;
50 51 52
/* context of the MPI workload */
typedef struct rank_mpi_context
{
53
    PROFILE_TYPE profile;
54
    int my_app_id;
55 56
    // whether we've seen an init op (needed for timing correctness)
    int is_init;
57
    int num_reqs;
58
    unsigned int num_ops;
59 60
    int64_t my_rank;
    double last_op_time;
61
    double init_time;
62 63
    void* dumpi_mpi_array;	
    struct qhash_head hash_link;
64 65
    
    struct rc_stack * completed_ctx;
66 67
} rank_mpi_context;

68 69 70 71 72 73
typedef struct rank_mpi_compare
{
    int app;
    int rank;
} rank_mpi_compare;

74 75 76 77 78 79 80 81
/* Holds all the data about MPI operations from the log */
typedef struct dumpi_op_data_array
{
	struct codes_workload_op* op_array;
        int64_t op_arr_ndx;
        int64_t op_arr_cnt;
} dumpi_op_data_array;

82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
/* timing utilities */

#ifdef __GNUC__
__attribute__((unused))
#endif
static dumpi_clock timediff(
        dumpi_clock end,
        dumpi_clock start)
{
    dumpi_clock temp;
    if ((end.nsec-start.nsec)<0) {
        temp.sec = end.sec-start.sec-1;
        temp.nsec = 1000000000+end.nsec-start.nsec;
    } else {
        temp.sec = end.sec-start.sec;
        temp.nsec = end.nsec-start.nsec;
    }
    return temp;
}

102
/*static inline double time_to_ms_lf(dumpi_clock t){
103 104 105 106
        return (double) t.sec * 1e3 + (double) t.nsec / 1e6;
}
static inline double time_to_us_lf(dumpi_clock t){
        return (double) t.sec * 1e6 + (double) t.nsec / 1e3;
107
}*/
108 109 110
static inline double time_to_ns_lf(dumpi_clock t){
        return (double) t.sec * 1e9 + (double) t.nsec;
}
111
/*static inline double time_to_s_lf(dumpi_clock t){
112
        return (double) t.sec + (double) t.nsec / 1e9;
113
}*/
114

115
/* load the trace */
116
static int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank);
117 118

/* dumpi implementation of get next operation in the workload */
119
static void dumpi_trace_nw_workload_get_next(int app_id, int rank, struct codes_workload_op *op);
120 121

/* get number of bytes from the workload data type and count */
122
static uint64_t get_num_bytes(rank_mpi_context* my_ctx, dumpi_datatype dt);
123 124

/* computes the delay between MPI operations */
125
static void update_compute_time(const dumpi_time* time, rank_mpi_context* my_ctx);
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187

/* initializes the data structures */
static void* dumpi_init_op_data();

/* removes next operations from the dynamic array */
static void dumpi_remove_next_op(void *mpi_op_array, struct codes_workload_op *mpi_op,
                                      double last_op_time);

/* resets the counters for the dynamic array once the workload is completely loaded*/
static void dumpi_finalize_mpi_op_data(void *mpi_op_array);

/* insert next operation */
static void dumpi_insert_next_op(void *mpi_op_array, struct codes_workload_op *mpi_op);

/* initialize the array data structure */
static void* dumpi_init_op_data()
{
	dumpi_op_data_array* tmp;
	
	tmp = malloc(sizeof(dumpi_op_data_array));
	assert(tmp);
	tmp->op_array = malloc(MAX_OPERATIONS * sizeof(struct codes_workload_op));
	assert(tmp->op_array);
        tmp->op_arr_ndx = 0;
	tmp->op_arr_cnt = MAX_OPERATIONS;

	return (void *)tmp;	
}

/* inserts next operation in the array */
static void dumpi_insert_next_op(void *mpi_op_array, struct codes_workload_op *mpi_op)
{
	dumpi_op_data_array *array = (dumpi_op_data_array*)mpi_op_array;
	struct codes_workload_op *tmp;

	/*check if array is full.*/
	if (array->op_arr_ndx == array->op_arr_cnt)
	{
		tmp = malloc((array->op_arr_cnt + MAX_OPERATIONS) * sizeof(struct codes_workload_op));
		assert(tmp);
		memcpy(tmp, array->op_array, array->op_arr_cnt * sizeof(struct codes_workload_op));
		free(array->op_array);
	        array->op_array = tmp;
	        array->op_arr_cnt += MAX_OPERATIONS;
	}

	/* add the MPI operation to the op array */
	array->op_array[array->op_arr_ndx] = *mpi_op;
	//printf("\n insert time %f end time %f ", array->op_array[array->op_arr_ndx].start_time, array->op_array[array->op_arr_ndx].end_time);
	array->op_arr_ndx++;
	return;
}

/* resets the counters after file is fully loaded */
static void dumpi_finalize_mpi_op_data(void *mpi_op_array)
{
	struct dumpi_op_data_array* array = (struct dumpi_op_data_array*)mpi_op_array;

	array->op_arr_cnt = array->op_arr_ndx;	
	array->op_arr_ndx = 0;
}

188 189 190 191 192 193 194
/* rolls back to previous index */
static void dumpi_roll_back_prev_op(void * mpi_op_array)
{
    dumpi_op_data_array *array = (dumpi_op_data_array*)mpi_op_array;
    array->op_arr_ndx--;
    assert(array->op_arr_ndx >= 0);
}
195 196 197 198
/* removes the next operation from the array */
static void dumpi_remove_next_op(void *mpi_op_array, struct codes_workload_op *mpi_op,
                                      double last_op_time)
{
199 200
    (void)last_op_time;

201 202 203 204 205 206 207 208 209 210
	dumpi_op_data_array *array = (dumpi_op_data_array*)mpi_op_array;
	//printf("\n op array index %d array count %d ", array->op_arr_ndx, array->op_arr_cnt);
	if (array->op_arr_ndx == array->op_arr_cnt)
	 {
		mpi_op->op_type = CODES_WK_END;
	 }
	else
	{
		struct codes_workload_op *tmp = &(array->op_array[array->op_arr_ndx]);
		*mpi_op = *tmp;
211
        array->op_arr_ndx++;
212
	}
213
	/*if(mpi_op->op_type == CODES_WK_END)
214 215 216
	{
		free(array->op_array);
		free(array);
217
	}*/
218 219
}

220 221 222 223 224 225 226 227 228 229
/* check for initialization and normalize reported time */
static inline void check_set_init_time(const dumpi_time *t, rank_mpi_context * my_ctx)
{
    if (!my_ctx->is_init) {
        my_ctx->is_init = 1;
        my_ctx->init_time = time_to_ns_lf(t->start);
        my_ctx->last_op_time = time_to_ns_lf(t->stop) - my_ctx->init_time;
    }
}

230 231 232
/* introduce delay between operations: delay is the compute time NOT spent in MPI operations*/
void update_compute_time(const dumpi_time* time, rank_mpi_context* my_ctx)
{
233 234
    double start = time_to_ns_lf(time->start) - my_ctx->init_time;
    double stop = time_to_ns_lf(time->stop) - my_ctx->init_time;
235
    if((start - my_ctx->last_op_time) > DUMPI_IGNORE_DELAY)
236
    {
237 238 239 240 241 242 243
        struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_DELAY;
        wrkld_per_rank.start_time = my_ctx->last_op_time;
        wrkld_per_rank.end_time = start;
        wrkld_per_rank.u.delay.seconds = (start - my_ctx->last_op_time) / 1e9;
        dumpi_insert_next_op(my_ctx->dumpi_mpi_array, &wrkld_per_rank); 
244
    }
245
    my_ctx->last_op_time = stop;
246 247
}

248 249 250 251 252 253 254 255
static int handleDUMPIInit(
        const dumpi_init *prm,
        uint16_t thread,
        const dumpi_time *cpu,
        const dumpi_time *wall,
        const dumpi_perfinfo *perf,
        void *uarg)
{
256 257 258 259 260 261
    (void)prm;
    (void)thread;
    (void)cpu;
    (void)wall;
    (void)perf;

262
    rank_mpi_context *myctx = (rank_mpi_context*)uarg;
263
    check_set_init_time(wall, myctx);
264 265 266
    return 0;
}

267 268
int handleDUMPIError(const void* prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *uarg)
{
269 270 271 272 273 274 275
    (void)prm;
    (void)thread;
    (void)cpu;
    (void)wall;
    (void)perf;
    (void)uarg;

276 277 278
    tw_error(TW_LOC, "\n MPI operation not supported by the MPI-Sim Layer ");
}

279
int handleDUMPIIgnore(const void* prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *uarg)
280
{
281 282 283 284 285 286 287
    (void)prm;
    (void)thread;
    (void)cpu;
    (void)wall;
    (void)perf;
	
    rank_mpi_context* myctx = (rank_mpi_context*)uarg;
288

289
    check_set_init_time(wall, myctx);
290
	update_compute_time(wall, myctx);
291 292 293 294

	return 0;
}

295 296 297 298 299
static void update_times_and_insert(
        struct codes_workload_op *op,
        const dumpi_time *t,
        rank_mpi_context *ctx)
{
300
    check_set_init_time(t, ctx);
301 302 303
    op->start_time = time_to_ns_lf(t->start) - ctx->init_time;
    op->end_time = time_to_ns_lf(t->stop) - ctx->init_time;
    update_compute_time(t, ctx);
304
    dumpi_insert_next_op(ctx->dumpi_mpi_array, op);
305 306 307
}


308 309 310 311
int handleDUMPIWait(const dumpi_wait *prm, uint16_t thread,
                    const dumpi_time *cpu, const dumpi_time *wall,
                    const dumpi_perfinfo *perf, void *userarg)
{
312 313 314 315 316
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)perf;
        
317 318 319
        rank_mpi_context* myctx = (rank_mpi_context*)userarg;
        struct codes_workload_op wrkld_per_rank;

320
        wrkld_per_rank.op_type = CODES_WK_WAIT;
321 322
        wrkld_per_rank.u.wait.req_id = prm->request;

323 324
        update_times_and_insert(&wrkld_per_rank, wall, myctx);

325 326 327 328 329 330 331
        return 0;
}

int handleDUMPIWaitsome(const dumpi_waitsome *prm, uint16_t thread,
                    const dumpi_time *cpu, const dumpi_time *wall,
                    const dumpi_perfinfo *perf, void *userarg)
{
332 333 334 335 336
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
        
337 338 339 340
        int i;
        rank_mpi_context* myctx = (rank_mpi_context*)userarg;
        struct codes_workload_op wrkld_per_rank;

341
        wrkld_per_rank.op_type = CODES_WK_WAITSOME;
342
        wrkld_per_rank.u.waits.count = prm->count;
343
        wrkld_per_rank.u.waits.req_ids = (int32_t*)malloc(prm->count * sizeof(int32_t));
344 345

        for( i = 0; i < prm->count; i++ )
346
                wrkld_per_rank.u.waits.req_ids[i] = (int32_t)prm->requests[i];
347

348
        update_times_and_insert(&wrkld_per_rank, wall, myctx);
349 350 351 352 353 354 355
        return 0;
}

int handleDUMPIWaitany(const dumpi_waitany *prm, uint16_t thread,
                    const dumpi_time *cpu, const dumpi_time *wall,
                    const dumpi_perfinfo *perf, void *userarg)
{
356 357 358 359 360 361
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
        
362 363 364 365
        int i;
        rank_mpi_context* myctx = (rank_mpi_context*)userarg;
        struct codes_workload_op wrkld_per_rank;

366
        wrkld_per_rank.op_type = CODES_WK_WAITANY;
367
        wrkld_per_rank.u.waits.count = prm->count;
368
        wrkld_per_rank.u.waits.req_ids = (int32_t*)malloc(prm->count * sizeof(int32_t));
369 370

        for( i = 0; i < prm->count; i++ )
371
                wrkld_per_rank.u.waits.req_ids[i] = (int32_t)prm->requests[i];
372

373
        update_times_and_insert(&wrkld_per_rank, wall, myctx);
374 375 376 377 378 379 380
        return 0;
}

int handleDUMPIWaitall(const dumpi_waitall *prm, uint16_t thread,
                    const dumpi_time *cpu, const dumpi_time *wall,
                    const dumpi_perfinfo *perf, void *userarg)
{
381 382 383 384 385
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
386
        int i;
387
        
388 389 390
        rank_mpi_context* myctx = (rank_mpi_context*)userarg;
        struct codes_workload_op wrkld_per_rank;

391
        wrkld_per_rank.op_type = CODES_WK_WAITALL;
392 393

        wrkld_per_rank.u.waits.count = prm->count;
394
        wrkld_per_rank.u.waits.req_ids = (int32_t*)malloc(prm->count * sizeof(int32_t));
395 396 397
        for( i = 0; i < prm->count; i++ )
                wrkld_per_rank.u.waits.req_ids[i] = prm->requests[i];

398
        update_times_and_insert(&wrkld_per_rank, wall, myctx);
399 400 401
        return 0;
}

402 403
int handleDUMPIISend(const dumpi_isend *prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *userarg)
{
404 405 406 407 408 409 410 411
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
	
        rank_mpi_context* myctx = (rank_mpi_context*)userarg;
        struct codes_workload_op wrkld_per_rank;
412

413 414 415 416 417 418
        wrkld_per_rank.op_type = CODES_WK_ISEND;
        wrkld_per_rank.u.send.tag = prm->tag;
        wrkld_per_rank.u.send.count = prm->count;
        wrkld_per_rank.u.send.data_type = prm->datatype;
        wrkld_per_rank.u.send.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
        assert(wrkld_per_rank.u.send.num_bytes >= 0);
419
    	wrkld_per_rank.u.send.req_id = prm->request;
420 421
        wrkld_per_rank.u.send.dest_rank = prm->dest;
        wrkld_per_rank.u.send.source_rank = myctx->my_rank;
422 423

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
424 425
	
        return 0;
426 427 428 429
}

int handleDUMPIIRecv(const dumpi_irecv *prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *userarg)
{
430 431 432 433 434 435 436 437
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
	
        //printf("\n irecv source %d count %d data type %d", prm->source, prm->count, prm->datatype);
        rank_mpi_context* myctx = (rank_mpi_context*)userarg;
438 439 440
        struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_IRECV;
441 442 443
	    wrkld_per_rank.u.recv.data_type = prm->datatype;
	    wrkld_per_rank.u.recv.count = prm->count;
	    wrkld_per_rank.u.recv.tag = prm->tag;
444
        wrkld_per_rank.u.recv.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
445
	    
446
        assert(wrkld_per_rank.u.recv.num_bytes >= 0);
447 448
        wrkld_per_rank.u.recv.source_rank = prm->source;
        wrkld_per_rank.u.recv.dest_rank = -1;
449
	    wrkld_per_rank.u.recv.req_id = prm->request;
450 451

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
452 453 454 455 456 457 458
        return 0;
}

int handleDUMPISend(const dumpi_send *prm, uint16_t thread,
                      const dumpi_time *cpu, const dumpi_time *wall,
                      const dumpi_perfinfo *perf, void *uarg)
{
459 460 461 462 463 464 465
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
	    
        rank_mpi_context* myctx = (rank_mpi_context*)uarg;
466 467 468
        struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_SEND;
469
	    wrkld_per_rank.u.send.tag = prm->tag;
470 471
        wrkld_per_rank.u.send.count = prm->count;
        wrkld_per_rank.u.send.data_type = prm->datatype;
472
        wrkld_per_rank.u.send.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
473
	    assert(wrkld_per_rank.u.send.num_bytes >= 0);
474 475
        wrkld_per_rank.u.send.dest_rank = prm->dest;
        wrkld_per_rank.u.send.source_rank = myctx->my_rank;
476
         wrkld_per_rank.u.send.req_id = -1;
477

Misbah Mubarak's avatar
Misbah Mubarak committed
478 479
        
         update_times_and_insert(&wrkld_per_rank, wall, myctx);
480 481 482 483 484 485 486
        return 0;
}

int handleDUMPIRecv(const dumpi_recv *prm, uint16_t thread,
                      const dumpi_time *cpu, const dumpi_time *wall,
                      const dumpi_perfinfo *perf, void *uarg)
{
487 488 489 490 491 492
     (void)prm;
     (void)thread;
     (void)cpu;
     (void)wall;
     (void)perf;

493 494 495 496
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	struct codes_workload_op wrkld_per_rank;

	wrkld_per_rank.op_type = CODES_WK_RECV;
497 498 499
    wrkld_per_rank.u.recv.tag = prm->tag;
    wrkld_per_rank.u.recv.count = prm->count;
    wrkld_per_rank.u.recv.data_type = prm->datatype;
500
    wrkld_per_rank.u.recv.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
501
	assert(wrkld_per_rank.u.recv.num_bytes >= 0);
502 503
    wrkld_per_rank.u.recv.source_rank = prm->source;
    wrkld_per_rank.u.recv.dest_rank = -1;
504

505
	//printf("\n recv source %d count %d data type %d bytes %lld ", prm->source, prm->count, prm->datatype, wrkld_per_rank.u.recv.num_bytes);
506 507
    update_times_and_insert(&wrkld_per_rank, wall, myctx);
    return 0;
508 509 510

}

511 512 513 514
int handleDUMPISendrecv(const dumpi_sendrecv* prm, uint16_t thread,
			const dumpi_time *cpu, const dumpi_time *wall,
			const dumpi_perfinfo *perf, void *uarg)
{
515 516 517 518 519 520 521
     (void)prm;
     (void)thread;
     (void)cpu;
     (void)wall;
     (void)perf;
	
     rank_mpi_context* myctx = (rank_mpi_context*)uarg;
522

523
     /* Issue a non-blocking send */
524 525
	{
		struct codes_workload_op wrkld_per_rank;
526
		wrkld_per_rank.op_type = CODES_WK_ISEND;
527 528 529 530
		wrkld_per_rank.u.send.tag = prm->sendtag;
		wrkld_per_rank.u.send.count = prm->sendcount;
		wrkld_per_rank.u.send.data_type = prm->sendtype;
		wrkld_per_rank.u.send.num_bytes = prm->sendcount * get_num_bytes(myctx,prm->sendtype);
531

Misbah Mubarak's avatar
Misbah Mubarak committed
532 533
		
        assert(wrkld_per_rank.u.send.num_bytes >= 0);
534 535
		wrkld_per_rank.u.send.dest_rank = prm->dest;
		wrkld_per_rank.u.send.source_rank = myctx->my_rank;
536
		wrkld_per_rank.u.send.req_id = myctx->num_reqs;
537
		update_times_and_insert(&wrkld_per_rank, wall, myctx);
538

539 540
	}

541
    /* issue a blocking receive */
542 543 544 545 546 547 548
	{
		struct codes_workload_op wrkld_per_rank;
		wrkld_per_rank.op_type = CODES_WK_RECV;
		wrkld_per_rank.u.recv.tag = prm->recvtag;
		wrkld_per_rank.u.recv.count = prm->recvcount;
		wrkld_per_rank.u.recv.data_type = prm->recvtype;
		wrkld_per_rank.u.recv.num_bytes = prm->recvcount * get_num_bytes(myctx,prm->recvtype);
Misbah Mubarak's avatar
Misbah Mubarak committed
549 550

        assert(wrkld_per_rank.u.recv.num_bytes >= 0);
551 552 553 554
		wrkld_per_rank.u.recv.source_rank = prm->source;
		wrkld_per_rank.u.recv.dest_rank = -1;
		update_times_and_insert(&wrkld_per_rank, wall, myctx);
	}
555 556 557 558 559 560 561 562 563 564 565 566 567
    
    /* Issue a wait operation */
    {
        struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_WAIT;
        wrkld_per_rank.u.wait.req_id = myctx->num_reqs;

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
    
        myctx->num_reqs++;
    }

568 569 570
	return 0;
}

571 572 573 574
int handleDUMPIBcast(const dumpi_bcast *prm, uint16_t thread,
                       const dumpi_time *cpu, const dumpi_time *wall,
                       const dumpi_perfinfo *perf, void *uarg)
{
575 576 577 578 579 580 581
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
        rank_mpi_context* myctx = (rank_mpi_context*)uarg;
        struct codes_workload_op wrkld_per_rank;
582 583

        wrkld_per_rank.op_type = CODES_WK_BCAST;
584
        wrkld_per_rank.u.collective.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
585
	    assert(wrkld_per_rank.u.collective.num_bytes >= 0);
586 587

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
588 589 590 591 592 593 594
        return 0;
}

int handleDUMPIAllgather(const dumpi_allgather *prm, uint16_t thread,
                           const dumpi_time *cpu, const dumpi_time *wall,
                           const dumpi_perfinfo *perf, void *uarg)
{
595 596 597 598 599
    (void)prm;
    (void)thread;
    (void)cpu;
    (void)wall;
    (void)perf;
600 601 602
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	struct codes_workload_op wrkld_per_rank;

603 604
    wrkld_per_rank.op_type = CODES_WK_ALLGATHER;
    wrkld_per_rank.u.collective.num_bytes = prm->sendcount * get_num_bytes(myctx,prm->sendtype);
605
	assert(wrkld_per_rank.u.collective.num_bytes > 0);
606

607 608
    update_times_and_insert(&wrkld_per_rank, wall, myctx);
    return 0;
609 610 611 612 613 614
}

int handleDUMPIAllgatherv(const dumpi_allgatherv *prm, uint16_t thread,
                            const dumpi_time *cpu, const dumpi_time *wall,
                            const dumpi_perfinfo *perf, void *uarg)
{
615 616 617 618 619 620 621
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
	    rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	    struct codes_workload_op wrkld_per_rank;
622 623

        wrkld_per_rank.op_type = CODES_WK_ALLGATHERV;
624
        wrkld_per_rank.u.collective.num_bytes = prm->sendcount * get_num_bytes(myctx,prm->sendtype);
625
	    assert(wrkld_per_rank.u.collective.num_bytes > 0);
626 627

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
628 629 630 631 632 633 634
        return 0;
}

int handleDUMPIAlltoall(const dumpi_alltoall *prm, uint16_t thread,
                          const dumpi_time *cpu, const dumpi_time *wall,
                          const dumpi_perfinfo *perf, void *uarg)
{
635 636 637 638 639 640 641
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
	    rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	    struct codes_workload_op wrkld_per_rank;
642 643

        wrkld_per_rank.op_type = CODES_WK_ALLTOALL;
644
        wrkld_per_rank.u.collective.num_bytes = prm->sendcount * get_num_bytes(myctx,prm->sendtype);
645
	    assert(wrkld_per_rank.u.collective.num_bytes > 0);
646 647

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
648 649 650 651 652 653 654
        return 0;
}

int handleDUMPIAlltoallv(const dumpi_alltoallv *prm, uint16_t thread,
                           const dumpi_time *cpu, const dumpi_time *wall,
                           const dumpi_perfinfo *perf, void *uarg)
{
655 656 657 658 659 660 661 662
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
	
        rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	    struct codes_workload_op wrkld_per_rank;
663 664

        wrkld_per_rank.op_type = CODES_WK_ALLTOALLV;
665
        wrkld_per_rank.u.collective.num_bytes = prm->sendcounts[0] * get_num_bytes(myctx,prm->sendtype);
666
	    assert(wrkld_per_rank.u.collective.num_bytes > 0);
667 668

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
669 670 671 672 673 674 675
        return 0;
}

int handleDUMPIReduce(const dumpi_reduce *prm, uint16_t thread,
                        const dumpi_time *cpu, const dumpi_time *wall,
                        const dumpi_perfinfo *perf, void *uarg)
{
676 677 678 679 680 681 682 683
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
	
        rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	    struct codes_workload_op wrkld_per_rank;
684 685

        wrkld_per_rank.op_type = CODES_WK_REDUCE;
686
        wrkld_per_rank.u.collective.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
687
	    assert(wrkld_per_rank.u.collective.num_bytes > 0);
688 689

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
690 691 692 693 694 695 696
        return 0;
}

int handleDUMPIAllreduce(const dumpi_allreduce *prm, uint16_t thread,
                           const dumpi_time *cpu, const dumpi_time *wall,
                           const dumpi_perfinfo *perf, void *uarg)
{
697 698 699 700 701 702 703 704
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
	
        rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	    struct codes_workload_op wrkld_per_rank;
705 706

        wrkld_per_rank.op_type = CODES_WK_ALLREDUCE;
707
        wrkld_per_rank.u.collective.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
708
	    assert(wrkld_per_rank.u.collective.num_bytes > 0);
709 710

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
711 712 713 714 715
        return 0;
}

int handleDUMPIFinalize(const dumpi_finalize *prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *uarg)
{
716 717 718 719 720 721 722 723
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
	
        rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	    struct codes_workload_op wrkld_per_rank;
724 725

        wrkld_per_rank.op_type = CODES_WK_END;
726 727

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
728 729 730
        return 0;
}

731 732
int handleDUMPIReqFree(const dumpi_request_free *prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *userarg)
{
733 734 735 736 737 738 739 740
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
    
        rank_mpi_context* myctx = (rank_mpi_context*)userarg;
        struct codes_workload_op wrkld_per_rank;
741

742 743
        wrkld_per_rank.op_type = CODES_WK_REQ_FREE;
        wrkld_per_rank.u.free.req_id = prm->request;
744

745 746
        update_times_and_insert(&wrkld_per_rank, wall, myctx);
        return 0;
747 748
}

749 750
static int hash_rank_compare(void *key, struct qhash_head *link)
{
751
    rank_mpi_compare *in = key;
752 753 754
    rank_mpi_context *tmp;

    tmp = qhash_entry(link, rank_mpi_context, hash_link);
755
    if (tmp->my_rank == in->rank && tmp->my_app_id == in->app)
756 757 758 759
        return 1;
    return 0;
}

760
int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank)
761 762 763
{
	libundumpi_callbacks callbacks;
	libundumpi_cbpair callarr[DUMPI_END_OF_STREAM];
764 765 766 767
#ifdef ENABLE_CORTEX
	libundumpi_cbpair transarr[DUMPI_END_OF_STREAM];
#endif
	PROFILE_TYPE profile;
768
	dumpi_trace_params* dumpi_params = (dumpi_trace_params*)params;
769
	char file_name[MAX_LENGTH_FILE];
770 771 772 773 774 775

	if(rank >= dumpi_params->num_net_traces)
		return -1;

	if(!rank_tbl)
    	{
776
            rank_tbl = qhash_init(hash_rank_compare, quickhash_64bit_hash, RANK_HASH_TABLE_SIZE);
777 778 779 780 781 782 783 784
            if(!rank_tbl)
                  return -1;
    	}
	
	rank_mpi_context *my_ctx;
	my_ctx = malloc(sizeof(rank_mpi_context));
	assert(my_ctx);
	my_ctx->my_rank = rank;
785
    my_ctx->my_app_id = app_id;
786
	my_ctx->last_op_time = 0.0;
787 788
    my_ctx->is_init = 0;
    my_ctx->num_reqs = 0;
789
	my_ctx->dumpi_mpi_array = dumpi_init_op_data();
790
    my_ctx->num_ops = 0;
791 792 793 794 795 796 797 798 799

	if(rank < 10)
            sprintf(file_name, "%s000%d.bin", dumpi_params->file_name, rank);
         else if(rank >=10 && rank < 100)
            sprintf(file_name, "%s00%d.bin", dumpi_params->file_name, rank);
           else if(rank >=100 && rank < 1000)
             sprintf(file_name, "%s0%d.bin", dumpi_params->file_name, rank);
             else
              sprintf(file_name, "%s%d.bin", dumpi_params->file_name, rank);
800
#ifdef ENABLE_CORTEX
801 802 803 804 805
	if(strcmp(dumpi_params->file_name,"none") == 0) {
		profile = cortex_undumpi_open(NULL, app_id, dumpi_params->num_net_traces, rank);
	} else {
		profile = cortex_undumpi_open(file_name, app_id, dumpi_params->num_net_traces, rank);
	}
806
	
Matthieu Dorier's avatar
Matthieu Dorier committed
807 808 809 810 811 812 813 814 815 816 817 818 819 820 821
	{ int i;
	for(i=0; i < dumpi_params->num_net_traces; i++) {
		struct codes_jobmap_id id = {
			.job = app_id,
			.rank = i
		};
		uint32_t cn_id;
		if(jobmap_ctx) {
			cn_id = codes_jobmap_to_global_id(id, jobmap_ctx);
		} else {
			cn_id = i;
		}
		cortex_placement_set(profile, i, cn_id);
	}
	}
822
	
823
	cortex_topology_set(profile,&model_net_topology);
824
#else
825
	profile =  undumpi_open(file_name);
826
#endif
827
        my_ctx->profile = profile;
828 829 830 831 832 833 834
        if(NULL == profile) {
                printf("Error: unable to open DUMPI trace: %s", file_name);
                exit(-1);
        }
	
	memset(&callbacks, 0, sizeof(libundumpi_callbacks));
        memset(&callarr, 0, sizeof(libundumpi_cbpair) * DUMPI_END_OF_STREAM);
835 836 837
#ifdef ENABLE_CORTEX
	memset(&transarr, 0, sizeof(libundumpi_cbpair) * DUMPI_END_OF_STREAM);
#endif
838 839

	/* handle MPI function calls */	        
840
        callbacks.on_init = handleDUMPIInit;
841 842 843 844 845 846
	callbacks.on_send = (dumpi_send_call)handleDUMPISend;
        callbacks.on_recv = (dumpi_recv_call)handleDUMPIRecv;
        callbacks.on_isend = (dumpi_isend_call)handleDUMPIISend;
        callbacks.on_irecv = (dumpi_irecv_call)handleDUMPIIRecv;
        callbacks.on_allreduce = (dumpi_allreduce_call)handleDUMPIAllreduce;
	callbacks.on_bcast = (dumpi_bcast_call)handleDUMPIBcast;
847 848 849 850 851 852 853 854 855
	callbacks.on_get_count = (dumpi_get_count_call)handleDUMPIIgnore;
	callbacks.on_bsend = (dumpi_bsend_call)handleDUMPIIgnore;
	callbacks.on_ssend = (dumpi_ssend_call)handleDUMPIIgnore;
	callbacks.on_rsend = (dumpi_rsend_call)handleDUMPIIgnore;
	callbacks.on_buffer_attach = (dumpi_buffer_attach_call)handleDUMPIIgnore;
	callbacks.on_buffer_detach = (dumpi_buffer_detach_call)handleDUMPIIgnore;
	callbacks.on_ibsend = (dumpi_ibsend_call)handleDUMPIIgnore;
	callbacks.on_issend = (dumpi_issend_call)handleDUMPIIgnore;
	callbacks.on_irsend = (dumpi_irsend_call)handleDUMPIIgnore;
856
	callbacks.on_wait = (dumpi_wait_call)handleDUMPIWait;
857
	callbacks.on_test = (dumpi_test_call)handleDUMPIIgnore;
858
	callbacks.on_request_free = (dumpi_request_free_call)handleDUMPIReqFree;
859
	callbacks.on_waitany = (dumpi_waitany_call)handleDUMPIWaitany;
860
	callbacks.on_testany = (dumpi_testany_call)handleDUMPIIgnore;
861
	callbacks.on_waitall = (dumpi_waitall_call)handleDUMPIWaitall;
862
	callbacks.on_testall = (dumpi_testall_call)handleDUMPIIgnore;
863
	callbacks.on_waitsome = (dumpi_waitsome_call)handleDUMPIWaitsome;
864 865 866 867 868 869 870 871 872 873 874 875
	callbacks.on_testsome = (dumpi_testsome_call)handleDUMPIIgnore;
	callbacks.on_iprobe = (dumpi_iprobe_call)handleDUMPIIgnore;
	callbacks.on_probe = (dumpi_probe_call)handleDUMPIIgnore;
	callbacks.on_cancel = (dumpi_cancel_call)handleDUMPIIgnore;
	callbacks.on_test_cancelled = (dumpi_test_cancelled_call)handleDUMPIIgnore;
	callbacks.on_send_init = (dumpi_send_init_call)handleDUMPIIgnore;
	callbacks.on_bsend_init = (dumpi_bsend_init_call)handleDUMPIIgnore;
	callbacks.on_ssend_init = (dumpi_ssend_init_call)handleDUMPIIgnore;
	callbacks.on_rsend_init = (dumpi_rsend_init_call)handleDUMPIIgnore;
	callbacks.on_recv_init = (dumpi_recv_init_call)handleDUMPIIgnore;
	callbacks.on_start = (dumpi_start_call)handleDUMPIIgnore;
	callbacks.on_startall = (dumpi_startall_call)handleDUMPIIgnore;
876
	callbacks.on_sendrecv = (dumpi_sendrecv_call)handleDUMPISendrecv;
877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895
	callbacks.on_sendrecv_replace = (dumpi_sendrecv_replace_call)handleDUMPIIgnore;
	callbacks.on_type_contiguous = (dumpi_type_contiguous_call)handleDUMPIIgnore;
	callbacks.on_barrier = (dumpi_barrier_call)handleDUMPIIgnore;
        callbacks.on_gather = (dumpi_gather_call)handleDUMPIIgnore;
        callbacks.on_gatherv = (dumpi_gatherv_call)handleDUMPIIgnore;
        callbacks.on_scatter = (dumpi_scatter_call)handleDUMPIIgnore;
        callbacks.on_scatterv = (dumpi_scatterv_call)handleDUMPIIgnore;
        callbacks.on_allgather = (dumpi_allgather_call)handleDUMPIIgnore;
        callbacks.on_allgatherv = (dumpi_allgatherv_call)handleDUMPIIgnore;
        callbacks.on_alltoall = (dumpi_alltoall_call)handleDUMPIIgnore;
        callbacks.on_alltoallv = (dumpi_alltoallv_call)handleDUMPIIgnore;
        callbacks.on_alltoallw = (dumpi_alltoallw_call)handleDUMPIIgnore;
        callbacks.on_reduce = (dumpi_reduce_call)handleDUMPIIgnore;
        callbacks.on_reduce_scatter = (dumpi_reduce_scatter_call)handleDUMPIIgnore;
        callbacks.on_group_size = (dumpi_group_size_call)handleDUMPIIgnore;
        callbacks.on_group_rank = (dumpi_group_rank_call)handleDUMPIIgnore;
        callbacks.on_comm_size = (dumpi_comm_size_call)handleDUMPIIgnore;
        callbacks.on_comm_rank = (dumpi_comm_rank_call)handleDUMPIIgnore;
        callbacks.on_comm_get_attr = (dumpi_comm_get_attr_call)handleDUMPIIgnore;
896 897
        callbacks.on_comm_dup = (dumpi_comm_dup_call)handleDUMPIError;
        callbacks.on_comm_create = (dumpi_comm_create_call)handleDUMPIError;
898
        callbacks.on_wtime = (dumpi_wtime_call)handleDUMPIIgnore;
899 900 901 902
        callbacks.on_finalize = (dumpi_finalize_call)handleDUMPIFinalize;

        libundumpi_populate_callbacks(&callbacks, callarr);

903
#ifdef ENABLE_CORTEX
904
#ifdef ENABLE_CORTEX_PYTHON
905 906 907 908 909
	if(dumpi_params->cortex_script[0] != 0) {
		libundumpi_populate_callbacks(CORTEX_PYTHON_TRANSLATION, transarr);
	} else {
		libundumpi_populate_callbacks(CORTEX_MPICH_TRANSLATION, transarr);
	}
910 911 912
#else
	libundumpi_populate_callbacks(CORTEX_MPICH_TRANSLATION, transarr);
#endif
913 914
#endif
        DUMPI_START_STREAM_READ(profile);
915 916
        //dumpi_header* trace_header = undumpi_read_header(profile);
        //dumpi_free_header(trace_header);
917

918
#ifdef ENABLE_CORTEX_PYTHON
919 920 921 922 923 924 925 926 927 928
	if(dumpi_params->cortex_script[0] != 0) {
		if(dumpi_params->cortex_class[0] != 0) {
			cortex_python_set_module(dumpi_params->cortex_script, dumpi_params->cortex_class);
		} else {
			cortex_python_set_module(dumpi_params->cortex_script, NULL);
		}
		if(dumpi_params->cortex_gen[0] != 0) {
			cortex_python_call_generator(profile, dumpi_params->cortex_gen);
		}
	}
929 930
#endif

931 932 933 934 935 936
        int finalize_reached = 0;
        int active = 1;
        int num_calls = 0;
        while(active && !finalize_reached)
        {
           num_calls++;
937
           my_ctx->num_ops++;
938
#ifdef ENABLE_CORTEX
939 940 941 942 943 944 945 946 947 948 949 950
           if(my_ctx->num_ops < max_threshold)
	        active = cortex_undumpi_read_single_call(profile, callarr, transarr, (void*)my_ctx, &finalize_reached);
           else
           {
                struct codes_workload_op op;
                op.op_type = CODES_WK_END;

                op.start_time = my_ctx->last_op_time;
                op.end_time = my_ctx->last_op_time + 1;
                dumpi_insert_next_op(my_ctx->dumpi_mpi_array, &op);
                break;
           }
951
#else
952
           active = undumpi_read_single_call(profile, callarr, (void*)my_ctx, &finalize_reached);
953
#endif
954
        }
955
	UNDUMPI_CLOSE(profile);
956 957
	dumpi_finalize_mpi_op_data(my_ctx->dumpi_mpi_array);
	/* add this rank context to hash table */	
958 959 960 961
        rank_mpi_compare cmp;
        cmp.app = my_ctx->my_app_id;
        cmp.rank = my_ctx->my_rank;
	qhash_add(rank_tbl, &cmp, &(my_ctx->hash_link));
962 963 964 965
	rank_tbl_pop++;

	return 0;
}
966 967 968
/* Data types are for 64-bit archs. Source:
 * https://www.tutorialspoint.com/cprogramming/c_data_types.htm 
 * */
969
static uint64_t get_num_bytes(rank_mpi_context* myctx, dumpi_datatype dt)
970
{
971 972
    (void)myctx;

973 974 975
#ifdef ENABLE_CORTEX
   return cortex_datatype_get_size(myctx->profile,dt);
#endif
976 977 978 979
   switch(dt)
   {
	case DUMPI_DATATYPE_ERROR:
	case DUMPI_DATATYPE_NULL:
980
		tw_error(TW_LOC, "\n data type error");
981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000
	break;

	case DUMPI_CHAR:
	case DUMPI_UNSIGNED_CHAR:
	case DUMPI_SIGNED_CHAR:
	case DUMPI_BYTE:
		return 1; /* 1 byte for char */
	break;

	case DUMPI_WCHAR:
		return 4; /* 4 bytes for a 64-bit version */
	break;

	case DUMPI_SHORT:
	case DUMPI_SHORT_INT:
	case DUMPI_UNSIGNED_SHORT:
		return 2;
	break;

	 case DUMPI_INT:
1001 1002 1003
		return 4;
	 break;