Commit c68713b9 authored by Shane Snyder's avatar Shane Snyder
Browse files

slight change to darshan generator logic

Some refactoring and a bug fix for regenerating i/o in situations
where independent and collective i/o are used on the same file
(which is typical of hdf5 applications which use headers, etc.).
parent 2db831df
......@@ -38,8 +38,7 @@ struct rank_io_context
double last_op_time;
void *io_op_dat;
off_t next_coll_rd_off;
off_t next_coll_wr_off;
off_t next_off;
struct qhash_head hash_link;
};
......@@ -80,8 +79,8 @@ static double generate_psx_coll_io_events(struct darshan_file *file, int64_t ind
int64_t aggregator_cnt, double inter_io_delay,
double meta_op_time, double cur_time,
struct rank_io_context *io_context);
static void determine_io_params(struct darshan_file *file, int write_flag, int64_t io_this_op,
int64_t proc_count, size_t *io_sz, off_t *io_off);
static void determine_ind_io_params(struct darshan_file *file, int write_flag, size_t *io_sz,
off_t *io_off, struct rank_io_context *io_context);
static void determine_coll_io_params(struct darshan_file *file, int write_flag, int64_t coll_op_cnt,
int64_t agg_cnt, int64_t agg_ndx, size_t *io_sz, off_t *io_off,
struct rank_io_context *io_context);
......@@ -146,8 +145,7 @@ static int darshan_io_workload_load(const char *params, int rank)
my_ctx->my_rank = (int64_t)rank;
my_ctx->last_op_time = 0.0;
my_ctx->io_op_dat = darshan_init_io_op_dat();
my_ctx->next_coll_rd_off = my_ctx->next_coll_wr_off = 0;
my_ctx->next_off = 0;
/* loop over all files contained in the log file */
while ((ret = darshan_log_getfile(logfile_fd, &job, &next_file)) > 0)
......@@ -548,14 +546,6 @@ void generate_psx_coll_file_events(
if (file->counters[CP_COLL_OPENS] || file->counters[CP_INDEP_OPENS])
{
/* Subtract a portion of MPI r/w overhead from the total delay.
* This accounts for time spent synchronizing, since we generate barrier events.
* Note: 60% of mpi read/write time seems to be an appropriate approx of sync duration.
*/
// total_delay -= ((((file->fcounters[CP_F_MPI_READ_TIME] +
// file->fcounters[CP_F_MPI_WRITE_TIME]) / nprocs) -
// file->fcounters[CP_F_SLOWEST_RANK_TIME]) * .4);
extra_opens = file->counters[CP_POSIX_OPENS] - file->counters[CP_COLL_OPENS] -
file->counters[CP_INDEP_OPENS];
......@@ -635,7 +625,6 @@ void generate_psx_coll_file_events(
if (!file->counters[CP_COLL_OPENS] && !file->counters[CP_INDEP_OPENS])
{
/* TODO: we probably want to use ind_io here */
cur_time = generate_psx_coll_io_events(file, 1, 0, nprocs, nprocs, 0.0,
meta_op_time, cur_time, io_context);
extra_io_ops--;
......@@ -646,7 +635,6 @@ void generate_psx_coll_file_events(
file->counters[CP_POSIX_OPENS]--;
}
/* TODO: look at this, use ind_io */
while (ind_opens_this_cycle)
{
if (ind_opens_this_cycle >= nprocs)
......@@ -766,6 +754,8 @@ static double generate_psx_close_event(
if (insert_flag)
darshan_insert_next_io_op(io_context->io_op_dat, &next_io_op);
io_context->next_off = 0;
return cur_time;
}
......@@ -841,7 +831,7 @@ static double generate_psx_ind_io_events(
for (i = 0; i < io_ops_this_cycle; i++)
{
/* calculate what value to use for i/o size and offset */
determine_io_params(file, rw, 1, 1, &io_sz, &io_off);
determine_ind_io_params(file, rw, &io_sz, &io_off, io_context);
if (!rw)
{
/* generate a read event */
......@@ -939,7 +929,6 @@ static double generate_psx_coll_io_events(
int64_t ind_ops_remaining = 0;
double io_op_time;
double max_cur_time = 0.0;
int ind_coll;
size_t io_sz;
off_t io_off;
int64_t i, j;
......@@ -998,21 +987,73 @@ static double generate_psx_coll_io_events(
{
if (ind_ops_remaining)
{
ind_coll = 0;
tmp_rank = (next_ind_io_rank++) % nprocs;
io_cnt = 1;
ind_io_ops_this_cycle--;
ind_ops_remaining--;
if (!ind_ops_remaining) next_ind_io_rank = 0;
determine_ind_io_params(file, rw, &io_sz, &io_off, io_context);
if (!rw)
{
/* generate a read event */
next_io_op.codes_op.op_type = CODES_WK_READ;
next_io_op.codes_op.u.read.file_id = file->hash;
next_io_op.codes_op.u.read.size = io_sz;
next_io_op.codes_op.u.read.offset = io_off;
/* set the end time based on observed bandwidth and io size */
if (rd_bw == 0.0)
io_op_time = 0.0;
else
io_op_time = (io_sz / rd_bw);
file->counters[CP_POSIX_READS]--;
file->counters[CP_INDEP_READS]--;
}
else
{
/* generate a write event */
next_io_op.codes_op.op_type = CODES_WK_WRITE;
next_io_op.codes_op.u.write.file_id = file->hash;
next_io_op.codes_op.u.write.size = io_sz;
next_io_op.codes_op.u.write.offset = io_off;
/* set the end time based on observed bandwidth and io size */
if (wr_bw == 0.0)
io_op_time = 0.0;
else
io_op_time = (io_sz / wr_bw);
file->counters[CP_POSIX_WRITES]--;
file->counters[CP_INDEP_WRITES]--;
}
next_io_op.start_time = cur_time;
next_io_op.end_time = cur_time + io_op_time + meta_op_time;
psx_rw_ops_remaining--;
assert(file->counters[CP_POSIX_READS] >= 0);
assert(file->counters[CP_POSIX_WRITES] >= 0);
/* store the io operation if it belongs to this rank */
if (tmp_rank == io_context->my_rank)
darshan_insert_next_io_op(io_context->io_op_dat, &next_io_op);
if (next_io_op.end_time > max_cur_time)
max_cur_time = next_io_op.end_time;
if ((tmp_rank == (nprocs - 1)) || (i == (total_io_ops_this_cycle - 1)))
{
cur_time = max_cur_time;
if (i != (total_io_ops_this_cycle - 1))
cur_time += inter_io_delay;
}
}
else
{
ind_coll = 1;
tmp_rank = 0;
coll_io_ops_this_cycle--;
if (!rw)
{
io_cnt = ceil((double)(file->counters[CP_POSIX_READS] -
......@@ -1053,7 +1094,6 @@ static double generate_psx_coll_io_events(
next_io_op.codes_op.u.read.file_id = file->hash;
next_io_op.codes_op.u.read.size = io_sz;
next_io_op.codes_op.u.read.offset = io_off;
next_io_op.start_time = cur_time;
/* set the end time based on observed bandwidth and io size */
if (rd_bw == 0.0)
......@@ -1061,7 +1101,6 @@ static double generate_psx_coll_io_events(
else
io_op_time = (io_sz / rd_bw);
next_io_op.end_time = cur_time + io_op_time + meta_op_time;
file->counters[CP_POSIX_READS] -= tmp_coll_cnt;
}
else
......@@ -1071,7 +1110,6 @@ static double generate_psx_coll_io_events(
next_io_op.codes_op.u.write.file_id = file->hash;
next_io_op.codes_op.u.write.size = io_sz;
next_io_op.codes_op.u.write.offset = io_off;
next_io_op.start_time = cur_time;
/* set the end time based on observed bandwidth and io size */
if (wr_bw == 0.0)
......@@ -1079,9 +1117,10 @@ static double generate_psx_coll_io_events(
else
io_op_time = (io_sz / wr_bw);
next_io_op.end_time = cur_time + io_op_time + meta_op_time;
file->counters[CP_POSIX_WRITES] -= tmp_coll_cnt;
}
next_io_op.start_time = cur_time;
next_io_op.end_time = cur_time + io_op_time + meta_op_time;
darshan_insert_next_io_op(io_context->io_op_dat, &next_io_op);
......@@ -1101,90 +1140,12 @@ static double generate_psx_coll_io_events(
psx_rw_ops_remaining -= tmp_coll_cnt;
assert(file->counters[CP_POSIX_READS] >= 0);
assert(file->counters[CP_POSIX_WRITES] >= 0);
}
}
io_ops_this_rw--;
max_cur_time = cur_time;
#if 0
for (j = 0; j < io_cnt; j++)
{
determine_coll_io_params(file, rw, io_cnt - j, &io_sz, &io_off);
if (!rw)
{
/* generate a read event */
next_io_op.codes_op.op_type = CODES_WK_READ;
next_io_op.codes_op.u.read.file_id = file->hash;
next_io_op.codes_op.u.read.size = io_sz;
next_io_op.codes_op.u.read.offset = io_off;
next_io_op.start_time = cur_time;
/* set the end time based on observed bandwidth and io size */
if (rd_bw == 0.0)
io_op_time = 0.0;
else
io_op_time = (io_sz / rd_bw);
next_io_op.end_time = cur_time + io_op_time;
file->counters[CP_POSIX_READS]--;
}
else
{
/* generate a write event */
ext_io_op.codes_op.op_type = CODES_WK_WRITE;
next_io_op.codes_op.u.write.file_id = file->hash;
next_io_op.codes_op.u.write.size = io_sz;
next_io_op.codes_op.u.write.offset = io_off;
next_io_op.start_time = cur_time;
/* set the end time based on observed bandwidth and io size */
if (wr_bw == 0.0)
io_op_time = 0.0;
else
io_op_time = (io_sz / wr_bw);
next_io_op.end_time = cur_time + io_op_time;
file->counters[CP_POSIX_WRITES]--;
}
psx_rw_ops_remaining--;
assert(file->counters[CP_POSIX_READS] >= 0);
assert(file->counters[CP_POSIX_WRITES] >= 0);
/* TODO store the i/o event */
if (tmp_rank == io_context->my_rank)
darshan_insert_next_io_op(io_context->io_op_dat, &next_io_op);
if (next_io_op.end_time > max_cur_time)
max_cur_time = next_io_op.end_time;
tmp_rank += ranks_per_aggregator;
if (ind_coll && (tmp_rank >= (ranks_per_aggregator * aggregator_cnt)))
{
tmp_rank = 0;
cur_time = max_cur_time;
if (j != io_cnt -1)
cur_time = generate_barrier_event(file, 0, cur_time, io_context);
}
}
io_ops_this_rw--;
#endif
if (ind_coll)
{
cur_time = max_cur_time;
if (i != (total_io_ops_this_cycle - 1))
cur_time += inter_io_delay;
}
else
{
if (tmp_rank == (nprocs - 1) || (i == (total_io_ops_this_cycle - 1)))
{
cur_time = max_cur_time;
if (i != (total_io_ops_this_cycle - 1))
cur_time += inter_io_delay;
}
}
io_ops_this_rw--;
/* determine whether to toggle between reads and writes */
if (!io_ops_this_rw && psx_rw_ops_remaining)
......@@ -1237,7 +1198,6 @@ static void determine_coll_io_params(
static off_t agg_off = 0;
int i, j;
int start_ndx, end_ndx;
off_t *next_coll_off;
int64_t *all_size_bins = NULL;
int64_t *common_accesses = &(file->counters[CP_ACCESS1_ACCESS]); /* 4 common accesses */
int64_t *common_access_counts = &(file->counters[CP_ACCESS1_COUNT]); /* common access counts */
......@@ -1257,13 +1217,11 @@ static void determine_coll_io_params(
{
all_size_bins = &(file->counters[CP_SIZE_WRITE_0_100]);
total_io_size = &(file->counters[CP_BYTES_WRITTEN]);
next_coll_off = &(io_context->next_coll_wr_off);
}
else
{
all_size_bins = &(file->counters[CP_SIZE_READ_0_100]);
total_io_size = &(file->counters[CP_BYTES_READ]);
next_coll_off = &(io_context->next_coll_rd_off);
}
/* we enter this if statement if we have not yet calculated which size bins to use for the
......@@ -1304,7 +1262,8 @@ static void determine_coll_io_params(
{
if (all_size_bins[i])
{
agg_size_bins[i] = ((double)all_size_bins[i] / total_agg_size_bin_cnt) * coll_op_cnt;
agg_size_bins[i] = ((double)all_size_bins[i] / total_agg_size_bin_cnt) *
MIN(total_agg_size_bin_cnt, coll_op_cnt);
size_bins_left += agg_size_bins[i];
all_size_bins[i] -= agg_size_bins[i];
}
......@@ -1338,7 +1297,7 @@ static void determine_coll_io_params(
memcpy(tmp_common_cnts, common_access_counts, 4 * sizeof(int64_t));
memcpy(tmp_size_bins, agg_size_bins, 10 * sizeof(int64_t));
tmp_cnt = coll_op_cnt;
agg_off = *next_coll_off;
agg_off = io_context->next_off;
while (tmp_cnt)
{
......@@ -1409,7 +1368,7 @@ static void determine_coll_io_params(
{
agg_off += tmp_size;
}
*next_coll_off += tmp_size;
io_context->next_off += tmp_size;
tmp_cnt--;
tmp_agg_cnt--;
......@@ -1433,7 +1392,6 @@ static void determine_coll_io_params(
*io_sz = 0;
if (*total_io_size > 0)
{
size_bins_left -= agg_cnt;
tmp_cnt = agg_cnt;
start_ndx = -1;
end_ndx = -1;
......@@ -1524,6 +1482,11 @@ static void determine_coll_io_params(
}
*total_io_size -= *io_sz;
/* decrement size bins counter */
size_bins_left -= agg_cnt;
if (size_bins_left < agg_ndx)
size_bins_left = 0;
/* we simply assign offsets sequentially through an aggregator's file view */
*io_off = agg_off;
agg_off += *io_sz;
......@@ -1531,25 +1494,17 @@ static void determine_coll_io_params(
return;
}
/* WARNING: BRUTE FORCE */
static void determine_io_params(
struct darshan_file *file, int write_flag, int64_t io_this_op, int64_t proc_count,
size_t *io_sz, off_t *io_off)
static void determine_ind_io_params(
struct darshan_file *file, int write_flag, size_t *io_sz, off_t *io_off,
struct rank_io_context *io_context)
{
static uint64_t next_rd_off = 0;
static uint64_t next_wr_off = 0;
static int size_bin_ndx = -1;
static int64_t io_this_size_bin = 0;
static int64_t rd_common_counts[4];
static int64_t wr_common_counts[4];
int size_bin_ndx = 0;
int64_t *rd_size_bins = &(file->counters[CP_SIZE_READ_0_100]);
int64_t *wr_size_bins = &(file->counters[CP_SIZE_WRITE_0_100]);
int64_t *size_bins = NULL;
int64_t *common_accesses = &(file->counters[CP_ACCESS1_ACCESS]); /* 4 common accesses */
int64_t *common_access_counts = &(file->counters[CP_ACCESS1_COUNT]); /* common access counts */
int64_t *total_io_size = NULL;
int64_t last_io_byte;
int look_for_small_bin = 0;
int i, j = 0;
const int64_t size_bin_min_vals[10] = { 0, 100, 1024, 10 * 1024, 100 * 1024, 1024 * 1024,
4 * 1024 * 1024, 10 * 1024 * 1024, 100 * 1024 * 1024,
......@@ -1558,206 +1513,58 @@ static void determine_io_params(
4 * 1024 * 1024, 10 * 1024 * 1024, 100 * 1024 * 1024,
1024 * 1024 * 1024, INT64_MAX };
assert(io_this_op);
if (size_bin_ndx == -1)
{
for (i = 0; i < 4; i++)
{
for (j = 0; j < 10; j++)
{
if ((common_accesses[i] >= size_bin_min_vals[j]) &&
(common_accesses[i] <= size_bin_max_vals[j]))
{
if (rd_size_bins[j] && wr_size_bins[j])
{
rd_common_counts[i] = MIN(common_access_counts[i] / 2, rd_size_bins[j]);
wr_common_counts[i] = common_access_counts[i] - rd_common_counts[i];
}
else if (rd_size_bins[j])
{
rd_common_counts[i] = common_access_counts[i];
wr_common_counts[i] = 0;
}
else if (wr_size_bins[j])
{
rd_common_counts[i] = 0;
wr_common_counts[i] = common_access_counts[i];
}
else
{
rd_common_counts[i] = wr_common_counts[i] = 0;
}
break;
}
}
}
}
/* assign data values depending on whether the operation is a read or write */
if (write_flag)
{
total_io_size = &(file->counters[CP_BYTES_WRITTEN]);
last_io_byte = file->counters[CP_MAX_BYTE_WRITTEN];
size_bins = wr_size_bins;
common_access_counts = wr_common_counts;
}
else
{
total_io_size = &(file->counters[CP_BYTES_READ]);
last_io_byte = file->counters[CP_MAX_BYTE_READ];
size_bins = rd_size_bins;
common_access_counts = rd_common_counts;
}
if (!io_this_size_bin)
for (i = 0; i < 10; i++)
{
if (io_this_op < proc_count)
if (size_bins[i])
{
look_for_small_bin = 1;
for (i = 0; i < 10; i++)
{
if (size_bins[i] % proc_count)
{
if (!io_this_size_bin)
{
size_bin_ndx = i;
io_this_size_bin = MIN(size_bins[i] % proc_count, io_this_op);
}
else if ((size_bins[i] % proc_count) < io_this_size_bin)
{
size_bin_ndx = i;
io_this_size_bin = size_bins[i] % proc_count;
}
}
}
}
else
{
for (i = 0; i < 10; i++)
{
if (size_bins[i] && ((size_bins[i] % proc_count) == 0))
{
if (!io_this_size_bin ||
(io_this_size_bin && (size_bins[i] > size_bins[size_bin_ndx])))
{
size_bin_ndx = i;
io_this_size_bin = proc_count;
}
}
}
}
if (!io_this_size_bin)
{
for (i = 0; i < 10; i++)
{
if (size_bins[i])
{
size_bin_ndx = i;
io_this_size_bin = size_bins[i];
if (io_this_size_bin > io_this_op)
{
io_this_size_bin = io_this_op;
}
break;
}
}
size_bin_ndx = i;
break;
}
}
*io_sz = 0;
if (*total_io_size > 0)
{
if ((write_flag && (file->counters[CP_POSIX_WRITES] == 1)) ||
(!write_flag && (file->counters[CP_POSIX_READS] == 1)))
{
*io_sz = ALIGN_BY_8(*total_io_size);
}
else
/* try to assign a common access first (intelligently) */
for (j = 0; j < 4; j++)
{
/* try to assign a common access first (intelligently) */
for (j = 0; j < 4; j++)
if (common_access_counts[j] &&
(common_accesses[j] >= size_bin_min_vals[size_bin_ndx]) &&
(common_accesses[j] <= size_bin_max_vals[size_bin_ndx]))
{
if (common_access_counts[j] &&
(common_accesses[j] >= size_bin_min_vals[size_bin_ndx]) &&
(common_accesses[j] <= size_bin_max_vals[size_bin_ndx]))
{
if (look_for_small_bin)
{
if (common_access_counts[j] == io_this_op)
{
*io_sz = common_accesses[j];
common_access_counts[j]--;
break;
}
}
else
{
*io_sz = common_accesses[j];
common_access_counts[j]--;
break;
}
}
if ((j == 3) && look_for_small_bin)
{
j = 0;
look_for_small_bin = 0;
}
*io_sz = common_accesses[j];
common_access_counts[j]--;
break;
}
}
/* if no common accesses left, then assign default size for this bin */
if (*io_sz == 0)
{
size_t gen_size;
gen_size = (size_bin_max_vals[size_bin_ndx] - size_bin_min_vals[size_bin_ndx]) / 2;
*io_sz = ALIGN_BY_8(gen_size);
}
/* if no common accesses left, then assign default size for this bin */
if (*io_sz == 0)
{
size_t gen_size;
gen_size = (size_bin_max_vals[size_bin_ndx] - size_bin_min_vals[size_bin_ndx]) / 2;
*io_sz = ALIGN_BY_8(gen_size);
}
assert(*io_sz);
}
*total_io_size -= *io_sz;
size_bins[size_bin_ndx]--;
io_this_size_bin--;
/* next, determine the offset to use */
/* for now we just assign a random offset that makes sure not to write past the recorded
* last byte written in the file.
*/
if (*io_sz == 0)
{
*io_off = last_io_byte + 1;
}
else
{
if (write_flag)
{
if ((next_wr_off + *io_sz) > (last_io_byte + 1))
next_wr_off = 0;
*io_off = next_wr_off;
next_wr_off += *io_sz;
}
else
{
if ((next_rd_off + *io_sz) > (last_io_byte + 1))
next_rd_off = 0;
*io_off = next_rd_off;
next_rd_off += *io_sz;
}
}
/* reset static variable if this is the last i/o op for this file */
if ((file->counters[CP_POSIX_READS] + file->counters[CP_POSIX_WRITES]) == 1)
{
io_this_size_bin = 0;
size_bin_ndx = -1;
next_rd_off = next_wr_off = 0;
}
*io_off = io_context->next_off;
io_context->next_off += *io_sz;
return;
}
......