Commit 04e554e8 authored by Shane Snyder's avatar Shane Snyder
Browse files

Darshan collective buffering correction

The collective buffering algorithm now appropriately assigns i/o
sizes and offsets according to an aggregator's "file view".
parent cba56542
...@@ -37,6 +37,10 @@ struct rank_io_context ...@@ -37,6 +37,10 @@ struct rank_io_context
int64_t my_rank; int64_t my_rank;
double last_op_time; double last_op_time;
void *io_op_dat; void *io_op_dat;
off_t next_coll_rd_off;
off_t next_coll_wr_off;
struct qhash_head hash_link; struct qhash_head hash_link;
}; };
...@@ -77,6 +81,9 @@ static double generate_psx_coll_io_events(struct darshan_file *file, int64_t ind ...@@ -77,6 +81,9 @@ static double generate_psx_coll_io_events(struct darshan_file *file, int64_t ind
double cur_time, struct rank_io_context *io_context); double cur_time, struct rank_io_context *io_context);
static void determine_io_params(struct darshan_file *file, int write_flag, int64_t io_this_op, static void determine_io_params(struct darshan_file *file, int write_flag, int64_t io_this_op,
int64_t proc_count, size_t *io_sz, off_t *io_off); int64_t proc_count, size_t *io_sz, off_t *io_off);
static void determine_coll_io_params(struct darshan_file *file, int write_flag, int64_t coll_op_cnt,
int64_t agg_cnt, int64_t agg_ndx, size_t *io_sz, off_t *io_off,
struct rank_io_context *io_context);
static void calc_io_delays(struct darshan_file *file, int64_t num_opens, int64_t num_io_ops, static void calc_io_delays(struct darshan_file *file, int64_t num_opens, int64_t num_io_ops,
double total_delay, double *first_io_delay, double *close_delay, double total_delay, double *first_io_delay, double *close_delay,
double *inter_open_delay, double *inter_io_delay); double *inter_open_delay, double *inter_io_delay);
...@@ -139,6 +146,8 @@ static int darshan_io_workload_load(const char *params, int rank) ...@@ -139,6 +146,8 @@ static int darshan_io_workload_load(const char *params, int rank)
my_ctx->my_rank = (int64_t)rank; my_ctx->my_rank = (int64_t)rank;
my_ctx->last_op_time = 0.0; my_ctx->last_op_time = 0.0;
my_ctx->io_op_dat = darshan_init_io_op_dat(); my_ctx->io_op_dat = darshan_init_io_op_dat();
my_ctx->next_coll_rd_off = my_ctx->next_coll_wr_off = 0;
/* loop over all files contained in the log file */ /* loop over all files contained in the log file */
while ((ret = darshan_log_getfile(logfile_fd, &job, &next_file)) > 0) while ((ret = darshan_log_getfile(logfile_fd, &job, &next_file)) > 0)
...@@ -637,6 +646,7 @@ void generate_psx_coll_file_events( ...@@ -637,6 +646,7 @@ void generate_psx_coll_file_events(
if (!file->counters[CP_COLL_OPENS] && !file->counters[CP_INDEP_OPENS]) if (!file->counters[CP_COLL_OPENS] && !file->counters[CP_INDEP_OPENS])
{ {
/* TODO: we probably want to use ind_io here */
cur_time = generate_psx_coll_io_events(file, 1, 0, nprocs, nprocs, 0.0, cur_time = generate_psx_coll_io_events(file, 1, 0, nprocs, nprocs, 0.0,
cur_time, io_context); cur_time, io_context);
extra_io_ops--; extra_io_ops--;
...@@ -647,6 +657,7 @@ void generate_psx_coll_file_events( ...@@ -647,6 +657,7 @@ void generate_psx_coll_file_events(
file->counters[CP_POSIX_OPENS]--; file->counters[CP_POSIX_OPENS]--;
} }
/* TODO: look at this, use ind_io */
while (ind_opens_this_cycle) while (ind_opens_this_cycle)
{ {
if (ind_opens_this_cycle >= nprocs) if (ind_opens_this_cycle >= nprocs)
...@@ -930,8 +941,6 @@ static double generate_psx_coll_io_events( ...@@ -930,8 +941,6 @@ static double generate_psx_coll_io_events(
static double rd_bw = 0.0, wr_bw = 0.0; static double rd_bw = 0.0, wr_bw = 0.0;
int64_t psx_rw_ops_remaining = file->counters[CP_POSIX_READS] + file->counters[CP_POSIX_WRITES]; int64_t psx_rw_ops_remaining = file->counters[CP_POSIX_READS] + file->counters[CP_POSIX_WRITES];
int64_t total_io_ops_this_cycle = ind_io_ops_this_cycle + coll_io_ops_this_cycle; int64_t total_io_ops_this_cycle = ind_io_ops_this_cycle + coll_io_ops_this_cycle;
int64_t total_coll_io_ops =
(file->counters[CP_COLL_READS] + file->counters[CP_COLL_WRITES]) / nprocs;
int64_t tmp_rank; int64_t tmp_rank;
int64_t next_ind_io_rank = 0; int64_t next_ind_io_rank = 0;
int64_t io_cnt; int64_t io_cnt;
...@@ -1033,13 +1042,83 @@ static double generate_psx_coll_io_events( ...@@ -1033,13 +1042,83 @@ static double generate_psx_coll_io_events(
else else
ind_ops_remaining = ind_io_ops_this_cycle; ind_ops_remaining = ind_io_ops_this_cycle;
cur_time = generate_barrier_event(file, 0, cur_time, io_context); for (j = 0; j < io_cnt; j += aggregator_cnt)
{
int64_t tmp_coll_cnt = MIN(io_cnt - j, aggregator_cnt);
cur_time = generate_barrier_event(file, 0, cur_time, io_context);
if (((io_context->my_rank % ranks_per_aggregator) == 0) &&
(io_context->my_rank < (ranks_per_aggregator * aggregator_cnt)) &&
((io_context->my_rank / ranks_per_aggregator) < tmp_coll_cnt))
{
determine_coll_io_params(file, rw, io_cnt, tmp_coll_cnt,
(io_context->my_rank / ranks_per_aggregator) + 1,
&io_sz, &io_off, io_context);
if (!rw)
{
/* generate a read event */
next_io_op.codes_op.op_type = CODES_WK_READ;
next_io_op.codes_op.u.read.file_id = file->hash;
next_io_op.codes_op.u.read.size = io_sz;
next_io_op.codes_op.u.read.offset = io_off;
next_io_op.start_time = cur_time;
/* set the end time based on observed bandwidth and io size */
if (rd_bw == 0.0)
io_op_time = 0.0;
else
io_op_time = (io_sz / rd_bw);
next_io_op.end_time = cur_time + io_op_time;
file->counters[CP_POSIX_READS] -= tmp_coll_cnt;
}
else
{
/* generate a write event */
next_io_op.codes_op.op_type = CODES_WK_WRITE;
next_io_op.codes_op.u.write.file_id = file->hash;
next_io_op.codes_op.u.write.size = io_sz;
next_io_op.codes_op.u.write.offset = io_off;
next_io_op.start_time = cur_time;
/* set the end time based on observed bandwidth and io size */
if (wr_bw == 0.0)
io_op_time = 0.0;
else
io_op_time = (io_sz / wr_bw);
next_io_op.end_time = cur_time + io_op_time;
file->counters[CP_POSIX_WRITES] -= tmp_coll_cnt;
}
darshan_insert_next_io_op(io_context->io_op_dat, &next_io_op);
cur_time = next_io_op.end_time;
}
else
{
if (!rw)
{
file->counters[CP_POSIX_READS] -= tmp_coll_cnt;
}
else
{
file->counters[CP_POSIX_WRITES] -= tmp_coll_cnt;
}
}
psx_rw_ops_remaining -= tmp_coll_cnt;
assert(file->counters[CP_POSIX_READS] >= 0);
assert(file->counters[CP_POSIX_WRITES] >= 0);
}
} }
io_ops_this_rw--;
max_cur_time = cur_time;
#if 0
for (j = 0; j < io_cnt; j++) for (j = 0; j < io_cnt; j++)
{ {
determine_io_params(file, rw, (ind_coll) ? io_cnt - j : ind_io_ops_this_cycle + 1, determine_coll_io_params(file, rw, io_cnt - j, &io_sz, &io_off);
aggregator_cnt, &io_sz, &io_off);
if (!rw) if (!rw)
{ {
/* generate a read event */ /* generate a read event */
...@@ -1061,7 +1140,7 @@ static double generate_psx_coll_io_events( ...@@ -1061,7 +1140,7 @@ static double generate_psx_coll_io_events(
else else
{ {
/* generate a write event */ /* generate a write event */
next_io_op.codes_op.op_type = CODES_WK_WRITE; ext_io_op.codes_op.op_type = CODES_WK_WRITE;
next_io_op.codes_op.u.write.file_id = file->hash; next_io_op.codes_op.u.write.file_id = file->hash;
next_io_op.codes_op.u.write.size = io_sz; next_io_op.codes_op.u.write.size = io_sz;
next_io_op.codes_op.u.write.offset = io_off; next_io_op.codes_op.u.write.offset = io_off;
...@@ -1080,7 +1159,7 @@ static double generate_psx_coll_io_events( ...@@ -1080,7 +1159,7 @@ static double generate_psx_coll_io_events(
assert(file->counters[CP_POSIX_READS] >= 0); assert(file->counters[CP_POSIX_READS] >= 0);
assert(file->counters[CP_POSIX_WRITES] >= 0); assert(file->counters[CP_POSIX_WRITES] >= 0);
/* store the i/o event */ /* TODO store the i/o event */
if (tmp_rank == io_context->my_rank) if (tmp_rank == io_context->my_rank)
darshan_insert_next_io_op(io_context->io_op_dat, &next_io_op); darshan_insert_next_io_op(io_context->io_op_dat, &next_io_op);
...@@ -1097,11 +1176,10 @@ static double generate_psx_coll_io_events( ...@@ -1097,11 +1176,10 @@ static double generate_psx_coll_io_events(
} }
} }
io_ops_this_rw--; io_ops_this_rw--;
#endif
if (ind_coll) if (ind_coll)
{ {
total_coll_io_ops--;
cur_time = max_cur_time; cur_time = max_cur_time;
if (i != (total_io_ops_this_cycle - 1)) if (i != (total_io_ops_this_cycle - 1))
cur_time += inter_io_delay; cur_time += inter_io_delay;
...@@ -1159,6 +1237,222 @@ static double generate_psx_coll_io_events( ...@@ -1159,6 +1237,222 @@ static double generate_psx_coll_io_events(
return cur_time; return cur_time;
} }
static void determine_coll_io_params(
struct darshan_file *file, int write_flag, int64_t coll_op_cnt, int64_t agg_cnt,
int64_t agg_ndx, size_t *io_sz, off_t *io_off, struct rank_io_context *io_context)
{
static int64_t size_bins_left = 0;
static int64_t agg_size_bins[10] = { 0 };
static off_t agg_off = 0;
int i, j;
off_t *next_coll_off;
int64_t *all_size_bins = NULL;
int64_t *common_accesses = &(file->counters[CP_ACCESS1_ACCESS]); /* 4 common accesses */
int64_t *common_access_counts = &(file->counters[CP_ACCESS1_COUNT]); /* common access counts */
int64_t *total_io_size = NULL;
int64_t tmp_cnt;
const int64_t size_bin_min_vals[10] = { 0, 100, 1024, 10 * 1024, 100 * 1024, 1024 * 1024,
4 * 1024 * 1024, 10 * 1024 * 1024, 100 * 1024 * 1024,
1024 * 1024 * 1024 };
const int64_t size_bin_max_vals[10] = { 100, 1024, 10 * 1024, 100 * 1024, 1024 * 1024,
4 * 1024 * 1024, 10 * 1024 * 1024, 100 * 1024 * 1024,
1024 * 1024 * 1024, INT64_MAX };
if (write_flag)
{
all_size_bins = &(file->counters[CP_SIZE_WRITE_0_100]);
total_io_size = &(file->counters[CP_BYTES_WRITTEN]);
next_coll_off = &(io_context->next_coll_wr_off);
}
else
{
all_size_bins = &(file->counters[CP_SIZE_READ_0_100]);
total_io_size = &(file->counters[CP_BYTES_READ]);
next_coll_off = &(io_context->next_coll_rd_off);
}
/* we enter this if statement if we have not yet calculated which size bins to use for the
* current collective I/O operation.
*/
if (!size_bins_left)
{
int64_t total_agg_size_bin_cnt = 0;
int tmp_ndx = 9;
/* find some size bins that we can assign accesses out of.
* Note: we require the bins be reasonably close to each other in size, and be less
* than 100 MiB.
*/
for (i = 7; i >= 0; i--)
{
if (all_size_bins[i])
{
if (total_agg_size_bin_cnt)
{
if ((tmp_ndx - 3) <= i)
{
tmp_ndx = i;
total_agg_size_bin_cnt += all_size_bins[i];
}
break;
}
else
{
tmp_ndx = i;
total_agg_size_bin_cnt += all_size_bins[i];
}
}
}
/* assign accesses from found bins proportional to bin size */
for (i = 7; i >= tmp_ndx; i--)
{
if (all_size_bins[i])
{
agg_size_bins[i] = ((double)all_size_bins[i] / total_agg_size_bin_cnt) * coll_op_cnt;
size_bins_left += agg_size_bins[i];
all_size_bins[i] -= agg_size_bins[i];
}
if (size_bins_left == coll_op_cnt) break;
}
/* if we still haven't assigned enough accesses, just assign them stupidly */
if (size_bins_left < coll_op_cnt)
{
for (i = 9; i >= 0; i--)
{
if (all_size_bins[i])
{
tmp_cnt = MIN(all_size_bins[i], coll_op_cnt - size_bins_left);
agg_size_bins[i] += tmp_cnt;
size_bins_left += tmp_cnt;
all_size_bins[i] -= tmp_cnt;
}
if (size_bins_left == coll_op_cnt) break;
}
}
assert(size_bins_left == coll_op_cnt);
ssize_t tmp_size;
int64_t tmp_agg_cnt = 1; /* start at aggregator 1 */
int64_t tmp_common_cnts[4];
memcpy(tmp_common_cnts, common_access_counts, 4 * sizeof(int64_t));
agg_off = *next_coll_off;
/* determine initial offset for this aggregator */
for (i = 9; i >= 0; i--)
{
tmp_cnt = agg_size_bins[i];
while (tmp_cnt)
{
/* assign access sizes, starting with common and falling back to default */
tmp_size = -1;
for (j = 0; j < 4; j++)
{
if (tmp_common_cnts[j] &&
(common_accesses[j] > size_bin_min_vals[i]) &&
(common_accesses[j] <= size_bin_max_vals[i]))
{
tmp_size = common_accesses[j];
tmp_common_cnts[j]--;
break;
}
}
if (tmp_size == -1)
{
tmp_size = ALIGN_BY_8((size_bin_max_vals[i] - size_bin_min_vals[i]) / 2);
}
/* only increment offset for aggregators less than me */
if (tmp_agg_cnt < agg_ndx)
{
agg_off += tmp_size;
}
*next_coll_off += tmp_size;
tmp_cnt--;
tmp_agg_cnt++;
if (tmp_agg_cnt > agg_cnt) tmp_agg_cnt = 1;
}
}
}
/* assign an actual access size, according to already initialized agg size bins */
*io_sz = 0;
if (*total_io_size > 0)
{
for (i = 9; i >= 0; i--)
{
if (agg_size_bins[i])
{
int my_ndx = -1;
/* decrement size bin counters to reflect this round of collective i/o */
size_bins_left -= agg_cnt;
tmp_cnt = agg_cnt;
while (tmp_cnt)
{
if (my_ndx == -1)
{
if (agg_ndx <= agg_size_bins[i])
my_ndx = i;
else
agg_ndx -= agg_size_bins[i];
}
if (tmp_cnt > agg_size_bins[i])
{
tmp_cnt -= agg_size_bins[i];
agg_size_bins[i] = 0;
}
else
{
agg_size_bins[i] -= tmp_cnt;
tmp_cnt = 0;
}
i--;
if (i < 0) i = 9;
}
assert((my_ndx >= 0) && (my_ndx < 10));
/* first try a common access size */
for (j = 0; j < 4; j++)
{
if (common_access_counts[j] &&
(common_accesses[j] > size_bin_min_vals[my_ndx]) &&
(common_accesses[j] <= size_bin_max_vals[my_ndx]))
{
*io_sz = common_accesses[j];
common_access_counts[j]--;
break;
}
}
/* if no common access size found, assign a default size */
if (j == 4)
{
/* default size is the median of the range, aligned to be multiple of 8 */
size_t gen_size = (size_bin_max_vals[my_ndx] - size_bin_min_vals[my_ndx]) / 2;
*io_sz = ALIGN_BY_8(gen_size);
}
break;
}
}
assert(*io_sz > 0);
}
*total_io_size -= *io_sz;
/* we simply assign offsets sequentially through an aggregator's file view */
*io_off = agg_off;
agg_off += *io_sz;
return;
}
/* WARNING: BRUTE FORCE */ /* WARNING: BRUTE FORCE */
static void determine_io_params( static void determine_io_params(
struct darshan_file *file, int write_flag, int64_t io_this_op, int64_t proc_count, struct darshan_file *file, int write_flag, int64_t io_this_op, int64_t proc_count,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment