codes-dumpi-trace-nw-wrkld.c 31.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (C) 2014 University of Chicago
 * See COPYRIGHT notice in top-level directory.
 *
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mpi.h>
#include <ross.h>
#include <assert.h>
#include "dumpi/libundumpi/bindings.h"
#include "dumpi/libundumpi/libundumpi.h"
#include "codes/codes-workload.h"
#include "codes/quickhash.h"

18 19
#if ENABLE_CORTEX
#include <cortex/cortex.h>
20
#include <cortex/datatype.h>
21 22
#include <cortex/cortex-mpich.h>
#ifdef ENABLE_CORTEX_PYTHON
23
#include <cortex/cortex-python.h>
24
#endif
25
#define PROFILE_TYPE cortex_dumpi_profile*
26
//#define UNDUMPI_OPEN cortex_undumpi_open
27 28 29 30
#define DUMPI_START_STREAM_READ cortex_dumpi_start_stream_read
#define UNDUMPI_CLOSE cortex_undumpi_close
#else
#define PROFILE_TYPE dumpi_profile*
31
//#define UNDUMPI_OPEN undumpi_open
32 33 34 35
#define DUMPI_START_STREAM_READ dumpi_start_stream_read
#define UNDUMPI_CLOSE undumpi_close
#endif

36 37 38
#define MAX_LENGTH 512
#define MAX_OPERATIONS 32768
#define DUMPI_IGNORE_DELAY 100
39
#define RANK_HASH_TABLE_SIZE 400
40 41 42 43 44 45 46

static struct qhash_table *rank_tbl = NULL;
static int rank_tbl_pop = 0;

/* context of the MPI workload */
typedef struct rank_mpi_context
{
47
    PROFILE_TYPE profile;
48
    int my_app_id;
49 50
    // whether we've seen an init op (needed for timing correctness)
    int is_init;
51 52
    int64_t my_rank;
    double last_op_time;
53
    double init_time;
54 55
    void* dumpi_mpi_array;	
    struct qhash_head hash_link;
56 57
    
    struct rc_stack * completed_ctx;
58 59
} rank_mpi_context;

60 61 62 63 64 65
typedef struct rank_mpi_compare
{
    int app;
    int rank;
} rank_mpi_compare;

66 67 68 69 70 71 72 73
/* Holds all the data about MPI operations from the log */
typedef struct dumpi_op_data_array
{
	struct codes_workload_op* op_array;
        int64_t op_arr_ndx;
        int64_t op_arr_cnt;
} dumpi_op_data_array;

74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
/* timing utilities */

#ifdef __GNUC__
__attribute__((unused))
#endif
static dumpi_clock timediff(
        dumpi_clock end,
        dumpi_clock start)
{
    dumpi_clock temp;
    if ((end.nsec-start.nsec)<0) {
        temp.sec = end.sec-start.sec-1;
        temp.nsec = 1000000000+end.nsec-start.nsec;
    } else {
        temp.sec = end.sec-start.sec;
        temp.nsec = end.nsec-start.nsec;
    }
    return temp;
}

static inline double time_to_ms_lf(dumpi_clock t){
        return (double) t.sec * 1e3 + (double) t.nsec / 1e6;
}
static inline double time_to_us_lf(dumpi_clock t){
        return (double) t.sec * 1e6 + (double) t.nsec / 1e3;
}
static inline double time_to_ns_lf(dumpi_clock t){
        return (double) t.sec * 1e9 + (double) t.nsec;
}
static inline double time_to_s_lf(dumpi_clock t){
        return (double) t.sec + (double) t.nsec / 1e9;
}

107
/* load the trace */
108
static int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank);
109 110

/* dumpi implementation of get next operation in the workload */
111
static void dumpi_trace_nw_workload_get_next(int app_id, int rank, struct codes_workload_op *op);
112 113

/* get number of bytes from the workload data type and count */
114
static int64_t get_num_bytes(rank_mpi_context* my_ctx, dumpi_datatype dt);
115 116

/* computes the delay between MPI operations */
117
static void update_compute_time(const dumpi_time* time, rank_mpi_context* my_ctx);
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179

/* initializes the data structures */
static void* dumpi_init_op_data();

/* removes next operations from the dynamic array */
static void dumpi_remove_next_op(void *mpi_op_array, struct codes_workload_op *mpi_op,
                                      double last_op_time);

/* resets the counters for the dynamic array once the workload is completely loaded*/
static void dumpi_finalize_mpi_op_data(void *mpi_op_array);

/* insert next operation */
static void dumpi_insert_next_op(void *mpi_op_array, struct codes_workload_op *mpi_op);

/* initialize the array data structure */
static void* dumpi_init_op_data()
{
	dumpi_op_data_array* tmp;
	
	tmp = malloc(sizeof(dumpi_op_data_array));
	assert(tmp);
	tmp->op_array = malloc(MAX_OPERATIONS * sizeof(struct codes_workload_op));
	assert(tmp->op_array);
        tmp->op_arr_ndx = 0;
	tmp->op_arr_cnt = MAX_OPERATIONS;

	return (void *)tmp;	
}

/* inserts next operation in the array */
static void dumpi_insert_next_op(void *mpi_op_array, struct codes_workload_op *mpi_op)
{
	dumpi_op_data_array *array = (dumpi_op_data_array*)mpi_op_array;
	struct codes_workload_op *tmp;

	/*check if array is full.*/
	if (array->op_arr_ndx == array->op_arr_cnt)
	{
		tmp = malloc((array->op_arr_cnt + MAX_OPERATIONS) * sizeof(struct codes_workload_op));
		assert(tmp);
		memcpy(tmp, array->op_array, array->op_arr_cnt * sizeof(struct codes_workload_op));
		free(array->op_array);
	        array->op_array = tmp;
	        array->op_arr_cnt += MAX_OPERATIONS;
	}

	/* add the MPI operation to the op array */
	array->op_array[array->op_arr_ndx] = *mpi_op;
	//printf("\n insert time %f end time %f ", array->op_array[array->op_arr_ndx].start_time, array->op_array[array->op_arr_ndx].end_time);
	array->op_arr_ndx++;
	return;
}

/* resets the counters after file is fully loaded */
static void dumpi_finalize_mpi_op_data(void *mpi_op_array)
{
	struct dumpi_op_data_array* array = (struct dumpi_op_data_array*)mpi_op_array;

	array->op_arr_cnt = array->op_arr_ndx;	
	array->op_arr_ndx = 0;
}

180 181 182 183 184 185 186
/* rolls back to previous index */
static void dumpi_roll_back_prev_op(void * mpi_op_array)
{
    dumpi_op_data_array *array = (dumpi_op_data_array*)mpi_op_array;
    array->op_arr_ndx--;
    assert(array->op_arr_ndx >= 0);
}
187 188 189 190 191 192 193 194 195 196 197 198 199 200
/* removes the next operation from the array */
static void dumpi_remove_next_op(void *mpi_op_array, struct codes_workload_op *mpi_op,
                                      double last_op_time)
{
	dumpi_op_data_array *array = (dumpi_op_data_array*)mpi_op_array;
	//printf("\n op array index %d array count %d ", array->op_arr_ndx, array->op_arr_cnt);
	if (array->op_arr_ndx == array->op_arr_cnt)
	 {
		mpi_op->op_type = CODES_WK_END;
	 }
	else
	{
		struct codes_workload_op *tmp = &(array->op_array[array->op_arr_ndx]);
		*mpi_op = *tmp;
201
        array->op_arr_ndx++;
202
	}
203
	/*if(mpi_op->op_type == CODES_WK_END)
204 205 206
	{
		free(array->op_array);
		free(array);
207
	}*/
208 209
}

210 211 212 213 214 215 216 217 218 219
/* check for initialization and normalize reported time */
static inline void check_set_init_time(const dumpi_time *t, rank_mpi_context * my_ctx)
{
    if (!my_ctx->is_init) {
        my_ctx->is_init = 1;
        my_ctx->init_time = time_to_ns_lf(t->start);
        my_ctx->last_op_time = time_to_ns_lf(t->stop) - my_ctx->init_time;
    }
}

220 221 222
/* introduce delay between operations: delay is the compute time NOT spent in MPI operations*/
void update_compute_time(const dumpi_time* time, rank_mpi_context* my_ctx)
{
223 224
    double start = time_to_ns_lf(time->start) - my_ctx->init_time;
    double stop = time_to_ns_lf(time->stop) - my_ctx->init_time;
225
    if((start - my_ctx->last_op_time) > DUMPI_IGNORE_DELAY)
226
    {
227 228 229 230 231 232 233
        struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_DELAY;
        wrkld_per_rank.start_time = my_ctx->last_op_time;
        wrkld_per_rank.end_time = start;
        wrkld_per_rank.u.delay.seconds = (start - my_ctx->last_op_time) / 1e9;
        dumpi_insert_next_op(my_ctx->dumpi_mpi_array, &wrkld_per_rank); 
234
    }
235
    my_ctx->last_op_time = stop;
236 237
}

238 239 240 241 242 243 244 245 246
static int handleDUMPIInit(
        const dumpi_init *prm,
        uint16_t thread,
        const dumpi_time *cpu,
        const dumpi_time *wall,
        const dumpi_perfinfo *perf,
        void *uarg)
{
    rank_mpi_context *myctx = (rank_mpi_context*)uarg;
247
    check_set_init_time(wall, myctx);
248 249 250
    return 0;
}

251 252 253 254 255
int handleDUMPIError(const void* prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *uarg)
{
    tw_error(TW_LOC, "\n MPI operation not supported by the MPI-Sim Layer ");
}

256
int handleDUMPIIgnore(const void* prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *uarg)
257 258 259
{
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;

260
        check_set_init_time(wall, myctx);
261
	update_compute_time(wall, myctx);
262 263 264 265

	return 0;
}

266 267 268 269 270
static void update_times_and_insert(
        struct codes_workload_op *op,
        const dumpi_time *t,
        rank_mpi_context *ctx)
{
271
    check_set_init_time(t, ctx);
272 273 274
    op->start_time = time_to_ns_lf(t->start) - ctx->init_time;
    op->end_time = time_to_ns_lf(t->stop) - ctx->init_time;
    update_compute_time(t, ctx);
275
    dumpi_insert_next_op(ctx->dumpi_mpi_array, op);
276 277 278
}


279 280 281 282 283 284 285
int handleDUMPIWait(const dumpi_wait *prm, uint16_t thread,
                    const dumpi_time *cpu, const dumpi_time *wall,
                    const dumpi_perfinfo *perf, void *userarg)
{
        rank_mpi_context* myctx = (rank_mpi_context*)userarg;
        struct codes_workload_op wrkld_per_rank;

286
        wrkld_per_rank.op_type = CODES_WK_WAIT;
287 288
        wrkld_per_rank.u.wait.req_id = prm->request;

289 290
        update_times_and_insert(&wrkld_per_rank, wall, myctx);

291 292 293 294 295 296 297 298 299 300 301
        return 0;
}

int handleDUMPIWaitsome(const dumpi_waitsome *prm, uint16_t thread,
                    const dumpi_time *cpu, const dumpi_time *wall,
                    const dumpi_perfinfo *perf, void *userarg)
{
        int i;
        rank_mpi_context* myctx = (rank_mpi_context*)userarg;
        struct codes_workload_op wrkld_per_rank;

302
        wrkld_per_rank.op_type = CODES_WK_WAITSOME;
303
        wrkld_per_rank.u.waits.count = prm->count;
304
        wrkld_per_rank.u.waits.req_ids = (int32_t*)malloc(prm->count * sizeof(int32_t));
305 306

        for( i = 0; i < prm->count; i++ )
307
                wrkld_per_rank.u.waits.req_ids[i] = (int32_t)prm->requests[i];
308

309
        update_times_and_insert(&wrkld_per_rank, wall, myctx);
310 311 312 313 314 315 316 317 318 319 320
        return 0;
}

int handleDUMPIWaitany(const dumpi_waitany *prm, uint16_t thread,
                    const dumpi_time *cpu, const dumpi_time *wall,
                    const dumpi_perfinfo *perf, void *userarg)
{
        int i;
        rank_mpi_context* myctx = (rank_mpi_context*)userarg;
        struct codes_workload_op wrkld_per_rank;

321
        wrkld_per_rank.op_type = CODES_WK_WAITANY;
322
        wrkld_per_rank.u.waits.count = prm->count;
323
        wrkld_per_rank.u.waits.req_ids = (int32_t*)malloc(prm->count * sizeof(int32_t));
324 325

        for( i = 0; i < prm->count; i++ )
326
                wrkld_per_rank.u.waits.req_ids[i] = (int32_t)prm->requests[i];
327

328
        update_times_and_insert(&wrkld_per_rank, wall, myctx);
329 330 331 332 333 334 335 336 337 338 339
        return 0;
}

int handleDUMPIWaitall(const dumpi_waitall *prm, uint16_t thread,
                    const dumpi_time *cpu, const dumpi_time *wall,
                    const dumpi_perfinfo *perf, void *userarg)
{
        int i;
        rank_mpi_context* myctx = (rank_mpi_context*)userarg;
        struct codes_workload_op wrkld_per_rank;

340
        wrkld_per_rank.op_type = CODES_WK_WAITALL;
341 342

        wrkld_per_rank.u.waits.count = prm->count;
343
        wrkld_per_rank.u.waits.req_ids = (int32_t*)malloc(prm->count * sizeof(int32_t));
344 345 346
        for( i = 0; i < prm->count; i++ )
                wrkld_per_rank.u.waits.req_ids[i] = prm->requests[i];

347
        update_times_and_insert(&wrkld_per_rank, wall, myctx);
348 349 350
        return 0;
}

351 352 353 354 355 356 357 358 359 360
int handleDUMPIISend(const dumpi_isend *prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *userarg)
{
	rank_mpi_context* myctx = (rank_mpi_context*)userarg;

	struct codes_workload_op wrkld_per_rank;

	wrkld_per_rank.op_type = CODES_WK_ISEND;
	wrkld_per_rank.u.send.tag = prm->tag;
	wrkld_per_rank.u.send.count = prm->count;
	wrkld_per_rank.u.send.data_type = prm->datatype;
361
    wrkld_per_rank.u.send.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
362
	assert(wrkld_per_rank.u.send.num_bytes >= 0);
363
    	wrkld_per_rank.u.send.req_id = prm->request;
364 365
        wrkld_per_rank.u.send.dest_rank = prm->dest;
        wrkld_per_rank.u.send.source_rank = myctx->my_rank;
366 367

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
368 369 370 371 372 373 374
	return 0;
}

int handleDUMPIIRecv(const dumpi_irecv *prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *userarg)
{
	//printf("\n irecv source %d count %d data type %d", prm->source, prm->count, prm->datatype);
	rank_mpi_context* myctx = (rank_mpi_context*)userarg;
375 376 377 378 379 380
        struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_IRECV;
	wrkld_per_rank.u.recv.data_type = prm->datatype;
	wrkld_per_rank.u.recv.count = prm->count;
	wrkld_per_rank.u.recv.tag = prm->tag;
381
        wrkld_per_rank.u.recv.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
382
	    
383
        assert(wrkld_per_rank.u.recv.num_bytes >= 0);
384 385 386 387 388
        wrkld_per_rank.u.recv.source_rank = prm->source;
        wrkld_per_rank.u.recv.dest_rank = -1;
	wrkld_per_rank.u.recv.req_id = prm->request;

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
389 390 391 392 393 394 395 396 397 398 399
        return 0;
}

int handleDUMPISend(const dumpi_send *prm, uint16_t thread,
                      const dumpi_time *cpu, const dumpi_time *wall,
                      const dumpi_perfinfo *perf, void *uarg)
{
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;
        struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_SEND;
400
	    wrkld_per_rank.u.send.tag = prm->tag;
401 402
        wrkld_per_rank.u.send.count = prm->count;
        wrkld_per_rank.u.send.data_type = prm->datatype;
403
        wrkld_per_rank.u.send.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
404
	    assert(wrkld_per_rank.u.send.num_bytes >= 0);
405 406
        wrkld_per_rank.u.send.dest_rank = prm->dest;
        wrkld_per_rank.u.send.source_rank = myctx->my_rank;
407
         wrkld_per_rank.u.send.req_id = -1;
408 409

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
410 411 412 413 414 415 416 417 418 419 420 421
        return 0;
}

int handleDUMPIRecv(const dumpi_recv *prm, uint16_t thread,
                      const dumpi_time *cpu, const dumpi_time *wall,
                      const dumpi_perfinfo *perf, void *uarg)
{
	//printf("\n irecv source %d count %d data type %d", prm->source, prm->count, prm->datatype);
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	struct codes_workload_op wrkld_per_rank;

	wrkld_per_rank.op_type = CODES_WK_RECV;
422 423 424
    wrkld_per_rank.u.recv.tag = prm->tag;
    wrkld_per_rank.u.recv.count = prm->count;
    wrkld_per_rank.u.recv.data_type = prm->datatype;
425
    wrkld_per_rank.u.recv.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
426
	assert(wrkld_per_rank.u.recv.num_bytes >= 0);
427 428 429
        wrkld_per_rank.u.recv.source_rank = prm->source;
        wrkld_per_rank.u.recv.dest_rank = -1;

430
        update_times_and_insert(&wrkld_per_rank, wall, myctx);
431 432 433 434
        return 0;

}

435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469
int handleDUMPISendrecv(const dumpi_sendrecv* prm, uint16_t thread,
			const dumpi_time *cpu, const dumpi_time *wall,
			const dumpi_perfinfo *perf, void *uarg)
{
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;

	{
		struct codes_workload_op wrkld_per_rank;
		wrkld_per_rank.op_type = CODES_WK_SEND;
		wrkld_per_rank.u.send.tag = prm->sendtag;
		wrkld_per_rank.u.send.count = prm->sendcount;
		wrkld_per_rank.u.send.data_type = prm->sendtype;
		wrkld_per_rank.u.send.num_bytes = prm->sendcount * get_num_bytes(myctx,prm->sendtype);
		assert(wrkld_per_rank.u.send.num_bytes >= 0);
		wrkld_per_rank.u.send.dest_rank = prm->dest;
		wrkld_per_rank.u.send.source_rank = myctx->my_rank;
		wrkld_per_rank.u.send.req_id = -1;
		update_times_and_insert(&wrkld_per_rank, wall, myctx);
	}

	{
		struct codes_workload_op wrkld_per_rank;
		wrkld_per_rank.op_type = CODES_WK_RECV;
		wrkld_per_rank.u.recv.tag = prm->recvtag;
		wrkld_per_rank.u.recv.count = prm->recvcount;
		wrkld_per_rank.u.recv.data_type = prm->recvtype;
		wrkld_per_rank.u.recv.num_bytes = prm->recvcount * get_num_bytes(myctx,prm->recvtype);
		assert(wrkld_per_rank.u.recv.num_bytes >= 0);
		wrkld_per_rank.u.recv.source_rank = prm->source;
		wrkld_per_rank.u.recv.dest_rank = -1;
		update_times_and_insert(&wrkld_per_rank, wall, myctx);
	}
	return 0;
}

470 471 472 473 474 475 476 477
int handleDUMPIBcast(const dumpi_bcast *prm, uint16_t thread,
                       const dumpi_time *cpu, const dumpi_time *wall,
                       const dumpi_perfinfo *perf, void *uarg)
{
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_BCAST;
478
        wrkld_per_rank.u.collective.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
479
	assert(wrkld_per_rank.u.collective.num_bytes >= 0);
480 481

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
482 483 484 485 486 487 488 489 490 491 492
        return 0;
}

int handleDUMPIAllgather(const dumpi_allgather *prm, uint16_t thread,
                           const dumpi_time *cpu, const dumpi_time *wall,
                           const dumpi_perfinfo *perf, void *uarg)
{
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_ALLGATHER;
493
        wrkld_per_rank.u.collective.num_bytes = prm->sendcount * get_num_bytes(myctx,prm->sendtype);
494
	assert(wrkld_per_rank.u.collective.num_bytes > 0);
495 496

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
497 498 499 500 501 502 503 504 505 506 507
        return 0;
}

int handleDUMPIAllgatherv(const dumpi_allgatherv *prm, uint16_t thread,
                            const dumpi_time *cpu, const dumpi_time *wall,
                            const dumpi_perfinfo *perf, void *uarg)
{
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_ALLGATHERV;
508
        wrkld_per_rank.u.collective.num_bytes = prm->sendcount * get_num_bytes(myctx,prm->sendtype);
509
	assert(wrkld_per_rank.u.collective.num_bytes > 0);
510 511

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
512 513 514 515 516 517 518 519 520 521 522
        return 0;
}

int handleDUMPIAlltoall(const dumpi_alltoall *prm, uint16_t thread,
                          const dumpi_time *cpu, const dumpi_time *wall,
                          const dumpi_perfinfo *perf, void *uarg)
{
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_ALLTOALL;
523
        wrkld_per_rank.u.collective.num_bytes = prm->sendcount * get_num_bytes(myctx,prm->sendtype);
524
	assert(wrkld_per_rank.u.collective.num_bytes > 0);
525 526

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
527 528 529 530 531 532 533 534 535 536 537
        return 0;
}

int handleDUMPIAlltoallv(const dumpi_alltoallv *prm, uint16_t thread,
                           const dumpi_time *cpu, const dumpi_time *wall,
                           const dumpi_perfinfo *perf, void *uarg)
{
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_ALLTOALLV;
538
        wrkld_per_rank.u.collective.num_bytes = prm->sendcounts[0] * get_num_bytes(myctx,prm->sendtype);
539
	assert(wrkld_per_rank.u.collective.num_bytes > 0);
540 541

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
542 543 544 545 546 547 548 549 550 551 552
        return 0;
}

int handleDUMPIReduce(const dumpi_reduce *prm, uint16_t thread,
                        const dumpi_time *cpu, const dumpi_time *wall,
                        const dumpi_perfinfo *perf, void *uarg)
{
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_REDUCE;
553
        wrkld_per_rank.u.collective.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
554
	assert(wrkld_per_rank.u.collective.num_bytes > 0);
555 556

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
557 558 559 560 561 562 563 564 565 566 567
        return 0;
}

int handleDUMPIAllreduce(const dumpi_allreduce *prm, uint16_t thread,
                           const dumpi_time *cpu, const dumpi_time *wall,
                           const dumpi_perfinfo *perf, void *uarg)
{
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_ALLREDUCE;
568
        wrkld_per_rank.u.collective.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
569
	assert(wrkld_per_rank.u.collective.num_bytes > 0);
570 571

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
572 573 574 575 576 577 578 579 580 581
        return 0;

}

int handleDUMPIFinalize(const dumpi_finalize *prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *uarg)
{
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_END;
582 583

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
584 585 586
        return 0;
}

587 588 589 590 591 592 593 594 595 596 597 598
int handleDUMPIReqFree(const dumpi_request_free *prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *userarg)
{
    rank_mpi_context* myctx = (rank_mpi_context*)userarg;
    struct codes_workload_op wrkld_per_rank;

    wrkld_per_rank.op_type = CODES_WK_REQ_FREE;
    wrkld_per_rank.u.free.req_id = prm->request;

    update_times_and_insert(&wrkld_per_rank, wall, myctx);
    return 0;
}

599 600
static int hash_rank_compare(void *key, struct qhash_head *link)
{
601
    rank_mpi_compare *in = key;
602 603 604
    rank_mpi_context *tmp;

    tmp = qhash_entry(link, rank_mpi_context, hash_link);
605
    if (tmp->my_rank == in->rank && tmp->my_app_id == in->app)
606 607 608 609
        return 1;
    return 0;
}

610
int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank)
611 612 613
{
	libundumpi_callbacks callbacks;
	libundumpi_cbpair callarr[DUMPI_END_OF_STREAM];
614 615 616 617
#ifdef ENABLE_CORTEX
	libundumpi_cbpair transarr[DUMPI_END_OF_STREAM];
#endif
	PROFILE_TYPE profile;
618 619 620 621 622 623 624 625
	dumpi_trace_params* dumpi_params = (dumpi_trace_params*)params;
	char file_name[MAX_LENGTH];

	if(rank >= dumpi_params->num_net_traces)
		return -1;

	if(!rank_tbl)
    	{
626
            rank_tbl = qhash_init(hash_rank_compare, quickhash_64bit_hash, RANK_HASH_TABLE_SIZE);
627 628 629 630 631 632 633 634
            if(!rank_tbl)
                  return -1;
    	}
	
	rank_mpi_context *my_ctx;
	my_ctx = malloc(sizeof(rank_mpi_context));
	assert(my_ctx);
	my_ctx->my_rank = rank;
635
        my_ctx->my_app_id = app_id;
636
	my_ctx->last_op_time = 0.0;
637
        my_ctx->is_init = 0;
638 639 640 641 642 643 644 645 646 647
	my_ctx->dumpi_mpi_array = dumpi_init_op_data();

	if(rank < 10)
            sprintf(file_name, "%s000%d.bin", dumpi_params->file_name, rank);
         else if(rank >=10 && rank < 100)
            sprintf(file_name, "%s00%d.bin", dumpi_params->file_name, rank);
           else if(rank >=100 && rank < 1000)
             sprintf(file_name, "%s0%d.bin", dumpi_params->file_name, rank);
             else
              sprintf(file_name, "%s%d.bin", dumpi_params->file_name, rank);
648
#ifdef ENABLE_CORTEX
649 650 651 652 653
	if(strcmp(dumpi_params->file_name,"none") == 0) {
		profile = cortex_undumpi_open(NULL, app_id, dumpi_params->num_net_traces, rank);
	} else {
		profile = cortex_undumpi_open(file_name, app_id, dumpi_params->num_net_traces, rank);
	}
654
#else
655
	profile =  undumpi_open(file_name);
656
#endif
657
        my_ctx->profile = profile;
658 659 660 661 662 663 664
        if(NULL == profile) {
                printf("Error: unable to open DUMPI trace: %s", file_name);
                exit(-1);
        }
	
	memset(&callbacks, 0, sizeof(libundumpi_callbacks));
        memset(&callarr, 0, sizeof(libundumpi_cbpair) * DUMPI_END_OF_STREAM);
665 666 667
#ifdef ENABLE_CORTEX
	memset(&transarr, 0, sizeof(libundumpi_cbpair) * DUMPI_END_OF_STREAM);
#endif
668 669

	/* handle MPI function calls */	        
670
        callbacks.on_init = handleDUMPIInit;
671 672 673 674 675 676
	callbacks.on_send = (dumpi_send_call)handleDUMPISend;
        callbacks.on_recv = (dumpi_recv_call)handleDUMPIRecv;
        callbacks.on_isend = (dumpi_isend_call)handleDUMPIISend;
        callbacks.on_irecv = (dumpi_irecv_call)handleDUMPIIRecv;
        callbacks.on_allreduce = (dumpi_allreduce_call)handleDUMPIAllreduce;
	callbacks.on_bcast = (dumpi_bcast_call)handleDUMPIBcast;
677 678 679 680 681 682 683 684 685
	callbacks.on_get_count = (dumpi_get_count_call)handleDUMPIIgnore;
	callbacks.on_bsend = (dumpi_bsend_call)handleDUMPIIgnore;
	callbacks.on_ssend = (dumpi_ssend_call)handleDUMPIIgnore;
	callbacks.on_rsend = (dumpi_rsend_call)handleDUMPIIgnore;
	callbacks.on_buffer_attach = (dumpi_buffer_attach_call)handleDUMPIIgnore;
	callbacks.on_buffer_detach = (dumpi_buffer_detach_call)handleDUMPIIgnore;
	callbacks.on_ibsend = (dumpi_ibsend_call)handleDUMPIIgnore;
	callbacks.on_issend = (dumpi_issend_call)handleDUMPIIgnore;
	callbacks.on_irsend = (dumpi_irsend_call)handleDUMPIIgnore;
686
	callbacks.on_wait = (dumpi_wait_call)handleDUMPIWait;
687
	callbacks.on_test = (dumpi_test_call)handleDUMPIIgnore;
688
	callbacks.on_request_free = (dumpi_request_free_call)handleDUMPIReqFree;
689
	callbacks.on_waitany = (dumpi_waitany_call)handleDUMPIWaitany;
690
	callbacks.on_testany = (dumpi_testany_call)handleDUMPIIgnore;
691
	callbacks.on_waitall = (dumpi_waitall_call)handleDUMPIWaitall;
692
	callbacks.on_testall = (dumpi_testall_call)handleDUMPIIgnore;
693
	callbacks.on_waitsome = (dumpi_waitsome_call)handleDUMPIWaitsome;
694 695 696 697 698 699 700 701 702 703 704 705
	callbacks.on_testsome = (dumpi_testsome_call)handleDUMPIIgnore;
	callbacks.on_iprobe = (dumpi_iprobe_call)handleDUMPIIgnore;
	callbacks.on_probe = (dumpi_probe_call)handleDUMPIIgnore;
	callbacks.on_cancel = (dumpi_cancel_call)handleDUMPIIgnore;
	callbacks.on_test_cancelled = (dumpi_test_cancelled_call)handleDUMPIIgnore;
	callbacks.on_send_init = (dumpi_send_init_call)handleDUMPIIgnore;
	callbacks.on_bsend_init = (dumpi_bsend_init_call)handleDUMPIIgnore;
	callbacks.on_ssend_init = (dumpi_ssend_init_call)handleDUMPIIgnore;
	callbacks.on_rsend_init = (dumpi_rsend_init_call)handleDUMPIIgnore;
	callbacks.on_recv_init = (dumpi_recv_init_call)handleDUMPIIgnore;
	callbacks.on_start = (dumpi_start_call)handleDUMPIIgnore;
	callbacks.on_startall = (dumpi_startall_call)handleDUMPIIgnore;
706
	callbacks.on_sendrecv = (dumpi_sendrecv_call)handleDUMPISendrecv;
707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725
	callbacks.on_sendrecv_replace = (dumpi_sendrecv_replace_call)handleDUMPIIgnore;
	callbacks.on_type_contiguous = (dumpi_type_contiguous_call)handleDUMPIIgnore;
	callbacks.on_barrier = (dumpi_barrier_call)handleDUMPIIgnore;
        callbacks.on_gather = (dumpi_gather_call)handleDUMPIIgnore;
        callbacks.on_gatherv = (dumpi_gatherv_call)handleDUMPIIgnore;
        callbacks.on_scatter = (dumpi_scatter_call)handleDUMPIIgnore;
        callbacks.on_scatterv = (dumpi_scatterv_call)handleDUMPIIgnore;
        callbacks.on_allgather = (dumpi_allgather_call)handleDUMPIIgnore;
        callbacks.on_allgatherv = (dumpi_allgatherv_call)handleDUMPIIgnore;
        callbacks.on_alltoall = (dumpi_alltoall_call)handleDUMPIIgnore;
        callbacks.on_alltoallv = (dumpi_alltoallv_call)handleDUMPIIgnore;
        callbacks.on_alltoallw = (dumpi_alltoallw_call)handleDUMPIIgnore;
        callbacks.on_reduce = (dumpi_reduce_call)handleDUMPIIgnore;
        callbacks.on_reduce_scatter = (dumpi_reduce_scatter_call)handleDUMPIIgnore;
        callbacks.on_group_size = (dumpi_group_size_call)handleDUMPIIgnore;
        callbacks.on_group_rank = (dumpi_group_rank_call)handleDUMPIIgnore;
        callbacks.on_comm_size = (dumpi_comm_size_call)handleDUMPIIgnore;
        callbacks.on_comm_rank = (dumpi_comm_rank_call)handleDUMPIIgnore;
        callbacks.on_comm_get_attr = (dumpi_comm_get_attr_call)handleDUMPIIgnore;
726 727
        callbacks.on_comm_dup = (dumpi_comm_dup_call)handleDUMPIError;
        callbacks.on_comm_create = (dumpi_comm_create_call)handleDUMPIError;
728
        callbacks.on_wtime = (dumpi_wtime_call)handleDUMPIIgnore;
729 730 731 732
        callbacks.on_finalize = (dumpi_finalize_call)handleDUMPIFinalize;

        libundumpi_populate_callbacks(&callbacks, callarr);

733
#ifdef ENABLE_CORTEX
734
#ifdef ENABLE_CORTEX_PYTHON
735 736 737 738 739
	if(dumpi_params->cortex_script[0] != 0) {
		libundumpi_populate_callbacks(CORTEX_PYTHON_TRANSLATION, transarr);
	} else {
		libundumpi_populate_callbacks(CORTEX_MPICH_TRANSLATION, transarr);
	}
740 741 742
#else
	libundumpi_populate_callbacks(CORTEX_MPICH_TRANSLATION, transarr);
#endif
743 744
#endif
        DUMPI_START_STREAM_READ(profile);
745 746
        //dumpi_header* trace_header = undumpi_read_header(profile);
        //dumpi_free_header(trace_header);
747

748
#ifdef ENABLE_CORTEX_PYTHON
749 750 751 752 753 754 755 756 757 758
	if(dumpi_params->cortex_script[0] != 0) {
		if(dumpi_params->cortex_class[0] != 0) {
			cortex_python_set_module(dumpi_params->cortex_script, dumpi_params->cortex_class);
		} else {
			cortex_python_set_module(dumpi_params->cortex_script, NULL);
		}
		if(dumpi_params->cortex_gen[0] != 0) {
			cortex_python_call_generator(profile, dumpi_params->cortex_gen);
		}
	}
759 760
#endif

761 762 763 764 765 766
        int finalize_reached = 0;
        int active = 1;
        int num_calls = 0;
        while(active && !finalize_reached)
        {
           num_calls++;
767 768 769
#ifdef ENABLE_CORTEX
	   active = cortex_undumpi_read_single_call(profile, callarr, transarr, (void*)my_ctx, &finalize_reached);
#else
770
           active = undumpi_read_single_call(profile, callarr, (void*)my_ctx, &finalize_reached);
771
#endif
772
        }
773
	UNDUMPI_CLOSE(profile);
774 775
	dumpi_finalize_mpi_op_data(my_ctx->dumpi_mpi_array);
	/* add this rank context to hash table */	
776 777 778 779
        rank_mpi_compare cmp;
        cmp.app = my_ctx->my_app_id;
        cmp.rank = my_ctx->my_rank;
	qhash_add(rank_tbl, &cmp, &(my_ctx->hash_link));
780 781 782 783 784
	rank_tbl_pop++;

	return 0;
}

785
static int64_t get_num_bytes(rank_mpi_context* myctx, dumpi_datatype dt)
786
{
787 788 789
#ifdef ENABLE_CORTEX
   return cortex_datatype_get_size(myctx->profile,dt);
#endif
790 791 792 793
   switch(dt)
   {
	case DUMPI_DATATYPE_ERROR:
	case DUMPI_DATATYPE_NULL:
794
		tw_error(TW_LOC, "\n data type error");
795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838
	break;

	case DUMPI_CHAR:
	case DUMPI_UNSIGNED_CHAR:
	case DUMPI_SIGNED_CHAR:
	case DUMPI_BYTE:
		return 1; /* 1 byte for char */
	break;

	case DUMPI_WCHAR:
		return 4; /* 4 bytes for a 64-bit version */
	break;

	case DUMPI_SHORT:
	case DUMPI_SHORT_INT:
	case DUMPI_UNSIGNED_SHORT:
		return 2;
	break;

	 case DUMPI_INT:
	 case DUMPI_UNSIGNED:
	 case DUMPI_FLOAT:
	 case DUMPI_FLOAT_INT:
		return 4;
	 break;

	case DUMPI_DOUBLE:
	case DUMPI_LONG:
	case DUMPI_LONG_INT:
	case DUMPI_UNSIGNED_LONG:
	case DUMPI_LONG_LONG_INT:
	case DUMPI_UNSIGNED_LONG_LONG:
	case DUMPI_LONG_LONG:
	case DUMPI_DOUBLE_INT:
		return 8;
	break;

	case DUMPI_LONG_DOUBLE:
	case DUMPI_LONG_DOUBLE_INT:
		return 16;
	break;
	
	default:
	  {
839
        tw_error(TW_LOC, "\n undefined data type");
840 841 842 843 844
		return 0;	
	  }	
   } 
}

845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860
void dumpi_trace_nw_workload_get_next_rc2(int app_id, int rank)
{
    rank_mpi_context* temp_data; 
    struct qhash_head *hash_link = NULL;  
    rank_mpi_compare cmp;  
    cmp.rank = rank;
    cmp.app = app_id;

    hash_link = qhash_search(rank_tbl, &cmp);

    assert(hash_link);
    temp_data = qhash_entry(hash_link, rank_mpi_context, hash_link); 
    assert(temp_data);

    dumpi_roll_back_prev_op(temp_data->dumpi_mpi_array);
}
861
void dumpi_trace_nw_workload_get_next(int app_id, int rank, struct codes_workload_op *op)
862 863 864
{
   rank_mpi_context* temp_data;
   struct qhash_head *hash_link = NULL;
865 866 867 868
   rank_mpi_compare cmp;
   cmp.rank = rank;
   cmp.app = app_id;
   hash_link = qhash_search(rank_tbl, &cmp);
869 870
   if(!hash_link)
   {
yangxuserene's avatar
yangxuserene committed
871
      printf("\n not found for rank id %d , %d", rank, app_id);
872 873 874 875 876 877
      op->op_type = CODES_WK_END;
      return;
   }
  temp_data = qhash_entry(hash_link, rank_mpi_context, hash_link);
  assert(temp_data);

878 879 880 881
  struct codes_workload_op mpi_op;
  dumpi_remove_next_op(temp_data->dumpi_mpi_array, &mpi_op, temp_data->last_op_time);
  *op = mpi_op;
  /*if( mpi_op.op_type == CODES_WK_END)
882 883 884 885 886 887 888 889 890 891
  {
	qhash_del(hash_link);
        free(temp_data);

        rank_tbl_pop--;
        if (!rank_tbl_pop)
        {
            qhash_finalize(rank_tbl);
            rank_tbl = NULL;
        }
892
  }*/
893 894 895 896 897 898 899
  return;
}

/* implements the codes workload method */
struct codes_workload_method dumpi_trace_workload_method =
{
    .method_name = "dumpi-trace-workload",
900
    .codes_workload_read_config = NULL,
901 902
    .codes_workload_load = dumpi_trace_nw_workload_load,
    .codes_workload_get_next = dumpi_trace_nw_workload_get_next,
903
    .codes_workload_get_next_rc2 = dumpi_trace_nw_workload_get_next_rc2,
904
};
Jonathan Jenkins's avatar
Jonathan Jenkins committed
905 906 907 908 909

/*
 * Local variables:
 *  c-indent-level: 4
 *  c-basic-offset: 4
910
 *  indent-tabs-mode: nil
Jonathan Jenkins's avatar
Jonathan Jenkins committed
911 912 913 914
 * End:
 *
 * vim: ft=c ts=8 sts=4 sw=4 expandtab
 */