Commit 51ea02b0 authored by Shane Snyder's avatar Shane Snyder

Add delay capability to recorder workloads

parent f5e5c65e
......@@ -20,13 +20,14 @@
#include "codes-workload-method.h"
#include "codes/quickhash.h"
#define RECORDER_MAX_TRACE_READ_COUNT 1024
#define RECORDER_NEGLIGIBLE_DELAY .00001
#define RANK_HASH_TABLE_SIZE 397
struct recorder_io_op
{
double start_time;
double end_time;
struct codes_workload_op codes_op;
};
......@@ -34,11 +35,13 @@ struct recorder_io_op
struct rank_traces_context
{
int rank;
struct qhash_head hash_link;
double last_op_time;
struct recorder_io_op trace_ops[1024]; /* TODO: this should be extendable */
struct recorder_io_op trace_ops[2048]; /* TODO: this should be extendable */
int trace_list_ndx;
int trace_list_max;
struct qhash_head hash_link;
};
......@@ -65,7 +68,7 @@ static int recorder_io_workload_load(const char *params, int rank)
{
recorder_params *r_params = (recorder_params *) params;
int64_t nprocs = 0;
int64_t nprocs = 16384;
struct rank_traces_context *new = NULL;
char *trace_dir = r_params->trace_dir_path;
......@@ -81,6 +84,7 @@ static int recorder_io_workload_load(const char *params, int rank)
new->trace_list_ndx = 0;
new->trace_list_max = 0;
#if 0
DIR *dirp;
struct dirent *entry;
dirp = opendir(trace_dir);
......@@ -89,6 +93,7 @@ static int recorder_io_workload_load(const char *params, int rank)
nprocs++;
}
closedir(dirp);
#endif
char trace_file_name[1024] = {'\0'};
sprintf(trace_file_name, "%s/log.%d", trace_dir, rank);
......@@ -97,21 +102,17 @@ static int recorder_io_workload_load(const char *params, int rank)
if(trace_file == NULL)
return -1;
double start_time;
char function_name[128] = {'\0'};
/* Read the first chunk of data (of size RECORDER_MAX_TRACE_READ_COUNT) */
char *line = NULL;
size_t len;
ssize_t ret_value;
char function_name[128] = {'\0'};
while((ret_value = getline(&line, &len, trace_file)) != -1) {
struct recorder_io_op r_op;
char *token = strtok(line, ", ");
start_time = atof(token);
r_op.start_time = atof(token);
token = strtok(NULL, ", ");
strcpy(function_name, token);
r_op.start_time = start_time;
if(!strcmp(function_name, "open") || !strcmp(function_name, "open64")) {
r_op.codes_op.op_type = CODES_WK_OPEN;
......@@ -122,12 +123,17 @@ static int recorder_io_workload_load(const char *params, int rank)
token = strtok(NULL, ", )");
token = strtok(NULL, ", ");
r_op.codes_op.u.open.file_id = atoi(token);
token = strtok(NULL, ", \n");
}
else if(!strcmp(function_name, "close")) {
r_op.codes_op.op_type = CODES_WK_CLOSE;
token = strtok(NULL, ", ()");
r_op.codes_op.u.close.file_id = atoi(token);
token = strtok(NULL, ", ");
token = strtok(NULL, ", \n");
}
else if(!strcmp(function_name, "read") || !strcmp(function_name, "read64")) {
r_op.codes_op.op_type = CODES_WK_READ;
......@@ -143,6 +149,9 @@ static int recorder_io_workload_load(const char *params, int rank)
token = strtok(NULL, ", )");
r_op.codes_op.u.read.offset = atol(token);
token = strtok(NULL, ", ");
token = strtok(NULL, ", \n");
}
else if(!strcmp(function_name, "write") || !strcmp(function_name, "write64")) {
r_op.codes_op.op_type = CODES_WK_WRITE;
......@@ -158,19 +167,29 @@ static int recorder_io_workload_load(const char *params, int rank)
token = strtok(NULL, ", )");
r_op.codes_op.u.write.offset = atol(token);
token = strtok(NULL, ", ");
token = strtok(NULL, ", \n");
}
else if(!strcmp(function_name, "MPI_Barrier")) {
char *tmp;
r_op.codes_op.op_type = CODES_WK_BARRIER;
r_op.codes_op.u.barrier.count = nprocs;
r_op.codes_op.u.barrier.root = 0;
token = strtok(NULL, ", ()");
token = strtok(NULL, ", ");
tmp = strtok(NULL, ", \n");
if (tmp) token = tmp;
}
else{
continue;
}
r_op.end_time = r_op.start_time + atof(token);
new->trace_ops[new->trace_list_ndx++] = r_op;
if (new->trace_list_ndx == 1024) break;
if (new->trace_list_ndx == 2048) break;
}
fclose(trace_file);
......@@ -179,6 +198,9 @@ static int recorder_io_workload_load(const char *params, int rank)
/* now we can read all events by counting through array from 0 - max */
new->trace_list_max = new->trace_list_ndx;
new->trace_list_ndx = 0;
if (new->trace_list_max > 0) {
new->last_op_time = new->trace_ops[0].start_time;
}
/* initialize the hash table of rank contexts, if it has not been initialized */
if (!rank_tbl) {
......@@ -230,9 +252,20 @@ static void recorder_io_workload_get_next(int rank, struct codes_workload_op *op
}
}
else {
/* return the next event */
/* TODO: Do I need to check for the delay like in Darshan? */
*op = tmp->trace_ops[tmp->trace_list_ndx++].codes_op;
struct recorder_io_op *next_r_op = &(tmp->trace_ops[tmp->trace_list_ndx]);
if ((next_r_op->start_time - tmp->last_op_time) <= RECORDER_NEGLIGIBLE_DELAY) {
*op = next_r_op->codes_op;
tmp->trace_list_ndx++;
tmp->last_op_time = next_r_op->end_time;
}
else {
op->op_type = CODES_WK_DELAY;
op->u.delay.seconds = next_r_op->start_time - tmp->last_op_time;
tmp->last_op_time = next_r_op->start_time;
}
}
return;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment