Commit 54110809 authored by Shane Snyder's avatar Shane Snyder

Update darshan workload generator source

Now, the darshan generator can generate independent file events, as
well as basic collective file events. Still working on rewriting
collective events.
parent 5e2c35d2
......@@ -93,6 +93,7 @@ struct codes_workload_method darshan_io_workload_method =
/* hash table to store per-rank workload contexts */
static struct qhash_table *rank_tbl = NULL;
static int rank_tbl_pop = 0;
/* load the workload generator for this rank, given input params */
static int darshan_io_workload_load(const char *params, int rank)
......@@ -107,9 +108,6 @@ static int darshan_io_workload_load(const char *params, int rank)
if (!d_params)
return -1;
/* (re)seed the random number generator */
srand(time(NULL));
/* open the darshan log to begin reading in file i/o info */
logfile_fd = darshan_log_open(d_params->log_file_path, "r");
if (logfile_fd < 0)
......@@ -174,15 +172,7 @@ static int darshan_io_workload_load(const char *params, int rank)
/* add this rank context to the hash table */
qhash_add(rank_tbl, &(my_ctx->my_rank), &(my_ctx->hash_link));
/* fill out the info required for this workload group */
if (darshan_workload_info.group_id == -1)
{
darshan_workload_info.group_id = 1;
darshan_workload_info.min_rank = 0;
darshan_workload_info.max_rank = job.nprocs - 1;
darshan_workload_info.num_lrank = job.nprocs;
}
rank_tbl_pop++;
return 0;
}
......@@ -217,6 +207,10 @@ static void darshan_io_workload_get_next(int rank, struct codes_workload_op *op)
{
qhash_del(hash_link);
free(tmp);
rank_tbl_pop--;
if (!rank_tbl_pop)
qhash_finalize(rank_tbl);
}
else
{
......@@ -454,11 +448,14 @@ void generate_psx_coll_file_events(
struct darshan_file *file, struct rank_io_context *io_context,
int64_t nprocs, int64_t in_agg_cnt)
{
int64_t total_io_ops = file->counters[CP_POSIX_READS] + file->counters[CP_POSIX_WRITES];
int64_t open_cycles;
int64_t total_ind_opens;
int64_t total_coll_opens;
int64_t ind_opens_this_cycle;
int64_t coll_opens_this_cycle;
int64_t extra_opens = 0;
int64_t extra_open_stride = 0;
int64_t extra_io_ops = 0;
int64_t ind_open_stride = 0;
int64_t total_io_ops = file->counters[CP_POSIX_READS] + file->counters[CP_POSIX_WRITES];
int64_t total_ind_io_ops;
int64_t total_coll_io_ops;
int64_t ind_io_ops_this_cycle;
......@@ -484,86 +481,104 @@ void generate_psx_coll_file_events(
*/
assert(file->counters[CP_POSIX_OPENS] >= nprocs);
if (file->counters[CP_COLL_OPENS] || file->counters[CP_INDEP_OPENS])
{
}
else
{
extra_opens = file->counters[CP_POSIX_OPENS] % nprocs;
if (extra_opens && ((file->counters[CP_POSIX_OPENS] / nprocs) % extra_opens))
{
extra_opens = 0;
}
else
{
extra_io_ops = total_io_ops % nprocs;
}
total_ind_opens = file->counters[CP_POSIX_OPENS] - extra_opens;
total_coll_opens = 0;
open_cycles = ceil((double)(total_ind_opens) / nprocs);
}
assert(extra_opens <= open_cycles);
/* determine delay information */
delay_per_cycle = (file->fcounters[CP_F_CLOSE_TIMESTAMP] -
file->fcounters[CP_F_OPEN_TIMESTAMP] -
(file->fcounters[CP_F_POSIX_READ_TIME] / nprocs) -
(file->fcounters[CP_F_POSIX_WRITE_TIME] / nprocs) -
(file->fcounters[CP_F_POSIX_META_TIME] / nprocs)) /
(file->counters[CP_POSIX_OPENS] / nprocs);
(file->fcounters[CP_F_POSIX_META_TIME] / nprocs)) / open_cycles;
/* calculate average meta op time (for i/o and opens/closes) */
meta_op_time = file->fcounters[CP_F_POSIX_META_TIME] / ((2 * file->counters[CP_POSIX_OPENS]) +
file->counters[CP_POSIX_READS] + file->counters[CP_POSIX_WRITES]);
/* TODO TIMING */
calc_io_delays(file, (file->counters[CP_POSIX_OPENS] / nprocs),
round((double)total_io_ops / nprocs), delay_per_cycle,
&first_io_delay, &close_delay, &inter_cycle_delay, &inter_io_delay);
if (file->counters[CP_COLL_OPENS] && file->counters[CP_INDEP_OPENS])
{
if (((file->counters[CP_COLL_OPENS] / nprocs) % file->counters[CP_INDEP_OPENS]) == 0)
{
ind_open_stride = (file->counters[CP_COLL_OPENS] / nprocs) /
file->counters[CP_INDEP_OPENS];
}
else
/* it is rare to overwrite existing files, so set the create flag */
if (file->counters[CP_BYTES_WRITTEN])
{
ind_open_stride = 1;
}
create_flag = 1;
}
if (file->counters[CP_COLL_OPENS] || file->counters[CP_INDEP_OPENS])
{
extra_opens = file->counters[CP_POSIX_OPENS] - file->counters[CP_COLL_OPENS] -
file->counters[CP_INDEP_OPENS];
assert(extra_opens <= ((file->counters[CP_COLL_OPENS] / nprocs) +
file->counters[CP_INDEP_OPENS]));
if (extra_opens)
/* generate all events for this collectively opened file */
for (i = 0; i < open_cycles; i++)
{
if ((((file->counters[CP_COLL_OPENS] / nprocs) + file->counters[CP_INDEP_OPENS]) %
extra_opens) == 0)
if (file->counters[CP_COLL_OPENS])
{
extra_open_stride = ((file->counters[CP_COLL_OPENS] / nprocs) +
file->counters[CP_INDEP_OPENS]) / extra_opens;
}
else
{
extra_open_stride = 1;
}
extra_io_ops = 0;
ind_opens_this_cycle = ceil((double)total_ind_opens / open_cycles);
coll_opens_this_cycle = 0;
total_ind_opens -= ind_opens_this_cycle;
}
total_ind_io_ops = file->counters[CP_INDEP_READS] + file->counters[CP_INDEP_WRITES];
total_coll_io_ops = (file->counters[CP_COLL_READS] + file->counters[CP_COLL_WRITES]) / nprocs;
/* assign any extra opens to rank 0 (these may correspond to file creations or
* header reads/writes)
*/
if (extra_opens && !((open_cycles - i) % extra_opens))
{
assert(create_flag);
file->rank = 0;
cur_time = generate_psx_open_event(file, create_flag, meta_op_time, cur_time, io_context);
create_flag = 0;
cur_time = generate_psx_close_event(file, meta_op_time, cur_time, io_context);
file->rank = -1;
file->counters[CP_POSIX_OPENS]--;
}
else
while (ind_opens_this_cycle > io_context->my_rank)
{
extra_opens = file->counters[CP_POSIX_OPENS] % nprocs;
if (extra_opens && (((file->counters[CP_POSIX_OPENS] / nprocs) % extra_opens) == 0))
cur_time = generate_psx_open_event(file, create_flag, meta_op_time,
cur_time, io_context);
create_flag = 0;
cur_time = generate_psx_close_event(file, meta_op_time, cur_time, io_context);
if (ind_opens_this_cycle >= nprocs)
{
extra_open_stride = (file->counters[CP_POSIX_OPENS] / nprocs) / extra_opens;
extra_io_ops = total_io_ops % nprocs;
file->counters[CP_POSIX_OPENS] -= nprocs;
ind_opens_this_cycle -= nprocs;
}
else
{
extra_opens = 0;
file->counters[CP_POSIX_OPENS] -= ind_opens_this_cycle;
ind_opens_this_cycle = 0;
}
total_ind_io_ops = total_io_ops - extra_io_ops;
total_coll_io_ops = 0;
}
/* it is rare to overwrite existing files, so set the create flag */
if (file->counters[CP_BYTES_WRITTEN])
while (coll_opens_this_cycle)
{
create_flag = 1;
}
}
/* generate all events for this collectively opened file */
for (i = 0; file->counters[CP_POSIX_OPENS]; i++)
{
#if 0
if (file->counters[CP_COLL_OPENS])
aggregator_cnt = in_agg_cnt;
else
......@@ -719,6 +734,7 @@ void generate_psx_coll_file_events(
file->counters[CP_POSIX_OPENS] -= rank_cnt;
}
}
#endif
return;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment