Commit 081c74bf authored by Shane Snyder's avatar Shane Snyder

Update generation of darshan collective events

Now, there is logic for generating collective/indpendent i/o events
from darshan logs. There are a couple minor timing and i/o parameter
bugs that still need to be worked out.
parent 1386ce65
......@@ -68,13 +68,12 @@ static double generate_psx_close_event(struct darshan_file *file, double meta_op
static double generate_barrier_event(struct darshan_file *file, int64_t root, double cur_time,
struct rank_io_context *io_context);
static double generate_psx_ind_io_events(struct darshan_file *file, int64_t io_ops_this_cycle,
int64_t open_ndx, double inter_io_delay,
double meta_op_time, double cur_time,
struct rank_io_context *io_context);
double inter_io_delay, double meta_op_time,
double cur_time, struct rank_io_context *io_context);
static double generate_psx_coll_io_events(struct darshan_file *file, int64_t ind_io_ops_this_cycle,
int64_t coll_io_ops_this_cycle, int64_t nprocs,
int64_t aggregator_cnt, int64_t open_ndx,
double inter_io_delay, double meta_op_time, double cur_time,
int64_t aggregator_cnt, double inter_io_delay,
double meta_op_time, double cur_time,
struct rank_io_context *io_context);
static void determine_io_params(struct darshan_file *file, int write_flag, int coll_flag,
int64_t io_cycles, size_t *io_sz, off_t *io_off);
......@@ -153,6 +152,10 @@ static int darshan_io_workload_load(const char *params, int rank)
/* generate collective i/o events and store them in the rank context */
generate_psx_coll_file_events(&next_file, my_ctx, job.nprocs, d_params->aggregator_cnt);
}
assert(next_file.counters[CP_POSIX_OPENS] == 0);
assert(next_file.counters[CP_POSIX_READS] == 0);
assert(next_file.counters[CP_POSIX_WRITES] == 0);
}
if (ret < 0)
return -1;
......@@ -424,7 +427,7 @@ static void generate_psx_ind_file_events(
file->counters[CP_POSIX_OPENS]);
/* perform the calculated number of i/o operations for this file open */
cur_time = generate_psx_ind_io_events(file, io_ops_this_cycle, i, inter_io_delay,
cur_time = generate_psx_ind_io_events(file, io_ops_this_cycle, inter_io_delay,
meta_op_time, cur_time, io_context);
/* account for potential delay from last io to close */
......@@ -460,7 +463,7 @@ void generate_psx_coll_file_events(
int64_t total_coll_io_ops;
int64_t ind_io_ops_this_cycle;
int64_t coll_io_ops_this_cycle;
int64_t aggregator_cnt;
int64_t rank_cnt;
int create_flag = 0;
double cur_time = file->fcounters[CP_F_OPEN_TIMESTAMP];
double delay_per_cycle;
......@@ -491,6 +494,9 @@ void generate_psx_coll_file_events(
open_cycles = total_coll_opens / nprocs;
else
open_cycles = ceil((double)total_ind_opens / nprocs);
total_ind_io_ops = file->counters[CP_INDEP_READS] + file->counters[CP_INDEP_WRITES];
total_coll_io_ops = (file->counters[CP_COLL_READS] + file->counters[CP_COLL_WRITES]) / nprocs;
}
else
{
......@@ -502,11 +508,19 @@ void generate_psx_coll_file_events(
else
{
extra_io_ops = total_io_ops % nprocs;
if (extra_io_ops != extra_opens)
{
extra_opens = 0;
extra_io_ops = 0;
}
}
total_coll_opens = 0;
total_ind_opens = file->counters[CP_POSIX_OPENS] - extra_opens;
open_cycles = ceil((double)total_ind_opens / nprocs);
total_ind_io_ops = total_io_ops - extra_io_ops;
total_coll_io_ops = 0;
}
assert(extra_opens <= open_cycles);
......@@ -521,6 +535,8 @@ void generate_psx_coll_file_events(
meta_op_time = file->fcounters[CP_F_POSIX_META_TIME] / ((2 * file->counters[CP_POSIX_OPENS]) +
file->counters[CP_POSIX_READS] + file->counters[CP_POSIX_WRITES]);
/* TODO calc delays */
/* it is rare to overwrite existing files, so set the create flag */
if (file->counters[CP_BYTES_WRITTEN])
{
......@@ -530,12 +546,8 @@ void generate_psx_coll_file_events(
/* generate all events for this collectively opened file */
for (i = 0; i < open_cycles; i++)
{
/* TODO: AGG COUNT */
ind_opens_this_cycle = ceil((double)total_ind_opens / (open_cycles - i));
coll_opens_this_cycle = total_coll_opens / (open_cycles - i);
total_ind_opens -= ind_opens_this_cycle;
total_coll_opens -= coll_opens_this_cycle;
/* assign any extra opens to rank 0 (these may correspond to file creations or
* header reads/writes)
......@@ -547,7 +559,12 @@ void generate_psx_coll_file_events(
cur_time = generate_psx_open_event(file, create_flag, meta_op_time, cur_time, io_context);
create_flag = 0;
/* TODO: any extra i/o here -- no delays */
if (!file->counters[CP_COLL_OPENS] && !file->counters[CP_INDEP_OPENS])
{
cur_time = generate_psx_coll_io_events(file, 1, 0, nprocs, nprocs, 0.0,
meta_op_time, cur_time, io_context);
extra_io_ops--;
}
cur_time = generate_psx_close_event(file, meta_op_time, cur_time, io_context);
......@@ -555,26 +572,35 @@ void generate_psx_coll_file_events(
file->counters[CP_POSIX_OPENS]--;
}
while (ind_opens_this_cycle > io_context->my_rank)
while (ind_opens_this_cycle)
{
if (ind_opens_this_cycle >= nprocs)
rank_cnt = nprocs;
else
rank_cnt = ind_opens_this_cycle;
cur_time = generate_psx_open_event(file, create_flag, meta_op_time,
cur_time, io_context);
create_flag = 0;
/* TODO do delays, set i/o op counts, do i/o, update i/o op counts */
cur_time += first_io_delay;
ind_io_ops_this_cycle = ceil(((double)total_ind_io_ops / total_ind_opens) * rank_cnt);
cur_time = generate_psx_coll_io_events(file, ind_io_ops_this_cycle, 0, nprocs,
rank_cnt, inter_io_delay, meta_op_time,
cur_time, io_context);
total_ind_io_ops -= ind_io_ops_this_cycle;
cur_time += close_delay;
cur_time = generate_psx_close_event(file, meta_op_time, cur_time, io_context);
if (ind_opens_this_cycle >= nprocs)
{
file->counters[CP_POSIX_OPENS] -= nprocs;
ind_opens_this_cycle -= nprocs;
}
else
{
file->counters[CP_POSIX_OPENS] -= ind_opens_this_cycle;
ind_opens_this_cycle = 0;
}
file->counters[CP_POSIX_OPENS] -= rank_cnt;
ind_opens_this_cycle -= rank_cnt;
total_ind_opens -= rank_cnt;
if (file->counters[CP_POSIX_OPENS])
cur_time += inter_cycle_delay;
}
while (coll_opens_this_cycle)
......@@ -586,13 +612,34 @@ void generate_psx_coll_file_events(
cur_time = generate_psx_open_event(file, create_flag, meta_op_time,
cur_time, io_context);
/* TODO: do delays, set i/o counts, do i/o, update i/o counts */
cur_time += first_io_delay;
if (file->counters[CP_INDEP_OPENS])
ind_io_ops_this_cycle = 0;
else
ind_io_ops_this_cycle = ceil((double)total_ind_io_ops /
(file->counters[CP_COLL_OPENS] / nprocs));
coll_io_ops_this_cycle = ceil((double)total_coll_io_ops /
(file->counters[CP_COLL_OPENS] / nprocs));
cur_time = generate_psx_coll_io_events(file, ind_io_ops_this_cycle,
coll_io_ops_this_cycle, nprocs, in_agg_cnt,
inter_io_delay, meta_op_time,
cur_time, io_context);
total_ind_io_ops -= ind_io_ops_this_cycle;
total_coll_io_ops -= coll_io_ops_this_cycle;
cur_time += close_delay;
cur_time = generate_psx_close_event(file, meta_op_time, cur_time, io_context);
file->counters[CP_POSIX_OPENS] -= nprocs;
file->counters[CP_COLL_OPENS] -= nprocs;
coll_opens_this_cycle -= nprocs;
total_coll_opens -= nprocs;
if (file->counters[CP_POSIX_OPENS])
cur_time += inter_cycle_delay;
}
}
......@@ -670,8 +717,8 @@ static double generate_barrier_event(
/* generate all i/o events for one independent file open and store them with the rank context */
static double generate_psx_ind_io_events(
struct darshan_file *file, int64_t io_ops_this_cycle, int64_t open_ndx,
double inter_io_delay, double meta_op_time, double cur_time, struct rank_io_context *io_context)
struct darshan_file *file, int64_t io_ops_this_cycle, double inter_io_delay,
double meta_op_time, double cur_time, struct rank_io_context *io_context)
{
static int rw = -1; /* rw = 1 for write, 0 for read, -1 for uninitialized */
static int64_t io_ops_this_rw;
......@@ -801,8 +848,8 @@ static double generate_psx_ind_io_events(
static double generate_psx_coll_io_events(
struct darshan_file *file, int64_t ind_io_ops_this_cycle, int64_t coll_io_ops_this_cycle,
int64_t nprocs, int64_t aggregator_cnt, int64_t open_ndx, double inter_io_delay,
double meta_op_time, double cur_time, struct rank_io_context *io_context)
int64_t nprocs, int64_t aggregator_cnt, double inter_io_delay, double meta_op_time,
double cur_time, struct rank_io_context *io_context)
{
static int rw = -1; /* rw = 1 for write, 0 for read, -1 for uninitialized */
static int64_t io_ops_this_rw;
......@@ -839,28 +886,6 @@ static double generate_psx_coll_io_events(
rw = (file->fcounters[CP_F_READ_START_TIMESTAMP] <
file->fcounters[CP_F_WRITE_START_TIMESTAMP]) ? 0 : 1;
/* determine how many io ops to do before next rw switch */
if (!rw)
{
if (file->counters[CP_COLL_OPENS])
io_ops_this_rw =
((file->counters[CP_COLL_READS] / nprocs) + file->counters[CP_INDEP_READS]) /
((file->counters[CP_RW_SWITCHES] / (2 * aggregator_cnt)) + 1);
else
io_ops_this_rw = file->counters[CP_POSIX_READS] /
((file->counters[CP_RW_SWITCHES] / (2 * aggregator_cnt)) + 1);
}
else
{
if (file->counters[CP_COLL_OPENS])
io_ops_this_rw =
((file->counters[CP_COLL_WRITES] / nprocs) + file->counters[CP_INDEP_WRITES]) /
((file->counters[CP_RW_SWITCHES] / (2 * aggregator_cnt)) + 1);
else
io_ops_this_rw = file->counters[CP_POSIX_WRITES] /
((file->counters[CP_RW_SWITCHES] / (2 * aggregator_cnt)) + 1);
}
/* initialize the rd and wr bandwidth values using total io size and time */
if (file->fcounters[CP_F_POSIX_READ_TIME])
rd_bw = file->counters[CP_BYTES_READ] / file->fcounters[CP_F_POSIX_READ_TIME];
......@@ -868,6 +893,28 @@ static double generate_psx_coll_io_events(
wr_bw = file->counters[CP_BYTES_WRITTEN] / file->fcounters[CP_F_POSIX_WRITE_TIME];
}
/* determine how many io ops to do before next rw switch */
if (!rw)
{
if (file->counters[CP_COLL_OPENS])
io_ops_this_rw =
((file->counters[CP_COLL_READS] / nprocs) + file->counters[CP_INDEP_READS]) /
((file->counters[CP_RW_SWITCHES] / (2 * aggregator_cnt)) + 1);
else
io_ops_this_rw = file->counters[CP_POSIX_READS] /
((file->counters[CP_RW_SWITCHES] / (2 * aggregator_cnt)) + 1);
}
else
{
if (file->counters[CP_COLL_OPENS])
io_ops_this_rw =
((file->counters[CP_COLL_WRITES] / nprocs) + file->counters[CP_INDEP_WRITES]) /
((file->counters[CP_RW_SWITCHES] / (2 * aggregator_cnt)) + 1);
else
io_ops_this_rw = file->counters[CP_POSIX_WRITES] /
((file->counters[CP_RW_SWITCHES] / (2 * aggregator_cnt)) + 1);
}
if (coll_io_ops_this_cycle)
ind_ops_remaining = ceil((double)ind_io_ops_this_cycle / coll_io_ops_this_cycle);
else
......@@ -876,46 +923,44 @@ static double generate_psx_coll_io_events(
for (i = 0; i < total_io_ops_this_cycle; i++)
{
if (ind_ops_remaining)
{
ind_coll = 0;
}
else
{
ind_coll = 1;
}
#if 0
if (((double)rand() / (double)(RAND_MAX + 1.0)) < ind_coll_switch)
{
ind_coll = 0;
tmp_rank = (next_ind_io_rank++) % nprocs;
io_cnt = 1;
ind_io_ops_this_cycle--;
ind_ops_remaining--;
if (!rw)
file->counters[CP_INDEP_READS]--;
else
file->counters[CP_INDEP_WRITES]--;
ind_io_ops_this_cycle--;
}
else
{
ind_coll = 1;
cur_time = generate_barrier_event(file, 0, cur_time, io_context);
tmp_rank = 0;
coll_io_ops_this_cycle--;
if (!rw)
{
io_cnt = ceil((double)(file->counters[CP_POSIX_READS] -
file->counters[CP_INDEP_READS]) /
file->counters[CP_INDEP_READS]) /
(file->counters[CP_COLL_READS] / nprocs));
file->counters[CP_COLL_READS] -= nprocs;
}
else
{
io_cnt = ceil((double)(file->counters[CP_POSIX_WRITES] -
file->counters[CP_INDEP_WRITES]) /
file->counters[CP_INDEP_WRITES]) /
(file->counters[CP_COLL_WRITES] / nprocs));
file->counters[CP_COLL_WRITES] -= nprocs;
}
coll_io_ops_this_cycle--;
assert(io_cnt <= io_ops_this_rw);
if (coll_io_ops_this_cycle)
ind_ops_remaining = ceil((double)ind_io_ops_this_cycle / coll_io_ops_this_cycle);
else
ind_ops_remaining = ind_io_ops_this_cycle;
cur_time = generate_barrier_event(file, 0, cur_time, io_context);
}
for (j = 0; j < io_cnt; j++)
......@@ -970,7 +1015,7 @@ static double generate_psx_coll_io_events(
max_cur_time = next_io_op.end_time;
tmp_rank += ranks_per_aggregator;
if (tmp_rank >= (ranks_per_aggregator * aggregator_cnt))
if (ind_coll && (tmp_rank >= (ranks_per_aggregator * aggregator_cnt)))
{
tmp_rank = 0;
cur_time = max_cur_time;
......@@ -979,6 +1024,25 @@ static double generate_psx_coll_io_events(
}
io_ops_this_rw--;
if (ind_coll)
{
total_coll_io_ops--;
cur_time = max_cur_time;
if (i != (total_io_ops_this_cycle - 1))
cur_time += inter_io_delay;
}
else
{
if (tmp_rank == (nprocs - 1))
cur_time = max_cur_time;
if (i != (total_io_ops_this_cycle - 1))
cur_time += inter_io_delay;
else
cur_time = max_cur_time;
}
/* determine whether to toggle between reads and writes */
if (!io_ops_this_rw && psx_rw_ops_remaining)
{
......@@ -1010,8 +1074,6 @@ static double generate_psx_coll_io_events(
((file->counters[CP_RW_SWITCHES] / (2 * aggregator_cnt)) + 1);
}
}
#endif
}
/* reset the static rw flag if this is the last open-close cycle for this file */
......@@ -1020,10 +1082,7 @@ static double generate_psx_coll_io_events(
rw = -1;
}
if (max_cur_time > cur_time)
return max_cur_time;
else
return cur_time;
return cur_time;
}
static void determine_io_params(
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment