Commit f5e5c65e authored by Shane Snyder's avatar Shane Snyder
Browse files

Update recorder workload generator

Update recorder workload generator to obtain workload operation
parameters from new Recorder I/O traces (courteousy of Babak
Behzad). I/O offsets are now tracked explicitly in the recorder
traces, and workload file_id's are set to original execution file
descriptors.
parent 8dd0baa4
...@@ -24,72 +24,21 @@ ...@@ -24,72 +24,21 @@
#define RANK_HASH_TABLE_SIZE 397 #define RANK_HASH_TABLE_SIZE 397
typedef enum struct recorder_io_op
{ {
POSIX_OPEN = 0,
POSIX_CLOSE,
POSIX_READ,
POSIX_WRITE,
BARRIER,
} recorder_trace_type;
struct recorder_trace
{
int64_t rank;
recorder_trace_type type;
double start_time; double start_time;
union struct codes_workload_op codes_op;
{
struct
{
uint64_t file;
char file_path[2048];
int create_flag;
} open;
struct
{
uint64_t file;
char file_path[2048];
} close;
struct
{
uint64_t file;
char file_path[2048];
off_t offset;
size_t size;
} read;
struct
{
uint64_t file;
char file_path[2048];
off_t offset;
size_t size;
} write;
struct
{
int64_t proc_count;
int64_t root;
} barrier;
} trace_params;
}; };
/* structure for storing all context needed to retrieve traces for this rank */ /* structure for storing all context needed to retrieve traces for this rank */
struct rank_traces_context struct rank_traces_context
{ {
int rank; int rank;
struct qhash_head hash_link;
FILE *trace_file; struct recorder_io_op trace_ops[1024]; /* TODO: this should be extendable */
int64_t trace_cnt;
struct recorder_trace traces[RECORDER_MAX_TRACE_READ_COUNT];
int last_trace_in_memory_index;
long last_line_read;
int trace_list_ndx; int trace_list_ndx;
int trace_list_max; int trace_list_max;
struct qhash_head hash_link;
}; };
...@@ -98,7 +47,6 @@ static int recorder_io_workload_load(const char *params, int rank); ...@@ -98,7 +47,6 @@ static int recorder_io_workload_load(const char *params, int rank);
static void recorder_io_workload_get_next(int rank, struct codes_workload_op *op); static void recorder_io_workload_get_next(int rank, struct codes_workload_op *op);
/* helper functions for recorder workload CODES API */ /* helper functions for recorder workload CODES API */
static struct codes_workload_op recorder_trace_to_codes_workload_op(struct recorder_trace trace);
static int hash_rank_compare(void *key, struct qhash_head *link); static int hash_rank_compare(void *key, struct qhash_head *link);
/* workload method name and function pointers for the CODES workload API */ /* workload method name and function pointers for the CODES workload API */
...@@ -142,127 +90,95 @@ static int recorder_io_workload_load(const char *params, int rank) ...@@ -142,127 +90,95 @@ static int recorder_io_workload_load(const char *params, int rank)
} }
closedir(dirp); closedir(dirp);
char *trace_file_name = (char*) malloc(sizeof(char) * 1024); char trace_file_name[1024] = {'\0'};
sprintf(trace_file_name, "%s/log.%d", trace_dir, rank); sprintf(trace_file_name, "%s/log.%d", trace_dir, rank);
FILE *trace_file = fopen(trace_file_name, "r"); FILE *trace_file = fopen(trace_file_name, "r");
if(trace_file == NULL) if(trace_file == NULL)
return -1; return -1;
printf("rank %d out of %ld procs: Opened %s\n", rank, nprocs, trace_file_name);
double start_time; double start_time;
char *function_name = (char*) malloc(sizeof(char) * 128); char function_name[128] = {'\0'};
/* Read the first chunk of data (of size RECORDER_MAX_TRACE_READ_COUNT) */ /* Read the first chunk of data (of size RECORDER_MAX_TRACE_READ_COUNT) */
char *line = NULL; char *line = NULL;
size_t len; size_t len;
ssize_t ret_value; ssize_t ret_value;
int i; while((ret_value = getline(&line, &len, trace_file)) != -1) {
for(i = 0; i < RECORDER_MAX_TRACE_READ_COUNT; i++) { struct recorder_io_op r_op;
ret_value = getline(&line, &len, trace_file); char *token = strtok(line, ", ");
if(ret_value == -1) { start_time = atof(token);
new->trace_list_max = i; token = strtok(NULL, ", ");
break; strcpy(function_name, token);
}
else { r_op.start_time = start_time;
char *token = strtok(line, ", "); if(!strcmp(function_name, "open") || !strcmp(function_name, "open64")) {
r_op.codes_op.op_type = CODES_WK_OPEN;
start_time = atof(token);
token = strtok(NULL, ", (");
token = strtok(NULL, ", )");
r_op.codes_op.u.open.create_flag = atoi(token);
token = strtok(NULL, ", )");
token = strtok(NULL, ", "); token = strtok(NULL, ", ");
strcpy(function_name, token); r_op.codes_op.u.open.file_id = atoi(token);
}
//printf("function_name=%s:\n", function_name); else if(!strcmp(function_name, "close")) {
r_op.codes_op.op_type = CODES_WK_CLOSE;
struct recorder_trace rt;
if(!strcmp(function_name, "open") || !strcmp(function_name, "open64")) {
rt.rank = rank;
rt.type = POSIX_OPEN;
rt.start_time = start_time;
char *file_path = (char*) malloc(sizeof(char) * 2048);
token = strtok(NULL, ", (");
strcpy(file_path, token);
int create_flag;
token = strtok(NULL, ", )");
create_flag = atoi(token);
strcpy(rt.trace_params.open.file_path, file_path);
rt.trace_params.open.create_flag = create_flag;
}
else if(!strcmp(function_name, "close")) {
rt.rank = rank;
rt.type = POSIX_CLOSE;
rt.start_time = start_time;
char *file_path = (char*) malloc(sizeof(char) * 2048);
token = strtok(NULL, ", ()");
strcpy(file_path, token);
strcpy(rt.trace_params.close.file_path, file_path);
// printf("\t rt = [%ld %d %f close (%s)\n", rt.rank, rt.type, rt.start_time, rt.trace_params.open.file_path);
}
else if(!strcmp(function_name, "read") || !strcmp(function_name, "read64")) {
rt.rank = rank;
rt.type = POSIX_READ;
rt.start_time = start_time;
char *file_path = (char*) malloc(sizeof(char) * 2048);
token = strtok(NULL, ", (");
strcpy(file_path, token);
// Throw out the buffer token = strtok(NULL, ", ()");
token = strtok(NULL, ", "); r_op.codes_op.u.close.file_id = atoi(token);
}
else if(!strcmp(function_name, "read") || !strcmp(function_name, "read64")) {
r_op.codes_op.op_type = CODES_WK_READ;
size_t size; token = strtok(NULL, ", (");
token = strtok(NULL, ", )"); r_op.codes_op.u.read.file_id = atoi(token);
size = atol(token);
strcpy(rt.trace_params.read.file_path, file_path); // Throw out the buffer
rt.trace_params.read.size = size; token = strtok(NULL, ", ");
} token = strtok(NULL, ", )");
else if(!strcmp(function_name, "write") || !strcmp(function_name, "write")) { r_op.codes_op.u.read.size = atol(token);
rt.rank = rank;
rt.type = POSIX_WRITE;
rt.start_time = start_time;
char *file_path = (char*) malloc(sizeof(char) * 2048); token = strtok(NULL, ", )");
token = strtok(NULL, ", ("); r_op.codes_op.u.read.offset = atol(token);
strcpy(file_path, token); }
else if(!strcmp(function_name, "write") || !strcmp(function_name, "write64")) {
r_op.codes_op.op_type = CODES_WK_WRITE;
// Throw out the buffer token = strtok(NULL, ", (");
token = strtok(NULL, ", "); r_op.codes_op.u.write.file_id = atoi(token);
size_t size; // Throw out the buffer
token = strtok(NULL, ", )"); token = strtok(NULL, ", ");
size = atol(token);
strcpy(rt.trace_params.write.file_path, file_path); token = strtok(NULL, ", )");
rt.trace_params.write.size = size; r_op.codes_op.u.write.size = atol(token);
} token = strtok(NULL, ", )");
else if(!strcmp(function_name, "MPI_Barrier")) { r_op.codes_op.u.write.offset = atol(token);
rt.rank = rank; }
rt.type = BARRIER; else if(!strcmp(function_name, "MPI_Barrier")) {
rt.start_time = start_time; r_op.codes_op.op_type = CODES_WK_BARRIER;
}
else{
continue;
}
new->traces[i] = rt; r_op.codes_op.u.barrier.count = nprocs;
r_op.codes_op.u.barrier.root = 0;
}
else{
continue;
} }
new->trace_ops[new->trace_list_ndx++] = r_op;
if (new->trace_list_ndx == 1024) break;
} }
new->last_trace_in_memory_index = i;
new->last_line_read = ftell(trace_file);
new->trace_list_ndx = 0;
fclose(trace_file);
/* reset ndx to 0 and set max to event count */
/* now we can read all events by counting through array from 0 - max */
new->trace_list_max = new->trace_list_ndx;
new->trace_list_ndx = 0;
/* initialize the hash table of rank contexts, if it has not been initialized */ /* initialize the hash table of rank contexts, if it has not been initialized */
if (!rank_tbl) { if (!rank_tbl) {
...@@ -270,7 +186,6 @@ static int recorder_io_workload_load(const char *params, int rank) ...@@ -270,7 +186,6 @@ static int recorder_io_workload_load(const char *params, int rank)
if (!rank_tbl) { if (!rank_tbl) {
free(new); free(new);
fclose(trace_file);
return -1; return -1;
} }
} }
...@@ -278,7 +193,6 @@ static int recorder_io_workload_load(const char *params, int rank) ...@@ -278,7 +193,6 @@ static int recorder_io_workload_load(const char *params, int rank)
/* add this rank context to the hash table */ /* add this rank context to the hash table */
qhash_add(rank_tbl, &(new->rank), &(new->hash_link)); qhash_add(rank_tbl, &(new->rank), &(new->hash_link));
rank_tbl_pop++; rank_tbl_pop++;
fclose(trace_file);
return 0; return 0;
} }
...@@ -286,14 +200,12 @@ static int recorder_io_workload_load(const char *params, int rank) ...@@ -286,14 +200,12 @@ static int recorder_io_workload_load(const char *params, int rank)
/* pull the next trace (independent or collective) for this rank from its trace context */ /* pull the next trace (independent or collective) for this rank from its trace context */
static void recorder_io_workload_get_next(int rank, struct codes_workload_op *op) static void recorder_io_workload_get_next(int rank, struct codes_workload_op *op)
{ {
struct recorder_trace next_trace;
struct qhash_head *hash_link = NULL; struct qhash_head *hash_link = NULL;
struct rank_traces_context *tmp = NULL; struct rank_traces_context *tmp = NULL;
/* Find event context for this rank in the rank hash table */ /* Find event context for this rank in the rank hash table */
hash_link = qhash_search(rank_tbl, &rank); hash_link = qhash_search(rank_tbl, &rank);
/* terminate the workload if there is no valid rank context */ /* terminate the workload if there is no valid rank context */
if(!hash_link) { if(!hash_link) {
...@@ -304,16 +216,12 @@ static void recorder_io_workload_get_next(int rank, struct codes_workload_op *op ...@@ -304,16 +216,12 @@ static void recorder_io_workload_get_next(int rank, struct codes_workload_op *op
tmp = qhash_entry(hash_link, struct rank_traces_context, hash_link); tmp = qhash_entry(hash_link, struct rank_traces_context, hash_link);
assert(tmp->rank == rank); assert(tmp->rank == rank);
/* TODO: read in more events if necessary */ if(tmp->trace_list_ndx == tmp->trace_list_max) {
if(tmp->trace_list_max == 0 && tmp->last_trace_in_memory_index == RECORDER_MAX_TRACE_READ_COUNT) {
}
if(tmp->trace_list_max != 0 && tmp->trace_list_max == tmp->trace_list_ndx) {
/* no more events -- just end the workload */ /* no more events -- just end the workload */
op->op_type = CODES_WK_END; op->op_type = CODES_WK_END;
qhash_del(hash_link); qhash_del(hash_link);
free(tmp); free(tmp);
rank_tbl_pop--; rank_tbl_pop--;
if(!rank_tbl_pop) if(!rank_tbl_pop)
{ {
...@@ -324,64 +232,18 @@ static void recorder_io_workload_get_next(int rank, struct codes_workload_op *op ...@@ -324,64 +232,18 @@ static void recorder_io_workload_get_next(int rank, struct codes_workload_op *op
else { else {
/* return the next event */ /* return the next event */
/* TODO: Do I need to check for the delay like in Darshan? */ /* TODO: Do I need to check for the delay like in Darshan? */
next_trace = tmp->traces[tmp->trace_list_ndx]; *op = tmp->trace_ops[tmp->trace_list_ndx++].codes_op;
*op = recorder_trace_to_codes_workload_op(next_trace);
// tmp->last_event_time = next_event.end_time
tmp->trace_list_ndx++;
} }
return; return;
} }
/* take a recorder trace struct as input and convert it to a codes workload op */
static struct codes_workload_op recorder_trace_to_codes_workload_op(struct recorder_trace trace)
{
struct codes_workload_op codes_op;
switch (trace.type)
{
case POSIX_OPEN:
codes_op.op_type = CODES_WK_OPEN;
codes_op.u.open.file_id = trace.trace_params.open.file;
codes_op.u.open.create_flag = trace.trace_params.open.create_flag;
break;
case POSIX_CLOSE:
codes_op.op_type = CODES_WK_CLOSE;
codes_op.u.close.file_id = trace.trace_params.close.file;
break;
case POSIX_READ:
codes_op.op_type = CODES_WK_READ;
codes_op.u.read.file_id = trace.trace_params.read.file;
codes_op.u.read.offset = trace.trace_params.read.offset;
codes_op.u.read.size = trace.trace_params.read.size;
break;
case POSIX_WRITE:
codes_op.op_type = CODES_WK_WRITE;
codes_op.u.write.file_id = trace.trace_params.write.file;
codes_op.u.write.offset = trace.trace_params.write.offset;
codes_op.u.write.size = trace.trace_params.write.size;
break;
case BARRIER:
codes_op.op_type = CODES_WK_BARRIER;
codes_op.u.barrier.count = trace.trace_params.barrier.proc_count;
codes_op.u.barrier.root = trace.trace_params.barrier.root;
break;
default:
assert(0);
break;
}
return codes_op;
}
static int hash_rank_compare(void *key, struct qhash_head *link) static int hash_rank_compare(void *key, struct qhash_head *link)
{ {
int *in_rank = (int *)key; int *in_rank = (int *)key;
struct rank_traces_context *tmp; struct rank_traces_context *tmp;
tmp = qhash_entry(link, struct rank_traces_context, hash_link);
tmp = qhash_entry(link, struct rank_traces_context, hash_link);
if (tmp->rank == *in_rank) if (tmp->rank == *in_rank)
return 1; return 1;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment