codes-dumpi-trace-nw-wrkld.c 38.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (C) 2014 University of Chicago
 * See COPYRIGHT notice in top-level directory.
 *
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mpi.h>
#include <ross.h>
#include <assert.h>
#include "dumpi/libundumpi/bindings.h"
#include "dumpi/libundumpi/libundumpi.h"
#include "codes/codes-workload.h"
#include "codes/quickhash.h"
17
#include "codes/codes-jobmap.h"
18
#include "codes/jenkins-hash.h"
19
#include "codes/model-net.h"
20

21 22
#if ENABLE_CORTEX
#include <cortex/cortex.h>
23
#include <cortex/datatype.h>
24 25
#include <cortex/cortex-mpich.h>
#ifdef ENABLE_CORTEX_PYTHON
26
#include <cortex/cortex-python.h>
27
#endif
28
#define PROFILE_TYPE cortex_dumpi_profile*
29
//#define UNDUMPI_OPEN cortex_undumpi_open
30 31 32 33
#define DUMPI_START_STREAM_READ cortex_dumpi_start_stream_read
#define UNDUMPI_CLOSE cortex_undumpi_close
#else
#define PROFILE_TYPE dumpi_profile*
34
//#define UNDUMPI_OPEN undumpi_open
35 36 37 38
#define DUMPI_START_STREAM_READ dumpi_start_stream_read
#define UNDUMPI_CLOSE undumpi_close
#endif

39

40
#define MAX_LENGTH_FILE 512
41 42 43
#define MAX_OPERATIONS 32768
#define DUMPI_IGNORE_DELAY 100

44 45 46
#define INITIAL_OP_QUEUE_SIZE    64
#define INITIAL_OP_RC_STACK_SIZE 64

47 48 49
/* This variable is defined in src/network-workloads/model-net-mpi-replay.c */
extern struct codes_jobmap_ctx *jobmap_ctx; 

50 51 52
static struct qhash_table *rank_tbl = NULL;
static int rank_tbl_pop = 0;

53
static unsigned int max_threshold = INT_MAX;
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72

typedef struct dumpi_op_data_array
{
    /* operations loaded, not yet consumed by simulator */
    struct codes_workload_op* next_ops_queue;      // queue implemented with a circular buffer
    size_t                    next_ops_queue_size; // size (allocated) of the array
    uint64_t                  next_ops_queue_count; // number of elements currently in the queue
    uint64_t                  next_ops_queue_first; // pointer to first element (next to consume)
    uint64_t                  next_ops_queue_last;  // pointer to the first free space in the array
    /* operations consumed, which may be reversed */
    struct codes_workload_op* prev_ops_stack;      // stack implemented with an array
    size_t                    prev_ops_stack_size; // size (allocated) of the array
    uint64_t                  prev_ops_stack_top;  // index of the top of the stack (first free place)
    /* sequence id */
    uint64_t                  sequence_id;       // to attribute a sequence number of operations
    int                       finalize_reached;  // reached MPI_Finalize call
    int                       active;            // dumpi stream is active
} dumpi_op_data_array;

73 74 75
/* context of the MPI workload */
typedef struct rank_mpi_context
{
76
    PROFILE_TYPE profile;
77 78 79 80
    libundumpi_cbpair callarr[DUMPI_END_OF_STREAM];
#ifdef ENABLE_CORTEX
    libundumpi_cbpair transarr[DUMPI_END_OF_STREAM];
#endif
81
    int my_app_id;
82 83
    // whether we've seen an init op (needed for timing correctness)
    int is_init;
84
    unsigned int num_reqs;
85
    unsigned int num_ops;
86 87
    int64_t my_rank;
    double last_op_time;
88
    double init_time;
89
    dumpi_op_data_array dumpi_mpi_array;
90
    struct qhash_head hash_link;
91 92
    
    struct rc_stack * completed_ctx;
93 94
} rank_mpi_context;

95 96 97 98 99 100
typedef struct rank_mpi_compare
{
    int app;
    int rank;
} rank_mpi_compare;

101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
/* timing utilities */

#ifdef __GNUC__
__attribute__((unused))
#endif
static dumpi_clock timediff(
        dumpi_clock end,
        dumpi_clock start)
{
    dumpi_clock temp;
    if ((end.nsec-start.nsec)<0) {
        temp.sec = end.sec-start.sec-1;
        temp.nsec = 1000000000+end.nsec-start.nsec;
    } else {
        temp.sec = end.sec-start.sec;
        temp.nsec = end.nsec-start.nsec;
    }
    return temp;
}

121
/*static inline double time_to_ms_lf(dumpi_clock t){
122 123 124 125
        return (double) t.sec * 1e3 + (double) t.nsec / 1e6;
}
static inline double time_to_us_lf(dumpi_clock t){
        return (double) t.sec * 1e6 + (double) t.nsec / 1e3;
126
}*/
127 128 129
static inline double time_to_ns_lf(dumpi_clock t){
        return (double) t.sec * 1e9 + (double) t.nsec;
}
130
/*static int32_t get_unique_req_id(int32_t request_id)
131 132 133 134
{
    uint32_t pc = 0, pb = 0;
    bj_hashlittle2(&request_id, sizeof(int32_t), &pc, &pb);
    return pc;
135
}*/
136
/*static inline double time_to_s_lf(dumpi_clock t){
137
        return (double) t.sec + (double) t.nsec / 1e9;
138
}*/
139

140
/* load the trace */
141
static int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank);
142 143

/* dumpi implementation of get next operation in the workload */
144
static void dumpi_trace_nw_workload_get_next(int app_id, int rank, struct codes_workload_op *op);
145 146

/* get number of bytes from the workload data type and count */
147
static uint64_t get_num_bytes(rank_mpi_context* my_ctx, dumpi_datatype dt);
148 149

/* computes the delay between MPI operations */
150
static void update_compute_time(const dumpi_time* time, rank_mpi_context* my_ctx);
151

152
static void dumpi_init_op_data(struct rank_mpi_context* ctx);
153
/* removes next operations from the dynamic array */
154 155
static void dumpi_remove_next_op(struct rank_mpi_context* ctx, struct codes_workload_op *mpi_op,
                                              double last_op_time);
156
/* resets the counters for the dynamic array once the workload is completely loaded*/
157
static void dumpi_finalize_mpi_op_data(struct rank_mpi_context* ctx);
158
/* insert next operation */
159
static void dumpi_insert_next_op(struct rank_mpi_context* ctx, struct codes_workload_op *mpi_op);
160 161

/* initialize the array data structure */
162
static void dumpi_init_op_data(struct rank_mpi_context* ctx)
163
{
164 165 166 167 168 169 170 171 172 173 174 175
    dumpi_op_data_array* t  = &(ctx->dumpi_mpi_array);
    t->next_ops_queue       = calloc(INITIAL_OP_QUEUE_SIZE, sizeof(struct codes_workload_op));
    t->next_ops_queue_size  = INITIAL_OP_QUEUE_SIZE;
    t->next_ops_queue_count = 0;
    t->next_ops_queue_first = 0;
    t->next_ops_queue_last  = 0;
    t->prev_ops_stack       = calloc(INITIAL_OP_RC_STACK_SIZE, sizeof(struct codes_workload_op));
    t->prev_ops_stack_size  = INITIAL_OP_RC_STACK_SIZE;
    t->prev_ops_stack_top   = 0;
    t->sequence_id          = 0;
    t->finalize_reached     = 0;
    t->active               = 1;
176 177
}

178 179
/* inserts next operation in the queue */
static void dumpi_insert_next_op(struct rank_mpi_context* ctx, struct codes_workload_op *mpi_op)
180
{
181 182 183 184 185 186 187 188 189 190 191
    dumpi_op_data_array* t = &(ctx->dumpi_mpi_array);
    // check if we have some space in the queue
    if(t->next_ops_queue_size == t->next_ops_queue_count) {
        t->next_ops_queue = realloc(t->next_ops_queue, t->next_ops_queue_size*2);
        assert(t->next_ops_queue);
        t->next_ops_queue_size *= 2;
    }
    t->next_ops_queue[t->next_ops_queue_last] = *mpi_op;
    t->next_ops_queue_last += 1;
    t->next_ops_queue_last %= t->next_ops_queue_size;
    t->next_ops_queue_count += 1;
192 193
}
/* resets the counters after file is fully loaded */
194
static void dumpi_finalize_mpi_op_data(struct rank_mpi_context* ctx)
195
{
196 197 198
    dumpi_op_data_array* t = &(ctx->dumpi_mpi_array);
    free(t->next_ops_queue);
    free(t->prev_ops_stack);
199
}
200
/* rolls back to previous index */
201
static void dumpi_roll_back_prev_op(struct rank_mpi_context* ctx)
202
{
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
    dumpi_op_data_array* t = &(ctx->dumpi_mpi_array);
    // if there is something on the stack of previous operations,
    // put it back in the queue (from the other end of the queue)
    assert(t->prev_ops_stack_top);
    // check if the queue size needs to be increased
    if(t->next_ops_queue_size == t->next_ops_queue_count) {
        t->next_ops_queue = realloc(t->next_ops_queue, t->next_ops_queue_size*2);
        assert(t->next_ops_queue);
        t->next_ops_queue_size *= 2;
    }
    // move the cursor of the first element in the queue
    if(t->next_ops_queue_first != 0)
        t->next_ops_queue_first -= 1;
    else
        t->next_ops_queue_first = t->next_ops_queue_size - 1;
    // add the element in the queue
    t->next_ops_queue[t->next_ops_queue_last]
        = t->prev_ops_stack[t->prev_ops_stack_top-1];
    t->next_ops_queue_count += 1;
    // remove the element from the stack
    t->prev_ops_stack_top -= 1;
    t->sequence_id -= 1;
225
}
226 227
/* get the next operation from the array */
static void dumpi_remove_next_op(struct rank_mpi_context* ctx, struct codes_workload_op *mpi_op,
228 229
                                      double last_op_time)
{
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
    dumpi_op_data_array* t = &(ctx->dumpi_mpi_array);
retry:
    if(t->next_ops_queue_count == 0) {
        mpi_op->op_type = CODES_WK_END;

        // no more operation in the queue, try to load from the file
        if(t->active && !t->finalize_reached)
        {
#ifdef ENABLE_CORTEX
            t->active = cortex_undumpi_read_single_call(ctx->profile, 
                   ctx->callarr, ctx->transarr, (void*)ctx, &(t->finalize_reached));
#else
            t->active = undumpi_read_single_call(ctx->profile, 
                   ctx->callarr, (void*)ctx, &(t->finalize_reached));
#endif
            goto retry;
        }
    } else {
        *mpi_op = t->next_ops_queue[t->next_ops_queue_first];
        t->next_ops_queue_first += 1;
        if(t->next_ops_queue_first == t->next_ops_queue_size)
            t->next_ops_queue_first = 0;
        t->next_ops_queue_count -= 1;
    }
    mpi_op->sequence_id = t->sequence_id;
    t->sequence_id += 1;
    // put the event in the stack of previous events
    if(t->prev_ops_stack_top == t->prev_ops_stack_size) {
        t->prev_ops_stack = realloc(t->prev_ops_stack, 2*(t->prev_ops_stack_size));
        t->prev_ops_stack_size *= 2;
    }
    t->prev_ops_stack[t->prev_ops_stack_top] = *mpi_op;
    t->prev_ops_stack_top += 1;
263 264
}

265 266 267 268 269 270 271 272 273 274
/* check for initialization and normalize reported time */
static inline void check_set_init_time(const dumpi_time *t, rank_mpi_context * my_ctx)
{
    if (!my_ctx->is_init) {
        my_ctx->is_init = 1;
        my_ctx->init_time = time_to_ns_lf(t->start);
        my_ctx->last_op_time = time_to_ns_lf(t->stop) - my_ctx->init_time;
    }
}

275 276 277
/* introduce delay between operations: delay is the compute time NOT spent in MPI operations*/
void update_compute_time(const dumpi_time* time, rank_mpi_context* my_ctx)
{
278 279
    double start = time_to_ns_lf(time->start) - my_ctx->init_time;
    double stop = time_to_ns_lf(time->stop) - my_ctx->init_time;
280
    if((start - my_ctx->last_op_time) > DUMPI_IGNORE_DELAY)
281
    {
282 283 284 285 286 287
        struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_DELAY;
        wrkld_per_rank.start_time = my_ctx->last_op_time;
        wrkld_per_rank.end_time = start;
        wrkld_per_rank.u.delay.seconds = (start - my_ctx->last_op_time) / 1e9;
288
        dumpi_insert_next_op(my_ctx, &wrkld_per_rank); 
289
    }
290
    my_ctx->last_op_time = stop;
291 292
}

293 294 295 296 297 298 299 300
static int handleDUMPIInit(
        const dumpi_init *prm,
        uint16_t thread,
        const dumpi_time *cpu,
        const dumpi_time *wall,
        const dumpi_perfinfo *perf,
        void *uarg)
{
301 302 303 304 305 306
    (void)prm;
    (void)thread;
    (void)cpu;
    (void)wall;
    (void)perf;

307
    rank_mpi_context *myctx = (rank_mpi_context*)uarg;
308
    check_set_init_time(wall, myctx);
309 310 311
    return 0;
}

312 313
int handleDUMPIError(const void* prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *uarg)
{
314 315 316 317 318 319 320
    (void)prm;
    (void)thread;
    (void)cpu;
    (void)wall;
    (void)perf;
    (void)uarg;

321 322 323
    tw_error(TW_LOC, "\n MPI operation not supported by the MPI-Sim Layer ");
}

324
int handleDUMPIIgnore(const void* prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *uarg)
325
{
326 327 328 329 330 331 332
    (void)prm;
    (void)thread;
    (void)cpu;
    (void)wall;
    (void)perf;
	
    rank_mpi_context* myctx = (rank_mpi_context*)uarg;
333

334
    check_set_init_time(wall, myctx);
335
	update_compute_time(wall, myctx);
336 337 338 339

	return 0;
}

340 341 342 343 344
static void update_times_and_insert(
        struct codes_workload_op *op,
        const dumpi_time *t,
        rank_mpi_context *ctx)
{
345
    check_set_init_time(t, ctx);
346 347 348
    op->start_time = time_to_ns_lf(t->start) - ctx->init_time;
    op->end_time = time_to_ns_lf(t->stop) - ctx->init_time;
    update_compute_time(t, ctx);
349
    dumpi_insert_next_op(ctx, op);
350 351 352
}


353 354 355 356
int handleDUMPIWait(const dumpi_wait *prm, uint16_t thread,
                    const dumpi_time *cpu, const dumpi_time *wall,
                    const dumpi_perfinfo *perf, void *userarg)
{
357 358 359 360 361
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)perf;
        
362 363 364
        rank_mpi_context* myctx = (rank_mpi_context*)userarg;
        struct codes_workload_op wrkld_per_rank;

365
        wrkld_per_rank.op_type = CODES_WK_WAIT;
366 367
        wrkld_per_rank.u.wait.req_id = prm->request;

368 369
        update_times_and_insert(&wrkld_per_rank, wall, myctx);

370 371 372 373 374 375 376
        return 0;
}

int handleDUMPIWaitsome(const dumpi_waitsome *prm, uint16_t thread,
                    const dumpi_time *cpu, const dumpi_time *wall,
                    const dumpi_perfinfo *perf, void *userarg)
{
377 378 379 380 381
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
        
382 383 384 385
        int i;
        rank_mpi_context* myctx = (rank_mpi_context*)userarg;
        struct codes_workload_op wrkld_per_rank;

386
        wrkld_per_rank.op_type = CODES_WK_WAITSOME;
387
        wrkld_per_rank.u.waits.count = prm->count;
388
        wrkld_per_rank.u.waits.req_ids = (unsigned int*)malloc(prm->count * sizeof(unsigned int));
389 390

        for( i = 0; i < prm->count; i++ )
391
                wrkld_per_rank.u.waits.req_ids[i] = prm->requests[i];
392

393
        update_times_and_insert(&wrkld_per_rank, wall, myctx);
394 395 396 397 398 399 400
        return 0;
}

int handleDUMPIWaitany(const dumpi_waitany *prm, uint16_t thread,
                    const dumpi_time *cpu, const dumpi_time *wall,
                    const dumpi_perfinfo *perf, void *userarg)
{
401 402 403 404 405 406
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
        
407 408 409 410
        int i;
        rank_mpi_context* myctx = (rank_mpi_context*)userarg;
        struct codes_workload_op wrkld_per_rank;

411
        wrkld_per_rank.op_type = CODES_WK_WAITANY;
412
        wrkld_per_rank.u.waits.count = prm->count;
413
        wrkld_per_rank.u.waits.req_ids = (unsigned int*)malloc(prm->count * sizeof(unsigned int));
414 415

        for( i = 0; i < prm->count; i++ )
416
                wrkld_per_rank.u.waits.req_ids[i] = prm->requests[i];
417

418
        update_times_and_insert(&wrkld_per_rank, wall, myctx);
419 420 421 422 423 424 425
        return 0;
}

int handleDUMPIWaitall(const dumpi_waitall *prm, uint16_t thread,
                    const dumpi_time *cpu, const dumpi_time *wall,
                    const dumpi_perfinfo *perf, void *userarg)
{
426 427 428 429 430
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
431
        int i;
432
        
433 434 435
        rank_mpi_context* myctx = (rank_mpi_context*)userarg;
        struct codes_workload_op wrkld_per_rank;

436
        wrkld_per_rank.op_type = CODES_WK_WAITALL;
437 438

        wrkld_per_rank.u.waits.count = prm->count;
439
        wrkld_per_rank.u.waits.req_ids = (unsigned int*)malloc(prm->count * sizeof(unsigned int));
440 441 442
        for( i = 0; i < prm->count; i++ )
                wrkld_per_rank.u.waits.req_ids[i] = prm->requests[i];

443
        update_times_and_insert(&wrkld_per_rank, wall, myctx);
444 445 446
        return 0;
}

447 448
int handleDUMPIISend(const dumpi_isend *prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *userarg)
{
449 450 451 452 453 454 455 456
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
	
        rank_mpi_context* myctx = (rank_mpi_context*)userarg;
        struct codes_workload_op wrkld_per_rank;
457

458 459 460 461 462
        wrkld_per_rank.op_type = CODES_WK_ISEND;
        wrkld_per_rank.u.send.tag = prm->tag;
        wrkld_per_rank.u.send.count = prm->count;
        wrkld_per_rank.u.send.data_type = prm->datatype;
        wrkld_per_rank.u.send.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
463
        
464
        assert(wrkld_per_rank.u.send.num_bytes >= 0);
465
    	wrkld_per_rank.u.send.req_id = prm->request;
466 467
        wrkld_per_rank.u.send.dest_rank = prm->dest;
        wrkld_per_rank.u.send.source_rank = myctx->my_rank;
468 469

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
470 471
	
        return 0;
472 473 474 475
}

int handleDUMPIIRecv(const dumpi_irecv *prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *userarg)
{
476 477 478 479 480 481 482 483
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
	
        //printf("\n irecv source %d count %d data type %d", prm->source, prm->count, prm->datatype);
        rank_mpi_context* myctx = (rank_mpi_context*)userarg;
484 485 486
        struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_IRECV;
487 488 489
	    wrkld_per_rank.u.recv.data_type = prm->datatype;
	    wrkld_per_rank.u.recv.count = prm->count;
	    wrkld_per_rank.u.recv.tag = prm->tag;
490
        wrkld_per_rank.u.recv.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
491
	    
492
        assert(wrkld_per_rank.u.recv.num_bytes >= 0);
493 494
        wrkld_per_rank.u.recv.source_rank = prm->source;
        wrkld_per_rank.u.recv.dest_rank = -1;
495
	    wrkld_per_rank.u.recv.req_id = prm->request;
496 497

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
498 499 500 501 502 503 504
        return 0;
}

int handleDUMPISend(const dumpi_send *prm, uint16_t thread,
                      const dumpi_time *cpu, const dumpi_time *wall,
                      const dumpi_perfinfo *perf, void *uarg)
{
505 506 507 508 509 510 511
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
	    
        rank_mpi_context* myctx = (rank_mpi_context*)uarg;
512 513 514
        struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_SEND;
515
	    wrkld_per_rank.u.send.tag = prm->tag;
516 517
        wrkld_per_rank.u.send.count = prm->count;
        wrkld_per_rank.u.send.data_type = prm->datatype;
518
        wrkld_per_rank.u.send.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
519
	    assert(wrkld_per_rank.u.send.num_bytes >= 0);
520 521
        wrkld_per_rank.u.send.dest_rank = prm->dest;
        wrkld_per_rank.u.send.source_rank = myctx->my_rank;
522
         wrkld_per_rank.u.send.req_id = -1;
523

524 525
        
         update_times_and_insert(&wrkld_per_rank, wall, myctx);
526 527 528 529 530 531 532
        return 0;
}

int handleDUMPIRecv(const dumpi_recv *prm, uint16_t thread,
                      const dumpi_time *cpu, const dumpi_time *wall,
                      const dumpi_perfinfo *perf, void *uarg)
{
533 534 535 536 537 538
     (void)prm;
     (void)thread;
     (void)cpu;
     (void)wall;
     (void)perf;

539 540 541 542
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	struct codes_workload_op wrkld_per_rank;

	wrkld_per_rank.op_type = CODES_WK_RECV;
543 544 545
    wrkld_per_rank.u.recv.tag = prm->tag;
    wrkld_per_rank.u.recv.count = prm->count;
    wrkld_per_rank.u.recv.data_type = prm->datatype;
546
    wrkld_per_rank.u.recv.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
547
	assert(wrkld_per_rank.u.recv.num_bytes >= 0);
548
	wrkld_per_rank.u.recv.req_id = -1;
549 550
    wrkld_per_rank.u.recv.source_rank = prm->source;
    wrkld_per_rank.u.recv.dest_rank = -1;
551

552
	//printf("\n recv source %d count %d data type %d bytes %lld ", prm->source, prm->count, prm->datatype, wrkld_per_rank.u.recv.num_bytes);
553 554
    update_times_and_insert(&wrkld_per_rank, wall, myctx);
    return 0;
555 556 557

}

558 559 560 561
int handleDUMPISendrecv(const dumpi_sendrecv* prm, uint16_t thread,
			const dumpi_time *cpu, const dumpi_time *wall,
			const dumpi_perfinfo *perf, void *uarg)
{
562 563 564 565 566 567 568
     (void)prm;
     (void)thread;
     (void)cpu;
     (void)wall;
     (void)perf;
	
     rank_mpi_context* myctx = (rank_mpi_context*)uarg;
569

570
     /* Issue a non-blocking send */
571 572
	{
		struct codes_workload_op wrkld_per_rank;
573
		wrkld_per_rank.op_type = CODES_WK_ISEND;
574 575 576 577
		wrkld_per_rank.u.send.tag = prm->sendtag;
		wrkld_per_rank.u.send.count = prm->sendcount;
		wrkld_per_rank.u.send.data_type = prm->sendtype;
		wrkld_per_rank.u.send.num_bytes = prm->sendcount * get_num_bytes(myctx,prm->sendtype);
578

579 580
		
        assert(wrkld_per_rank.u.send.num_bytes >= 0);
581 582
		wrkld_per_rank.u.send.dest_rank = prm->dest;
		wrkld_per_rank.u.send.source_rank = myctx->my_rank;
583
		wrkld_per_rank.u.send.req_id = myctx->num_reqs;
584
		update_times_and_insert(&wrkld_per_rank, wall, myctx);
585

586
	}
587
    /* issue a blocking receive */
588 589 590 591 592 593 594
	{
		struct codes_workload_op wrkld_per_rank;
		wrkld_per_rank.op_type = CODES_WK_RECV;
		wrkld_per_rank.u.recv.tag = prm->recvtag;
		wrkld_per_rank.u.recv.count = prm->recvcount;
		wrkld_per_rank.u.recv.data_type = prm->recvtype;
		wrkld_per_rank.u.recv.num_bytes = prm->recvcount * get_num_bytes(myctx,prm->recvtype);
595 596

        assert(wrkld_per_rank.u.recv.num_bytes >= 0);
597 598
		wrkld_per_rank.u.recv.source_rank = prm->source;
		wrkld_per_rank.u.recv.dest_rank = -1;
599
	    wrkld_per_rank.u.recv.req_id = -1;
600 601
		update_times_and_insert(&wrkld_per_rank, wall, myctx);
	}
602 603 604 605 606 607 608 609 610 611 612 613 614
    
    /* Issue a wait operation */
    {
        struct codes_workload_op wrkld_per_rank;

        wrkld_per_rank.op_type = CODES_WK_WAIT;
        wrkld_per_rank.u.wait.req_id = myctx->num_reqs;

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
    
        myctx->num_reqs++;
    }

615

616 617 618
	return 0;
}

619 620 621 622
int handleDUMPIBcast(const dumpi_bcast *prm, uint16_t thread,
                       const dumpi_time *cpu, const dumpi_time *wall,
                       const dumpi_perfinfo *perf, void *uarg)
{
623 624 625 626 627 628 629
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
        rank_mpi_context* myctx = (rank_mpi_context*)uarg;
        struct codes_workload_op wrkld_per_rank;
630 631

        wrkld_per_rank.op_type = CODES_WK_BCAST;
632
        wrkld_per_rank.u.collective.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
633
	    assert(wrkld_per_rank.u.collective.num_bytes >= 0);
634 635

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
636 637 638 639 640 641 642
        return 0;
}

int handleDUMPIAllgather(const dumpi_allgather *prm, uint16_t thread,
                           const dumpi_time *cpu, const dumpi_time *wall,
                           const dumpi_perfinfo *perf, void *uarg)
{
643 644 645 646 647
    (void)prm;
    (void)thread;
    (void)cpu;
    (void)wall;
    (void)perf;
648 649 650
	rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	struct codes_workload_op wrkld_per_rank;

651 652
    wrkld_per_rank.op_type = CODES_WK_ALLGATHER;
    wrkld_per_rank.u.collective.num_bytes = prm->sendcount * get_num_bytes(myctx,prm->sendtype);
653
	assert(wrkld_per_rank.u.collective.num_bytes > 0);
654

655 656
    update_times_and_insert(&wrkld_per_rank, wall, myctx);
    return 0;
657 658 659 660 661 662
}

int handleDUMPIAllgatherv(const dumpi_allgatherv *prm, uint16_t thread,
                            const dumpi_time *cpu, const dumpi_time *wall,
                            const dumpi_perfinfo *perf, void *uarg)
{
663 664 665 666 667 668 669
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
	    rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	    struct codes_workload_op wrkld_per_rank;
670 671

        wrkld_per_rank.op_type = CODES_WK_ALLGATHERV;
672
        wrkld_per_rank.u.collective.num_bytes = prm->sendcount * get_num_bytes(myctx,prm->sendtype);
673
	    assert(wrkld_per_rank.u.collective.num_bytes > 0);
674 675

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
676 677 678 679 680 681 682
        return 0;
}

int handleDUMPIAlltoall(const dumpi_alltoall *prm, uint16_t thread,
                          const dumpi_time *cpu, const dumpi_time *wall,
                          const dumpi_perfinfo *perf, void *uarg)
{
683 684 685 686 687 688 689
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
	    rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	    struct codes_workload_op wrkld_per_rank;
690 691

        wrkld_per_rank.op_type = CODES_WK_ALLTOALL;
692
        wrkld_per_rank.u.collective.num_bytes = prm->sendcount * get_num_bytes(myctx,prm->sendtype);
693
	    assert(wrkld_per_rank.u.collective.num_bytes > 0);
694 695

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
696 697 698 699 700 701 702
        return 0;
}

int handleDUMPIAlltoallv(const dumpi_alltoallv *prm, uint16_t thread,
                           const dumpi_time *cpu, const dumpi_time *wall,
                           const dumpi_perfinfo *perf, void *uarg)
{
703 704 705 706 707 708 709 710
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
	
        rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	    struct codes_workload_op wrkld_per_rank;
711 712

        wrkld_per_rank.op_type = CODES_WK_ALLTOALLV;
713
        wrkld_per_rank.u.collective.num_bytes = prm->sendcounts[0] * get_num_bytes(myctx,prm->sendtype);
714
	    assert(wrkld_per_rank.u.collective.num_bytes > 0);
715 716

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
717 718 719 720 721 722 723
        return 0;
}

int handleDUMPIReduce(const dumpi_reduce *prm, uint16_t thread,
                        const dumpi_time *cpu, const dumpi_time *wall,
                        const dumpi_perfinfo *perf, void *uarg)
{
724 725 726 727 728 729 730 731
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
	
        rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	    struct codes_workload_op wrkld_per_rank;
732 733

        wrkld_per_rank.op_type = CODES_WK_REDUCE;
734
        wrkld_per_rank.u.collective.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
735
	    assert(wrkld_per_rank.u.collective.num_bytes > 0);
736 737

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
738 739 740 741 742 743 744
        return 0;
}

int handleDUMPIAllreduce(const dumpi_allreduce *prm, uint16_t thread,
                           const dumpi_time *cpu, const dumpi_time *wall,
                           const dumpi_perfinfo *perf, void *uarg)
{
745 746 747 748 749 750 751 752
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
	
        rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	    struct codes_workload_op wrkld_per_rank;
753 754

        wrkld_per_rank.op_type = CODES_WK_ALLREDUCE;
755
        wrkld_per_rank.u.collective.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
756
	    assert(wrkld_per_rank.u.collective.num_bytes > 0);
757 758

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
759 760 761 762 763
        return 0;
}

int handleDUMPIFinalize(const dumpi_finalize *prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *uarg)
{
764 765 766 767 768 769 770 771
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
	
        rank_mpi_context* myctx = (rank_mpi_context*)uarg;
	    struct codes_workload_op wrkld_per_rank;
772 773

        wrkld_per_rank.op_type = CODES_WK_END;
774 775

        update_times_and_insert(&wrkld_per_rank, wall, myctx);
776 777 778
        return 0;
}

779 780
int handleDUMPIReqFree(const dumpi_request_free *prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *userarg)
{
781 782 783 784 785 786 787 788
        (void)prm;
        (void)thread;
        (void)cpu;
        (void)wall;
        (void)perf;
    
        rank_mpi_context* myctx = (rank_mpi_context*)userarg;
        struct codes_workload_op wrkld_per_rank;
789

790 791
        wrkld_per_rank.op_type = CODES_WK_REQ_FREE;
        wrkld_per_rank.u.free.req_id = prm->request;
792

793 794
        update_times_and_insert(&wrkld_per_rank, wall, myctx);
        return 0;
795 796
}

797 798
static int hash_rank_compare(void *key, struct qhash_head *link)
{
799
    rank_mpi_compare *in = key;
800 801 802
    rank_mpi_context *tmp;

    tmp = qhash_entry(link, rank_mpi_context, hash_link);
803
    if (tmp->my_rank == in->rank && tmp->my_app_id == in->app)
804 805 806 807
        return 1;
    return 0;
}

808
int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank)
809 810
{
	libundumpi_callbacks callbacks;
811 812

    PROFILE_TYPE profile;
813
	dumpi_trace_params* dumpi_params = (dumpi_trace_params*)params;
814
	char file_name[MAX_LENGTH_FILE];
815 816 817 818

	if(rank >= dumpi_params->num_net_traces)
		return -1;

819
    int hash_size = (dumpi_params->num_net_traces / dumpi_params->nprocs) + 1;
820 821
	if(!rank_tbl)
    	{
822
            rank_tbl = qhash_init(hash_rank_compare, quickhash_64bit_hash, hash_size);
823 824 825 826 827 828 829 830
            if(!rank_tbl)
                  return -1;
    	}
	
	rank_mpi_context *my_ctx;
	my_ctx = malloc(sizeof(rank_mpi_context));
	assert(my_ctx);
	my_ctx->my_rank = rank;
831
    my_ctx->my_app_id = app_id;
832
	my_ctx->last_op_time = 0.0;
833 834
    my_ctx->is_init = 0;
    my_ctx->num_reqs = 0;
835
    dumpi_init_op_data(my_ctx);
836
    my_ctx->num_ops = 0;
837 838 839 840 841 842 843 844 845

	if(rank < 10)
            sprintf(file_name, "%s000%d.bin", dumpi_params->file_name, rank);
         else if(rank >=10 && rank < 100)
            sprintf(file_name, "%s00%d.bin", dumpi_params->file_name, rank);
           else if(rank >=100 && rank < 1000)
             sprintf(file_name, "%s0%d.bin", dumpi_params->file_name, rank);
             else
              sprintf(file_name, "%s%d.bin", dumpi_params->file_name, rank);
846
#ifdef ENABLE_CORTEX
847 848 849 850 851
	if(strcmp(dumpi_params->file_name,"none") == 0) {
		profile = cortex_undumpi_open(NULL, app_id, dumpi_params->num_net_traces, rank);
	} else {
		profile = cortex_undumpi_open(file_name, app_id, dumpi_params->num_net_traces, rank);
	}
852
	
853 854 855 856 857 858 859 860 861 862 863 864 865 866 867
	{ int i;
	for(i=0; i < dumpi_params->num_net_traces; i++) {
		struct codes_jobmap_id id = {
			.job = app_id,
			.rank = i
		};
		uint32_t cn_id;
		if(jobmap_ctx) {
			cn_id = codes_jobmap_to_global_id(id, jobmap_ctx);
		} else {
			cn_id = i;
		}
		cortex_placement_set(profile, i, cn_id);
	}
	}
868
	
869
	cortex_topology_set(profile,&model_net_topology);
870
#else
871
	profile =  undumpi_open(file_name);
872
#endif
873
        my_ctx->profile = profile;
874 875 876 877 878 879
        if(NULL == profile) {
                printf("Error: unable to open DUMPI trace: %s", file_name);
                exit(-1);
        }
	
	memset(&callbacks, 0, sizeof(libundumpi_callbacks));
880
        memset(&my_ctx->callarr, 0, sizeof(libundumpi_cbpair) * DUMPI_END_OF_STREAM);
881 882 883
#ifdef ENABLE_CORTEX
	memset(&transarr, 0, sizeof(libundumpi_cbpair) * DUMPI_END_OF_STREAM);
#endif
884 885

	/* handle MPI function calls */	        
886
        callbacks.on_init = handleDUMPIInit;
887 888 889 890 891 892
	callbacks.on_send = (dumpi_send_call)handleDUMPISend;
        callbacks.on_recv = (dumpi_recv_call)handleDUMPIRecv;
        callbacks.on_isend = (dumpi_isend_call)handleDUMPIISend;
        callbacks.on_irecv = (dumpi_irecv_call)handleDUMPIIRecv;
        callbacks.on_allreduce = (dumpi_allreduce_call)handleDUMPIAllreduce;
	callbacks.on_bcast = (dumpi_bcast_call)handleDUMPIBcast;
893 894 895 896 897 898 899 900 901
	callbacks.on_get_count = (dumpi_get_count_call)handleDUMPIIgnore;
	callbacks.on_bsend = (dumpi_bsend_call)handleDUMPIIgnore;
	callbacks.on_ssend = (dumpi_ssend_call)handleDUMPIIgnore;
	callbacks.on_rsend = (dumpi_rsend_call)handleDUMPIIgnore;
	callbacks.on_buffer_attach = (dumpi_buffer_attach_call)handleDUMPIIgnore;
	callbacks.on_buffer_detach = (dumpi_buffer_detach_call)handleDUMPIIgnore;
	callbacks.on_ibsend = (dumpi_ibsend_call)handleDUMPIIgnore;
	callbacks.on_issend = (dumpi_issend_call)handleDUMPIIgnore;
	callbacks.on_irsend = (dumpi_irsend_call)handleDUMPIIgnore;
902
	callbacks.on_wait = (dumpi_wait_call)handleDUMPIWait;
903
	callbacks.on_test = (dumpi_test_call)handleDUMPIIgnore;
904
	callbacks.on_request_free = (dumpi_request_free_call)handleDUMPIReqFree;
905
	callbacks.on_waitany = (dumpi_waitany_call)handleDUMPIWaitany;
906
	callbacks.on_testany = (dumpi_testany_call)handleDUMPIIgnore;
907
	callbacks.on_waitall = (dumpi_waitall_call)handleDUMPIWaitall;
908
	callbacks.on_testall = (dumpi_testall_call)handleDUMPIIgnore;
909
	callbacks.on_waitsome = (dumpi_waitsome_call)handleDUMPIWaitsome;
910 911 912 913 914 915 916 917 918 919 920 921
	callbacks.on_testsome = (dumpi_testsome_call)handleDUMPIIgnore;
	callbacks.on_iprobe = (dumpi_iprobe_call)handleDUMPIIgnore;
	callbacks.on_probe = (dumpi_probe_call)handleDUMPIIgnore;
	callbacks.on_cancel = (dumpi_cancel_call)handleDUMPIIgnore;
	callbacks.on_test_cancelled = (dumpi_test_cancelled_call)handleDUMPIIgnore;
	callbacks.on_send_init = (dumpi_send_init_call)handleDUMPIIgnore;
	callbacks.on_bsend_init = (dumpi_bsend_init_call)handleDUMPIIgnore;
	callbacks.on_ssend_init = (dumpi_ssend_init_call)handleDUMPIIgnore;
	callbacks.on_rsend_init = (dumpi_rsend_init_call)handleDUMPIIgnore;
	callbacks.on_recv_init = (dumpi_recv_init_call)handleDUMPIIgnore;
	callbacks.on_start = (dumpi_start_call)handleDUMPIIgnore;
	callbacks.on_startall = (dumpi_startall_call)handleDUMPIIgnore;
922
	callbacks.on_sendrecv = (dumpi_sendrecv_call)handleDUMPISendrecv;
923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941
	callbacks.on_sendrecv_replace = (dumpi_sendrecv_replace_call)handleDUMPIIgnore;
	callbacks.on_type_contiguous = (dumpi_type_contiguous_call)handleDUMPIIgnore;
	callbacks.on_barrier = (dumpi_barrier_call)handleDUMPIIgnore;
        callbacks.on_gather = (dumpi_gather_call)handleDUMPIIgnore;
        callbacks.on_gatherv = (dumpi_gatherv_call)handleDUMPIIgnore;
        callbacks.on_scatter = (dumpi_scatter_call)handleDUMPIIgnore;
        callbacks.on_scatterv = (dumpi_scatterv_call)handleDUMPIIgnore;
        callbacks.on_allgather = (dumpi_allgather_call)handleDUMPIIgnore;
        callbacks.on_allgatherv = (dumpi_allgatherv_call)handleDUMPIIgnore;
        callbacks.on_alltoall = (dumpi_alltoall_call)handleDUMPIIgnore;
        callbacks.on_alltoallv = (dumpi_alltoallv_call)handleDUMPIIgnore;
        callbacks.on_alltoallw = (dumpi_alltoallw_call)handleDUMPIIgnore;
        callbacks.on_reduce = (dumpi_reduce_call)handleDUMPIIgnore;
        callbacks.on_reduce_scatter = (dumpi_reduce_scatter_call)handleDUMPIIgnore;
        callbacks.on_group_size = (dumpi_group_size_call)handleDUMPIIgnore;
        callbacks.on_group_rank = (dumpi_group_rank_call)handleDUMPIIgnore;
        callbacks.on_comm_size = (dumpi_comm_size_call)handleDUMPIIgnore;
        callbacks.on_comm_rank = (dumpi_comm_rank_call)handleDUMPIIgnore;
        callbacks.on_comm_get_attr = (dumpi_comm_get_attr_call)handleDUMPIIgnore;
942 943
        callbacks.on_comm_dup = (dumpi_comm_dup_call)handleDUMPIError;
        callbacks.on_comm_create = (dumpi_comm_create_call)handleDUMPIError;
944
        callbacks.on_wtime = (dumpi_wtime_call)handleDUMPIIgnore;
945 946
        callbacks.on_finalize = (dumpi_finalize_call)handleDUMPIFinalize;

947
        libundumpi_populate_callbacks(&callbacks, my_ctx->callarr);
948

949
#ifdef ENABLE_CORTEX
950
#ifdef ENABLE_CORTEX_PYTHON
951
	if(dumpi_params->cortex_script[0] != 0) {
952
		libundumpi_populate_callbacks(CORTEX_PYTHON_TRANSLATION, my_ctx->transarr);
953
	} else {
954
		libundumpi_populate_callbacks(CORTEX_MPICH_TRANSLATION, my_ctx->transarr);
955
	}
956
#else
957
	libundumpi_populate_callbacks(CORTEX_MPICH_TRANSLATION, my_ctx->transarr);
958
#endif
959 960
#endif
        DUMPI_START_STREAM_READ(profile);
961 962
        //dumpi_header* trace_header = undumpi_read_header(profile);
        //dumpi_free_header(trace_header);
963

964
#ifdef ENABLE_CORTEX_PYTHON
965 966 967 968 969 970 971 972 973 974
	if(dumpi_params->cortex_script[0] != 0) {
		if(dumpi_params->cortex_class[0] != 0) {
			cortex_python_set_module(dumpi_params->cortex_script, dumpi_params->cortex_class);
		} else {
			cortex_python_set_module(dumpi_params->cortex_script, NULL);
		}
		if(dumpi_params->cortex_gen[0] != 0) {
			cortex_python_call_generator(profile, dumpi_params->cortex_gen);
		}
	}
975 976
#endif

977
#if 0
978 979 980 981 982 983
        int finalize_reached = 0;
        int active = 1;
        int num_calls = 0;
        while(active && !finalize_reached)
        {
           num_calls++;
984
           my_ctx->num_ops++;
985
#ifdef ENABLE_CORTEX
986 987 988 989 990 991 992 993 994 995 996 997
           if(my_ctx->num_ops < max_threshold)
	        active = cortex_undumpi_read_single_call(profile, callarr, transarr, (void*)my_ctx, &finalize_reached);
           else
           {
                struct codes_workload_op op;
                op.op_type = CODES_WK_END;

                op.start_time = my_ctx->last_op_time;
                op.end_time = my_ctx->last_op_time + 1;
                dumpi_insert_next_op(my_ctx->dumpi_mpi_array, &op);
                break;
           }
998
#else
999
           active = undumpi_read_single_call(profile, callarr, (void*)my_ctx, &finalize_reached);
1000
#endif
1001
        }
1002 1003 1004 1005 1006 1007 1008 1009

        // now that DUMPI events are read on the fly, we shouldn't close the provider here
#endif

#if 0
    UNDUMPI_CLOSE(profile);
	dumpi_finalize_mpi_op_data(my_ctx);
#endif
1010
	/* add this rank context to hash table */	
1011 1012 1013 1014
        rank_mpi_compare cmp;
        cmp.app = my_ctx->my_app_id;
        cmp.rank = my_ctx->my_rank;
	qhash_add(rank_tbl, &cmp, &(my_ctx->hash_link));
1015 1016 1017 1018
	rank_tbl_pop++;

	return 0;
}
1019 1020 1021
/* Data types are for 64-bit archs. Source:
 * https://www.tutorialspoint.com/cprogramming/c_data_types.htm 
 * */
1022
static uint64_t get_num_bytes(rank_mpi_context* myctx, dumpi_datatype dt)
1023
{
1024 1025
    (void)myctx;

1026 1027 1028
#ifdef ENABLE_CORTEX
   return cortex_datatype_get_size(myctx->profile,dt);
#endif
1029 1030 1031 1032
   switch(dt)
   {
	case DUMPI_DATATYPE_ERROR:
	case DUMPI_DATATYPE_NULL:
1033
		tw_error(TW_LOC, "\n data type error");
1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053
	break;

	case DUMPI_CHAR:
	case DUMPI_UNSIGNED_CHAR:
	case DUMPI_SIGNED_CHAR:
	case DUMPI_BYTE:
		return 1; /* 1 byte for char */
	break;

	case DUMPI_WCHAR:
		return 4; /* 4 bytes for a 64-bit version */
	break;

	case DUMPI_SHORT:
	case DUMPI_SHORT_INT:
	case DUMPI_UNSIGNED_SHORT:
		return 2;
	break;

	 case DUMPI_INT:
1054 1055 1056
		return 4;
	 break;

1057
	 case DUMPI_UNSIGNED:
1058 1059 1060
     return 4;
     break;

1061 1062
	 case DUMPI_FLOAT:
	 case DUMPI_FLOAT_INT:
1063 1064
        return 4;
     break;
1065 1066

	case DUMPI_DOUBLE:
1067 1068 1069
     return 8;
    break;

1070
	case DUMPI_LONG:
1071 1072 1073
     return 8;
     break;

1074
	case DUMPI_LONG_INT:
1075 1076 1077
     return 8;
     break;

1078
	case DUMPI_UNSIGNED_LONG:
1079 1080 1081
     return 8;
     break;

1082
	case DUMPI_LONG_LONG_INT:
1083 1084 1085
     return 8;
     break;

1086
	case DUMPI_UNSIGNED_LONG_LONG:
1087 1088 1089
     return 8;
     break;

1090
	case DUMPI_LONG_LONG:
1091 1092 1093
     return 8;
     break;

1094 1095 1096 1097 1098
	case DUMPI_DOUBLE_INT:
		return 8;
	break;

	case DUMPI_LONG_DOUBLE_INT:
1099 1100 1101 1102
	case DUMPI_LONG_DOUBLE:
        return 10;
        break;

1103 1104
	default:
	  {
1105
        tw_error(TW_LOC, "\n undefined data type");
1106 1107 1108 1109 1110
		return 0;	
	  }	
   } 
}

1111 1112
void dumpi_trace_nw_workload_get_next_rc2(int app_id, int rank)
{
1113
    rank_mpi_context* ctx; 
1114 1115 1116 1117 1118 1119 1120 1121
    struct qhash_head *hash_link = NULL;  
    rank_mpi_compare cmp;  
    cmp.rank = rank;
    cmp.app = app_id;

    hash_link = qhash_search(rank_tbl, &cmp);

    assert(hash_link);
1122 1123
    ctx = qhash_entry(hash_link, rank_mpi_context, hash_link); 
    assert(ctx);
1124

1125
    dumpi_roll_back_prev_op(ctx);
1126
}
1127
void dumpi_trace_nw_workload_get_next(int app_id, int rank, struct codes_workload_op *op)
1128
{
1129
   rank_mpi_context* ctx;
1130
   struct qhash_head *hash_link = NULL;
1131 1132 1133 1134
   rank_mpi_compare cmp;
   cmp.rank = rank;
   cmp.app = app_id;
   hash_link = qhash_search(rank_tbl, &cmp);
1135 1136
   if(!hash_link)
   {
yangxuserene's avatar
yangxuserene committed
1137
      printf("\n not found for rank id %d , %d", rank, app_id);
1138 1139 1140
      op->op_type = CODES_WK_END;
      return;
   }
1141 1142
  ctx = qhash_entry(hash_link, rank_mpi_context, hash_link);
  assert(ctx);
1143

1144
  struct codes_workload_op mpi_op;
1145
  dumpi_remove_next_op(ctx, &mpi_op, ctx->last_op_time);
1146
  *op = mpi_op;
1147 1148 1149 1150 1151 1152 1153
  return;
}

/* implements the codes workload method */
struct codes_workload_method dumpi_trace_workload_method =
{
    .method_name = "dumpi-trace-workload",
1154
    .codes_workload_read_config = NULL,
1155 1156
    .codes_workload_load = dumpi_trace_nw_workload_load,
    .codes_workload_get_next = dumpi_trace_nw_workload_get_next,
1157
    .codes_workload_get_next_rc2 = dumpi_trace_nw_workload_get_next_rc2
1158
};
Jonathan Jenkins's avatar
Jonathan Jenkins committed
1159 1160 1161 1162 1163

/*
 * Local variables:
 *  c-indent-level: 4
 *  c-basic-offset: 4
1164
 *  indent-tabs-mode: nil
Jonathan Jenkins's avatar
Jonathan Jenkins committed
1165 1166 1167 1168
 * End:
 *
 * vim: ft=c ts=8 sts=4 sw=4 expandtab
 */