codes-recorder-io-wrkld.c 12.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (C) 2013 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

/* Recorder workload generator that plugs into the general CODES workload
 * generator API. This generator consumes a set of input files of Recorder I/O
 * traces and passes these traces to the underlying simulator.
 */
#include <fcntl.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <sys/stat.h>
#include <dirent.h>

#include "ross.h"
#include "codes/codes-workload.h"
#include "codes/quickhash.h"
21
#include "codes/jenkins-hash.h"
22

23
#define RECORDER_NEGLIGIBLE_DELAY .00001
24 25 26

#define RANK_HASH_TABLE_SIZE 397

27
struct recorder_io_op
28 29
{
    double start_time;
30
    double end_time;
31
    struct codes_workload_op codes_op;
32 33
};

34 35 36 37 38 39 40
struct file_entry
{
    uint64_t file_id;
    int fd;
    struct qhash_head hash_link;
};

41 42 43 44
/* structure for storing all context needed to retrieve traces for this rank */
struct rank_traces_context
{
    int rank;
45
    double last_op_time;
46

47
    struct recorder_io_op trace_ops[2048]; /* TODO: this should be extendable */
48 49
    int trace_list_ndx;
    int trace_list_max;
50

51
    /* hash table link to next rank (if any) */
52
    struct qhash_head hash_link;
53 54 55
};

/* CODES workload API functions for workloads generated from recorder traces*/
56 57
static int recorder_io_workload_load(const char *params, int app_id, int rank);
static void recorder_io_workload_get_next(int app_id, int rank, struct codes_workload_op *op);
58 59 60

/* helper functions for recorder workload CODES API */
static int hash_rank_compare(void *key, struct qhash_head *link);
61
static int hash_file_compare(void *key, struct qhash_head *link);
62 63 64 65 66

/* workload method name and function pointers for the CODES workload API */
struct codes_workload_method recorder_io_workload_method =
{
    .method_name = "recorder_io_workload",
67
    .codes_workload_read_config = NULL,
68 69 70 71 72 73 74 75
    .codes_workload_load = recorder_io_workload_load,
    .codes_workload_get_next = recorder_io_workload_get_next,
};

static struct qhash_table *rank_tbl = NULL;
static int rank_tbl_pop = 0;

/* load the workload generator for this rank, given input params */
76
static int recorder_io_workload_load(const char *params, int app_id, int rank)
77
{
78
    recorder_params *r_params = (recorder_params *) params;
79
    struct rank_traces_context *newv = NULL;
80 81 82
    struct qhash_table *file_id_tbl = NULL;
    struct qhash_head *link = NULL;
    struct file_entry *file;
83

84 85
    APP_ID_UNSUPPORTED(app_id, "recorder")

86
    int64_t nprocs = r_params->nprocs;
87
    char *trace_dir = r_params->trace_dir_path;
88 89 90 91
    if(!trace_dir)
        return -1;

    /* allocate a new trace context for this rank */
92 93
    newv = (struct rank_traces_context*)malloc(sizeof(*newv));
    if(!newv)
94 95
        return -1;

96 97 98
    newv->rank = rank;
    newv->trace_list_ndx = 0;
    newv->trace_list_max = 0;
99

100
#if 0
101 102 103 104 105 106 107
    DIR *dirp;
    struct dirent *entry;
    dirp = opendir(trace_dir);
    while((entry = readdir(dirp)) != NULL) {
        if(entry->d_type == DT_REG)
            nprocs++;
    }
108
    closedir(dirp);
109
#endif
110

111
    char trace_file_name[1024] = {'\0'};
112 113 114 115 116 117
    sprintf(trace_file_name, "%s/log.%d", trace_dir, rank);

    FILE *trace_file = fopen(trace_file_name, "r");
    if(trace_file == NULL)
        return -1;

118
    char *line = NULL;
119 120
    size_t len;
    ssize_t ret_value;
121
    char function_name[128] = {'\0'};
122 123
    double wkld_start_time = 0.0;
    double io_start_time = 0.0;
124 125
    while((ret_value = getline(&line, &len, trace_file)) != -1) {
        struct recorder_io_op r_op;
126
        char *token = strtok(line, ", \n");
127
        int fd;
128

129
        if (strcmp(token, "BARRIER") && strcmp(token, "0"))
130
        {
131 132 133 134
            if (wkld_start_time == 0.0)
                wkld_start_time = atof(token);

            r_op.start_time = atof(token) - wkld_start_time;
135 136
            token = strtok(NULL, ", ");
        }
137 138 139
        strcpy(function_name, token);

        if(!strcmp(function_name, "open") || !strcmp(function_name, "open64")) {
140
            char *filename = NULL;
141
            char *open_flags = NULL;
142

143
            filename = strtok(NULL, ", (");
144 145 146 147 148 149 150 151 152 153
            open_flags = strtok(NULL, ", )");

            if (!(atoi(open_flags) & O_CREAT))
            {
                r_op.codes_op.op_type = CODES_WK_BARRIER;
                r_op.end_time = r_op.start_time;

                r_op.codes_op.u.barrier.count = nprocs;
                r_op.codes_op.u.barrier.root = 0;

154 155
                newv->trace_ops[newv->trace_list_ndx++] = r_op;
                if (newv->trace_list_ndx == 2048) break;
156
            }
157 158

            token = strtok(NULL, ", )");
159
            token = strtok(NULL, ", ");
160
            fd = atoi(token);
161 162

            token = strtok(NULL, ", \n");
163
            r_op.end_time = r_op.start_time + atof(token);
164 165 166 167 168 169

            if (!file_id_tbl)
            {
                file_id_tbl = qhash_init(hash_file_compare, quickhash_32bit_hash, 11);
                if (!file_id_tbl)
                {
170
                    free(newv);
171 172 173 174
                    return -1;
                }
            }

175
            file = (struct file_entry*)malloc(sizeof(struct file_entry));
176 177
            if (!file)
            {
178
                free(newv);
179 180 181
                return -1;
            }
            
182
            uint32_t h1 = 0x00000000, h2 = 0xFFFFFFFF;
183 184 185
            bj_hashlittle2(filename, strlen(filename), &h1, &h2);
            file->file_id = h1 + (((uint64_t)h2)<<32);
            file->fd = fd;
186
            r_op.codes_op.op_type = CODES_WK_OPEN;
187
            r_op.codes_op.u.open.file_id = file->file_id;
188
            r_op.codes_op.u.open.create_flag = atoi(open_flags) & O_CREAT;
189 190

            qhash_add(file_id_tbl, &fd, &(file->hash_link));
191 192 193
        }
        else if(!strcmp(function_name, "close")) {
            r_op.codes_op.op_type = CODES_WK_CLOSE;
194

195
            token = strtok(NULL, ", ()");
196
            fd = atoi(token);
197 198 199

            token = strtok(NULL, ", ");
            token = strtok(NULL, ", \n");
200
            r_op.end_time = r_op.start_time + atof(token);
201 202 203 204

            link = qhash_search(file_id_tbl, &fd);
            if (!link)
            {
205
                free(newv);
206 207 208 209 210 211 212 213
                return -1;
            }

            file = qhash_entry(link, struct file_entry, hash_link);
            r_op.codes_op.u.close.file_id = file->file_id;

            qhash_del(link);
            free(file);
214 215 216
        }
        else if(!strcmp(function_name, "read") || !strcmp(function_name, "read64")) {
            r_op.codes_op.op_type = CODES_WK_READ;
217

218
            token = strtok(NULL, ", (");
219
            fd = atoi(token);
220

221 222
            // Throw out the buffer
            token = strtok(NULL, ", ");
223

224 225
            token = strtok(NULL, ", )");
            r_op.codes_op.u.read.size = atol(token);
226

227 228
            token = strtok(NULL, ", )");
            r_op.codes_op.u.read.offset = atol(token);
229 230 231

            token = strtok(NULL, ", ");
            token = strtok(NULL, ", \n");
232 233 234 235 236 237 238 239 240

            if (io_start_time == 0.0)
            {
                r_op.end_time = r_op.start_time + atof(token);
            }
            else
            {
                r_op.start_time = r_op.end_time = io_start_time;
            }
241 242 243 244

            link = qhash_search(file_id_tbl, &fd);
            if (!link)
            {
245
                free(newv);
246 247 248 249 250
                return -1;
            }

            file = qhash_entry(link, struct file_entry, hash_link);
            r_op.codes_op.u.read.file_id = file->file_id;
251 252 253
        }
        else if(!strcmp(function_name, "write") || !strcmp(function_name, "write64")) {
            r_op.codes_op.op_type = CODES_WK_WRITE;
254

255
            token = strtok(NULL, ", (");
256
            fd = atoi(token);
257

258 259
            // Throw out the buffer
            token = strtok(NULL, ", ");
260

261 262
            token = strtok(NULL, ", )");
            r_op.codes_op.u.write.size = atol(token);
263

264 265
            token = strtok(NULL, ", )");
            r_op.codes_op.u.write.offset = atol(token);
266 267 268

            token = strtok(NULL, ", ");
            token = strtok(NULL, ", \n");
269 270 271 272 273 274 275 276 277

            if (io_start_time == 0.0)
            {
                r_op.end_time = r_op.start_time + atof(token);
            }
            else
            {
                r_op.start_time = r_op.end_time = io_start_time;
            }
278 279 280 281

            link = qhash_search(file_id_tbl, &fd);
            if (!link)
            {
282
                free(newv);
283 284 285 286 287
                return -1;
            }

            file = qhash_entry(link, struct file_entry, hash_link);
            r_op.codes_op.u.write.file_id = file->file_id;
288
        }
289 290 291
        else if(!strcmp(function_name, "BARRIER")) {
            r_op.start_time = r_op.end_time = io_start_time;
            
292 293 294
            r_op.codes_op.op_type = CODES_WK_BARRIER;
            r_op.codes_op.u.barrier.count = nprocs;
            r_op.codes_op.u.barrier.root = 0;
295 296 297
        }
        else if(!strcmp(function_name, "0")) {
            token = strtok (NULL, ", \n");
298
            newv->trace_ops[newv->trace_list_ndx-1].end_time += atof(token);
299 300 301

            io_start_time = 0.0;
            continue;
302 303
        }
        else{
304 305 306 307
            if (!strcmp(function_name, "MPI_File_write_at_all") ||
                !strcmp(function_name, "MPI_File_read_at_all")) {
                io_start_time = r_op.start_time;
            }
308
            continue;
309 310
        }

311 312
        newv->trace_ops[newv->trace_list_ndx++] = r_op;
        if (newv->trace_list_ndx == 2048) break;
313
    }
314

315
    fclose(trace_file);
316
    qhash_finalize(file_id_tbl);
317 318 319

    /* reset ndx to 0 and set max to event count */
    /* now we can read all events by counting through array from 0 - max */
320 321 322
    newv->trace_list_max = newv->trace_list_ndx;
    newv->trace_list_ndx = 0;
    newv->last_op_time = 0.0;
323 324 325 326 327 328

    /* initialize the hash table of rank contexts, if it has not been initialized */
    if (!rank_tbl) {
        rank_tbl = qhash_init(hash_rank_compare, quickhash_32bit_hash, RANK_HASH_TABLE_SIZE);

        if (!rank_tbl) {
329
            free(newv);
330 331 332 333 334
            return -1;
        }
    }

    /* add this rank context to the hash table */
335
    qhash_add(rank_tbl, &(newv->rank), &(newv->hash_link));
336 337 338 339 340 341
    rank_tbl_pop++;

    return 0;
}

/* pull the next trace (independent or collective) for this rank from its trace context */
342
static void recorder_io_workload_get_next(int app_id, int rank, struct codes_workload_op *op)
343
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
344
    (void)app_id; // no need for this here
345 346 347 348 349 350 351 352
    struct qhash_head *hash_link = NULL;
    struct rank_traces_context *tmp = NULL;

    /* Find event context for this rank in the rank hash table */
    hash_link = qhash_search(rank_tbl, &rank);

    /* terminate the workload if there is no valid rank context */
    if(!hash_link) {
353

354 355 356 357 358 359 360
        op->op_type = CODES_WK_END;
        return;
    }

    tmp = qhash_entry(hash_link, struct rank_traces_context, hash_link);
    assert(tmp->rank == rank);

361
    if(tmp->trace_list_ndx == tmp->trace_list_max) {
362 363 364 365
        /* no more events -- just end the workload */
        op->op_type = CODES_WK_END;
        qhash_del(hash_link);
        free(tmp);
366

367 368
        rank_tbl_pop--;
        if(!rank_tbl_pop)
369
        {
370
            qhash_finalize(rank_tbl);
371 372
            rank_tbl = NULL;
        }
373 374
    }
    else {
375 376 377 378 379 380 381 382 383 384 385 386 387 388
        struct recorder_io_op *next_r_op = &(tmp->trace_ops[tmp->trace_list_ndx]);

        if ((next_r_op->start_time - tmp->last_op_time) <= RECORDER_NEGLIGIBLE_DELAY) {
            *op = next_r_op->codes_op;

            tmp->trace_list_ndx++;
            tmp->last_op_time = next_r_op->end_time;
        }
        else {
            op->op_type = CODES_WK_DELAY;
            op->u.delay.seconds = next_r_op->start_time - tmp->last_op_time;

            tmp->last_op_time = next_r_op->start_time;
        }
389 390 391 392 393 394 395 396 397 398
    }

    return;
}

static int hash_rank_compare(void *key, struct qhash_head *link)
{
    int *in_rank = (int *)key;
    struct rank_traces_context *tmp;

399
    tmp = qhash_entry(link, struct rank_traces_context, hash_link);
400 401 402 403 404 405
    if (tmp->rank == *in_rank)
        return 1;

    return 0;
}

406 407 408 409 410 411 412 413 414 415 416
static int hash_file_compare(void *key, struct qhash_head *link)
{
    int *in_file = (int *)key;
    struct file_entry *tmp;

    tmp = qhash_entry(link, struct file_entry, hash_link);
    if (tmp->fd == *in_file)
        return 1;

    return 0;
}
417 418 419 420 421 422 423 424 425

/*
 * Local variables:
 *  c-indent-level: 4
 *  c-basic-offset: 4
 * End:
 *
 * vim: ft=c ts=8 sts=4 sw=4 expandtab
 */