codes-recorder-io-wrkld.c 8.17 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*
 * Copyright (C) 2013 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

/* Recorder workload generator that plugs into the general CODES workload
 * generator API. This generator consumes a set of input files of Recorder I/O
 * traces and passes these traces to the underlying simulator.
 */
#include <fcntl.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <sys/stat.h>
#include <dirent.h>

#include "ross.h"
#include "codes/codes-workload.h"
#include "codes-workload-method.h"
#include "codes/quickhash.h"

23
#define RECORDER_NEGLIGIBLE_DELAY .00001
24 25 26

#define RANK_HASH_TABLE_SIZE 397

27
struct recorder_io_op
28 29
{
    double start_time;
30
    double end_time;
31
    struct codes_workload_op codes_op;
32 33 34 35 36 37
};

/* structure for storing all context needed to retrieve traces for this rank */
struct rank_traces_context
{
    int rank;
38
    double last_op_time;
39

40
    struct recorder_io_op trace_ops[2048]; /* TODO: this should be extendable */
41 42
    int trace_list_ndx;
    int trace_list_max;
43 44

    struct qhash_head hash_link;
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
};


/* CODES workload API functions for workloads generated from recorder traces*/
static int recorder_io_workload_load(const char *params, int rank);
static void recorder_io_workload_get_next(int rank, struct codes_workload_op *op);

/* helper functions for recorder workload CODES API */
static int hash_rank_compare(void *key, struct qhash_head *link);

/* workload method name and function pointers for the CODES workload API */
struct codes_workload_method recorder_io_workload_method =
{
    .method_name = "recorder_io_workload",
    .codes_workload_load = recorder_io_workload_load,
    .codes_workload_get_next = recorder_io_workload_get_next,
};

static struct qhash_table *rank_tbl = NULL;
static int rank_tbl_pop = 0;

/* load the workload generator for this rank, given input params */
static int recorder_io_workload_load(const char *params, int rank)
{
69
    recorder_params *r_params = (recorder_params *) params;
70

71
    int64_t nprocs = 16384;
72 73
    struct rank_traces_context *new = NULL;

74
    char *trace_dir = r_params->trace_dir_path;
75 76 77 78 79 80 81 82 83 84 85 86
    if(!trace_dir)
        return -1;

    /* allocate a new trace context for this rank */
    new = malloc(sizeof(*new));
    if(!new)
        return -1;

    new->rank = rank;
    new->trace_list_ndx = 0;
    new->trace_list_max = 0;

87
#if 0
88 89 90 91 92 93 94
    DIR *dirp;
    struct dirent *entry;
    dirp = opendir(trace_dir);
    while((entry = readdir(dirp)) != NULL) {
        if(entry->d_type == DT_REG)
            nprocs++;
    }
95
    closedir(dirp);
96
#endif
97

98
    char trace_file_name[1024] = {'\0'};
99 100 101 102 103 104
    sprintf(trace_file_name, "%s/log.%d", trace_dir, rank);

    FILE *trace_file = fopen(trace_file_name, "r");
    if(trace_file == NULL)
        return -1;

105
    char *line = NULL;
106 107
    size_t len;
    ssize_t ret_value;
108
    char function_name[128] = {'\0'};
109 110 111
    while((ret_value = getline(&line, &len, trace_file)) != -1) {
        struct recorder_io_op r_op;
        char *token = strtok(line, ", ");
112
        r_op.start_time = atof(token);
113 114 115 116 117 118 119 120 121 122 123
        token = strtok(NULL, ", ");
        strcpy(function_name, token);

        if(!strcmp(function_name, "open") || !strcmp(function_name, "open64")) {
            r_op.codes_op.op_type = CODES_WK_OPEN;

            token = strtok(NULL, ", (");
            token = strtok(NULL, ", )");
            r_op.codes_op.u.open.create_flag = atoi(token);

            token = strtok(NULL, ", )");
124
            token = strtok(NULL, ", ");
125
            r_op.codes_op.u.open.file_id = atoi(token);
126 127

            token = strtok(NULL, ", \n");
128 129 130
        }
        else if(!strcmp(function_name, "close")) {
            r_op.codes_op.op_type = CODES_WK_CLOSE;
131

132 133
            token = strtok(NULL, ", ()");
            r_op.codes_op.u.close.file_id = atoi(token);
134 135 136

            token = strtok(NULL, ", ");
            token = strtok(NULL, ", \n");
137 138 139
        }
        else if(!strcmp(function_name, "read") || !strcmp(function_name, "read64")) {
            r_op.codes_op.op_type = CODES_WK_READ;
140

141 142
            token = strtok(NULL, ", (");
            r_op.codes_op.u.read.file_id = atoi(token);
143

144 145
            // Throw out the buffer
            token = strtok(NULL, ", ");
146

147 148
            token = strtok(NULL, ", )");
            r_op.codes_op.u.read.size = atol(token);
149

150 151
            token = strtok(NULL, ", )");
            r_op.codes_op.u.read.offset = atol(token);
152 153 154

            token = strtok(NULL, ", ");
            token = strtok(NULL, ", \n");
155 156 157
        }
        else if(!strcmp(function_name, "write") || !strcmp(function_name, "write64")) {
            r_op.codes_op.op_type = CODES_WK_WRITE;
158

159 160
            token = strtok(NULL, ", (");
            r_op.codes_op.u.write.file_id = atoi(token);
161

162 163
            // Throw out the buffer
            token = strtok(NULL, ", ");
164

165 166
            token = strtok(NULL, ", )");
            r_op.codes_op.u.write.size = atol(token);
167

168 169
            token = strtok(NULL, ", )");
            r_op.codes_op.u.write.offset = atol(token);
170 171 172

            token = strtok(NULL, ", ");
            token = strtok(NULL, ", \n");
173 174
        }
        else if(!strcmp(function_name, "MPI_Barrier")) {
175
            char *tmp;
176
            r_op.codes_op.op_type = CODES_WK_BARRIER;
177

178 179
            r_op.codes_op.u.barrier.count = nprocs;
            r_op.codes_op.u.barrier.root = 0;
180 181 182 183 184

            token = strtok(NULL, ", ()");
            token = strtok(NULL, ", ");
            tmp = strtok(NULL, ", \n");
            if (tmp) token = tmp;
185 186 187
        }
        else{
            continue;
188
        }
189
        r_op.end_time = r_op.start_time + atof(token);
190

191
        new->trace_ops[new->trace_list_ndx++] = r_op;
192
        if (new->trace_list_ndx == 2048) break;
193
    }
194

195 196 197 198 199 200
    fclose(trace_file);

    /* reset ndx to 0 and set max to event count */
    /* now we can read all events by counting through array from 0 - max */
    new->trace_list_max = new->trace_list_ndx;
    new->trace_list_ndx = 0;
201 202 203
    if (new->trace_list_max > 0) {
        new->last_op_time = new->trace_ops[0].start_time;
    }
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232

    /* initialize the hash table of rank contexts, if it has not been initialized */
    if (!rank_tbl) {
        rank_tbl = qhash_init(hash_rank_compare, quickhash_32bit_hash, RANK_HASH_TABLE_SIZE);

        if (!rank_tbl) {
            free(new);
            return -1;
        }
    }

    /* add this rank context to the hash table */
    qhash_add(rank_tbl, &(new->rank), &(new->hash_link));
    rank_tbl_pop++;

    return 0;
}

/* pull the next trace (independent or collective) for this rank from its trace context */
static void recorder_io_workload_get_next(int rank, struct codes_workload_op *op)
{
    struct qhash_head *hash_link = NULL;
    struct rank_traces_context *tmp = NULL;

    /* Find event context for this rank in the rank hash table */
    hash_link = qhash_search(rank_tbl, &rank);

    /* terminate the workload if there is no valid rank context */
    if(!hash_link) {
233

234 235 236 237 238 239 240
        op->op_type = CODES_WK_END;
        return;
    }

    tmp = qhash_entry(hash_link, struct rank_traces_context, hash_link);
    assert(tmp->rank == rank);

241
    if(tmp->trace_list_ndx == tmp->trace_list_max) {
242 243 244 245
        /* no more events -- just end the workload */
        op->op_type = CODES_WK_END;
        qhash_del(hash_link);
        free(tmp);
246

247 248
        rank_tbl_pop--;
        if(!rank_tbl_pop)
249
        {
250
            qhash_finalize(rank_tbl);
251 252
            rank_tbl = NULL;
        }
253 254
    }
    else {
255 256 257 258 259 260 261 262 263 264 265 266 267 268
        struct recorder_io_op *next_r_op = &(tmp->trace_ops[tmp->trace_list_ndx]);

        if ((next_r_op->start_time - tmp->last_op_time) <= RECORDER_NEGLIGIBLE_DELAY) {
            *op = next_r_op->codes_op;

            tmp->trace_list_ndx++;
            tmp->last_op_time = next_r_op->end_time;
        }
        else {
            op->op_type = CODES_WK_DELAY;
            op->u.delay.seconds = next_r_op->start_time - tmp->last_op_time;

            tmp->last_op_time = next_r_op->start_time;
        }
269 270 271 272 273 274 275 276 277 278
    }

    return;
}

static int hash_rank_compare(void *key, struct qhash_head *link)
{
    int *in_rank = (int *)key;
    struct rank_traces_context *tmp;

279
    tmp = qhash_entry(link, struct rank_traces_context, hash_link);
280 281 282 283 284 285 286 287 288 289 290 291 292 293 294
    if (tmp->rank == *in_rank)
        return 1;

    return 0;
}


/*
 * Local variables:
 *  c-indent-level: 4
 *  c-basic-offset: 4
 * End:
 *
 * vim: ft=c ts=8 sts=4 sw=4 expandtab
 */