From 04e554e89daf89d5d4d526102920ac4344c3d705 Mon Sep 17 00:00:00 2001 From: Shane Snyder Date: Fri, 18 Apr 2014 13:51:39 -0500 Subject: [PATCH] Darshan collective buffering correction The collective buffering algorithm now appropriately assigns i/o sizes and offsets according to an aggregator's "file view". --- src/workload/codes-darshan-io-wrkld.c | 312 +++++++++++++++++++++++++- 1 file changed, 303 insertions(+), 9 deletions(-) diff --git a/src/workload/codes-darshan-io-wrkld.c b/src/workload/codes-darshan-io-wrkld.c index 61d9ee0..56670ad 100644 --- a/src/workload/codes-darshan-io-wrkld.c +++ b/src/workload/codes-darshan-io-wrkld.c @@ -37,6 +37,10 @@ struct rank_io_context int64_t my_rank; double last_op_time; void *io_op_dat; + + off_t next_coll_rd_off; + off_t next_coll_wr_off; + struct qhash_head hash_link; }; @@ -77,6 +81,9 @@ static double generate_psx_coll_io_events(struct darshan_file *file, int64_t ind double cur_time, struct rank_io_context *io_context); static void determine_io_params(struct darshan_file *file, int write_flag, int64_t io_this_op, int64_t proc_count, size_t *io_sz, off_t *io_off); +static void determine_coll_io_params(struct darshan_file *file, int write_flag, int64_t coll_op_cnt, + int64_t agg_cnt, int64_t agg_ndx, size_t *io_sz, off_t *io_off, + struct rank_io_context *io_context); static void calc_io_delays(struct darshan_file *file, int64_t num_opens, int64_t num_io_ops, double total_delay, double *first_io_delay, double *close_delay, double *inter_open_delay, double *inter_io_delay); @@ -139,6 +146,8 @@ static int darshan_io_workload_load(const char *params, int rank) my_ctx->my_rank = (int64_t)rank; my_ctx->last_op_time = 0.0; my_ctx->io_op_dat = darshan_init_io_op_dat(); + my_ctx->next_coll_rd_off = my_ctx->next_coll_wr_off = 0; + /* loop over all files contained in the log file */ while ((ret = darshan_log_getfile(logfile_fd, &job, &next_file)) > 0) @@ -637,6 +646,7 @@ void generate_psx_coll_file_events( if (!file->counters[CP_COLL_OPENS] && !file->counters[CP_INDEP_OPENS]) { + /* TODO: we probably want to use ind_io here */ cur_time = generate_psx_coll_io_events(file, 1, 0, nprocs, nprocs, 0.0, cur_time, io_context); extra_io_ops--; @@ -647,6 +657,7 @@ void generate_psx_coll_file_events( file->counters[CP_POSIX_OPENS]--; } + /* TODO: look at this, use ind_io */ while (ind_opens_this_cycle) { if (ind_opens_this_cycle >= nprocs) @@ -930,8 +941,6 @@ static double generate_psx_coll_io_events( static double rd_bw = 0.0, wr_bw = 0.0; int64_t psx_rw_ops_remaining = file->counters[CP_POSIX_READS] + file->counters[CP_POSIX_WRITES]; int64_t total_io_ops_this_cycle = ind_io_ops_this_cycle + coll_io_ops_this_cycle; - int64_t total_coll_io_ops = - (file->counters[CP_COLL_READS] + file->counters[CP_COLL_WRITES]) / nprocs; int64_t tmp_rank; int64_t next_ind_io_rank = 0; int64_t io_cnt; @@ -1033,13 +1042,83 @@ static double generate_psx_coll_io_events( else ind_ops_remaining = ind_io_ops_this_cycle; - cur_time = generate_barrier_event(file, 0, cur_time, io_context); + for (j = 0; j < io_cnt; j += aggregator_cnt) + { + int64_t tmp_coll_cnt = MIN(io_cnt - j, aggregator_cnt); + + cur_time = generate_barrier_event(file, 0, cur_time, io_context); + + if (((io_context->my_rank % ranks_per_aggregator) == 0) && + (io_context->my_rank < (ranks_per_aggregator * aggregator_cnt)) && + ((io_context->my_rank / ranks_per_aggregator) < tmp_coll_cnt)) + { + determine_coll_io_params(file, rw, io_cnt, tmp_coll_cnt, + (io_context->my_rank / ranks_per_aggregator) + 1, + &io_sz, &io_off, io_context); + if (!rw) + { + /* generate a read event */ + next_io_op.codes_op.op_type = CODES_WK_READ; + next_io_op.codes_op.u.read.file_id = file->hash; + next_io_op.codes_op.u.read.size = io_sz; + next_io_op.codes_op.u.read.offset = io_off; + next_io_op.start_time = cur_time; + + /* set the end time based on observed bandwidth and io size */ + if (rd_bw == 0.0) + io_op_time = 0.0; + else + io_op_time = (io_sz / rd_bw); + + next_io_op.end_time = cur_time + io_op_time; + file->counters[CP_POSIX_READS] -= tmp_coll_cnt; + } + else + { + /* generate a write event */ + next_io_op.codes_op.op_type = CODES_WK_WRITE; + next_io_op.codes_op.u.write.file_id = file->hash; + next_io_op.codes_op.u.write.size = io_sz; + next_io_op.codes_op.u.write.offset = io_off; + next_io_op.start_time = cur_time; + + /* set the end time based on observed bandwidth and io size */ + if (wr_bw == 0.0) + io_op_time = 0.0; + else + io_op_time = (io_sz / wr_bw); + + next_io_op.end_time = cur_time + io_op_time; + file->counters[CP_POSIX_WRITES] -= tmp_coll_cnt; + } + + darshan_insert_next_io_op(io_context->io_op_dat, &next_io_op); + + cur_time = next_io_op.end_time; + } + else + { + if (!rw) + { + file->counters[CP_POSIX_READS] -= tmp_coll_cnt; + } + else + { + file->counters[CP_POSIX_WRITES] -= tmp_coll_cnt; + } + } + psx_rw_ops_remaining -= tmp_coll_cnt; + assert(file->counters[CP_POSIX_READS] >= 0); + assert(file->counters[CP_POSIX_WRITES] >= 0); + } } + io_ops_this_rw--; + max_cur_time = cur_time; +#if 0 for (j = 0; j < io_cnt; j++) { - determine_io_params(file, rw, (ind_coll) ? io_cnt - j : ind_io_ops_this_cycle + 1, - aggregator_cnt, &io_sz, &io_off); + determine_coll_io_params(file, rw, io_cnt - j, &io_sz, &io_off); if (!rw) { /* generate a read event */ @@ -1061,7 +1140,7 @@ static double generate_psx_coll_io_events( else { /* generate a write event */ - next_io_op.codes_op.op_type = CODES_WK_WRITE; + ext_io_op.codes_op.op_type = CODES_WK_WRITE; next_io_op.codes_op.u.write.file_id = file->hash; next_io_op.codes_op.u.write.size = io_sz; next_io_op.codes_op.u.write.offset = io_off; @@ -1080,7 +1159,7 @@ static double generate_psx_coll_io_events( assert(file->counters[CP_POSIX_READS] >= 0); assert(file->counters[CP_POSIX_WRITES] >= 0); - /* store the i/o event */ + /* TODO store the i/o event */ if (tmp_rank == io_context->my_rank) darshan_insert_next_io_op(io_context->io_op_dat, &next_io_op); @@ -1097,11 +1176,10 @@ static double generate_psx_coll_io_events( } } io_ops_this_rw--; +#endif if (ind_coll) { - total_coll_io_ops--; - cur_time = max_cur_time; if (i != (total_io_ops_this_cycle - 1)) cur_time += inter_io_delay; @@ -1159,6 +1237,222 @@ static double generate_psx_coll_io_events( return cur_time; } +static void determine_coll_io_params( + struct darshan_file *file, int write_flag, int64_t coll_op_cnt, int64_t agg_cnt, + int64_t agg_ndx, size_t *io_sz, off_t *io_off, struct rank_io_context *io_context) +{ + static int64_t size_bins_left = 0; + static int64_t agg_size_bins[10] = { 0 }; + static off_t agg_off = 0; + int i, j; + off_t *next_coll_off; + int64_t *all_size_bins = NULL; + int64_t *common_accesses = &(file->counters[CP_ACCESS1_ACCESS]); /* 4 common accesses */ + int64_t *common_access_counts = &(file->counters[CP_ACCESS1_COUNT]); /* common access counts */ + int64_t *total_io_size = NULL; + int64_t tmp_cnt; + const int64_t size_bin_min_vals[10] = { 0, 100, 1024, 10 * 1024, 100 * 1024, 1024 * 1024, + 4 * 1024 * 1024, 10 * 1024 * 1024, 100 * 1024 * 1024, + 1024 * 1024 * 1024 }; + const int64_t size_bin_max_vals[10] = { 100, 1024, 10 * 1024, 100 * 1024, 1024 * 1024, + 4 * 1024 * 1024, 10 * 1024 * 1024, 100 * 1024 * 1024, + 1024 * 1024 * 1024, INT64_MAX }; + + if (write_flag) + { + all_size_bins = &(file->counters[CP_SIZE_WRITE_0_100]); + total_io_size = &(file->counters[CP_BYTES_WRITTEN]); + next_coll_off = &(io_context->next_coll_wr_off); + } + else + { + all_size_bins = &(file->counters[CP_SIZE_READ_0_100]); + total_io_size = &(file->counters[CP_BYTES_READ]); + next_coll_off = &(io_context->next_coll_rd_off); + } + + /* we enter this if statement if we have not yet calculated which size bins to use for the + * current collective I/O operation. + */ + if (!size_bins_left) + { + int64_t total_agg_size_bin_cnt = 0; + int tmp_ndx = 9; + + /* find some size bins that we can assign accesses out of. + * Note: we require the bins be reasonably close to each other in size, and be less + * than 100 MiB. + */ + for (i = 7; i >= 0; i--) + { + if (all_size_bins[i]) + { + if (total_agg_size_bin_cnt) + { + if ((tmp_ndx - 3) <= i) + { + tmp_ndx = i; + total_agg_size_bin_cnt += all_size_bins[i]; + } + break; + } + else + { + tmp_ndx = i; + total_agg_size_bin_cnt += all_size_bins[i]; + } + } + } + + /* assign accesses from found bins proportional to bin size */ + for (i = 7; i >= tmp_ndx; i--) + { + if (all_size_bins[i]) + { + agg_size_bins[i] = ((double)all_size_bins[i] / total_agg_size_bin_cnt) * coll_op_cnt; + size_bins_left += agg_size_bins[i]; + all_size_bins[i] -= agg_size_bins[i]; + } + + if (size_bins_left == coll_op_cnt) break; + } + + /* if we still haven't assigned enough accesses, just assign them stupidly */ + if (size_bins_left < coll_op_cnt) + { + for (i = 9; i >= 0; i--) + { + if (all_size_bins[i]) + { + tmp_cnt = MIN(all_size_bins[i], coll_op_cnt - size_bins_left); + agg_size_bins[i] += tmp_cnt; + size_bins_left += tmp_cnt; + all_size_bins[i] -= tmp_cnt; + } + + if (size_bins_left == coll_op_cnt) break; + } + } + assert(size_bins_left == coll_op_cnt); + + ssize_t tmp_size; + int64_t tmp_agg_cnt = 1; /* start at aggregator 1 */ + int64_t tmp_common_cnts[4]; + memcpy(tmp_common_cnts, common_access_counts, 4 * sizeof(int64_t)); + agg_off = *next_coll_off; + + /* determine initial offset for this aggregator */ + for (i = 9; i >= 0; i--) + { + tmp_cnt = agg_size_bins[i]; + while (tmp_cnt) + { + /* assign access sizes, starting with common and falling back to default */ + tmp_size = -1; + for (j = 0; j < 4; j++) + { + if (tmp_common_cnts[j] && + (common_accesses[j] > size_bin_min_vals[i]) && + (common_accesses[j] <= size_bin_max_vals[i])) + { + tmp_size = common_accesses[j]; + tmp_common_cnts[j]--; + break; + } + } + + if (tmp_size == -1) + { + tmp_size = ALIGN_BY_8((size_bin_max_vals[i] - size_bin_min_vals[i]) / 2); + } + + /* only increment offset for aggregators less than me */ + if (tmp_agg_cnt < agg_ndx) + { + agg_off += tmp_size; + } + *next_coll_off += tmp_size; + + tmp_cnt--; + tmp_agg_cnt++; + if (tmp_agg_cnt > agg_cnt) tmp_agg_cnt = 1; + } + } + } + + /* assign an actual access size, according to already initialized agg size bins */ + *io_sz = 0; + if (*total_io_size > 0) + { + for (i = 9; i >= 0; i--) + { + if (agg_size_bins[i]) + { + int my_ndx = -1; + + /* decrement size bin counters to reflect this round of collective i/o */ + size_bins_left -= agg_cnt; + tmp_cnt = agg_cnt; + while (tmp_cnt) + { + if (my_ndx == -1) + { + if (agg_ndx <= agg_size_bins[i]) + my_ndx = i; + else + agg_ndx -= agg_size_bins[i]; + } + + if (tmp_cnt > agg_size_bins[i]) + { + tmp_cnt -= agg_size_bins[i]; + agg_size_bins[i] = 0; + } + else + { + agg_size_bins[i] -= tmp_cnt; + tmp_cnt = 0; + } + + i--; + if (i < 0) i = 9; + } + assert((my_ndx >= 0) && (my_ndx < 10)); + + /* first try a common access size */ + for (j = 0; j < 4; j++) + { + if (common_access_counts[j] && + (common_accesses[j] > size_bin_min_vals[my_ndx]) && + (common_accesses[j] <= size_bin_max_vals[my_ndx])) + { + *io_sz = common_accesses[j]; + common_access_counts[j]--; + break; + } + } + + /* if no common access size found, assign a default size */ + if (j == 4) + { + /* default size is the median of the range, aligned to be multiple of 8 */ + size_t gen_size = (size_bin_max_vals[my_ndx] - size_bin_min_vals[my_ndx]) / 2; + *io_sz = ALIGN_BY_8(gen_size); + } + break; + } + } + assert(*io_sz > 0); + } + *total_io_size -= *io_sz; + + /* we simply assign offsets sequentially through an aggregator's file view */ + *io_off = agg_off; + agg_off += *io_sz; + + return; +} + /* WARNING: BRUTE FORCE */ static void determine_io_params( struct darshan_file *file, int write_flag, int64_t io_this_op, int64_t proc_count, -- 2.26.2