codes-workload-mpi-replay.c 15.7 KB
Newer Older
1 2 3 4 5 6 7 8
/*
 * Copyright (C) 2013 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

/* SUMMARY:
 *
9
 *  MPI replay tool for replaying workloads from the codes workload API.
10 11 12 13 14 15 16 17 18 19
 *
 */

#include <stdio.h>
#include <unistd.h>
#include <assert.h>
#include <inttypes.h>
#include <math.h>
#include <getopt.h>
#include <fcntl.h>
20
#include <sys/stat.h>
21 22 23 24
#include <mpi.h>

#include "codes/codes-workload.h"
#include "codes/quickhash.h"
25
#include "codes/configuration.h"
26

27
#define WORKLOAD_PRINT 1
28

29
/* hash table entry for looking up file descriptor of a workload file id */
30
struct file_info
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
{
    struct qlist_head hash_link;
    uint64_t file_hash;
    int file_descriptor;
};

int replay_workload_op(struct codes_workload_op replay_op, int rank, long long int op_number);
int hash_file_compare(void *key, struct qlist_head *link);

/* command line options */
static int opt_verbose = 0;
static int opt_noop = 0;

/* hash table for storing file descriptors of opened files */
static struct qhash_table *fd_table = NULL;

47 48 49
/* file stream to log rank events to, if verbose turned on */
static FILE *log_stream = NULL;

50 51
void usage(char *exename)
{
52 53
    fprintf(stderr, "Usage: %s [OPTIONS] --conf <conf_file_path>\n       "
            "--test-dir <workload_test_dir>\n\n", exename);
54
    fprintf(stderr, "\t<conf_file_path> : (absolute) path to a valid workload configuration file\n");
55 56
    fprintf(stderr, "\t<workload_test_dir> : the directory to replay the workload I/O in\n");
    fprintf(stderr, "\n\t[OPTIONS] includes:\n");
57
    fprintf(stderr, "\t\t--noop : do not perform i/o\n");
58
    fprintf(stderr, "\t\t    -v : verbose (output i/o details)\n");
59 60 61 62

    exit(1);
}

63
void parse_args(int argc, char **argv, char **conf_path, char **test_dir)
64 65 66 67
{
    int index;
    static struct option long_opts[] =
    {
68
        {"conf", 1, NULL, 'c'},
69
        {"test-dir", 1, NULL, 'd'},
70
        {"noop", 0, NULL, 'n'},
71 72 73 74
        {"help", 0, NULL, 0},
        {0, 0, 0, 0}
    };

75
    *conf_path = NULL;
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
    *test_dir = NULL;
    while (1)
    {
        int c = getopt_long(argc, argv, "v", long_opts, &index);

        if (c == -1)
            break;

        switch (c)
        {
            case 'v':
                opt_verbose = 1;
                break;
            case 'n':
                opt_noop = 1;
                break;
92 93
            case 'c':
                *conf_path = optarg;
94 95 96 97 98 99 100 101 102 103 104 105
                break;
            case 'd':
                *test_dir = optarg;
                break;
            case 0:
            case '?':
            default:
                usage(argv[0]);
                break;
        }
    }

106
    if (optind < argc || !(*conf_path) || !(*test_dir))
107 108 109 110 111 112 113
    {
        usage(argv[0]);
    }

    return;
}

114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
int load_workload(char *conf_path, int rank)
{
    config_lpgroups_t paramconf;
    char workload_type[MAX_NAME_LENGTH_WKLD];

    /* load the config file across all ranks */
    configuration_load(conf_path, MPI_COMM_WORLD, &config);

    /* get the PARAMS group out of the config file */
    configuration_get_lpgroups(&config, "PARAMS", &paramconf);

    /* get the workload type out of PARAMS */
    configuration_get_value(&config, "PARAMS", "workload_type",
                            workload_type, MAX_NAME_LENGTH_WKLD);

    /* set up the workload parameters and load into the workload API */
    if (strcmp(workload_type, "darshan_io_workload") == 0)
    {
        struct darshan_params d_params;
        char aggregator_count[10];

        /* get the darshan params from the config file */
        configuration_get_value(&config, "PARAMS", "log_file_path",
                                d_params.log_file_path, MAX_NAME_LENGTH_WKLD);
        configuration_get_value(&config, "PARAMS", "aggregator_count", aggregator_count, 10);
        d_params.aggregator_cnt = atoi(aggregator_count);

141
#if WORKLOAD_PRINT
142
        d_params.stream = NULL;
143
#else
144
        d_params.stream = log_stream;
145 146 147
        opt_verbose = 0;
#endif

148 149 150 151 152 153 154 155 156 157
        return codes_workload_load(workload_type, (char *)&d_params, rank);
    }
    else if (strcmp(workload_type, "bgp_io_workload") == 0)
    {
        struct bgp_params b_params;
        char rank_count[10];

        /* get the bgp i/o params from the config file */
        configuration_get_value(&config, "PARAMS", "io_kernel_meta_path",
                                b_params.io_kernel_meta_path, MAX_NAME_LENGTH_WKLD);
158
        configuration_get_value(&config, "PARAMS", "bgp_config_file",
159 160 161 162 163 164 165 166
                                b_params.bgp_config_file, MAX_NAME_LENGTH_WKLD);
        configuration_get_value(&config, "PARAMS", "rank_count", rank_count, 10);
        strcpy(b_params.io_kernel_path, "");
        strcpy(b_params.io_kernel_def_path, "");
        b_params.num_cns = atoi(rank_count);

        return codes_workload_load(workload_type, (char *)&b_params, rank);
    }
167 168 169 170 171 172 173 174 175 176 177
    else if (strcmp(workload_type, "recorder_io_workload") == 0) {
        struct recorder_params r_params;

        /* get the darshan params from the config file */
        configuration_get_value(&config, "PARAMS", "trace_dir_path",
                                r_params.trace_dir_path, MAX_NAME_LENGTH_WKLD);
        r_params.stream = NULL;

        return codes_workload_load(workload_type, (char *)&r_params, rank);

	}
178 179 180 181 182 183 184
    else
    {
        fprintf(stderr, "Error: Invalid workload type specified (%s)\n", workload_type);
        return -1;
    }
}

185 186
int main(int argc, char *argv[])
{
187
    char *conf_path;
188
    char *replay_test_path;
189 190
    char *log_dir = "log";
    char my_log_path[MAX_NAME_LENGTH_WKLD];
191 192 193 194 195 196 197 198
    int nprocs;
    int myrank;
    int workload_id;
    struct codes_workload_op next_op;
    long long int replay_op_number = 1;
    int ret = 0;

    /* parse command line args */
199
    parse_args(argc, argv, &conf_path, &replay_test_path);
200 201 202 203 204 205

    /* initialize MPI */
    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
    MPI_Comm_rank(MPI_COMM_WORLD, &myrank);

206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
    /* change the working directory to be the test directory */
    ret = chdir(replay_test_path);
    if (ret < 0)
    {
        fprintf(stderr, "Unable to change to testing directory (%s)\n", strerror(errno));
        goto error_exit;
    }

    /* set the path for logging this rank's events, if verbose is turned on */
    if (opt_verbose)
    {
        mkdir(log_dir, 0755);
        snprintf(my_log_path, MAX_NAME_LENGTH_WKLD, "%s/rank-%d.log", log_dir, myrank);
        log_stream = fopen(my_log_path, "w");
        if (log_stream == NULL)
        {
            fprintf(stderr, "Unable to open log file %s\n", my_log_path);
            goto error_exit;
        }
    }

227 228
    /* initialize workload generator from config file */
    workload_id = load_workload(conf_path, myrank);
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
    if (workload_id < 0)
    {
        goto error_exit;
    }

    /* initialize hash table for storing file descriptors */
    fd_table = qhash_init(hash_file_compare, quickhash_64bit_hash, 29);
    if (!fd_table)
    {
        fprintf(stderr, "File descriptor hash table memory error\n");
        goto error_exit;
    }

    /* replay loop */
    while (1)
    {
        /* get the next replay operation from the workload generator */
        codes_workload_get_next(workload_id, myrank, &next_op);

        if (next_op.op_type != CODES_WK_END)
        {
            /* replay the next workload operation */
            ret = replay_workload_op(next_op, myrank, replay_op_number++);
            if (ret < 0)
            {
                break;
            }
        }
        else
        {
            /* workload replay for this rank is complete */
            break;
        }
    }

264
    if (log_stream)
265 266
        fclose(log_stream);

267
    /* destroy and finalize the file descriptor hash table */
268
    qhash_destroy_and_finalize(fd_table, struct file_info, hash_link, free);
269 270 271

error_exit:
    MPI_Finalize();
272

273 274 275 276 277 278 279 280 281 282
    return ret;
}

int replay_workload_op(struct codes_workload_op replay_op, int rank, long long int op_number)
{
    unsigned int secs;
    unsigned int usecs;
    int open_flags = O_RDWR;
    char file_name[50];
    int fildes;
283
    struct file_info *tmp_list = NULL;
284 285 286 287 288 289 290
    struct qlist_head *hash_link = NULL;
    char *buf = NULL;
    int ret;

    switch (replay_op.op_type)
    {
        case CODES_WK_DELAY:
291
#if WORKLOAD_PRINT
292
            if (opt_verbose)
293
                fprintf(log_stream, "[Rank %d] Operation %lld : DELAY %lf seconds\n",
294
                       rank, op_number, replay_op.u.delay.seconds);
295
#endif
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321

            if (!opt_noop)
            {
                /* satisfy delay using second delay then microsecond delay */
                secs = floor(replay_op.u.delay.seconds);
                usecs = round((replay_op.u.delay.seconds - secs) * 1000 * 1000);
                ret = sleep(secs);
                if (ret)
                {
                    /* error in sleep */
                    errno = EINTR;
                    fprintf(stderr, "Rank %d failure on operation %lld [DELAY: %s]\n",
                            rank, op_number, strerror(errno));
                    return -1;
                }
                ret = usleep(usecs);
                if (ret < 0)
                {
                    /* error in usleep */
                    fprintf(stderr, "Rank %d failure on operation %lld [DELAY: %s]\n",
                            rank, op_number, strerror(errno));
                    return -1;
                }
            }
            return 0;
        case CODES_WK_BARRIER:
322
#if WORKLOAD_PRINT
323
            if (opt_verbose)
324
                fprintf(log_stream, "[Rank %d] Operation %lld : BARRIER\n", rank, op_number);
325
#endif
326 327 328 329 330 331 332 333 334 335 336 337 338 339 340

            if (!opt_noop)
            {
                /* implement barrier using MPI global barrier on all ranks */
                ret = MPI_Barrier(MPI_COMM_WORLD);
                if (ret != MPI_SUCCESS)
                {
                    /* error in MPI_Barrier */
                    fprintf(stderr, "Rank %d failure on operation %lld [BARRIER: %s]\n",
                            rank, op_number, "Invalid communicator");
                    return -1;
                }
            }
            return 0;
        case CODES_WK_OPEN:
341
#if WORKLOAD_PRINT
342
            if (opt_verbose)
343
                fprintf(log_stream, "[Rank %d] Operation %lld: %s file %"PRIu64"\n", rank, op_number,
344
                       (replay_op.u.open.create_flag) ? "CREATE" : "OPEN", replay_op.u.open.file_id);
345
#endif
346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366

            if (!opt_noop)
            {
                /* set the create flag, if necessary */
                if (replay_op.u.open.create_flag)
                    open_flags |= O_CREAT;

                /* write the file hash to string to be used as the actual file name */
                snprintf(file_name, sizeof(file_name), "%"PRIu64, replay_op.u.open.file_id);

                /* perform the open operation */
                fildes = open(file_name, open_flags, 0666);
                if (fildes < 0)
                {
                    fprintf(stderr, "Rank %d failure on operation %lld [%s: %s]\n",
                            rank, op_number, (replay_op.u.open.create_flag) ? "CREATE" : "OPEN",
                            strerror(errno));
                    return -1;
                }

                /* save the file descriptor for this file in a hash table to be retrieved later */
367
                tmp_list = malloc(sizeof(struct file_info));
368 369 370 371 372 373 374 375 376 377 378 379
                if (!tmp_list)
                {
                    fprintf(stderr, "No memory available for file hash entry\n");
                    return -1;
                }

                tmp_list->file_hash = replay_op.u.open.file_id;
                tmp_list->file_descriptor = fildes;
                qhash_add(fd_table, &(replay_op.u.open.file_id), &(tmp_list->hash_link));
            }
            return 0;
        case CODES_WK_CLOSE:
380
#if WORKLOAD_PRINT
381
            if (opt_verbose)
382 383
                fprintf(log_stream, "[Rank %d] Operation %lld : CLOSE file %"PRIu64"\n",
                        rank, op_number, replay_op.u.close.file_id);
384
#endif
385 386 387 388 389 390

            if (!opt_noop)
            {
                /* search for the corresponding file descriptor in the hash table */
                hash_link = qhash_search_and_remove(fd_table, &(replay_op.u.close.file_id));
                assert(hash_link);
391
                tmp_list = qhash_entry(hash_link, struct file_info, hash_link);
392 393
                fildes = tmp_list->file_descriptor;
                free(tmp_list);
394

395 396 397 398 399 400 401 402 403 404 405
                /* perform the close operation */
                ret = close(fildes);
                if (ret < 0)
                {
                    fprintf(stderr, "Rank %d failure on operation %lld [CLOSE: %s]\n",
                            rank, op_number, strerror(errno));
                    return -1;
                }
            }
            return 0;
        case CODES_WK_WRITE:
406
#if WORKLOAD_PRINT
407
            if (opt_verbose)
408
                fprintf(log_stream, "[Rank %d] Operation %lld : WRITE file %"PRIu64" (sz = %"PRId64
409
                       ", off = %"PRId64")\n",
410
                       rank, op_number, replay_op.u.write.file_id, replay_op.u.write.size,
411
                       replay_op.u.write.offset);
412
#endif
413 414 415 416 417 418

            if (!opt_noop)
            {
                /* search for the corresponding file descriptor in the hash table */
                hash_link = qhash_search(fd_table, &(replay_op.u.write.file_id));
                assert(hash_link);
419
                tmp_list = qhash_entry(hash_link, struct file_info, hash_link);
420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439
                fildes = tmp_list->file_descriptor;

                /* perform the write operation */
                buf = malloc(replay_op.u.write.size);
                if (!buf)
                {
                    fprintf(stderr, "No memory available for write buffer\n");
                    return -1;
                }
                ret = pwrite(fildes, buf, replay_op.u.write.size, replay_op.u.write.offset);
                free(buf);
                if (ret < 0)
                {
                    fprintf(stderr, "Rank %d failure on operation %lld [WRITE: %s]\n",
                            rank, op_number, strerror(errno));
                    return -1;
                }
            }
            return 0;
        case CODES_WK_READ:
440
#if WORKLOAD_PRINT
441
            if (opt_verbose)
442 443
                fprintf(log_stream, "[Rank %d] Operation %lld : READ file %"PRIu64" (sz = %"PRId64
                        ", off = %"PRId64")\n", rank, op_number, replay_op.u.read.file_id,
444
                       replay_op.u.read.size, replay_op.u.read.offset);
445
#endif
446 447 448 449 450
            if (!opt_noop)
            {
                /* search for the corresponding file descriptor in the hash table */
                hash_link = qhash_search(fd_table, &(replay_op.u.read.file_id));
                assert(hash_link);
451
                tmp_list = qhash_entry(hash_link, struct file_info, hash_link);
452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472
                fildes = tmp_list->file_descriptor;

                /* perform the write operation */
                buf = malloc(replay_op.u.read.size);
                if (!buf)
                {
                    fprintf(stderr, "No memory available for write buffer\n");
                    return -1;
                }
                ret = pread(fildes, buf, replay_op.u.read.size, replay_op.u.read.offset);
                free(buf);
                if (ret < 0)
                {
                    fprintf(stderr, "Rank %d failure on operation %lld [READ: %s]\n",
                            rank, op_number, strerror(errno));
                    return -1;
                }
            }
            return 0;
        default:
            fprintf(stderr, "** Rank %d: INVALID OPERATION (op count = %lld) **\n", rank, op_number);
473
            return 0;
474 475 476 477 478 479 480
    }

}

int hash_file_compare(void *key, struct qlist_head *link)
{
    uint64_t *in_file_hash = (uint64_t *)key;
481
    struct file_info *tmp_file;
482

483
    tmp_file = qlist_entry(link, struct file_info, hash_link);
484 485 486 487 488
    if (tmp_file->file_hash == *in_file_hash)
        return 1;

    return 0;
}