Commit 74058f51 authored by Shane Snyder's avatar Shane Snyder

Darshan workload generator development

Update the darshan workload generator to use new collective open/close
generation algorithm. Previously, generating these operations was
done sequentially on one process, so the algorithm has to be updated
to give consistent results in a distributed environment.
parent 4d14c225
......@@ -460,7 +460,6 @@ void generate_psx_coll_file_events(
int64_t total_coll_io_ops;
int64_t ind_io_ops_this_cycle;
int64_t coll_io_ops_this_cycle;
int64_t rank_cnt;
int64_t aggregator_cnt;
int create_flag = 0;
double cur_time = file->fcounters[CP_F_OPEN_TIMESTAMP];
......@@ -470,7 +469,7 @@ void generate_psx_coll_file_events(
double inter_cycle_delay = 0.0;
double inter_io_delay = 0.0;
double meta_op_time;
int64_t i, j;
int64_t i;
/* the collective file was never opened (i.e., just stat-ed), so return */
if (!(file->counters[CP_POSIX_OPENS]))
......@@ -483,7 +482,15 @@ void generate_psx_coll_file_events(
if (file->counters[CP_COLL_OPENS] || file->counters[CP_INDEP_OPENS])
{
extra_opens = file->counters[CP_POSIX_OPENS] - file->counters[CP_COLL_OPENS] -
file->counters[CP_INDEP_OPENS];
total_coll_opens = file->counters[CP_COLL_OPENS];
total_ind_opens = file->counters[CP_POSIX_OPENS] - total_coll_opens - extra_opens;
if (total_coll_opens)
open_cycles = total_coll_opens / nprocs;
else
open_cycles = ceil((double)total_ind_opens / nprocs);
}
else
{
......@@ -497,9 +504,9 @@ void generate_psx_coll_file_events(
extra_io_ops = total_io_ops % nprocs;
}
total_ind_opens = file->counters[CP_POSIX_OPENS] - extra_opens;
total_coll_opens = 0;
open_cycles = ceil((double)(total_ind_opens) / nprocs);
total_ind_opens = file->counters[CP_POSIX_OPENS] - extra_opens;
open_cycles = ceil((double)total_ind_opens / nprocs);
}
assert(extra_opens <= open_cycles);
......@@ -523,29 +530,25 @@ void generate_psx_coll_file_events(
/* generate all events for this collectively opened file */
for (i = 0; i < open_cycles; i++)
{
if (file->counters[CP_COLL_OPENS])
{
}
else
{
ind_opens_this_cycle = ceil((double)total_ind_opens / open_cycles);
coll_opens_this_cycle = 0;
/* TODO: AGG COUNT */
ind_opens_this_cycle = ceil((double)total_ind_opens / (open_cycles - i));
coll_opens_this_cycle = total_coll_opens / (open_cycles - i);
total_ind_opens -= ind_opens_this_cycle;
}
total_coll_opens -= coll_opens_this_cycle;
/* assign any extra opens to rank 0 (these may correspond to file creations or
* header reads/writes)
*/
if (extra_opens && !((open_cycles - i) % extra_opens))
if (extra_opens && !(i % (open_cycles / extra_opens)))
{
assert(create_flag);
file->rank = 0;
cur_time = generate_psx_open_event(file, create_flag, meta_op_time, cur_time, io_context);
create_flag = 0;
/* TODO: any extra i/o here -- no delays */
cur_time = generate_psx_close_event(file, meta_op_time, cur_time, io_context);
file->rank = -1;
......@@ -558,6 +561,8 @@ void generate_psx_coll_file_events(
cur_time, io_context);
create_flag = 0;
/* TODO do delays, set i/o op counts, do i/o, update i/o op counts */
cur_time = generate_psx_close_event(file, meta_op_time, cur_time, io_context);
if (ind_opens_this_cycle >= nprocs)
......@@ -574,168 +579,23 @@ void generate_psx_coll_file_events(
while (coll_opens_this_cycle)
{
}
}
#if 0
if (file->counters[CP_COLL_OPENS])
aggregator_cnt = in_agg_cnt;
else
aggregator_cnt = nprocs;
/* assign any determined 'extra' opens to rank 0 at the beginning of the cycle */
if (extra_opens && !(i % extra_open_stride))
{
assert(create_flag);
/* generate the open/close events for creating the collective file */
file->rank = 0;
cur_time = generate_psx_open_event(file, create_flag, meta_op_time, cur_time, io_context);
ind_io_ops_this_cycle = ceil((double)extra_io_ops / extra_opens);
cur_time = generate_psx_coll_io_events(file, ind_io_ops_this_cycle, 0, nprocs,
aggregator_cnt, i, 0.0, meta_op_time,
cur_time, io_context);
extra_io_ops -= ind_io_ops_this_cycle;
cur_time = generate_psx_close_event(file, meta_op_time, cur_time, io_context);
create_flag = 0;
file->rank = -1;
file->counters[CP_POSIX_OPENS]--;
extra_opens--;
}
if (file->counters[CP_POSIX_OPENS] >= nprocs)
{
if (file->counters[CP_COLL_OPENS])
{
if (file->counters[CP_INDEP_OPENS] && !(i % ind_open_stride))
{
rank_cnt = ceil((double)file->counters[CP_INDEP_OPENS] /
(file->counters[CP_COLL_OPENS] / nprocs));
for (j = 0; j < rank_cnt; j++)
{
file->rank = j;
if (j != (rank_cnt - 1))
generate_psx_open_event(file, 0, meta_op_time, cur_time, io_context);
else
cur_time = generate_psx_open_event(file, 0, meta_op_time,
cur_time, io_context);
}
ind_io_ops_this_cycle = ceil(((double)total_ind_io_ops /
file->counters[CP_INDEP_OPENS]) * rank_cnt);
cur_time = generate_psx_coll_io_events(file, ind_io_ops_this_cycle, 0,
nprocs, aggregator_cnt, i, 0.0,
meta_op_time, cur_time, io_context);
total_ind_io_ops -= ind_io_ops_this_cycle;
for (j = 0; j < rank_cnt; j++)
{
file->rank = j;
if (j != (rank_cnt - 1))
generate_psx_close_event(file, meta_op_time, cur_time, io_context);
else
cur_time = generate_psx_close_event(file, meta_op_time,
cur_time, io_context);
}
file->rank = -1;
file->counters[CP_INDEP_OPENS] -= rank_cnt;
file->counters[CP_POSIX_OPENS] -= rank_cnt;
}
assert(!create_flag);
cur_time = generate_barrier_event(file, 0, cur_time, io_context);
coll_io_ops_this_cycle = ceil((double)total_coll_io_ops /
(file->counters[CP_COLL_OPENS] / nprocs));
if (file->counters[CP_INDEP_OPENS])
ind_io_ops_this_cycle = 0;
else
ind_io_ops_this_cycle = ceil((double)total_ind_io_ops /
(file->counters[CP_POSIX_OPENS] / nprocs));
}
else
{
coll_io_ops_this_cycle = 0;
ind_io_ops_this_cycle = ceil((double)total_ind_io_ops /
(file->counters[CP_POSIX_OPENS] / nprocs));
}
/* perform an open across all ranks (rank == -1) */
cur_time = generate_psx_open_event(file, create_flag, meta_op_time,
cur_time, io_context);
create_flag = 0;
/* account for potential delay between the open and first i/o */
cur_time += first_io_delay;
/* generate the io events */
cur_time = generate_psx_coll_io_events(file, ind_io_ops_this_cycle,
coll_io_ops_this_cycle, nprocs, aggregator_cnt,
i, inter_io_delay, meta_op_time,
cur_time, io_context);
total_ind_io_ops -= ind_io_ops_this_cycle;
total_coll_io_ops -= coll_io_ops_this_cycle;
/* account for potential delay between last i/o operation and file close */
cur_time += close_delay;
/* TODO: do delays, set i/o counts, do i/o, update i/o counts */
/* generate the corresponding close event for the open at the start of the loop */
cur_time = generate_psx_close_event(file, meta_op_time, cur_time, io_context);
file->counters[CP_POSIX_OPENS] -= nprocs;
if (file->counters[CP_COLL_OPENS])
file->counters[CP_COLL_OPENS] -= nprocs;
/* account for any delay between open-close cycles */
if (file->counters[CP_POSIX_OPENS])
cur_time += inter_cycle_delay;
coll_opens_this_cycle -= nprocs;
}
else
{
/* open the file across participating ranks */
rank_cnt = file->counters[CP_POSIX_OPENS];
for (j = 0; j < rank_cnt; j++)
{
file->rank = j;
if (j != (rank_cnt - 1))
generate_psx_open_event(file, 0, meta_op_time, cur_time, io_context);
else
cur_time = generate_psx_open_event(file, 0, meta_op_time, cur_time, io_context);
}
/* account for potential delay between the open and first i/o */
cur_time += first_io_delay;
ind_io_ops_this_cycle = ceil((double)total_ind_io_ops /
(file->counters[CP_POSIX_OPENS] / nprocs));
cur_time = generate_psx_coll_io_events(file, ind_io_ops_this_cycle, 0, nprocs,
rank_cnt, i, inter_io_delay, meta_op_time,
cur_time, io_context);
total_ind_io_ops -= ind_io_ops_this_cycle;
/* account for potential delay between last i/o operation and file close */
cur_time += close_delay;
/* close the file across participating ranks */
for (j = 0; j < rank_cnt; j++)
{
file->rank = j;
if (j != (rank_cnt - 1))
generate_psx_close_event(file, meta_op_time, cur_time, io_context);
else
cur_time = generate_psx_close_event(file, meta_op_time, cur_time, io_context);
}
file->rank = -1;
file->counters[CP_POSIX_OPENS] -= rank_cnt;
}
}
#endif
return;
}
......@@ -955,8 +815,7 @@ static double generate_psx_coll_io_events(
int64_t next_ind_io_rank = 0;
int64_t io_cnt;
int64_t ranks_per_aggregator = nprocs / aggregator_cnt;
int64_t ind_ops_so_far = 0;
double ind_coll_switch;
int64_t ind_ops_remaining = 0;
double io_op_time;
double max_cur_time = 0.0;
int ind_coll;
......@@ -1009,9 +868,23 @@ static double generate_psx_coll_io_events(
wr_bw = file->counters[CP_BYTES_WRITTEN] / file->fcounters[CP_F_POSIX_WRITE_TIME];
}
if (coll_io_ops_this_cycle)
ind_ops_remaining = ceil((double)ind_io_ops_this_cycle / coll_io_ops_this_cycle);
else
ind_ops_remaining = ind_io_ops_this_cycle;
for (i = 0; i < total_io_ops_this_cycle; i++)
{
ind_coll_switch = (double)ind_io_ops_this_cycle / (total_io_ops_this_cycle - i);
if (ind_ops_remaining)
{
ind_coll = 0;
}
else
{
ind_coll = 1;
}
#if 0
if (((double)rand() / (double)(RAND_MAX + 1.0)) < ind_coll_switch)
{
ind_coll = 0;
......@@ -1025,13 +898,6 @@ static double generate_psx_coll_io_events(
}
else
{
if (ind_ops_so_far)
{
cur_time = max_cur_time;
cur_time += inter_io_delay;
ind_ops_so_far = 0;
}
ind_coll = 1;
cur_time = generate_barrier_event(file, 0, cur_time, io_context);
tmp_rank = 0;
......@@ -1145,24 +1011,7 @@ static double generate_psx_coll_io_events(
}
}
if (ind_coll)
{
total_coll_io_ops--;
cur_time = max_cur_time;
if (i != (total_io_ops_this_cycle - 1))
cur_time += inter_io_delay;
}
else
{
if (ind_ops_so_far && ((ind_ops_so_far % nprocs) == nprocs - 1))
{
cur_time = max_cur_time;
if (i != (total_io_ops_this_cycle - 1))
cur_time += inter_io_delay;
}
ind_ops_so_far++;
}
#endif
}
/* reset the static rw flag if this is the last open-close cycle for this file */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment