codes-workload-mpi-replay.c 17.6 KB
Newer Older
1 2 3 4 5 6 7 8
/*
 * Copyright (C) 2013 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

/* SUMMARY:
 *
9
 *  MPI replay tool for replaying workloads from the codes workload API.
10 11 12 13 14 15 16 17 18 19
 *
 */

#include <stdio.h>
#include <unistd.h>
#include <assert.h>
#include <inttypes.h>
#include <math.h>
#include <getopt.h>
#include <fcntl.h>
20
#include <sys/stat.h>
21 22 23 24
#include <mpi.h>

#include "codes/codes-workload.h"
#include "codes/quickhash.h"
25
#include "codes/configuration.h"
26

27
#define DEBUG_PROFILING 0
28

29
/* hash table entry for looking up file descriptor of a workload file id */
30
struct file_info
31 32 33 34 35 36 37 38 39 40 41 42
{
    struct qlist_head hash_link;
    uint64_t file_hash;
    int file_descriptor;
};

int replay_workload_op(struct codes_workload_op replay_op, int rank, long long int op_number);
int hash_file_compare(void *key, struct qlist_head *link);

/* command line options */
static int opt_verbose = 0;
static int opt_noop = 0;
43
static double opt_delay_pct = 1.0;
44 45 46 47

/* hash table for storing file descriptors of opened files */
static struct qhash_table *fd_table = NULL;

48 49 50
/* file stream to log rank events to, if verbose turned on */
static FILE *log_stream = NULL;

51
/* global variables for profiling different portions of the replay, if enabled */
52
#if DEBUG_PROFILING
53 54 55 56 57 58
static double total_open_time = 0.0;
static double total_close_time = 0.0;
static double total_read_time = 0.0;
static double total_write_time = 0.0;
static double total_delay_time = 0.0;
static double total_barrier_time = 0.0;
59
#endif
60

61 62
void usage(char *exename)
{
63 64
    fprintf(stderr, "Usage: %s [OPTIONS] --conf <conf_file_path>\n       "
            "--test-dir <workload_test_dir>\n\n", exename);
65
    fprintf(stderr, "\t<conf_file_path> : (absolute) path to a valid workload configuration file\n");
66 67
    fprintf(stderr, "\t<workload_test_dir> : the directory to replay the workload I/O in\n");
    fprintf(stderr, "\n\t[OPTIONS] includes:\n");
68
    fprintf(stderr, "\t\t--noop : do not perform i/o\n");
69
    fprintf(stderr, "\t\t    -v : verbose (output i/o details)\n");
70 71 72 73

    exit(1);
}

74
void parse_args(int argc, char **argv, char **conf_path, char **test_dir)
75 76 77 78
{
    int index;
    static struct option long_opts[] =
    {
79
        {"conf", 1, NULL, 'c'},
80
        {"test-dir", 1, NULL, 'd'},
81
        {"noop", 0, NULL, 'n'},
82
        {"delay", 1, NULL, 'p'},
83 84 85 86
        {"help", 0, NULL, 0},
        {0, 0, 0, 0}
    };

87
    *conf_path = NULL;
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
    *test_dir = NULL;
    while (1)
    {
        int c = getopt_long(argc, argv, "v", long_opts, &index);

        if (c == -1)
            break;

        switch (c)
        {
            case 'v':
                opt_verbose = 1;
                break;
            case 'n':
                opt_noop = 1;
                break;
104 105
            case 'c':
                *conf_path = optarg;
106 107 108 109
                break;
            case 'd':
                *test_dir = optarg;
                break;
110 111 112
            case 'p':
                opt_delay_pct = atof(optarg);
                break;
113 114 115 116 117 118 119 120
            case 0:
            case '?':
            default:
                usage(argv[0]);
                break;
        }
    }

121
    if (optind < argc || !(*conf_path) || !(*test_dir))
122 123 124 125 126 127 128
    {
        usage(argv[0]);
    }

    return;
}

129 130 131 132 133 134 135 136 137 138 139 140 141
int load_workload(char *conf_path, int rank)
{
    config_lpgroups_t paramconf;
    char workload_type[MAX_NAME_LENGTH_WKLD];

    /* load the config file across all ranks */
    configuration_load(conf_path, MPI_COMM_WORLD, &config);

    /* get the PARAMS group out of the config file */
    configuration_get_lpgroups(&config, "PARAMS", &paramconf);

    /* get the workload type out of PARAMS */
    configuration_get_value(&config, "PARAMS", "workload_type",
142
                            NULL, workload_type, MAX_NAME_LENGTH_WKLD);
143 144 145 146 147 148 149 150 151

    /* set up the workload parameters and load into the workload API */
    if (strcmp(workload_type, "darshan_io_workload") == 0)
    {
        struct darshan_params d_params;
        char aggregator_count[10];

        /* get the darshan params from the config file */
        configuration_get_value(&config, "PARAMS", "log_file_path",
152 153
                                NULL, d_params.log_file_path, MAX_NAME_LENGTH_WKLD);
        configuration_get_value(&config, "PARAMS", "aggregator_count", NULL, aggregator_count, 10);
154
        d_params.aggregator_cnt = atol(aggregator_count);
155 156 157 158 159 160 161 162 163 164

        return codes_workload_load(workload_type, (char *)&d_params, rank);
    }
    else if (strcmp(workload_type, "bgp_io_workload") == 0)
    {
        struct bgp_params b_params;
        char rank_count[10];

        /* get the bgp i/o params from the config file */
        configuration_get_value(&config, "PARAMS", "io_kernel_meta_path",
165
                                NULL, b_params.io_kernel_meta_path, MAX_NAME_LENGTH_WKLD);
166
        configuration_get_value(&config, "PARAMS", "bgp_config_file",
167 168
                                NULL, b_params.bgp_config_file, MAX_NAME_LENGTH_WKLD);
        configuration_get_value(&config, "PARAMS", "rank_count", NULL, rank_count, 10);
169 170 171
        strcpy(b_params.io_kernel_path, "");
        strcpy(b_params.io_kernel_def_path, "");
        b_params.num_cns = atoi(rank_count);
Shane Snyder's avatar
Shane Snyder committed
172
        b_params.use_relpath = 1;
173 174 175

        return codes_workload_load(workload_type, (char *)&b_params, rank);
    }
176 177
    else if (strcmp(workload_type, "recorder_io_workload") == 0) {
        struct recorder_params r_params;
178
        char nprocs[10];
179

180
        /* get the recorder params from the config file */
181
        configuration_get_value(&config, "PARAMS", "trace_dir_path",
182 183
                                NULL, r_params.trace_dir_path, MAX_NAME_LENGTH_WKLD);
        configuration_get_value(&config, "PARAMS", "nprocs", NULL, nprocs, 10);
184
        r_params.nprocs = atol(nprocs);
185 186 187 188

        return codes_workload_load(workload_type, (char *)&r_params, rank);

	}
189 190 191 192 193 194 195
    else
    {
        fprintf(stderr, "Error: Invalid workload type specified (%s)\n", workload_type);
        return -1;
    }
}

196 197
char* buf = NULL;

198 199
int main(int argc, char *argv[])
{
200
    char *conf_path;
201
    char *replay_test_path;
202 203
    char *log_dir = "log";
    char my_log_path[MAX_NAME_LENGTH_WKLD];
204 205 206 207 208
    int nprocs;
    int myrank;
    int workload_id;
    struct codes_workload_op next_op;
    long long int replay_op_number = 1;
209
    double load_start, load_end;
210
    int ret = 0;
211
    int i;
212 213

    /* parse command line args */
214
    parse_args(argc, argv, &conf_path, &replay_test_path);
215 216 217 218 219 220

    /* initialize MPI */
    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
    MPI_Comm_rank(MPI_COMM_WORLD, &myrank);

221 222 223
    /* start workload load timer */
    load_start = MPI_Wtime();

224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244
    /* change the working directory to be the test directory */
    ret = chdir(replay_test_path);
    if (ret < 0)
    {
        fprintf(stderr, "Unable to change to testing directory (%s)\n", strerror(errno));
        goto error_exit;
    }

    /* set the path for logging this rank's events, if verbose is turned on */
    if (opt_verbose)
    {
        mkdir(log_dir, 0755);
        snprintf(my_log_path, MAX_NAME_LENGTH_WKLD, "%s/rank-%d.log", log_dir, myrank);
        log_stream = fopen(my_log_path, "w");
        if (log_stream == NULL)
        {
            fprintf(stderr, "Unable to open log file %s\n", my_log_path);
            goto error_exit;
        }
    }

245 246
    /* initialize workload generator from config file */
    workload_id = load_workload(conf_path, myrank);
247 248 249 250 251 252 253 254 255 256 257 258 259
    if (workload_id < 0)
    {
        goto error_exit;
    }

    /* initialize hash table for storing file descriptors */
    fd_table = qhash_init(hash_file_compare, quickhash_64bit_hash, 29);
    if (!fd_table)
    {
        fprintf(stderr, "File descriptor hash table memory error\n");
        goto error_exit;
    }

260 261 262 263 264 265 266
    buf = malloc(16*1024*1024);
    assert(buf);
    for(i=0; i<16*1024*1024; i++)
    {
        buf[i] = '1';
    }

267 268 269
    /* synchronize before replay */
    MPI_Barrier(MPI_COMM_WORLD);

270 271 272
    /* loading is finished */
    load_end = MPI_Wtime();

Shane Snyder's avatar
Shane Snyder committed
273 274
    if (myrank == 0) printf("Note: Workload took %.2lf seconds to load.\n", load_end - load_start);

275 276 277 278 279 280 281 282
    /* replay loop */
    while (1)
    {
        /* get the next replay operation from the workload generator */
        codes_workload_get_next(workload_id, myrank, &next_op);

        if (next_op.op_type != CODES_WK_END)
        {
283 284

            if (next_op.op_type == CODES_WK_DELAY)
285
                next_op.u.delay.seconds *= opt_delay_pct;
286

287 288 289 290 291 292 293 294 295 296 297 298 299 300
            /* replay the next workload operation */
            ret = replay_workload_op(next_op, myrank, replay_op_number++);
            if (ret < 0)
            {
                break;
            }
        }
        else
        {
            /* workload replay for this rank is complete */
            break;
        }
    }

301
    if (log_stream)
302 303
        fclose(log_stream);

304
    /* destroy and finalize the file descriptor hash table */
305
    qhash_destroy_and_finalize(fd_table, struct file_info, hash_link, free);
306

307 308 309 310 311 312
#if DEBUG_PROFILING
    printf("Rank %d:\td=%.4lf, b=%.4lf, o=%.4lf, c=%.4lf, r=%.4lf, w=%.4lf\n",
           myrank, total_delay_time, total_barrier_time, total_open_time,
           total_close_time, total_read_time, total_write_time);
#endif

313 314
error_exit:
    MPI_Finalize();
315

316 317 318 319 320
    return ret;
}

int replay_workload_op(struct codes_workload_op replay_op, int rank, long long int op_number)
{
Shane Snyder's avatar
Shane Snyder committed
321
    struct timespec delay;
322
    int open_flags = O_RDWR;
323
    char file_name[250];
324
    int fildes;
325
    struct file_info *tmp_list = NULL;
326 327
    struct qlist_head *hash_link = NULL;
    int ret;
328 329

#if DEBUG_PROFILING
330
    double start, end;
331 332
    start = MPI_Wtime();
#endif
333 334 335 336 337

    switch (replay_op.op_type)
    {
        case CODES_WK_DELAY:
            if (opt_verbose)
338
                fprintf(log_stream, "[Rank %d] Operation %lld : DELAY %lf seconds\n",
339 340 341 342 343
                       rank, op_number, replay_op.u.delay.seconds);

            if (!opt_noop)
            {
                /* satisfy delay using second delay then microsecond delay */
Shane Snyder's avatar
Shane Snyder committed
344 345 346 347
                delay.tv_sec = (long long)replay_op.u.delay.seconds;
                delay.tv_nsec = (unsigned int)((replay_op.u.delay.seconds - delay.tv_sec) *
                                               1000 * 1000 * 1000);
                ret = nanosleep(&delay, NULL);
348 349 350 351 352 353 354 355
                if (ret)
                {
                    /* error in sleep */
                    errno = EINTR;
                    fprintf(stderr, "Rank %d failure on operation %lld [DELAY: %s]\n",
                            rank, op_number, strerror(errno));
                    return -1;
                }
356 357 358 359
#if DEBUG_PROFILING
                end = MPI_Wtime();
                total_delay_time += (end - start);
#endif
360 361 362 363
            }
            return 0;
        case CODES_WK_BARRIER:
            if (opt_verbose)
364
                fprintf(log_stream, "[Rank %d] Operation %lld : BARRIER\n", rank, op_number);
365 366 367 368 369 370 371 372 373 374 375 376

            if (!opt_noop)
            {
                /* implement barrier using MPI global barrier on all ranks */
                ret = MPI_Barrier(MPI_COMM_WORLD);
                if (ret != MPI_SUCCESS)
                {
                    /* error in MPI_Barrier */
                    fprintf(stderr, "Rank %d failure on operation %lld [BARRIER: %s]\n",
                            rank, op_number, "Invalid communicator");
                    return -1;
                }
377 378 379 380 381

#if DEBUG_PROFILING
                end = MPI_Wtime();
                total_barrier_time += (end - start);
#endif
382 383 384 385
            }
            return 0;
        case CODES_WK_OPEN:
            if (opt_verbose)
386
                fprintf(log_stream, "[Rank %d] Operation %lld: %s file %"PRIu64"\n", rank, op_number,
387 388 389 390 391 392 393 394 395
                       (replay_op.u.open.create_flag) ? "CREATE" : "OPEN", replay_op.u.open.file_id);

            if (!opt_noop)
            {
                /* set the create flag, if necessary */
                if (replay_op.u.open.create_flag)
                    open_flags |= O_CREAT;

                /* write the file hash to string to be used as the actual file name */
Shane Snyder's avatar
Shane Snyder committed
396
                snprintf(file_name, sizeof(file_name), "%"PRIu64, replay_op.u.open.file_id);
397 398 399 400 401 402 403 404 405 406 407 408

                /* perform the open operation */
                fildes = open(file_name, open_flags, 0666);
                if (fildes < 0)
                {
                    fprintf(stderr, "Rank %d failure on operation %lld [%s: %s]\n",
                            rank, op_number, (replay_op.u.open.create_flag) ? "CREATE" : "OPEN",
                            strerror(errno));
                    return -1;
                }

                /* save the file descriptor for this file in a hash table to be retrieved later */
409
                tmp_list = malloc(sizeof(struct file_info));
410 411 412 413 414 415 416 417 418
                if (!tmp_list)
                {
                    fprintf(stderr, "No memory available for file hash entry\n");
                    return -1;
                }

                tmp_list->file_hash = replay_op.u.open.file_id;
                tmp_list->file_descriptor = fildes;
                qhash_add(fd_table, &(replay_op.u.open.file_id), &(tmp_list->hash_link));
419 420 421 422 423

#if DEBUG_PROFILING
                end = MPI_Wtime();
                total_open_time += (end - start);
#endif
424 425 426 427
            }
            return 0;
        case CODES_WK_CLOSE:
            if (opt_verbose)
428 429
                fprintf(log_stream, "[Rank %d] Operation %lld : CLOSE file %"PRIu64"\n",
                        rank, op_number, replay_op.u.close.file_id);
430 431 432 433 434 435

            if (!opt_noop)
            {
                /* search for the corresponding file descriptor in the hash table */
                hash_link = qhash_search_and_remove(fd_table, &(replay_op.u.close.file_id));
                assert(hash_link);
436
                tmp_list = qhash_entry(hash_link, struct file_info, hash_link);
437 438
                fildes = tmp_list->file_descriptor;
                free(tmp_list);
439

440 441 442 443 444 445 446 447
                /* perform the close operation */
                ret = close(fildes);
                if (ret < 0)
                {
                    fprintf(stderr, "Rank %d failure on operation %lld [CLOSE: %s]\n",
                            rank, op_number, strerror(errno));
                    return -1;
                }
448 449 450 451 452

#if DEBUG_PROFILING
                end = MPI_Wtime();
                total_close_time += (end - start);
#endif
453 454 455 456
            }
            return 0;
        case CODES_WK_WRITE:
            if (opt_verbose)
457
                fprintf(log_stream, "[Rank %d] Operation %lld : WRITE file %"PRIu64" (sz = %"PRId64
458
                       ", off = %"PRId64")\n",
459
                       rank, op_number, replay_op.u.write.file_id, replay_op.u.write.size,
460 461 462 463 464 465 466
                       replay_op.u.write.offset);

            if (!opt_noop)
            {
                /* search for the corresponding file descriptor in the hash table */
                hash_link = qhash_search(fd_table, &(replay_op.u.write.file_id));
                assert(hash_link);
467
                tmp_list = qhash_entry(hash_link, struct file_info, hash_link);
468 469
                fildes = tmp_list->file_descriptor;

470
#if 0
471 472 473 474 475 476 477
                /* perform the write operation */
                buf = malloc(replay_op.u.write.size);
                if (!buf)
                {
                    fprintf(stderr, "No memory available for write buffer\n");
                    return -1;
                }
478
#endif
479
                ret = pwrite(fildes, buf, replay_op.u.write.size, replay_op.u.write.offset);
480
#if 0
481
                free(buf);
482
#endif
483 484 485 486 487 488
                if (ret < 0)
                {
                    fprintf(stderr, "Rank %d failure on operation %lld [WRITE: %s]\n",
                            rank, op_number, strerror(errno));
                    return -1;
                }
489 490 491 492 493

#if DEBUG_PROFILING
                end = MPI_Wtime();
                total_write_time += (end - start);
#endif
494 495 496 497
            }
            return 0;
        case CODES_WK_READ:
            if (opt_verbose)
498 499
                fprintf(log_stream, "[Rank %d] Operation %lld : READ file %"PRIu64" (sz = %"PRId64
                        ", off = %"PRId64")\n", rank, op_number, replay_op.u.read.file_id,
500
                       replay_op.u.read.size, replay_op.u.read.offset);
501

502 503 504 505 506
            if (!opt_noop)
            {
                /* search for the corresponding file descriptor in the hash table */
                hash_link = qhash_search(fd_table, &(replay_op.u.read.file_id));
                assert(hash_link);
507
                tmp_list = qhash_entry(hash_link, struct file_info, hash_link);
508 509
                fildes = tmp_list->file_descriptor;

510
#if 0
511 512 513 514 515 516 517
                /* perform the write operation */
                buf = malloc(replay_op.u.read.size);
                if (!buf)
                {
                    fprintf(stderr, "No memory available for write buffer\n");
                    return -1;
                }
518
#endif
519
                ret = pread(fildes, buf, replay_op.u.read.size, replay_op.u.read.offset);
520
#if 0
521
                free(buf);
522
#endif
523 524 525 526 527 528
                if (ret < 0)
                {
                    fprintf(stderr, "Rank %d failure on operation %lld [READ: %s]\n",
                            rank, op_number, strerror(errno));
                    return -1;
                }
529 530 531 532 533

#if DEBUG_PROFILING
                end = MPI_Wtime();
                total_read_time += (end - start);
#endif
534 535 536 537
            }
            return 0;
        default:
            fprintf(stderr, "** Rank %d: INVALID OPERATION (op count = %lld) **\n", rank, op_number);
538
            return 0;
539 540 541 542 543 544 545
    }

}

int hash_file_compare(void *key, struct qlist_head *link)
{
    uint64_t *in_file_hash = (uint64_t *)key;
546
    struct file_info *tmp_file;
547

548
    tmp_file = qlist_entry(link, struct file_info, hash_link);
549 550 551 552 553
    if (tmp_file->file_hash == *in_file_hash)
        return 1;

    return 0;
}
554 555 556 557 558 559 560 561 562 563

/*
 * Local variables:
 *  c-indent-level: 4
 *  c-basic-offset: 4
 * End:
 *
 * vim: ft=c ts=8 sts=4 sw=4 expandtab
 */