Commit 8dd0baa4 authored by Shane Snyder's avatar Shane Snyder
Browse files

Darshan workload generator collective enhancement

Darshan workload generator now evenly distributes aggregators
acoss the entire rank-space. Also, individual operation sizes
are evenly distributed among aggregators, so aggregator
workloads are more evenly distributed.
parent 58648028
......@@ -935,7 +935,7 @@ static double generate_psx_coll_io_events(
int64_t tmp_rank;
int64_t next_ind_io_rank = 0;
int64_t io_cnt;
int64_t ranks_per_aggregator = nprocs / aggregator_cnt;
double ranks_per_aggregator = (double)(nprocs - 1) / (aggregator_cnt - 1);
int64_t ind_ops_remaining = 0;
double io_op_time;
double max_cur_time = 0.0;
......@@ -1036,15 +1036,15 @@ static double generate_psx_coll_io_events(
for (j = 0; j < io_cnt; j += aggregator_cnt)
{
int64_t tmp_coll_cnt = MIN(io_cnt - j, aggregator_cnt);
int64_t tmp_agg_ndx;
cur_time = generate_barrier_event(file, 0, cur_time, io_context);
if (((io_context->my_rank % ranks_per_aggregator) == 0) &&
(io_context->my_rank < (ranks_per_aggregator * aggregator_cnt)) &&
((io_context->my_rank / ranks_per_aggregator) < tmp_coll_cnt))
tmp_agg_ndx = (int64_t)round(io_context->my_rank / ranks_per_aggregator);
if ((round(tmp_agg_ndx * ranks_per_aggregator) == io_context->my_rank) &&
(tmp_agg_ndx < tmp_coll_cnt))
{
determine_coll_io_params(file, rw, io_cnt, tmp_coll_cnt,
(io_context->my_rank / ranks_per_aggregator) + 1,
determine_coll_io_params(file, rw, io_cnt, tmp_coll_cnt, tmp_agg_ndx + 1,
&io_sz, &io_off, io_context);
if (!rw)
{
......@@ -1236,12 +1236,16 @@ static void determine_coll_io_params(
static int64_t agg_size_bins[10] = { 0 };
static off_t agg_off = 0;
int i, j;
int start_ndx, end_ndx;
off_t *next_coll_off;
int64_t *all_size_bins = NULL;
int64_t *common_accesses = &(file->counters[CP_ACCESS1_ACCESS]); /* 4 common accesses */
int64_t *common_access_counts = &(file->counters[CP_ACCESS1_COUNT]); /* common access counts */
int64_t *total_io_size = NULL;
int64_t tmp_cnt;
int64_t switch_cnt;
int64_t leftover;
int64_t bins_to_use;
const int64_t size_bin_min_vals[10] = { 0, 100, 1024, 10 * 1024, 100 * 1024, 1024 * 1024,
4 * 1024 * 1024, 10 * 1024 * 1024, 100 * 1024 * 1024,
1024 * 1024 * 1024 };
......@@ -1327,46 +1331,100 @@ static void determine_coll_io_params(
assert(size_bins_left == coll_op_cnt);
ssize_t tmp_size;
int64_t tmp_agg_cnt = 1; /* start at aggregator 1 */
int64_t tmp_agg_ndx = 1; /* start at aggregator 1 */
int64_t tmp_agg_cnt;
int64_t tmp_common_cnts[4];
int64_t tmp_size_bins[10];
memcpy(tmp_common_cnts, common_access_counts, 4 * sizeof(int64_t));
memcpy(tmp_size_bins, agg_size_bins, 10 * sizeof(int64_t));
tmp_cnt = coll_op_cnt;
agg_off = *next_coll_off;
/* determine initial offset for this aggregator */
for (i = 9; i >= 0; i--)
while (tmp_cnt)
{
tmp_cnt = agg_size_bins[i];
while (tmp_cnt)
start_ndx = -1;
end_ndx = -1;
leftover = 0;
tmp_agg_cnt = MIN(tmp_cnt, agg_cnt);
for (i = 9; i >= 0; i--)
{
/* assign access sizes, starting with common and falling back to default */
tmp_size = -1;
for (j = 0; j < 4; j++)
if (tmp_size_bins[i])
{
if (tmp_common_cnts[j] &&
(common_accesses[j] > size_bin_min_vals[i]) &&
(common_accesses[j] <= size_bin_max_vals[i]))
if (start_ndx == -1) start_ndx = i;
tmp_agg_cnt -= tmp_size_bins[i];
if (tmp_agg_cnt <= 0)
{
tmp_size = common_accesses[j];
tmp_common_cnts[j]--;
end_ndx = i;
break;
}
}
}
if (tmp_size == -1)
i = start_ndx;
tmp_agg_cnt = MIN(tmp_cnt, agg_cnt);
while (tmp_agg_cnt)
{
if ((tmp_size_bins[i] >= tmp_agg_cnt) && !leftover)
{
tmp_size = ALIGN_BY_8((size_bin_max_vals[i] - size_bin_min_vals[i]) / 2);
switch_cnt = tmp_agg_cnt;
}
else if (tmp_size_bins[i])
{
bins_to_use = MIN(tmp_size_bins[i], tmp_agg_cnt - leftover);
switch_cnt = ceil((double)bins_to_use / (tmp_agg_cnt - bins_to_use));
/* only increment offset for aggregators less than me */
if (tmp_agg_cnt < agg_ndx)
leftover += (bins_to_use - switch_cnt);
}
else
{
agg_off += tmp_size;
switch_cnt = 0;
}
/* assign i/o operations until we need to move to the next size bin */
while (switch_cnt)
{
/* assign access sizes, starting with common and falling back to default */
tmp_size = -1;
for (j = 0; j < 4; j++)
{
if (tmp_common_cnts[j] &&
(common_accesses[j] > size_bin_min_vals[i]) &&
(common_accesses[j] <= size_bin_max_vals[i]))
{
tmp_size = common_accesses[j];
tmp_common_cnts[j]--;
break;
}
}
if (tmp_size == -1)
{
tmp_size = ALIGN_BY_8((size_bin_max_vals[i] - size_bin_min_vals[i]) / 2);
}
/* only increment offset for aggregators less than me */
if (tmp_agg_ndx < agg_ndx)
{
agg_off += tmp_size;
}
*next_coll_off += tmp_size;
tmp_cnt--;
tmp_agg_cnt--;
switch_cnt--;
tmp_size_bins[i]--;
tmp_agg_ndx++;
if (tmp_agg_ndx > agg_cnt) tmp_agg_ndx = 1;
}
*next_coll_off += tmp_size;
tmp_cnt--;
tmp_agg_cnt++;
if (tmp_agg_cnt > agg_cnt) tmp_agg_cnt = 1;
i--;
if (i < end_ndx)
{
i = start_ndx;
leftover = 0;
}
}
}
}
......@@ -1375,65 +1433,94 @@ static void determine_coll_io_params(
*io_sz = 0;
if (*total_io_size > 0)
{
size_bins_left -= agg_cnt;
tmp_cnt = agg_cnt;
start_ndx = -1;
end_ndx = -1;
leftover = 0;
int my_ndx = -1;
for (i = 9; i >= 0; i--)
{
if (agg_size_bins[i])
{
int my_ndx = -1;
if (start_ndx == -1) start_ndx = i;
/* decrement size bin counters to reflect this round of collective i/o */
size_bins_left -= agg_cnt;
tmp_cnt = agg_cnt;
while (tmp_cnt)
tmp_cnt -= agg_size_bins[i];
if (tmp_cnt <= 0)
{
if (my_ndx == -1)
{
if (agg_ndx <= agg_size_bins[i])
my_ndx = i;
else
agg_ndx -= agg_size_bins[i];
}
end_ndx = i;
break;
}
}
}
if (tmp_cnt > agg_size_bins[i])
{
tmp_cnt -= agg_size_bins[i];
agg_size_bins[i] = 0;
}
else
{
agg_size_bins[i] -= tmp_cnt;
tmp_cnt = 0;
}
i = start_ndx;
tmp_cnt = agg_cnt;
while (tmp_cnt)
{
if ((agg_size_bins[i] >= tmp_cnt) && !leftover)
{
switch_cnt = tmp_cnt;
}
else if (agg_size_bins[i])
{
bins_to_use = MIN(agg_size_bins[i], tmp_cnt - leftover);
switch_cnt = ceil((double)bins_to_use / (tmp_cnt - bins_to_use));
i--;
if (i < 0) i = 9;
}
assert((my_ndx >= 0) && (my_ndx < 10));
leftover += (bins_to_use - switch_cnt);
}
else
{
switch_cnt = 0;
}
/* first try a common access size */
for (j = 0; j < 4; j++)
if (switch_cnt)
{
if (my_ndx == -1)
{
if (common_access_counts[j] &&
(common_accesses[j] > size_bin_min_vals[my_ndx]) &&
(common_accesses[j] <= size_bin_max_vals[my_ndx]))
{
*io_sz = common_accesses[j];
common_access_counts[j]--;
break;
}
if (agg_ndx <= switch_cnt)
my_ndx = i;
else
agg_ndx -= switch_cnt;
}
/* if no common access size found, assign a default size */
if (j == 4)
{
/* default size is the median of the range, aligned to be multiple of 8 */
size_t gen_size = (size_bin_max_vals[my_ndx] - size_bin_min_vals[my_ndx]) / 2;
*io_sz = ALIGN_BY_8(gen_size);
}
agg_size_bins[i] -= switch_cnt;
if (tmp_cnt > switch_cnt)
tmp_cnt -= switch_cnt;
else
tmp_cnt = 0;
}
i--;
if (i < end_ndx)
{
i = start_ndx;
leftover = 0;
}
}
assert((my_ndx >= 0) && (my_ndx < 10));
/* first try a common access size */
for (j = 0; j < 4; j++)
{
if (common_access_counts[j] &&
(common_accesses[j] > size_bin_min_vals[my_ndx]) &&
(common_accesses[j] <= size_bin_max_vals[my_ndx]))
{
*io_sz = common_accesses[j];
common_access_counts[j]--;
break;
}
}
assert(*io_sz > 0);
/* if no common access size found, assign a default size */
if (j == 4)
{
/* default size is the median of the range, aligned to be multiple of 8 */
size_t gen_size = (size_bin_max_vals[my_ndx] - size_bin_min_vals[my_ndx]) / 2;
*io_sz = ALIGN_BY_8(gen_size);
}
}
*total_io_size -= *io_sz;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment