diff --git a/src/workload/codes-darshan-io-wrkld.c b/src/workload/codes-darshan-io-wrkld.c index 19c73472ed77bc09432f933a0a716450d62d1b1a..fd6466586c5aaf841bfabe4b2dbb176c1e39472d 100644 --- a/src/workload/codes-darshan-io-wrkld.c +++ b/src/workload/codes-darshan-io-wrkld.c @@ -22,7 +22,9 @@ #define IO_IS_IN_SIZE_BIN_RANGE(size, bin_ndx, bin_min_sizes) \ ((bin_ndx == 9) ? \ (size >= bin_min_sizes[bin_ndx]) : \ - ((size >= bin_min_sizes[bin_ndx]) && (size < bin_min_sizes[bin_ndx + 1]))) + ((size >= bin_min_sizes[bin_ndx]) && (size <= bin_min_sizes[bin_ndx + 1]))) + +#define MIN(a, b) (((a) < (b)) ? (a) : (b)) /* structure for storing a darshan workload operation (a codes op with 2 timestamps) */ struct darshan_io_op @@ -77,8 +79,8 @@ static double generate_psx_coll_io_events(struct darshan_file *file, int64_t ind int64_t aggregator_cnt, double inter_io_delay, double meta_op_time, double cur_time, struct rank_io_context *io_context); -static void determine_io_params(struct darshan_file *file, int write_flag, int coll_flag, - int64_t io_cycles, size_t *io_sz, off_t *io_off); +static void determine_io_params(struct darshan_file *file, int write_flag, int64_t io_this_op, + int64_t proc_count, size_t *io_sz, off_t *io_off); static void calc_io_delays(struct darshan_file *file, int64_t num_opens, int64_t num_io_ops, double total_delay, double *first_io_delay, double *close_delay, double *inter_open_delay, double *inter_io_delay); @@ -403,7 +405,8 @@ void darshan_print_io_ops(void *io_op_dat, int rank, FILE *log_stream) } else if (event_list[i].codes_op.op_type == CODES_WK_READ) { - fprintf(log_stream, "Rank %d READ %"PRIu64" [sz = %"PRId64", off = %"PRId64"] (%lf - %lf)\n", + fprintf(log_stream, "Rank %d READ %"PRIu64" [sz = %"PRId64", off = %"PRId64 + "] (%lf - %lf)\n", rank, event_list[i].codes_op.u.read.file_id, (int64_t)event_list[i].codes_op.u.read.size, @@ -413,7 +416,8 @@ void darshan_print_io_ops(void *io_op_dat, int rank, FILE *log_stream) } else if (event_list[i].codes_op.op_type == CODES_WK_WRITE) { - fprintf(log_stream, "Rank %d WRITE %"PRIu64" [sz = %"PRId64", off = %"PRId64"] (%lf - %lf)\n", + fprintf(log_stream, "Rank %d WRITE %"PRIu64" [sz = %"PRId64", off = %"PRId64 + "] (%lf - %lf)\n", rank, event_list[i].codes_op.u.write.file_id, (int64_t)event_list[i].codes_op.u.write.size, @@ -867,7 +871,7 @@ static double generate_psx_ind_io_events( for (i = 0; i < io_ops_this_cycle; i++) { /* calculate what value to use for i/o size and offset */ - determine_io_params(file, rw, 0, file->counters[CP_POSIX_OPENS], &io_sz, &io_off); + determine_io_params(file, rw, 1, 1, &io_sz, &io_off); if (!rw) { /* generate a read event */ @@ -1066,8 +1070,8 @@ static double generate_psx_coll_io_events( for (j = 0; j < io_cnt; j++) { - determine_io_params(file, rw, ind_coll, (ind_coll) ? total_coll_io_ops : - file->counters[CP_POSIX_OPENS] / nprocs, &io_sz, &io_off); + determine_io_params(file, rw, (ind_coll) ? io_cnt - j : ind_io_ops_this_cycle + 1, + aggregator_cnt, &io_sz, &io_off); if (!rw) { /* generate a read event */ @@ -1186,23 +1190,23 @@ static double generate_psx_coll_io_events( } static void determine_io_params( - struct darshan_file *file, int write_flag, int coll_flag, int64_t io_cycles, + struct darshan_file *file, int write_flag, int64_t io_this_op, int64_t proc_count, size_t *io_sz, off_t *io_off) { - static int seq_rd_flag = -1; - static int seq_wr_flag = -1; static uint64_t next_rd_off = 0; static uint64_t next_wr_off = 0; - static int64_t rd_common_accesses[4] = { 0, 0, 0, 0 }; - static int64_t wr_common_accesses[4] = { 0, 0, 0, 0 }; - static int all_common_flag = -1; - int64_t *size_bins; /* 10 size bins for io operations */ + static int size_bin_ndx = -1; + static int64_t io_this_size_bin = 0; + static int64_t rd_common_counts[4]; + static int64_t wr_common_counts[4]; + int64_t *rd_size_bins = &(file->counters[CP_SIZE_READ_0_100]); + int64_t *wr_size_bins = &(file->counters[CP_SIZE_WRITE_0_100]); + int64_t *size_bins; int64_t *common_accesses = &(file->counters[CP_ACCESS1_ACCESS]); /* 4 common accesses */ int64_t *common_access_counts = &(file->counters[CP_ACCESS1_COUNT]); /* common access counts */ int64_t *total_io_size; int64_t last_io_byte; - int64_t tmp_byte_counter = 0; - int size_bin_ndx; + int look_for_small_bin = 0; int i, j = 0; int64_t bin_min_size[10] = { 0, 100, 1024, 10 * 1024, 100 * 1024, 1024 * 1024, 4 * 1024 * 1024, 10 * 1024 * 1024, 100 * 1024 * 1024, 1024 * 1024 * 1024 }; @@ -1210,163 +1214,163 @@ static void determine_io_params( 6 * 1024 * 1024, 40 * 1024 * 1024, 400 * 1024 * 1024, 1 * 1024 * 1024 * 1024 }; - /* determine how to assign common access counters to reads and/or writes */ - if (all_common_flag == -1) + assert(io_this_op); + + if (size_bin_ndx == -1) { for (i = 0; i < 4; i++) { - tmp_byte_counter += (common_accesses[i] * common_access_counts[i]); - } - - if (tmp_byte_counter == (file->counters[CP_BYTES_WRITTEN] + file->counters[CP_BYTES_READ])) - { - all_common_flag = 1; - } - else - { - all_common_flag = 0; + for (j = 0; j < 10; j++) + { + if (IO_IS_IN_SIZE_BIN_RANGE(common_accesses[i], j, bin_min_size)) + { + if (rd_size_bins[j] && wr_size_bins[j]) + { + rd_common_counts[i] = MIN(common_access_counts[i] / 2, rd_size_bins[j]); + wr_common_counts[i] = common_access_counts[i] - rd_common_counts[i]; + } + else if (rd_size_bins[j]) + { + rd_common_counts[i] = common_access_counts[i]; + wr_common_counts[i] = 0; + } + else if (wr_size_bins[j]) + { + rd_common_counts[i] = 0; + wr_common_counts[i] = common_access_counts[i]; + } + else + { + rd_common_counts[i] = wr_common_counts[i] = 0; + } + break; + } + } } } /* assign data values depending on whether the operation is a read or write */ if (write_flag) { - size_bins = &(file->counters[CP_SIZE_WRITE_0_100]); total_io_size = &(file->counters[CP_BYTES_WRITTEN]); last_io_byte = file->counters[CP_MAX_BYTE_WRITTEN]; - - if (seq_wr_flag == -1) - { - if ((file->counters[CP_POSIX_WRITES] - - ((*total_io_size - last_io_byte - 1) / (last_io_byte + 1)) - 1) == - file->counters[CP_SEQ_WRITES]) - { - seq_wr_flag = 1; - } - else - { - seq_wr_flag = 0; - } - } + size_bins = wr_size_bins; + common_access_counts = wr_common_counts; } else { - size_bins = &(file->counters[CP_SIZE_READ_0_100]); total_io_size = &(file->counters[CP_BYTES_READ]); last_io_byte = file->counters[CP_MAX_BYTE_READ]; - - if (seq_rd_flag == -1) - { - if ((file->counters[CP_POSIX_READS] - - ((*total_io_size - last_io_byte - 1) / (last_io_byte + 1)) - 1) == - file->counters[CP_SEQ_READS]) - { - seq_rd_flag = 1; - } - else - { - seq_rd_flag = 0; - } - } + size_bins = rd_size_bins; + common_access_counts = rd_common_counts; } - *io_sz = 0; - if ((*total_io_size == 0) || (write_flag && (file->counters[CP_POSIX_WRITES] == 1)) || - (!write_flag && (file->counters[CP_POSIX_READS] == 1))) - { - if (*total_io_size >= 0) - *io_sz = *total_io_size; - } - else if (all_common_flag) + if (!io_this_size_bin) { - for (i = 0; i < 4; i++) + if (io_this_op < proc_count) { - if (!write_flag && rd_common_accesses[i]) + look_for_small_bin = 1; + for (i = 0; i < 10; i++) { - *io_sz = common_accesses[i]; - rd_common_accesses[i]--; - common_access_counts[i]--; - break; - } - else if (write_flag && wr_common_accesses[i]) - { - *io_sz = common_accesses[i]; - wr_common_accesses[i]--; - common_access_counts[i]--; - break; + if (size_bins[i] % proc_count) + { + if (!io_this_size_bin) + { + size_bin_ndx = i; + io_this_size_bin = MIN(size_bins[i] % proc_count, io_this_op); + } + else if ((size_bins[i] % proc_count) < io_this_size_bin) + { + size_bin_ndx = i; + io_this_size_bin = size_bins[i] % proc_count; + } + } } } - - if (*io_sz == 0) + else { - for (i = 0; i < 4; i++) + for (i = 0; i < 10; i++) { - if (write_flag) + if (size_bins[i] && ((size_bins[i] % proc_count) == 0)) { - wr_common_accesses[i] = (common_access_counts[i] / io_cycles); - if ((*io_sz == 0) && wr_common_accesses[i]) + if (!io_this_size_bin || + (io_this_size_bin && (size_bins[i] > size_bins[size_bin_ndx]))) { - *io_sz = common_accesses[i]; - wr_common_accesses[i]--; - common_access_counts[i]--; + size_bin_ndx = i; + io_this_size_bin = proc_count; } } - else + } + } + + if (!io_this_size_bin) + { + for (i = 0; i < 10; i++) + { + if (size_bins[i]) { - rd_common_accesses[i] = (common_access_counts[i] / io_cycles); - if ((*io_sz == 0) && rd_common_accesses[i]) + size_bin_ndx = i; + io_this_size_bin = size_bins[i]; + if (io_this_size_bin > io_this_op) { - *io_sz = common_accesses[i]; - rd_common_accesses[i]--; - common_access_counts[i]--; + io_this_size_bin = io_this_op; } + break; } } } - assert(*io_sz); } - else + + *io_sz = 0; + if (*total_io_size > 0) { - /* try to assign a common access first */ - for (i = 0; i < 10; i++) + if ((write_flag && (file->counters[CP_POSIX_WRITES] == 1)) || + (!write_flag && (file->counters[CP_POSIX_READS] == 1))) + { + *io_sz = *total_io_size; + } + else { + /* try to assign a common access first (intelligently) */ for (j = 0; j < 4; j++) { - if (size_bins[i] && common_access_counts[j] && - IO_IS_IN_SIZE_BIN_RANGE(common_accesses[j], i, bin_min_size)) + if (common_access_counts[j] && + IO_IS_IN_SIZE_BIN_RANGE(common_accesses[j], size_bin_ndx, bin_min_size)) { - *io_sz = common_accesses[j]; - common_access_counts[j]--; - break; + if (look_for_small_bin) + { + if (common_access_counts[j] == io_this_op) + { + *io_sz = common_accesses[j]; + common_access_counts[j]--; + break; + } + } + else + { + *io_sz = common_accesses[j]; + common_access_counts[j]--; + break; + } } - } - if (*io_sz) - break; - } - /* if no common accesses left, then assign a random io size */ - if (*io_sz == 0) - { - size_bin_ndx = rand() % 10; - for (i = 0; i < 10; i++) - { - if (size_bins[size_bin_ndx]) + if ((j == 3) && look_for_small_bin) { - *io_sz = bin_def_size[size_bin_ndx]; - break; + j = 0; + look_for_small_bin = 0; } - size_bin_ndx = (size_bin_ndx + 1) % 10; } + + /* if no common accesses left, then assign default size for this bin */ + if (*io_sz == 0) + *io_sz = bin_def_size[size_bin_ndx]; } assert(*io_sz); } *total_io_size -= *io_sz; - for (i = 0; i < 10; i++) - { - if (IO_IS_IN_SIZE_BIN_RANGE(*io_sz, i, bin_min_size)) - size_bins[i]--; - } + size_bins[size_bin_ndx]--; + io_this_size_bin--; /* next, determine the offset to use */ @@ -1377,39 +1381,32 @@ static void determine_io_params( { *io_off = last_io_byte + 1; } - else if (write_flag && seq_wr_flag) + else { - if ((next_wr_off + *io_sz) > (last_io_byte + 1)) - next_wr_off = 0; + if (write_flag) + { + if ((next_wr_off + *io_sz) > (last_io_byte + 1)) + next_wr_off = 0; - *io_off = next_wr_off; - next_wr_off += *io_sz; - } - else if (!write_flag && seq_rd_flag) - { - if ((next_rd_off + *io_sz) > (last_io_byte + 1)) - next_rd_off = 0; + *io_off = next_wr_off; + next_wr_off += *io_sz; + } + else + { + if ((next_rd_off + *io_sz) > (last_io_byte + 1)) + next_rd_off = 0; - *io_off = next_rd_off; - next_rd_off += *io_sz; - } - else if (*io_sz < last_io_byte) - { - *io_off = (off_t)rand() % (last_io_byte - *io_sz); - } - else - { - *io_off = 0; + *io_off = next_rd_off; + next_rd_off += *io_sz; + } } /* reset static variable if this is the last i/o op for this file */ if ((file->counters[CP_POSIX_READS] + file->counters[CP_POSIX_WRITES]) == 1) { + io_this_size_bin = 0; + size_bin_ndx = -1; next_rd_off = next_wr_off = 0; - seq_wr_flag = seq_rd_flag = -1; - all_common_flag = -1; - for (i = 0; i < 4; i++) - rd_common_accesses[i] = wr_common_accesses[i] = 0; } return; diff --git a/tests/workload/codes-workload-mpi-replay.c b/tests/workload/codes-workload-mpi-replay.c index 0a91e9af102427dcfafa0f8dfd7d4f9e4828d6a9..61499434ba205d5b97d53aae91a43ec71dcc426b 100644 --- a/tests/workload/codes-workload-mpi-replay.c +++ b/tests/workload/codes-workload-mpi-replay.c @@ -24,6 +24,8 @@ #include "codes/quickhash.h" #include "codes/configuration.h" +#define WORKLOAD_PRINT 0 + /* hash table entry for looking up file descriptor of a workload file id */ struct file_info { @@ -136,8 +138,9 @@ int load_workload(char *conf_path, int rank) configuration_get_value(&config, "PARAMS", "aggregator_count", aggregator_count, 10); d_params.aggregator_cnt = atoi(aggregator_count); +#if WORKLOAD_PRINT d_params.stream = NULL; -#if 0 +#else d_params.stream = log_stream; opt_verbose = 0; #endif @@ -274,9 +277,11 @@ int replay_workload_op(struct codes_workload_op replay_op, int rank, long long i switch (replay_op.op_type) { case CODES_WK_DELAY: +#if WORKLOAD_PRINT if (opt_verbose) fprintf(log_stream, "[Rank %d] Operation %lld : DELAY %lf seconds\n", rank, op_number, replay_op.u.delay.seconds); +#endif if (!opt_noop) { @@ -303,8 +308,10 @@ int replay_workload_op(struct codes_workload_op replay_op, int rank, long long i } return 0; case CODES_WK_BARRIER: +#if WORKLOAD_PRINT if (opt_verbose) fprintf(log_stream, "[Rank %d] Operation %lld : BARRIER\n", rank, op_number); +#endif if (!opt_noop) { @@ -320,9 +327,11 @@ int replay_workload_op(struct codes_workload_op replay_op, int rank, long long i } return 0; case CODES_WK_OPEN: +#if WORKLOAD_PRINT if (opt_verbose) fprintf(log_stream, "[Rank %d] Operation %lld: %s file %"PRIu64"\n", rank, op_number, (replay_op.u.open.create_flag) ? "CREATE" : "OPEN", replay_op.u.open.file_id); +#endif if (!opt_noop) { @@ -357,9 +366,11 @@ int replay_workload_op(struct codes_workload_op replay_op, int rank, long long i } return 0; case CODES_WK_CLOSE: +#if WORKLOAD_PRINT if (opt_verbose) fprintf(log_stream, "[Rank %d] Operation %lld : CLOSE file %"PRIu64"\n", rank, op_number, replay_op.u.close.file_id); +#endif if (!opt_noop) { @@ -381,11 +392,13 @@ int replay_workload_op(struct codes_workload_op replay_op, int rank, long long i } return 0; case CODES_WK_WRITE: +#if WORKLOAD_PRINT if (opt_verbose) fprintf(log_stream, "[Rank %d] Operation %lld : WRITE file %"PRIu64" (sz = %"PRId64 ", off = %"PRId64")\n", rank, op_number, replay_op.u.write.file_id, replay_op.u.write.size, replay_op.u.write.offset); +#endif if (!opt_noop) { @@ -413,11 +426,12 @@ int replay_workload_op(struct codes_workload_op replay_op, int rank, long long i } return 0; case CODES_WK_READ: +#if WORKLOAD_PRINT if (opt_verbose) fprintf(log_stream, "[Rank %d] Operation %lld : READ file %"PRIu64" (sz = %"PRId64 ", off = %"PRId64")\n", rank, op_number, replay_op.u.read.file_id, replay_op.u.read.size, replay_op.u.read.offset); - +#endif if (!opt_noop) { /* search for the corresponding file descriptor in the hash table */