Commit 4b369dec authored by Philip Carns's avatar Philip Carns
Browse files

complete update of workload generator for darshan3

notes:
- this generator will issue MPI-IO events, which cannot yet be handled
  directly by CODES simulations but are honored by the
  codes-workload-mpi-replay utility
- the codes-workload-mpi-replay utility now includes a --prep argument
  that will generate files that need to be read for subsequent replay
parent e8aa61e5
...@@ -47,7 +47,7 @@ endif ...@@ -47,7 +47,7 @@ endif
if USE_DARSHAN if USE_DARSHAN
AM_CPPFLAGS += ${DARSHAN_CFLAGS} -DUSE_DARSHAN=1 AM_CPPFLAGS += ${DARSHAN_CFLAGS} -DUSE_DARSHAN=1
src_libcodes_la_SOURCES += src/workload/methods/codes-darshan-posix-io-wrkld.c src_libcodes_la_SOURCES += src/workload/methods/codes-darshan3-io-wrkld.c
LDADD += ${DARSHAN_LIBS} LDADD += ${DARSHAN_LIBS}
TESTS += tests/workload/darshan-dump.sh TESTS += tests/workload/darshan-dump.sh
endif endif
......
...@@ -57,7 +57,6 @@ struct iolang_params ...@@ -57,7 +57,6 @@ struct iolang_params
struct darshan_params struct darshan_params
{ {
char log_file_path[MAX_NAME_LENGTH_WKLD]; char log_file_path[MAX_NAME_LENGTH_WKLD];
int64_t aggregator_cnt;
int app_cnt; int app_cnt;
}; };
...@@ -147,7 +146,24 @@ enum codes_workload_op_type ...@@ -147,7 +146,24 @@ enum codes_workload_op_type
/* for workloads that have events not yet handled /* for workloads that have events not yet handled
* (eg the workload language) */ * (eg the workload language) */
CODES_WK_IGNORE CODES_WK_IGNORE,
/* extended IO workload operations: MPI */
/* open */
CODES_WK_MPI_OPEN,
/* close */
CODES_WK_MPI_CLOSE,
/* write */
CODES_WK_MPI_WRITE,
/* read */
CODES_WK_MPI_READ,
/* collective open */
CODES_WK_MPI_COLL_OPEN,
/* collective_write */
CODES_WK_MPI_COLL_WRITE,
/* collective_read */
CODES_WK_MPI_COLL_READ,
}; };
/* I/O operation paramaters */ /* I/O operation paramaters */
...@@ -276,7 +292,7 @@ int codes_workload_load( ...@@ -276,7 +292,7 @@ int codes_workload_load(
const char* type, const char* type,
const char* params, const char* params,
int app_id, int app_id,
int rank, int *total_time); int rank);
/* Retrieves the next I/O operation to execute. the wkld_id is the /* Retrieves the next I/O operation to execute. the wkld_id is the
* identifier returned by the init() function. The op argument is a pointer * identifier returned by the init() function. The op argument is a pointer
...@@ -326,7 +342,7 @@ struct codes_workload_method ...@@ -326,7 +342,7 @@ struct codes_workload_method
void * (*codes_workload_read_config) ( void * (*codes_workload_read_config) (
ConfigHandle *handle, char const * section_name, ConfigHandle *handle, char const * section_name,
char const * annotation, int num_ranks); char const * annotation, int num_ranks);
int (*codes_workload_load)(const char* params, int app_id, int rank, int *total_time); int (*codes_workload_load)(const char* params, int app_id, int rank);
void (*codes_workload_get_next)(int app_id, int rank, struct codes_workload_op *op); void (*codes_workload_get_next)(int app_id, int rank, struct codes_workload_op *op);
void (*codes_workload_get_next_rc2)(int app_id, int rank); void (*codes_workload_get_next_rc2)(int app_id, int rank);
int (*codes_workload_get_rank_cnt)(const char* params, int app_id); int (*codes_workload_get_rank_cnt)(const char* params, int app_id);
......
...@@ -27,7 +27,6 @@ static struct option long_opts[] = ...@@ -27,7 +27,6 @@ static struct option long_opts[] =
{"num-ranks", required_argument, NULL, 'n'}, {"num-ranks", required_argument, NULL, 'n'},
{"start-rank", required_argument, NULL, 'r'}, {"start-rank", required_argument, NULL, 'r'},
{"d-log", required_argument, NULL, 'l'}, {"d-log", required_argument, NULL, 'l'},
{"d-aggregator-cnt", required_argument, NULL, 'a'},
{"i-meta", required_argument, NULL, 'm'}, {"i-meta", required_argument, NULL, 'm'},
{"i-use-relpath", no_argument, NULL, 'p'}, {"i-use-relpath", no_argument, NULL, 'p'},
{"r-trace-dir", required_argument, NULL, 'd'}, {"r-trace-dir", required_argument, NULL, 'd'},
...@@ -53,7 +52,6 @@ void usage(){ ...@@ -53,7 +52,6 @@ void usage(){
"-s: print final workload stats\n" "-s: print final workload stats\n"
"DARSHAN OPTIONS (darshan_io_workload)\n" "DARSHAN OPTIONS (darshan_io_workload)\n"
"--d-log: darshan log file\n" "--d-log: darshan log file\n"
"--d-aggregator-cnt: number of aggregators for collective I/O in darshan\n"
"IOLANG OPTIONS (iolang_workload)\n" "IOLANG OPTIONS (iolang_workload)\n"
"--i-meta: i/o language kernel meta file path\n" "--i-meta: i/o language kernel meta file path\n"
"--i-use-relpath: use i/o kernel path relative meta file path\n" "--i-use-relpath: use i/o kernel path relative meta file path\n"
...@@ -133,9 +131,6 @@ int main(int argc, char *argv[]) ...@@ -133,9 +131,6 @@ int main(int argc, char *argv[])
case 'l': case 'l':
strcpy(d_params.log_file_path, optarg); strcpy(d_params.log_file_path, optarg);
break; break;
case 'a':
d_params.aggregator_cnt = atol(optarg);
break;
case 'm': case 'm':
strcpy(i_params.io_kernel_meta_path, optarg); strcpy(i_params.io_kernel_meta_path, optarg);
break; break;
...@@ -206,11 +201,6 @@ int main(int argc, char *argv[]) ...@@ -206,11 +201,6 @@ int main(int argc, char *argv[])
usage(); usage();
return 1; return 1;
} }
else if (d_params.aggregator_cnt == 0){
fprintf(stderr, "Expected \"--d-aggregator-cnt\" argument for darshan workload\n");
usage();
return 1;
}
else{ else{
wparams = (char*)&d_params; wparams = (char*)&d_params;
} }
...@@ -308,8 +298,7 @@ int main(int argc, char *argv[]) ...@@ -308,8 +298,7 @@ int main(int argc, char *argv[])
for (i = start_rank ; i < start_rank+n; i++){ for (i = start_rank ; i < start_rank+n; i++){
struct codes_workload_op op; struct codes_workload_op op;
//printf("loading %s, %d\n", type, i); //printf("loading %s, %d\n", type, i);
int total_time; int id = codes_workload_load(type, wparams, 0, i);
int id = codes_workload_load(type, wparams, 0, i, &total_time);
double total_read_time = 0.0, total_write_time = 0.0; double total_read_time = 0.0, total_write_time = 0.0;
int64_t total_read_bytes = 0, total_written_bytes = 0; int64_t total_read_bytes = 0, total_written_bytes = 0;
codes_workload_get_time(type, wparams, 0, i, &total_read_time, &total_write_time, &total_read_bytes, &total_written_bytes); codes_workload_get_time(type, wparams, 0, i, &total_read_time, &total_write_time, &total_read_bytes, &total_written_bytes);
...@@ -328,16 +317,23 @@ int main(int argc, char *argv[]) ...@@ -328,16 +317,23 @@ int main(int argc, char *argv[])
num_barriers++; num_barriers++;
break; break;
case CODES_WK_OPEN: case CODES_WK_OPEN:
case CODES_WK_MPI_OPEN:
case CODES_WK_MPI_COLL_OPEN:
num_opens++; num_opens++;
break; break;
case CODES_WK_CLOSE: case CODES_WK_CLOSE:
case CODES_WK_MPI_CLOSE:
num_closes++; num_closes++;
break; break;
case CODES_WK_WRITE: case CODES_WK_WRITE:
case CODES_WK_MPI_WRITE:
case CODES_WK_MPI_COLL_WRITE:
num_writes++; num_writes++;
write_size += op.u.write.size; write_size += op.u.write.size;
break; break;
case CODES_WK_READ: case CODES_WK_READ:
case CODES_WK_MPI_READ:
case CODES_WK_MPI_COLL_READ:
num_reads++; num_reads++;
read_size += op.u.write.size; read_size += op.u.write.size;
break; break;
......
...@@ -160,7 +160,7 @@ int codes_workload_load( ...@@ -160,7 +160,7 @@ int codes_workload_load(
const char* type, const char* type,
const char* params, const char* params,
int app_id, int app_id,
int rank, int *total_time) int rank)
{ {
init_workload_methods(); init_workload_methods();
...@@ -173,7 +173,7 @@ int codes_workload_load( ...@@ -173,7 +173,7 @@ int codes_workload_load(
if(strcmp(method_array[i]->method_name, type) == 0) if(strcmp(method_array[i]->method_name, type) == 0)
{ {
/* load appropriate workload generator */ /* load appropriate workload generator */
ret = method_array[i]->codes_workload_load(params, app_id, rank, total_time); ret = method_array[i]->codes_workload_load(params, app_id, rank);
if(ret < 0) if(ret < 0)
{ {
return(-1); return(-1);
...@@ -338,6 +338,8 @@ void codes_workload_print_op( ...@@ -338,6 +338,8 @@ void codes_workload_print_op(
int app_id, int app_id,
int rank) int rank)
{ {
char *name;
switch(op->op_type){ switch(op->op_type){
case CODES_WK_END: case CODES_WK_END:
fprintf(f, "op: app:%d rank:%d type:end\n", app_id, rank); fprintf(f, "op: app:%d rank:%d type:end\n", app_id, rank);
...@@ -351,23 +353,41 @@ void codes_workload_print_op( ...@@ -351,23 +353,41 @@ void codes_workload_print_op(
app_id, rank, op->u.barrier.count, op->u.barrier.root); app_id, rank, op->u.barrier.count, op->u.barrier.root);
break; break;
case CODES_WK_OPEN: case CODES_WK_OPEN:
fprintf(f, "op: app:%d rank:%d type:open file_id:%llu flag:%d\n", case CODES_WK_MPI_OPEN:
app_id, rank, LLU(op->u.open.file_id), op->u.open.create_flag); case CODES_WK_MPI_COLL_OPEN:
if(op->op_type == CODES_WK_OPEN) name = "open";
if(op->op_type == CODES_WK_MPI_OPEN) name = "mpi_open";
if(op->op_type == CODES_WK_MPI_COLL_OPEN) name = "mpi_coll_open";
fprintf(f, "op: app:%d rank:%d type:%s file_id:%llu flag:%d\n",
app_id, rank, name, LLU(op->u.open.file_id), op->u.open.create_flag);
break; break;
case CODES_WK_CLOSE: case CODES_WK_CLOSE:
fprintf(f, "op: app:%d rank:%d type:close file_id:%llu\n", case CODES_WK_MPI_CLOSE:
app_id, rank, LLU(op->u.close.file_id)); if(op->op_type == CODES_WK_CLOSE) name = "close";
if(op->op_type == CODES_WK_MPI_CLOSE) name = "mpi_close";
fprintf(f, "op: app:%d rank:%d type:%s file_id:%llu\n",
app_id, rank, name, LLU(op->u.close.file_id));
break; break;
case CODES_WK_WRITE: case CODES_WK_WRITE:
fprintf(f, "op: app:%d rank:%d type:write " case CODES_WK_MPI_WRITE:
case CODES_WK_MPI_COLL_WRITE:
if(op->op_type == CODES_WK_WRITE) name = "write";
if(op->op_type == CODES_WK_MPI_WRITE) name = "mpi_write";
if(op->op_type == CODES_WK_MPI_COLL_WRITE) name = "mpi_coll_write";
fprintf(f, "op: app:%d rank:%d type:%s "
"file_id:%llu off:%llu size:%llu\n", "file_id:%llu off:%llu size:%llu\n",
app_id, rank, LLU(op->u.write.file_id), LLU(op->u.write.offset), app_id, rank, name, LLU(op->u.write.file_id), LLU(op->u.write.offset),
LLU(op->u.write.size)); LLU(op->u.write.size));
break; break;
case CODES_WK_READ: case CODES_WK_READ:
fprintf(f, "op: app:%d rank:%d type:read " case CODES_WK_MPI_READ:
case CODES_WK_MPI_COLL_READ:
if(op->op_type == CODES_WK_READ) name = "read";
if(op->op_type == CODES_WK_MPI_READ) name = "mpi_read";
if(op->op_type == CODES_WK_MPI_COLL_READ) name = "mpi_coll_read";
fprintf(f, "op: app:%d rank:%d type:%s "
"file_id:%llu off:%llu size:%llu\n", "file_id:%llu off:%llu size:%llu\n",
app_id, rank, LLU(op->u.read.file_id), LLU(op->u.read.offset), app_id, rank, name, LLU(op->u.read.file_id), LLU(op->u.read.offset),
LLU(op->u.read.size)); LLU(op->u.read.size));
break; break;
case CODES_WK_SEND: case CODES_WK_SEND:
......
/*
* Copyright (C) 2013 University of Chicago.
* See COPYRIGHT notice in top-level directory.
*
*/
#include <assert.h>
#include <math.h>
#include "codes/codes-workload.h"
#include "codes/quickhash.h"
#include "darshan-logutils.h"
#define DEF_INTER_IO_DELAY_PCT 0.2
#define DEF_INTER_CYC_DELAY_PCT 0.4
#define DARSHAN_NEGLIGIBLE_DELAY 0.00001
#define RANK_HASH_TABLE_SIZE 397
#define MIN(a, b) (((a) < (b)) ? (a) : (b))
#define ALIGN_BY_8(x) ((x) + ((x) % 8))
/* structure for storing a darshan workload operation (a codes op with 2 timestamps) */
struct darshan_io_op
{
struct codes_workload_op codes_op;
double start_time;
double end_time;
};
/* I/O context structure managed by each rank in the darshan workload */
struct rank_io_context
{
int64_t my_rank;
double last_op_time;
void *io_op_dat;
off_t next_off;
struct qhash_head hash_link;
};
static void * darshan_io_workload_read_config(
ConfigHandle * handle,
char const * section_name,
char const * annotation,
int num_ranks);
/* Darshan workload generator's implementation of the CODES workload API */
static int darshan_io_workload_load(const char *params, int app_id, int rank);
static void darshan_io_workload_get_next(int app_id, int rank, struct codes_workload_op *op);
static int darshan_io_workload_get_rank_cnt(const char *params, int app_id);
static int darshan_rank_hash_compare(void *key, struct qhash_head *link);
/* Darshan I/O op data structure access (insert, remove) abstraction */
static void *darshan_init_io_op_dat(void);
static void darshan_insert_next_io_op(void *io_op_dat, struct darshan_io_op *io_op);
static void darshan_remove_next_io_op(void *io_op_dat, struct darshan_io_op *io_op,
double last_op_time);
static void darshan_finalize_io_op_dat(void *io_op_dat);
static int darshan_io_op_compare(const void *p1, const void *p2);
/* Helper functions for implementing the Darshan workload generator */
static void generate_psx_ind_file_events(struct darshan_file *file,
struct rank_io_context *io_context);
static void generate_psx_coll_file_events(struct darshan_file *file,
struct rank_io_context *io_context,
int64_t nprocs, int64_t aggregator_cnt);
static double generate_psx_open_event(struct darshan_file *file, int create_flag,
double meta_op_time, double cur_time,
struct rank_io_context *io_context, int insert_flag);
static double generate_psx_close_event(struct darshan_file *file, double meta_op_time,
double cur_time, struct rank_io_context *io_context,
int insert_flag);
static double generate_barrier_event(struct darshan_file *file, int64_t root, double cur_time,
struct rank_io_context *io_context);
static double generate_psx_ind_io_events(struct darshan_file *file, int64_t io_ops_this_cycle,
double inter_io_delay, double cur_time,
struct rank_io_context *io_context);
static double generate_psx_coll_io_events(struct darshan_file *file, int64_t ind_io_ops_this_cycle,
int64_t coll_io_ops_this_cycle, int64_t nprocs,
int64_t aggregator_cnt, double inter_io_delay,
double meta_op_time, double cur_time,
struct rank_io_context *io_context);
static void determine_ind_io_params(struct darshan_file *file, int write_flag, size_t *io_sz,
off_t *io_off, struct rank_io_context *io_context);
static void determine_coll_io_params(struct darshan_file *file, int write_flag, int64_t coll_op_cnt,
int64_t agg_cnt, int64_t agg_ndx, size_t *io_sz, off_t *io_off,
struct rank_io_context *io_context);
static void calc_io_delays(struct darshan_file *file, int64_t num_opens, int64_t num_io_ops,
double total_delay, double *first_io_delay, double *close_delay,
double *inter_open_delay, double *inter_io_delay);
static void file_sanity_check(struct darshan_file *file, struct darshan_job *job);
/* workload method name and function pointers for the CODES workload API */
struct codes_workload_method darshan_io_workload_method =
{
.method_name = "darshan_io_workload",
.codes_workload_read_config = darshan_io_workload_read_config,
.codes_workload_load = darshan_io_workload_load,
.codes_workload_get_next = darshan_io_workload_get_next,
.codes_workload_get_rank_cnt = darshan_io_workload_get_rank_cnt,
};
static int total_rank_cnt = 0;
/* hash table to store per-rank workload contexts */
static struct qhash_table *rank_tbl = NULL;
static int rank_tbl_pop = 0;
static void * darshan_io_workload_read_config(
ConfigHandle * handle,
char const * section_name,
char const * annotation,
int num_ranks)
{
darshan_params *d = malloc(sizeof(*d));
assert(d);
d->log_file_path[0] = '\0';
d->aggregator_cnt = -1;
int rc = configuration_get_value_relpath(handle, section_name,
"darshan_log_file", annotation, d->log_file_path,
MAX_NAME_LENGTH_WKLD);
assert(rc > 0);
int tmp;
rc = configuration_get_value_int(&config, "workload",
"darshan_aggregator_count", annotation, &tmp);
assert(rc == 0);
d->aggregator_cnt = tmp;
return d;
}
/* load the workload generator for this rank, given input params */
static int darshan_io_workload_load(const char *params, int app_id, int rank)
{
darshan_params *d_params = (darshan_params *)params;
darshan_fd logfile_fd;
struct darshan_job job;
struct darshan_file next_file;
struct rank_io_context *my_ctx;
int ret;
APP_ID_UNSUPPORTED(app_id, "darshan")
if (!d_params)
return -1;
/* open the darshan log to begin reading in file i/o info */
logfile_fd = darshan_log_open(d_params->log_file_path, "r");
if (logfile_fd < 0)
return -1;
/* get the per-job stats from the log */
ret = darshan_log_getjob(logfile_fd, &job);
if (ret < 0)
{
darshan_log_close(logfile_fd);
return -1;
}
if (!total_rank_cnt)
{
total_rank_cnt = job.nprocs;
}
assert(rank < total_rank_cnt);
/* allocate the i/o context needed by this rank */
my_ctx = malloc(sizeof(struct rank_io_context));
if (!my_ctx)
{
darshan_log_close(logfile_fd);
return -1;
}
my_ctx->my_rank = (int64_t)rank;
my_ctx->last_op_time = 0.0;
my_ctx->io_op_dat = darshan_init_io_op_dat();
my_ctx->next_off = 0;
/* loop over all files contained in the log file */
while ((ret = darshan_log_getfile(logfile_fd, &job, &next_file)) > 0)
{
/* generate all i/o events contained in this independent file */
if (next_file.rank == rank)
{
/* make sure the file i/o counters are valid */
file_sanity_check(&next_file, &job);
/* generate i/o events and store them in this rank's workload context */
generate_psx_ind_file_events(&next_file, my_ctx);
}
/* generate all i/o events involving this rank in this collective file */
else if (next_file.rank == -1)
{
/* make sure the file i/o counters are valid */
file_sanity_check(&next_file, &job);
/* generate collective i/o events and store them in the rank context */
generate_psx_coll_file_events(&next_file, my_ctx, job.nprocs, d_params->aggregator_cnt);
}
else if (next_file.rank < rank)
continue;
else
break;
assert(next_file.counters[CP_POSIX_OPENS] == 0);
assert(next_file.counters[CP_POSIX_READS] == 0);
assert(next_file.counters[CP_POSIX_WRITES] == 0);
}
if (ret < 0)
return -1;
darshan_log_close(logfile_fd);
/* finalize the rank's i/o context so i/o ops may be retrieved later (in order) */
darshan_finalize_io_op_dat(my_ctx->io_op_dat);
/* initialize the hash table of rank contexts, if it has not been initialized */
if (!rank_tbl)
{
rank_tbl = qhash_init(darshan_rank_hash_compare, quickhash_64bit_hash, RANK_HASH_TABLE_SIZE);
if (!rank_tbl)
return -1;
}
/* add this rank context to the hash table */
qhash_add(rank_tbl, &(my_ctx->my_rank), &(my_ctx->hash_link));
rank_tbl_pop++;
return 0;
}
/* pull the next event (independent or collective) for this rank from its event context */
static void darshan_io_workload_get_next(int app_id, int rank, struct codes_workload_op *op)
{
int64_t my_rank = (int64_t)rank;
struct qhash_head *hash_link = NULL;
struct rank_io_context *tmp = NULL;
struct darshan_io_op next_io_op;
assert(rank < total_rank_cnt);
/* find i/o context for this rank in the rank hash table */
hash_link = qhash_search(rank_tbl, &my_rank);
/* terminate the workload if there is no valid rank context */
if (!hash_link)
{
op->op_type = CODES_WK_END;
return;
}
/* get access to the rank's io_context data */
tmp = qhash_entry(hash_link, struct rank_io_context, hash_link);
assert(tmp->my_rank == my_rank);
/* get the next darshan i/o op out of this rank's context */
darshan_remove_next_io_op(tmp->io_op_dat, &next_io_op, tmp->last_op_time);
/* free the rank's i/o context if this is the last i/o op */
if (next_io_op.codes_op.op_type == CODES_WK_END)
{
qhash_del(hash_link);
free(tmp);
rank_tbl_pop--;
if (!rank_tbl_pop)
{
qhash_finalize(rank_tbl);
rank_tbl = NULL;
}
}
else
{
/* else, set the last op time to be the end of the returned op */
tmp->last_op_time = next_io_op.end_time;
}
/* return the codes op contained in the darshan i/o op */
*op = next_io_op.codes_op;
return;
}
static int darshan_io_workload_get_rank_cnt(const char *params, int app_id)
{
darshan_params *d_params = (darshan_params *)params;
darshan_fd logfile_fd;
struct darshan_job job;
int ret;
if (!d_params)
return -1;
/* open the darshan log to begin reading in file i/o info */
logfile_fd = darshan_log_open(d_params->log_file_path, "r");
if (logfile_fd < 0)
return -1;
/* get the per-job stats from the log */
ret = darshan_log_getjob(logfile_fd, &job);
if (ret < 0)
{
darshan_log_close(logfile_fd);
return -1;
}
darshan_log_close(logfile_fd);
return job.nprocs;
}