codes-recorder-io-wrkld.c 7.29 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
/*
 * Copyright (C) 2013 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

/* Recorder workload generator that plugs into the general CODES workload
 * generator API. This generator consumes a set of input files of Recorder I/O
 * traces and passes these traces to the underlying simulator.
 */
#include <fcntl.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <sys/stat.h>
#include <dirent.h>

#include "ross.h"
#include "codes/codes-workload.h"
#include "codes-workload-method.h"
#include "codes/quickhash.h"

#define RECORDER_MAX_TRACE_READ_COUNT 1024

#define RANK_HASH_TABLE_SIZE 397

27
struct recorder_io_op
28 29
{
    double start_time;
30
    struct codes_workload_op codes_op;
31 32 33 34 35 36
};

/* structure for storing all context needed to retrieve traces for this rank */
struct rank_traces_context
{
    int rank;
37
    struct qhash_head hash_link;
38

39
    struct recorder_io_op trace_ops[1024]; /* TODO: this should be extendable */
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
    int trace_list_ndx;
    int trace_list_max;
};


/* CODES workload API functions for workloads generated from recorder traces*/
static int recorder_io_workload_load(const char *params, int rank);
static void recorder_io_workload_get_next(int rank, struct codes_workload_op *op);

/* helper functions for recorder workload CODES API */
static int hash_rank_compare(void *key, struct qhash_head *link);

/* workload method name and function pointers for the CODES workload API */
struct codes_workload_method recorder_io_workload_method =
{
    .method_name = "recorder_io_workload",
    .codes_workload_load = recorder_io_workload_load,
    .codes_workload_get_next = recorder_io_workload_get_next,
};

static struct qhash_table *rank_tbl = NULL;
static int rank_tbl_pop = 0;

/* load the workload generator for this rank, given input params */
static int recorder_io_workload_load(const char *params, int rank)
{
66
    recorder_params *r_params = (recorder_params *) params;
67 68 69 70

    int64_t nprocs = 0;
    struct rank_traces_context *new = NULL;

71
    char *trace_dir = r_params->trace_dir_path;
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
    if(!trace_dir)
        return -1;

    /* allocate a new trace context for this rank */
    new = malloc(sizeof(*new));
    if(!new)
        return -1;

    new->rank = rank;
    new->trace_list_ndx = 0;
    new->trace_list_max = 0;

    DIR *dirp;
    struct dirent *entry;
    dirp = opendir(trace_dir);
    while((entry = readdir(dirp)) != NULL) {
        if(entry->d_type == DT_REG)
            nprocs++;
    }
91
    closedir(dirp);
92

93
    char trace_file_name[1024] = {'\0'};
94 95 96 97 98 99 100
    sprintf(trace_file_name, "%s/log.%d", trace_dir, rank);

    FILE *trace_file = fopen(trace_file_name, "r");
    if(trace_file == NULL)
        return -1;

    double start_time;
101
    char function_name[128] = {'\0'};
102 103

    /* Read the first chunk of data (of size RECORDER_MAX_TRACE_READ_COUNT) */
104
    char *line = NULL;
105 106
    size_t len;
    ssize_t ret_value;
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
    while((ret_value = getline(&line, &len, trace_file)) != -1) {
        struct recorder_io_op r_op;
        char *token = strtok(line, ", ");
        start_time = atof(token);
        token = strtok(NULL, ", ");
        strcpy(function_name, token);

        r_op.start_time = start_time;
        if(!strcmp(function_name, "open") || !strcmp(function_name, "open64")) {
            r_op.codes_op.op_type = CODES_WK_OPEN;

            token = strtok(NULL, ", (");
            token = strtok(NULL, ", )");
            r_op.codes_op.u.open.create_flag = atoi(token);

            token = strtok(NULL, ", )");
123
            token = strtok(NULL, ", ");
124 125 126 127
            r_op.codes_op.u.open.file_id = atoi(token);
        }
        else if(!strcmp(function_name, "close")) {
            r_op.codes_op.op_type = CODES_WK_CLOSE;
128

129 130 131 132 133
            token = strtok(NULL, ", ()");
            r_op.codes_op.u.close.file_id = atoi(token);
        }
        else if(!strcmp(function_name, "read") || !strcmp(function_name, "read64")) {
            r_op.codes_op.op_type = CODES_WK_READ;
134

135 136
            token = strtok(NULL, ", (");
            r_op.codes_op.u.read.file_id = atoi(token);
137

138 139
            // Throw out the buffer
            token = strtok(NULL, ", ");
140

141 142
            token = strtok(NULL, ", )");
            r_op.codes_op.u.read.size = atol(token);
143

144 145 146 147 148
            token = strtok(NULL, ", )");
            r_op.codes_op.u.read.offset = atol(token);
        }
        else if(!strcmp(function_name, "write") || !strcmp(function_name, "write64")) {
            r_op.codes_op.op_type = CODES_WK_WRITE;
149

150 151
            token = strtok(NULL, ", (");
            r_op.codes_op.u.write.file_id = atoi(token);
152

153 154
            // Throw out the buffer
            token = strtok(NULL, ", ");
155

156 157
            token = strtok(NULL, ", )");
            r_op.codes_op.u.write.size = atol(token);
158

159 160 161 162 163
            token = strtok(NULL, ", )");
            r_op.codes_op.u.write.offset = atol(token);
        }
        else if(!strcmp(function_name, "MPI_Barrier")) {
            r_op.codes_op.op_type = CODES_WK_BARRIER;
164

165 166 167 168 169
            r_op.codes_op.u.barrier.count = nprocs;
            r_op.codes_op.u.barrier.root = 0;
        }
        else{
            continue;
170 171
        }

172 173
        new->trace_ops[new->trace_list_ndx++] = r_op;
        if (new->trace_list_ndx == 1024) break;
174
    }
175

176 177 178 179 180 181
    fclose(trace_file);

    /* reset ndx to 0 and set max to event count */
    /* now we can read all events by counting through array from 0 - max */
    new->trace_list_max = new->trace_list_ndx;
    new->trace_list_ndx = 0;
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210

    /* initialize the hash table of rank contexts, if it has not been initialized */
    if (!rank_tbl) {
        rank_tbl = qhash_init(hash_rank_compare, quickhash_32bit_hash, RANK_HASH_TABLE_SIZE);

        if (!rank_tbl) {
            free(new);
            return -1;
        }
    }

    /* add this rank context to the hash table */
    qhash_add(rank_tbl, &(new->rank), &(new->hash_link));
    rank_tbl_pop++;

    return 0;
}

/* pull the next trace (independent or collective) for this rank from its trace context */
static void recorder_io_workload_get_next(int rank, struct codes_workload_op *op)
{
    struct qhash_head *hash_link = NULL;
    struct rank_traces_context *tmp = NULL;

    /* Find event context for this rank in the rank hash table */
    hash_link = qhash_search(rank_tbl, &rank);

    /* terminate the workload if there is no valid rank context */
    if(!hash_link) {
211

212 213 214 215 216 217 218
        op->op_type = CODES_WK_END;
        return;
    }

    tmp = qhash_entry(hash_link, struct rank_traces_context, hash_link);
    assert(tmp->rank == rank);

219
    if(tmp->trace_list_ndx == tmp->trace_list_max) {
220 221 222 223
        /* no more events -- just end the workload */
        op->op_type = CODES_WK_END;
        qhash_del(hash_link);
        free(tmp);
224

225 226
        rank_tbl_pop--;
        if(!rank_tbl_pop)
227
        {
228
            qhash_finalize(rank_tbl);
229 230
            rank_tbl = NULL;
        }
231 232 233 234
    }
    else {
        /* return the next event */
        /* TODO: Do I need to check for the delay like in Darshan? */
235
        *op = tmp->trace_ops[tmp->trace_list_ndx++].codes_op;
236 237 238 239 240 241 242 243 244 245
    }

    return;
}

static int hash_rank_compare(void *key, struct qhash_head *link)
{
    int *in_rank = (int *)key;
    struct rank_traces_context *tmp;

246
    tmp = qhash_entry(link, struct rank_traces_context, hash_link);
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261
    if (tmp->rank == *in_rank)
        return 1;

    return 0;
}


/*
 * Local variables:
 *  c-indent-level: 4
 *  c-basic-offset: 4
 * End:
 *
 * vim: ft=c ts=8 sts=4 sw=4 expandtab
 */