codes-dumpi-trace-nw-wrkld.c 30.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (C) 2014 University of Chicago
 * See COPYRIGHT notice in top-level directory.
 *
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mpi.h>
#include <ross.h>
#include <assert.h>
#include "dumpi/libundumpi/bindings.h"
#include "dumpi/libundumpi/libundumpi.h"
#include "codes/codes-workload.h"
#include "codes/quickhash.h"

18 19
#if ENABLE_CORTEX
#include <cortex/cortex.h>
20
#include <cortex/datatype.h>
21 22
#include <cortex/cortex-mpich.h>
#ifdef ENABLE_CORTEX_PYTHON
23
#include <cortex/cortex-python.h>
24
#endif
25 26 27 28 29 30 31 32 33 34 35
#define PROFILE_TYPE cortex_dumpi_profile*
#define UNDUMPI_OPEN cortex_undumpi_open
#define DUMPI_START_STREAM_READ cortex_dumpi_start_stream_read
#define UNDUMPI_CLOSE cortex_undumpi_close
#else
#define PROFILE_TYPE dumpi_profile*
#define UNDUMPI_OPEN undumpi_open
#define DUMPI_START_STREAM_READ dumpi_start_stream_read
#define UNDUMPI_CLOSE undumpi_close
#endif

36 37 38
#define MAX_LENGTH 512
#define MAX_OPERATIONS 32768
#define DUMPI_IGNORE_DELAY 100
39
#define RANK_HASH_TABLE_SIZE 400
40 41 42 43 44 45 46

static struct qhash_table *rank_tbl = NULL;
static int rank_tbl_pop = 0;

/* context of the MPI workload */
typedef struct rank_mpi_context
{
47
    PROFILE_TYPE profile;
48
    int my_app_id;
49 50
    // whether we've seen an init op (needed for timing correctness)
    int is_init;
51 52
    int64_t my_rank;
    double last_op_time;
53
    double init_time;
54 55
    void* dumpi_mpi_array;	
    struct qhash_head hash_link;
56 57
    
    struct rc_stack * completed_ctx;
58 59
} rank_mpi_context;

60 61 62 63 64 65
typedef struct rank_mpi_compare
{
    int app;
    int rank;
} rank_mpi_compare;

66 67 68 69 70 71 72 73
/* Holds all the data about MPI operations from the log */
typedef struct dumpi_op_data_array
{
	struct codes_workload_op* op_array;
        int64_t op_arr_ndx;
        int64_t op_arr_cnt;
} dumpi_op_data_array;

74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
/* timing utilities */

#ifdef __GNUC__
__attribute__((unused))
#endif
static dumpi_clock timediff(
        dumpi_clock end,
        dumpi_clock start)
{
    dumpi_clock temp;
    if ((end.nsec-start.nsec)<0) {
        temp.sec = end.sec-start.sec-1;
        temp.nsec = 1000000000+end.nsec-start.nsec;
    } else {
        temp.sec = end.sec-start.sec;
        temp.nsec = end.nsec-start.nsec;
    }
    return temp;
}

static inline double time_to_ms_lf(dumpi_clock t){
        return (double) t.sec * 1e3 + (double) t.nsec / 1e6;
}
static inline double time_to_us_lf(dumpi_clock t){
        return (double) t.sec * 1e6 + (double) t.nsec / 1e3;
}
static inline double time_to_ns_lf(dumpi_clock t){
        return (double) t.sec * 1e9 + (double) t.nsec;
}
static inline double time_to_s_lf(dumpi_clock t){
        return (double) t.sec + (double) t.nsec / 1e9;
}

107
/* load the trace */
108
static int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank);
109 110

/* dumpi implementation of get next operation in the workload */
111
static void dumpi_trace_nw_workload_get_next(int app_id, int rank, struct codes_workload_op *op);
112 113

/* get number of bytes from the workload data type and count */
114
static int64_t get_num_bytes(rank_mpi_context* my_ctx, dumpi_datatype dt);
115 116

/* computes the delay between MPI operations */
117
static void update_compute_time(const dumpi_time* time, rank_mpi_context* my_ctx);
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179

/* initializes the data structures */
static void* dumpi_init_op_data();

/* removes next operations from the dynamic array */
static void dumpi_remove_next_op(void *mpi_op_array, struct codes_workload_op *mpi_op,
                                      double last_op_time);

/* resets the counters for the dynamic array once the workload is completely loaded*/
static void dumpi_finalize_mpi_op_data(void *mpi_op_array);

/* insert next operation */
static void dumpi_insert_next_op(void *mpi_op_array, struct codes_workload_op *mpi_op);

/* initialize the array data structure */
static void* dumpi_init_op_data()
{
	dumpi_op_data_array* tmp;
	
	tmp = malloc(sizeof(dumpi_op_data_array));
	assert(tmp);
	tmp->op_array = malloc(MAX_OPERATIONS * sizeof(struct codes_workload_op));
	assert(tmp->op_array);
        tmp->op_arr_ndx = 0;
	tmp->op_arr_cnt = MAX_OPERATIONS;

	return (void *)tmp;	
}

/* inserts next operation in the array */
static void dumpi_insert_next_op(void *mpi_op_array, struct codes_workload_op *mpi_op)
{
	dumpi_op_data_array *array = (dumpi_op_data_array*)mpi_op_array;
	struct codes_workload_op *tmp;

	/*check if array is full.*/
	if (array->op_arr_ndx == array->op_arr_cnt)
	{
		tmp = malloc((array->op_arr_cnt + MAX_OPERATIONS) * sizeof(struct codes_workload_op));
		assert(tmp);
		memcpy(tmp, array->op_array, array->op_arr_cnt * sizeof(struct codes_workload_op));
		free(array->op_array);
	        array->op_array = tmp;
	        array->op_arr_cnt += MAX_OPERATIONS;
	}

	/* add the MPI operation to the op array */
	array->op_array[array->op_arr_ndx] = *mpi_op;
	//printf("\n insert time %f end time %f ", array->op_array[array->op_arr_ndx].start_time, array->op_array[array->op_arr_ndx].end_time);
	array->op_arr_ndx++;
	return;
}

/* resets the counters after file is fully loaded */
static void dumpi_finalize_mpi_op_data(void *mpi_op_array)
{
	struct dumpi_op_data_array* array = (struct dumpi_op_data_array*)mpi_op_array;

	array->op_arr_cnt = array->op_arr_ndx;	
	array->op_arr_ndx = 0;
}

180 181 182 183 184 185 186
/* rolls back to previous index */
static void dumpi_roll_back_prev_op(void * mpi_op_array)
{
    dumpi_op_data_array *array = (dumpi_op_data_array*)mpi_op_array;
    array->op_arr_ndx--;
    assert(array->op_arr_ndx >= 0);
}
187 188 189 190 191 192 193 194 195 196 197 198 199 200
/* removes the next operation from the array */
static void dumpi_remove_next_op(void *mpi_op_array, struct codes_workload_op *mpi_op,
                                      double last_op_time)
{
	dumpi_op_data_array *array = (dumpi_op_data_array*)mpi_op_array;
	//printf("\n op array index %d array count %d ", array->op_arr_ndx, array->op_arr_cnt);
	if (array->op_arr_ndx == array->op_arr_cnt)
	 {
		mpi_op->op_type = CODES_WK_END;
	 }
	else
	{
		struct codes_workload_op *tmp = &(array->op_array[array->op_arr_ndx]);
		*mpi_op = *tmp;
201
        array->op_arr_ndx++;
202
	}
203
	/*if(mpi_op->op_type == CODES_WK_END)
204 205 206
	{
		free(array->op_array);
		free(array);
207
	}*/
208 209
}

210 211 212 213 214 215 216 217 218 219
/* check for initialization and normalize reported time */
static inline void check_set_init_time(const dumpi_time *t, rank_mpi_context * my_ctx)
{
    if (!my_ctx->is_init) {
        my_ctx->is_init = 1;
        my_ctx->init_time = time_to_ns_lf(t->start);
        my_ctx->last_op_time = time_to_ns_lf(t->stop) - my_ctx->init_time;
    }
}

220 221 222
/* introduce delay between operations: delay is the compute time NOT spent in MPI operations*/
void update_compute_time(const dumpi_time* time, rank_mpi_context* my_ctx)
{
223 224
    double start = time_to_ns_lf(time->start) - my_ctx->init_time;
    double stop = time_to_ns_lf(time->stop) - my_ctx->init_time;
225
    if((start - my_ctx->last_op_time) > DUMPI_IGNORE_DELAY)
226
    {
227 228 229 230 231 232 233
        struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_DELAY;
        wrkld_per_rank.start_time = my_ctx->last_op_time;
        wrkld_per_rank.end_time = start;
        wrkld_per_rank.u.delay.seconds = (start - my_ctx->last_op_time) / 1e9;
        dumpi_insert_next_op(my_ctx->dumpi_mpi_array, &wrkld_per_rank); 
234
    }
235
    my_ctx->last_op_time = stop;
236 237
}

238 239 240 241 242 243 244 245 246
static int handleDUMPIInit(
        const dumpi_init *prm,
        uint16_t thread,
        const dumpi_time *cpu,
        const dumpi_time *wall,
        const dumpi_perfinfo *perf,
        void *uarg)
{
    rank_mpi_context *myctx = (rank_mpi_context*)uarg;
247
    check_set_init_time(wall, myctx);
248 249 250
    return 0;
}

251 252 253 254 255
int handleDUMPIError(const void* prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *uarg)
{
    tw_error(TW_LOC, "\n MPI operation not supported by the MPI-Sim Layer ");
}

256
int handleDUMPIIgnore(const void* prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *uarg)
257 258 259
{
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;

260
        check_set_init_time(wall, myctx);
261
	update_compute_time(wall, myctx);
262 263 264 265

	return 0;
}

266 267 268 269 270
static void update_times_and_insert(
        struct codes_workload_op *op,
        const dumpi_time *t,
        rank_mpi_context *ctx)
{
271
    check_set_init_time(t, ctx);
272 273 274
    op->start_time = time_to_ns_lf(t->start) - ctx->init_time;
    op->end_time = time_to_ns_lf(t->stop) - ctx->init_time;
    update_compute_time(t, ctx);
275
    dumpi_insert_next_op(ctx->dumpi_mpi_array, op);
276 277 278
}


279 280 281 282 283 284 285
int handleDUMPIWait(const dumpi_wait *prm, uint16_t thread,
                    const dumpi_time *cpu, const dumpi_time *wall,
                    const dumpi_perfinfo *perf, void *userarg)
{
        rank_mpi_context* myctx = (rank_mpi_context*)userarg;
        struct codes_workload_op wrkld_per_rank;

286
        wrkld_per_rank.op_type = CODES_WK_WAIT;
287 288
        wrkld_per_rank.u.wait.req_id = prm->request;

289 290
        update_times_and_insert(&wrkld_per_rank, wall, myctx);

291 292 293 294 295 296 297 298 299 300 301
        return 0;
}

int handleDUMPIWaitsome(const dumpi_waitsome *prm, uint16_t thread,
                    const dumpi_time *cpu, const dumpi_time *wall,
                    const dumpi_perfinfo *perf, void *userarg)
{
        int i;
        rank_mpi_context* myctx = (rank_mpi_context*)userarg;
        struct codes_workload_op wrkld_per_rank;

302
        wrkld_per_rank.op_type = CODES_WK_WAITSOME;
303
        wrkld_per_rank.u.waits.count = prm->count;
304
        wrkld_per_rank.u.waits.req_ids = (int32_t*)malloc(prm->count * sizeof(int32_t));
305 306

        for( i = 0; i < prm->count; i++ )
307
                wrkld_per_rank.u.waits.req_ids[i] = (int32_t)prm->requests[i];
308

309
        update_times_and_insert(&wrkld_per_rank, wall, myctx);
310 311 312 313 314 315 316 317 318 319 320
        return 0;
}

int handleDUMPIWaitany(const dumpi_waitany *prm, uint16_t thread,
                    const dumpi_time *cpu, const dumpi_time *wall,
                    const dumpi_perfinfo *perf, void *userarg)
{
        int i;
        rank_mpi_context* myctx = (rank_mpi_context*)userarg;
        struct codes_workload_op wrkld_per_rank;

321
        wrkld_per_rank.op_type = CODES_WK_WAITANY;
322
        wrkld_per_rank.u.waits.count = prm->count;
323
        wrkld_per_rank.u.waits.req_ids = (int32_t*)malloc(prm->count * sizeof(int32_t));
324 325

        for( i = 0; i < prm->count; i++ )
326
                wrkld_per_rank.u.waits.req_ids[i] = (int32_t)prm->requests[i];
327

328
        update_times_and_insert(&wrkld_per_rank, wall, myctx);
329 330 331 332 333 334 335 336 337 338 339
        return 0;
}

int handleDUMPIWaitall(const dumpi_waitall *prm, uint16_t thread,
                    const dumpi_time *cpu, const dumpi_time *wall,
                    const dumpi_perfinfo *perf, void *userarg)
{
        int i;
        rank_mpi_context* myctx = (rank_mpi_context*)userarg;
        struct codes_workload_op wrkld_per_rank;

340
        wrkld_per_rank.op_type = CODES_WK_WAITALL;
341 342

        wrkld_per_rank.u.waits.count = prm->count;
343
        wrkld_per_rank.u.waits.req_ids = (int32_t*)malloc(prm->count * sizeof(int32_t));
344 345 346
        for( i = 0; i < prm->count; i++ )
                wrkld_per_rank.u.waits.req_ids[i] = prm->requests[i];

347
        update_times_and_insert(&wrkld_per_rank, wall, myctx);
348 349 350
        return 0;
}

351 352 353 354 355 356 357 358 359 360
int handleDUMPIISend(const dumpi_isend *prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *userarg)
{
	rank_mpi_context* myctx = (rank_mpi_context*)userarg;

	struct codes_workload_op wrkld_per_rank;

	wrkld_per_rank.op_type = CODES_WK_ISEND;
	wrkld_per_rank.u.send.tag = prm->tag;
	wrkld_per_rank.u.send.count = prm->count;
	wrkld_per_rank.u.send.data_type = prm->datatype;
361
    wrkld_per_rank.u.send.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
362
	assert(wrkld_per_rank.u.send.num_bytes >= 0);
363
    	wrkld_per_rank.u.send.req_id = prm->request;
364 365
        wrkld_per_rank.u.send.dest_rank = prm->dest;
        wrkld_per_rank.u.send.source_rank = myctx->my_rank;
366 367

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
368 369 370 371 372 373 374
	return 0;
}

int handleDUMPIIRecv(const dumpi_irecv *prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *userarg)
{
	//printf("\n irecv source %d count %d data type %d", prm->source, prm->count, prm->datatype);
	rank_mpi_context* myctx = (rank_mpi_context*)userarg;
375 376 377 378 379 380
        struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_IRECV;
	wrkld_per_rank.u.recv.data_type = prm->datatype;
	wrkld_per_rank.u.recv.count = prm->count;
	wrkld_per_rank.u.recv.tag = prm->tag;
381
        wrkld_per_rank.u.recv.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
382
	    
383
        assert(wrkld_per_rank.u.recv.num_bytes >= 0);
384 385 386 387 388
        wrkld_per_rank.u.recv.source_rank = prm->source;
        wrkld_per_rank.u.recv.dest_rank = -1;
	wrkld_per_rank.u.recv.req_id = prm->request;

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
389 390 391 392 393 394 395 396 397 398 399
        return 0;
}

int handleDUMPISend(const dumpi_send *prm, uint16_t thread,
                      const dumpi_time *cpu, const dumpi_time *wall,
                      const dumpi_perfinfo *perf, void *uarg)
{
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;
        struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_SEND;
400
	    wrkld_per_rank.u.send.tag = prm->tag;
401 402
        wrkld_per_rank.u.send.count = prm->count;
        wrkld_per_rank.u.send.data_type = prm->datatype;
403
        wrkld_per_rank.u.send.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
404
	    assert(wrkld_per_rank.u.send.num_bytes >= 0);
405 406
        wrkld_per_rank.u.send.dest_rank = prm->dest;
        wrkld_per_rank.u.send.source_rank = myctx->my_rank;
407
         wrkld_per_rank.u.send.req_id = -1;
408 409

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
410 411 412 413 414 415 416 417 418 419 420 421
        return 0;
}

int handleDUMPIRecv(const dumpi_recv *prm, uint16_t thread,
                      const dumpi_time *cpu, const dumpi_time *wall,
                      const dumpi_perfinfo *perf, void *uarg)
{
	//printf("\n irecv source %d count %d data type %d", prm->source, prm->count, prm->datatype);
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	struct codes_workload_op wrkld_per_rank;

	wrkld_per_rank.op_type = CODES_WK_RECV;
422 423 424
    wrkld_per_rank.u.recv.tag = prm->tag;
    wrkld_per_rank.u.recv.count = prm->count;
    wrkld_per_rank.u.recv.data_type = prm->datatype;
425
    wrkld_per_rank.u.recv.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
426
	assert(wrkld_per_rank.u.recv.num_bytes >= 0);
427 428 429
        wrkld_per_rank.u.recv.source_rank = prm->source;
        wrkld_per_rank.u.recv.dest_rank = -1;

430
        update_times_and_insert(&wrkld_per_rank, wall, myctx);
431 432 433 434
        return 0;

}

435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469
int handleDUMPISendrecv(const dumpi_sendrecv* prm, uint16_t thread,
			const dumpi_time *cpu, const dumpi_time *wall,
			const dumpi_perfinfo *perf, void *uarg)
{
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;

	{
		struct codes_workload_op wrkld_per_rank;
		wrkld_per_rank.op_type = CODES_WK_SEND;
		wrkld_per_rank.u.send.tag = prm->sendtag;
		wrkld_per_rank.u.send.count = prm->sendcount;
		wrkld_per_rank.u.send.data_type = prm->sendtype;
		wrkld_per_rank.u.send.num_bytes = prm->sendcount * get_num_bytes(myctx,prm->sendtype);
		assert(wrkld_per_rank.u.send.num_bytes >= 0);
		wrkld_per_rank.u.send.dest_rank = prm->dest;
		wrkld_per_rank.u.send.source_rank = myctx->my_rank;
		wrkld_per_rank.u.send.req_id = -1;
		update_times_and_insert(&wrkld_per_rank, wall, myctx);
	}

	{
		struct codes_workload_op wrkld_per_rank;
		wrkld_per_rank.op_type = CODES_WK_RECV;
		wrkld_per_rank.u.recv.tag = prm->recvtag;
		wrkld_per_rank.u.recv.count = prm->recvcount;
		wrkld_per_rank.u.recv.data_type = prm->recvtype;
		wrkld_per_rank.u.recv.num_bytes = prm->recvcount * get_num_bytes(myctx,prm->recvtype);
		assert(wrkld_per_rank.u.recv.num_bytes >= 0);
		wrkld_per_rank.u.recv.source_rank = prm->source;
		wrkld_per_rank.u.recv.dest_rank = -1;
		update_times_and_insert(&wrkld_per_rank, wall, myctx);
	}
	return 0;
}

470 471 472 473 474 475 476 477
int handleDUMPIBcast(const dumpi_bcast *prm, uint16_t thread,
                       const dumpi_time *cpu, const dumpi_time *wall,
                       const dumpi_perfinfo *perf, void *uarg)
{
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_BCAST;
478
        wrkld_per_rank.u.collective.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
479
	assert(wrkld_per_rank.u.collective.num_bytes >= 0);
480 481

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
482 483 484 485 486 487 488 489 490 491 492
        return 0;
}

int handleDUMPIAllgather(const dumpi_allgather *prm, uint16_t thread,
                           const dumpi_time *cpu, const dumpi_time *wall,
                           const dumpi_perfinfo *perf, void *uarg)
{
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_ALLGATHER;
493
        wrkld_per_rank.u.collective.num_bytes = prm->sendcount * get_num_bytes(myctx,prm->sendtype);
494
	assert(wrkld_per_rank.u.collective.num_bytes > 0);
495 496

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
497 498 499 500 501 502 503 504 505 506 507
        return 0;
}

int handleDUMPIAllgatherv(const dumpi_allgatherv *prm, uint16_t thread,
                            const dumpi_time *cpu, const dumpi_time *wall,
                            const dumpi_perfinfo *perf, void *uarg)
{
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_ALLGATHERV;
508
        wrkld_per_rank.u.collective.num_bytes = prm->sendcount * get_num_bytes(myctx,prm->sendtype);
509
	assert(wrkld_per_rank.u.collective.num_bytes > 0);
510 511

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
512 513 514 515 516 517 518 519 520 521 522
        return 0;
}

int handleDUMPIAlltoall(const dumpi_alltoall *prm, uint16_t thread,
                          const dumpi_time *cpu, const dumpi_time *wall,
                          const dumpi_perfinfo *perf, void *uarg)
{
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_ALLTOALL;
523
        wrkld_per_rank.u.collective.num_bytes = prm->sendcount * get_num_bytes(myctx,prm->sendtype);
524
	assert(wrkld_per_rank.u.collective.num_bytes > 0);
525 526

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
527 528 529 530 531 532 533 534 535 536 537
        return 0;
}

int handleDUMPIAlltoallv(const dumpi_alltoallv *prm, uint16_t thread,
                           const dumpi_time *cpu, const dumpi_time *wall,
                           const dumpi_perfinfo *perf, void *uarg)
{
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_ALLTOALLV;
538
        wrkld_per_rank.u.collective.num_bytes = prm->sendcounts[0] * get_num_bytes(myctx,prm->sendtype);
539
	assert(wrkld_per_rank.u.collective.num_bytes > 0);
540 541

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
542 543 544 545 546 547 548 549 550 551 552
        return 0;
}

int handleDUMPIReduce(const dumpi_reduce *prm, uint16_t thread,
                        const dumpi_time *cpu, const dumpi_time *wall,
                        const dumpi_perfinfo *perf, void *uarg)
{
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_REDUCE;
553
        wrkld_per_rank.u.collective.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
554
	assert(wrkld_per_rank.u.collective.num_bytes > 0);
555 556

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
557 558 559 560 561 562 563 564 565 566 567
        return 0;
}

int handleDUMPIAllreduce(const dumpi_allreduce *prm, uint16_t thread,
                           const dumpi_time *cpu, const dumpi_time *wall,
                           const dumpi_perfinfo *perf, void *uarg)
{
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_ALLREDUCE;
568
        wrkld_per_rank.u.collective.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
569
	assert(wrkld_per_rank.u.collective.num_bytes > 0);
570 571

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
572 573 574 575 576 577 578 579 580 581
        return 0;

}

int handleDUMPIFinalize(const dumpi_finalize *prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *uarg)
{
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_END;
582 583

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
584 585 586
        return 0;
}

587 588 589 590 591 592 593 594 595 596 597 598
int handleDUMPIReqFree(const dumpi_request_free *prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *userarg)
{
    rank_mpi_context* myctx = (rank_mpi_context*)userarg;
    struct codes_workload_op wrkld_per_rank;

    wrkld_per_rank.op_type = CODES_WK_REQ_FREE;
    wrkld_per_rank.u.free.req_id = prm->request;

    update_times_and_insert(&wrkld_per_rank, wall, myctx);
    return 0;
}

599 600
static int hash_rank_compare(void *key, struct qhash_head *link)
{
601
    rank_mpi_compare *in = key;
602 603 604
    rank_mpi_context *tmp;

    tmp = qhash_entry(link, rank_mpi_context, hash_link);
605
    if (tmp->my_rank == in->rank && tmp->my_app_id == in->app)
606 607 608 609
        return 1;
    return 0;
}

610
int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank)
611 612 613
{
	libundumpi_callbacks callbacks;
	libundumpi_cbpair callarr[DUMPI_END_OF_STREAM];
614 615 616 617
#ifdef ENABLE_CORTEX
	libundumpi_cbpair transarr[DUMPI_END_OF_STREAM];
#endif
	PROFILE_TYPE profile;
618 619 620 621 622 623 624 625
	dumpi_trace_params* dumpi_params = (dumpi_trace_params*)params;
	char file_name[MAX_LENGTH];

	if(rank >= dumpi_params->num_net_traces)
		return -1;

	if(!rank_tbl)
    	{
626
            rank_tbl = qhash_init(hash_rank_compare, quickhash_64bit_hash, RANK_HASH_TABLE_SIZE);
627 628 629 630 631 632 633 634
            if(!rank_tbl)
                  return -1;
    	}
	
	rank_mpi_context *my_ctx;
	my_ctx = malloc(sizeof(rank_mpi_context));
	assert(my_ctx);
	my_ctx->my_rank = rank;
635
        my_ctx->my_app_id = app_id;
636
	my_ctx->last_op_time = 0.0;
637
        my_ctx->is_init = 0;
638 639 640 641 642 643 644 645 646 647
	my_ctx->dumpi_mpi_array = dumpi_init_op_data();

	if(rank < 10)
            sprintf(file_name, "%s000%d.bin", dumpi_params->file_name, rank);
         else if(rank >=10 && rank < 100)
            sprintf(file_name, "%s00%d.bin", dumpi_params->file_name, rank);
           else if(rank >=100 && rank < 1000)
             sprintf(file_name, "%s0%d.bin", dumpi_params->file_name, rank);
             else
              sprintf(file_name, "%s%d.bin", dumpi_params->file_name, rank);
648 649 650
#ifdef ENABLE_CORTEX
	profile = cortex_undumpi_open(file_name, app_id, dumpi_params->num_net_traces, rank);
#else
651
	profile =  undumpi_open(file_name);
652
#endif
653
        my_ctx->profile = profile;
654 655 656 657 658 659 660
        if(NULL == profile) {
                printf("Error: unable to open DUMPI trace: %s", file_name);
                exit(-1);
        }
	
	memset(&callbacks, 0, sizeof(libundumpi_callbacks));
        memset(&callarr, 0, sizeof(libundumpi_cbpair) * DUMPI_END_OF_STREAM);
661 662 663
#ifdef ENABLE_CORTEX
	memset(&transarr, 0, sizeof(libundumpi_cbpair) * DUMPI_END_OF_STREAM);
#endif
664 665

	/* handle MPI function calls */	        
666
        callbacks.on_init = handleDUMPIInit;
667 668 669 670 671 672
	callbacks.on_send = (dumpi_send_call)handleDUMPISend;
        callbacks.on_recv = (dumpi_recv_call)handleDUMPIRecv;
        callbacks.on_isend = (dumpi_isend_call)handleDUMPIISend;
        callbacks.on_irecv = (dumpi_irecv_call)handleDUMPIIRecv;
        callbacks.on_allreduce = (dumpi_allreduce_call)handleDUMPIAllreduce;
	callbacks.on_bcast = (dumpi_bcast_call)handleDUMPIBcast;
673 674 675 676 677 678 679 680 681
	callbacks.on_get_count = (dumpi_get_count_call)handleDUMPIIgnore;
	callbacks.on_bsend = (dumpi_bsend_call)handleDUMPIIgnore;
	callbacks.on_ssend = (dumpi_ssend_call)handleDUMPIIgnore;
	callbacks.on_rsend = (dumpi_rsend_call)handleDUMPIIgnore;
	callbacks.on_buffer_attach = (dumpi_buffer_attach_call)handleDUMPIIgnore;
	callbacks.on_buffer_detach = (dumpi_buffer_detach_call)handleDUMPIIgnore;
	callbacks.on_ibsend = (dumpi_ibsend_call)handleDUMPIIgnore;
	callbacks.on_issend = (dumpi_issend_call)handleDUMPIIgnore;
	callbacks.on_irsend = (dumpi_irsend_call)handleDUMPIIgnore;
682
	callbacks.on_wait = (dumpi_wait_call)handleDUMPIWait;
683
	callbacks.on_test = (dumpi_test_call)handleDUMPIIgnore;
684
	callbacks.on_request_free = (dumpi_request_free_call)handleDUMPIReqFree;
685
	callbacks.on_waitany = (dumpi_waitany_call)handleDUMPIWaitany;
686
	callbacks.on_testany = (dumpi_testany_call)handleDUMPIIgnore;
687
	callbacks.on_waitall = (dumpi_waitall_call)handleDUMPIWaitall;
688
	callbacks.on_testall = (dumpi_testall_call)handleDUMPIIgnore;
689
	callbacks.on_waitsome = (dumpi_waitsome_call)handleDUMPIWaitsome;
690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721
	callbacks.on_testsome = (dumpi_testsome_call)handleDUMPIIgnore;
	callbacks.on_iprobe = (dumpi_iprobe_call)handleDUMPIIgnore;
	callbacks.on_probe = (dumpi_probe_call)handleDUMPIIgnore;
	callbacks.on_cancel = (dumpi_cancel_call)handleDUMPIIgnore;
	callbacks.on_test_cancelled = (dumpi_test_cancelled_call)handleDUMPIIgnore;
	callbacks.on_send_init = (dumpi_send_init_call)handleDUMPIIgnore;
	callbacks.on_bsend_init = (dumpi_bsend_init_call)handleDUMPIIgnore;
	callbacks.on_ssend_init = (dumpi_ssend_init_call)handleDUMPIIgnore;
	callbacks.on_rsend_init = (dumpi_rsend_init_call)handleDUMPIIgnore;
	callbacks.on_recv_init = (dumpi_recv_init_call)handleDUMPIIgnore;
	callbacks.on_start = (dumpi_start_call)handleDUMPIIgnore;
	callbacks.on_startall = (dumpi_startall_call)handleDUMPIIgnore;
	callbacks.on_sendrecv = (dumpi_sendrecv_call)handleDUMPIIgnore;
	callbacks.on_sendrecv_replace = (dumpi_sendrecv_replace_call)handleDUMPIIgnore;
	callbacks.on_type_contiguous = (dumpi_type_contiguous_call)handleDUMPIIgnore;
	callbacks.on_barrier = (dumpi_barrier_call)handleDUMPIIgnore;
        callbacks.on_gather = (dumpi_gather_call)handleDUMPIIgnore;
        callbacks.on_gatherv = (dumpi_gatherv_call)handleDUMPIIgnore;
        callbacks.on_scatter = (dumpi_scatter_call)handleDUMPIIgnore;
        callbacks.on_scatterv = (dumpi_scatterv_call)handleDUMPIIgnore;
        callbacks.on_allgather = (dumpi_allgather_call)handleDUMPIIgnore;
        callbacks.on_allgatherv = (dumpi_allgatherv_call)handleDUMPIIgnore;
        callbacks.on_alltoall = (dumpi_alltoall_call)handleDUMPIIgnore;
        callbacks.on_alltoallv = (dumpi_alltoallv_call)handleDUMPIIgnore;
        callbacks.on_alltoallw = (dumpi_alltoallw_call)handleDUMPIIgnore;
        callbacks.on_reduce = (dumpi_reduce_call)handleDUMPIIgnore;
        callbacks.on_reduce_scatter = (dumpi_reduce_scatter_call)handleDUMPIIgnore;
        callbacks.on_group_size = (dumpi_group_size_call)handleDUMPIIgnore;
        callbacks.on_group_rank = (dumpi_group_rank_call)handleDUMPIIgnore;
        callbacks.on_comm_size = (dumpi_comm_size_call)handleDUMPIIgnore;
        callbacks.on_comm_rank = (dumpi_comm_rank_call)handleDUMPIIgnore;
        callbacks.on_comm_get_attr = (dumpi_comm_get_attr_call)handleDUMPIIgnore;
722 723
        callbacks.on_comm_dup = (dumpi_comm_dup_call)handleDUMPIError;
        callbacks.on_comm_create = (dumpi_comm_create_call)handleDUMPIError;
724
        callbacks.on_wtime = (dumpi_wtime_call)handleDUMPIIgnore;
725 726 727 728
        callbacks.on_finalize = (dumpi_finalize_call)handleDUMPIFinalize;

        libundumpi_populate_callbacks(&callbacks, callarr);

729
#ifdef ENABLE_CORTEX
730
#ifdef ENABLE_CORTEX_PYTHON
731
	libundumpi_populate_callbacks(CORTEX_PYTHON_TRANSLATION, transarr);
732 733 734
#else
	libundumpi_populate_callbacks(CORTEX_MPICH_TRANSLATION, transarr);
#endif
735 736
#endif
        DUMPI_START_STREAM_READ(profile);
737 738
        //dumpi_header* trace_header = undumpi_read_header(profile);
        //dumpi_free_header(trace_header);
739

740
#ifdef ENABLE_CORTEX_PYTHON
741 742 743
	cortex_python_set_module(dumpi_params->cortex_script,dumpi_params->cortex_class);
#endif

744 745 746 747 748 749
        int finalize_reached = 0;
        int active = 1;
        int num_calls = 0;
        while(active && !finalize_reached)
        {
           num_calls++;
750 751 752
#ifdef ENABLE_CORTEX
	   active = cortex_undumpi_read_single_call(profile, callarr, transarr, (void*)my_ctx, &finalize_reached);
#else
753
           active = undumpi_read_single_call(profile, callarr, (void*)my_ctx, &finalize_reached);
754
#endif
755
        }
756
	UNDUMPI_CLOSE(profile);
757 758
	dumpi_finalize_mpi_op_data(my_ctx->dumpi_mpi_array);
	/* add this rank context to hash table */	
759 760 761 762
        rank_mpi_compare cmp;
        cmp.app = my_ctx->my_app_id;
        cmp.rank = my_ctx->my_rank;
	qhash_add(rank_tbl, &cmp, &(my_ctx->hash_link));
763 764 765 766 767
	rank_tbl_pop++;

	return 0;
}

768
static int64_t get_num_bytes(rank_mpi_context* myctx, dumpi_datatype dt)
769
{
770 771 772
#ifdef ENABLE_CORTEX
   return cortex_datatype_get_size(myctx->profile,dt);
#endif
773 774 775 776
   switch(dt)
   {
	case DUMPI_DATATYPE_ERROR:
	case DUMPI_DATATYPE_NULL:
777
		tw_error(TW_LOC, "\n data type error");
778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821
	break;

	case DUMPI_CHAR:
	case DUMPI_UNSIGNED_CHAR:
	case DUMPI_SIGNED_CHAR:
	case DUMPI_BYTE:
		return 1; /* 1 byte for char */
	break;

	case DUMPI_WCHAR:
		return 4; /* 4 bytes for a 64-bit version */
	break;

	case DUMPI_SHORT:
	case DUMPI_SHORT_INT:
	case DUMPI_UNSIGNED_SHORT:
		return 2;
	break;

	 case DUMPI_INT:
	 case DUMPI_UNSIGNED:
	 case DUMPI_FLOAT:
	 case DUMPI_FLOAT_INT:
		return 4;
	 break;

	case DUMPI_DOUBLE:
	case DUMPI_LONG:
	case DUMPI_LONG_INT:
	case DUMPI_UNSIGNED_LONG:
	case DUMPI_LONG_LONG_INT:
	case DUMPI_UNSIGNED_LONG_LONG:
	case DUMPI_LONG_LONG:
	case DUMPI_DOUBLE_INT:
		return 8;
	break;

	case DUMPI_LONG_DOUBLE:
	case DUMPI_LONG_DOUBLE_INT:
		return 16;
	break;
	
	default:
	  {
822
        tw_error(TW_LOC, "\n undefined data type");
823 824 825 826 827
		return 0;	
	  }	
   } 
}

828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843
void dumpi_trace_nw_workload_get_next_rc2(int app_id, int rank)
{
    rank_mpi_context* temp_data; 
    struct qhash_head *hash_link = NULL;  
    rank_mpi_compare cmp;  
    cmp.rank = rank;
    cmp.app = app_id;

    hash_link = qhash_search(rank_tbl, &cmp);

    assert(hash_link);
    temp_data = qhash_entry(hash_link, rank_mpi_context, hash_link); 
    assert(temp_data);

    dumpi_roll_back_prev_op(temp_data->dumpi_mpi_array);
}
844
void dumpi_trace_nw_workload_get_next(int app_id, int rank, struct codes_workload_op *op)
845 846 847
{
   rank_mpi_context* temp_data;
   struct qhash_head *hash_link = NULL;
848 849 850 851
   rank_mpi_compare cmp;
   cmp.rank = rank;
   cmp.app = app_id;
   hash_link = qhash_search(rank_tbl, &cmp);
852 853
   if(!hash_link)
   {
yangxuserene's avatar
yangxuserene committed
854
      printf("\n not found for rank id %d , %d", rank, app_id);
855 856 857 858 859 860
      op->op_type = CODES_WK_END;
      return;
   }
  temp_data = qhash_entry(hash_link, rank_mpi_context, hash_link);
  assert(temp_data);

861 862 863 864
  struct codes_workload_op mpi_op;
  dumpi_remove_next_op(temp_data->dumpi_mpi_array, &mpi_op, temp_data->last_op_time);
  *op = mpi_op;
  /*if( mpi_op.op_type == CODES_WK_END)
865 866 867 868 869 870 871 872 873 874
  {
	qhash_del(hash_link);
        free(temp_data);

        rank_tbl_pop--;
        if (!rank_tbl_pop)
        {
            qhash_finalize(rank_tbl);
            rank_tbl = NULL;
        }
875
  }*/
876 877 878 879 880 881 882
  return;
}

/* implements the codes workload method */
struct codes_workload_method dumpi_trace_workload_method =
{
    .method_name = "dumpi-trace-workload",
883
    .codes_workload_read_config = NULL,
884 885
    .codes_workload_load = dumpi_trace_nw_workload_load,
    .codes_workload_get_next = dumpi_trace_nw_workload_get_next,
886
    .codes_workload_get_next_rc2 = dumpi_trace_nw_workload_get_next_rc2,
887
};
Jonathan Jenkins's avatar
Jonathan Jenkins committed
888 889 890 891 892

/*
 * Local variables:
 *  c-indent-level: 4
 *  c-basic-offset: 4
893
 *  indent-tabs-mode: nil
Jonathan Jenkins's avatar
Jonathan Jenkins committed
894 895 896 897
 * End:
 *
 * vim: ft=c ts=8 sts=4 sw=4 expandtab
 */