Commit 8f6db6ba authored by Shane Snyder's avatar Shane Snyder

Optimizations for generating access size sequences

parent 08cb4d92
......@@ -22,7 +22,9 @@
#define IO_IS_IN_SIZE_BIN_RANGE(size, bin_ndx, bin_min_sizes) \
((bin_ndx == 9) ? \
(size >= bin_min_sizes[bin_ndx]) : \
((size >= bin_min_sizes[bin_ndx]) && (size < bin_min_sizes[bin_ndx + 1])))
((size >= bin_min_sizes[bin_ndx]) && (size <= bin_min_sizes[bin_ndx + 1])))
#define MIN(a, b) (((a) < (b)) ? (a) : (b))
/* structure for storing a darshan workload operation (a codes op with 2 timestamps) */
struct darshan_io_op
......@@ -77,8 +79,8 @@ static double generate_psx_coll_io_events(struct darshan_file *file, int64_t ind
int64_t aggregator_cnt, double inter_io_delay,
double meta_op_time, double cur_time,
struct rank_io_context *io_context);
static void determine_io_params(struct darshan_file *file, int write_flag, int coll_flag,
int64_t io_cycles, size_t *io_sz, off_t *io_off);
static void determine_io_params(struct darshan_file *file, int write_flag, int64_t io_this_op,
int64_t proc_count, size_t *io_sz, off_t *io_off);
static void calc_io_delays(struct darshan_file *file, int64_t num_opens, int64_t num_io_ops,
double total_delay, double *first_io_delay, double *close_delay,
double *inter_open_delay, double *inter_io_delay);
......@@ -403,7 +405,8 @@ void darshan_print_io_ops(void *io_op_dat, int rank, FILE *log_stream)
}
else if (event_list[i].codes_op.op_type == CODES_WK_READ)
{
fprintf(log_stream, "Rank %d READ %"PRIu64" [sz = %"PRId64", off = %"PRId64"] (%lf - %lf)\n",
fprintf(log_stream, "Rank %d READ %"PRIu64" [sz = %"PRId64", off = %"PRId64
"] (%lf - %lf)\n",
rank,
event_list[i].codes_op.u.read.file_id,
(int64_t)event_list[i].codes_op.u.read.size,
......@@ -413,7 +416,8 @@ void darshan_print_io_ops(void *io_op_dat, int rank, FILE *log_stream)
}
else if (event_list[i].codes_op.op_type == CODES_WK_WRITE)
{
fprintf(log_stream, "Rank %d WRITE %"PRIu64" [sz = %"PRId64", off = %"PRId64"] (%lf - %lf)\n",
fprintf(log_stream, "Rank %d WRITE %"PRIu64" [sz = %"PRId64", off = %"PRId64
"] (%lf - %lf)\n",
rank,
event_list[i].codes_op.u.write.file_id,
(int64_t)event_list[i].codes_op.u.write.size,
......@@ -867,7 +871,7 @@ static double generate_psx_ind_io_events(
for (i = 0; i < io_ops_this_cycle; i++)
{
/* calculate what value to use for i/o size and offset */
determine_io_params(file, rw, 0, file->counters[CP_POSIX_OPENS], &io_sz, &io_off);
determine_io_params(file, rw, 1, 1, &io_sz, &io_off);
if (!rw)
{
/* generate a read event */
......@@ -1066,8 +1070,8 @@ static double generate_psx_coll_io_events(
for (j = 0; j < io_cnt; j++)
{
determine_io_params(file, rw, ind_coll, (ind_coll) ? total_coll_io_ops :
file->counters[CP_POSIX_OPENS] / nprocs, &io_sz, &io_off);
determine_io_params(file, rw, (ind_coll) ? io_cnt - j : ind_io_ops_this_cycle + 1,
aggregator_cnt, &io_sz, &io_off);
if (!rw)
{
/* generate a read event */
......@@ -1186,23 +1190,23 @@ static double generate_psx_coll_io_events(
}
static void determine_io_params(
struct darshan_file *file, int write_flag, int coll_flag, int64_t io_cycles,
struct darshan_file *file, int write_flag, int64_t io_this_op, int64_t proc_count,
size_t *io_sz, off_t *io_off)
{
static int seq_rd_flag = -1;
static int seq_wr_flag = -1;
static uint64_t next_rd_off = 0;
static uint64_t next_wr_off = 0;
static int64_t rd_common_accesses[4] = { 0, 0, 0, 0 };
static int64_t wr_common_accesses[4] = { 0, 0, 0, 0 };
static int all_common_flag = -1;
int64_t *size_bins; /* 10 size bins for io operations */
static int size_bin_ndx = -1;
static int64_t io_this_size_bin = 0;
static int64_t rd_common_counts[4];
static int64_t wr_common_counts[4];
int64_t *rd_size_bins = &(file->counters[CP_SIZE_READ_0_100]);
int64_t *wr_size_bins = &(file->counters[CP_SIZE_WRITE_0_100]);
int64_t *size_bins;
int64_t *common_accesses = &(file->counters[CP_ACCESS1_ACCESS]); /* 4 common accesses */
int64_t *common_access_counts = &(file->counters[CP_ACCESS1_COUNT]); /* common access counts */
int64_t *total_io_size;
int64_t last_io_byte;
int64_t tmp_byte_counter = 0;
int size_bin_ndx;
int look_for_small_bin = 0;
int i, j = 0;
int64_t bin_min_size[10] = { 0, 100, 1024, 10 * 1024, 100 * 1024, 1024 * 1024, 4 * 1024 * 1024,
10 * 1024 * 1024, 100 * 1024 * 1024, 1024 * 1024 * 1024 };
......@@ -1210,163 +1214,163 @@ static void determine_io_params(
6 * 1024 * 1024, 40 * 1024 * 1024, 400 * 1024 * 1024,
1 * 1024 * 1024 * 1024 };
/* determine how to assign common access counters to reads and/or writes */
if (all_common_flag == -1)
assert(io_this_op);
if (size_bin_ndx == -1)
{
for (i = 0; i < 4; i++)
{
tmp_byte_counter += (common_accesses[i] * common_access_counts[i]);
}
if (tmp_byte_counter == (file->counters[CP_BYTES_WRITTEN] + file->counters[CP_BYTES_READ]))
{
all_common_flag = 1;
}
else
{
all_common_flag = 0;
for (j = 0; j < 10; j++)
{
if (IO_IS_IN_SIZE_BIN_RANGE(common_accesses[i], j, bin_min_size))
{
if (rd_size_bins[j] && wr_size_bins[j])
{
rd_common_counts[i] = MIN(common_access_counts[i] / 2, rd_size_bins[j]);
wr_common_counts[i] = common_access_counts[i] - rd_common_counts[i];
}
else if (rd_size_bins[j])
{
rd_common_counts[i] = common_access_counts[i];
wr_common_counts[i] = 0;
}
else if (wr_size_bins[j])
{
rd_common_counts[i] = 0;
wr_common_counts[i] = common_access_counts[i];
}
else
{
rd_common_counts[i] = wr_common_counts[i] = 0;
}
break;
}
}
}
}
/* assign data values depending on whether the operation is a read or write */
if (write_flag)
{
size_bins = &(file->counters[CP_SIZE_WRITE_0_100]);
total_io_size = &(file->counters[CP_BYTES_WRITTEN]);
last_io_byte = file->counters[CP_MAX_BYTE_WRITTEN];
if (seq_wr_flag == -1)
{
if ((file->counters[CP_POSIX_WRITES] -
((*total_io_size - last_io_byte - 1) / (last_io_byte + 1)) - 1) ==
file->counters[CP_SEQ_WRITES])
{
seq_wr_flag = 1;
}
else
{
seq_wr_flag = 0;
}
}
size_bins = wr_size_bins;
common_access_counts = wr_common_counts;
}
else
{
size_bins = &(file->counters[CP_SIZE_READ_0_100]);
total_io_size = &(file->counters[CP_BYTES_READ]);
last_io_byte = file->counters[CP_MAX_BYTE_READ];
if (seq_rd_flag == -1)
{
if ((file->counters[CP_POSIX_READS] -
((*total_io_size - last_io_byte - 1) / (last_io_byte + 1)) - 1) ==
file->counters[CP_SEQ_READS])
{
seq_rd_flag = 1;
}
else
{
seq_rd_flag = 0;
}
}
size_bins = rd_size_bins;
common_access_counts = rd_common_counts;
}
*io_sz = 0;
if ((*total_io_size == 0) || (write_flag && (file->counters[CP_POSIX_WRITES] == 1)) ||
(!write_flag && (file->counters[CP_POSIX_READS] == 1)))
{
if (*total_io_size >= 0)
*io_sz = *total_io_size;
}
else if (all_common_flag)
if (!io_this_size_bin)
{
for (i = 0; i < 4; i++)
if (io_this_op < proc_count)
{
if (!write_flag && rd_common_accesses[i])
look_for_small_bin = 1;
for (i = 0; i < 10; i++)
{
*io_sz = common_accesses[i];
rd_common_accesses[i]--;
common_access_counts[i]--;
break;
}
else if (write_flag && wr_common_accesses[i])
{
*io_sz = common_accesses[i];
wr_common_accesses[i]--;
common_access_counts[i]--;
break;
if (size_bins[i] % proc_count)
{
if (!io_this_size_bin)
{
size_bin_ndx = i;
io_this_size_bin = MIN(size_bins[i] % proc_count, io_this_op);
}
else if ((size_bins[i] % proc_count) < io_this_size_bin)
{
size_bin_ndx = i;
io_this_size_bin = size_bins[i] % proc_count;
}
}
}
}
if (*io_sz == 0)
else
{
for (i = 0; i < 4; i++)
for (i = 0; i < 10; i++)
{
if (write_flag)
if (size_bins[i] && ((size_bins[i] % proc_count) == 0))
{
wr_common_accesses[i] = (common_access_counts[i] / io_cycles);
if ((*io_sz == 0) && wr_common_accesses[i])
if (!io_this_size_bin ||
(io_this_size_bin && (size_bins[i] > size_bins[size_bin_ndx])))
{
*io_sz = common_accesses[i];
wr_common_accesses[i]--;
common_access_counts[i]--;
size_bin_ndx = i;
io_this_size_bin = proc_count;
}
}
else
}
}
if (!io_this_size_bin)
{
for (i = 0; i < 10; i++)
{
if (size_bins[i])
{
rd_common_accesses[i] = (common_access_counts[i] / io_cycles);
if ((*io_sz == 0) && rd_common_accesses[i])
size_bin_ndx = i;
io_this_size_bin = size_bins[i];
if (io_this_size_bin > io_this_op)
{
*io_sz = common_accesses[i];
rd_common_accesses[i]--;
common_access_counts[i]--;
io_this_size_bin = io_this_op;
}
break;
}
}
}
assert(*io_sz);
}
else
*io_sz = 0;
if (*total_io_size > 0)
{
/* try to assign a common access first */
for (i = 0; i < 10; i++)
if ((write_flag && (file->counters[CP_POSIX_WRITES] == 1)) ||
(!write_flag && (file->counters[CP_POSIX_READS] == 1)))
{
*io_sz = *total_io_size;
}
else
{
/* try to assign a common access first (intelligently) */
for (j = 0; j < 4; j++)
{
if (size_bins[i] && common_access_counts[j] &&
IO_IS_IN_SIZE_BIN_RANGE(common_accesses[j], i, bin_min_size))
if (common_access_counts[j] &&
IO_IS_IN_SIZE_BIN_RANGE(common_accesses[j], size_bin_ndx, bin_min_size))
{
*io_sz = common_accesses[j];
common_access_counts[j]--;
break;
if (look_for_small_bin)
{
if (common_access_counts[j] == io_this_op)
{
*io_sz = common_accesses[j];
common_access_counts[j]--;
break;
}
}
else
{
*io_sz = common_accesses[j];
common_access_counts[j]--;
break;
}
}
}
if (*io_sz)
break;
}
/* if no common accesses left, then assign a random io size */
if (*io_sz == 0)
{
size_bin_ndx = rand() % 10;
for (i = 0; i < 10; i++)
{
if (size_bins[size_bin_ndx])
if ((j == 3) && look_for_small_bin)
{
*io_sz = bin_def_size[size_bin_ndx];
break;
j = 0;
look_for_small_bin = 0;
}
size_bin_ndx = (size_bin_ndx + 1) % 10;
}
/* if no common accesses left, then assign default size for this bin */
if (*io_sz == 0)
*io_sz = bin_def_size[size_bin_ndx];
}
assert(*io_sz);
}
*total_io_size -= *io_sz;
for (i = 0; i < 10; i++)
{
if (IO_IS_IN_SIZE_BIN_RANGE(*io_sz, i, bin_min_size))
size_bins[i]--;
}
size_bins[size_bin_ndx]--;
io_this_size_bin--;
/* next, determine the offset to use */
......@@ -1377,39 +1381,32 @@ static void determine_io_params(
{
*io_off = last_io_byte + 1;
}
else if (write_flag && seq_wr_flag)
else
{
if ((next_wr_off + *io_sz) > (last_io_byte + 1))
next_wr_off = 0;
if (write_flag)
{
if ((next_wr_off + *io_sz) > (last_io_byte + 1))
next_wr_off = 0;
*io_off = next_wr_off;
next_wr_off += *io_sz;
}
else if (!write_flag && seq_rd_flag)
{
if ((next_rd_off + *io_sz) > (last_io_byte + 1))
next_rd_off = 0;
*io_off = next_wr_off;
next_wr_off += *io_sz;
}
else
{
if ((next_rd_off + *io_sz) > (last_io_byte + 1))
next_rd_off = 0;
*io_off = next_rd_off;
next_rd_off += *io_sz;
}
else if (*io_sz < last_io_byte)
{
*io_off = (off_t)rand() % (last_io_byte - *io_sz);
}
else
{
*io_off = 0;
*io_off = next_rd_off;
next_rd_off += *io_sz;
}
}
/* reset static variable if this is the last i/o op for this file */
if ((file->counters[CP_POSIX_READS] + file->counters[CP_POSIX_WRITES]) == 1)
{
io_this_size_bin = 0;
size_bin_ndx = -1;
next_rd_off = next_wr_off = 0;
seq_wr_flag = seq_rd_flag = -1;
all_common_flag = -1;
for (i = 0; i < 4; i++)
rd_common_accesses[i] = wr_common_accesses[i] = 0;
}
return;
......
......@@ -24,6 +24,8 @@
#include "codes/quickhash.h"
#include "codes/configuration.h"
#define WORKLOAD_PRINT 0
/* hash table entry for looking up file descriptor of a workload file id */
struct file_info
{
......@@ -136,8 +138,9 @@ int load_workload(char *conf_path, int rank)
configuration_get_value(&config, "PARAMS", "aggregator_count", aggregator_count, 10);
d_params.aggregator_cnt = atoi(aggregator_count);
#if WORKLOAD_PRINT
d_params.stream = NULL;
#if 0
#else
d_params.stream = log_stream;
opt_verbose = 0;
#endif
......@@ -274,9 +277,11 @@ int replay_workload_op(struct codes_workload_op replay_op, int rank, long long i
switch (replay_op.op_type)
{
case CODES_WK_DELAY:
#if WORKLOAD_PRINT
if (opt_verbose)
fprintf(log_stream, "[Rank %d] Operation %lld : DELAY %lf seconds\n",
rank, op_number, replay_op.u.delay.seconds);
#endif
if (!opt_noop)
{
......@@ -303,8 +308,10 @@ int replay_workload_op(struct codes_workload_op replay_op, int rank, long long i
}
return 0;
case CODES_WK_BARRIER:
#if WORKLOAD_PRINT
if (opt_verbose)
fprintf(log_stream, "[Rank %d] Operation %lld : BARRIER\n", rank, op_number);
#endif
if (!opt_noop)
{
......@@ -320,9 +327,11 @@ int replay_workload_op(struct codes_workload_op replay_op, int rank, long long i
}
return 0;
case CODES_WK_OPEN:
#if WORKLOAD_PRINT
if (opt_verbose)
fprintf(log_stream, "[Rank %d] Operation %lld: %s file %"PRIu64"\n", rank, op_number,
(replay_op.u.open.create_flag) ? "CREATE" : "OPEN", replay_op.u.open.file_id);
#endif
if (!opt_noop)
{
......@@ -357,9 +366,11 @@ int replay_workload_op(struct codes_workload_op replay_op, int rank, long long i
}
return 0;
case CODES_WK_CLOSE:
#if WORKLOAD_PRINT
if (opt_verbose)
fprintf(log_stream, "[Rank %d] Operation %lld : CLOSE file %"PRIu64"\n",
rank, op_number, replay_op.u.close.file_id);
#endif
if (!opt_noop)
{
......@@ -381,11 +392,13 @@ int replay_workload_op(struct codes_workload_op replay_op, int rank, long long i
}
return 0;
case CODES_WK_WRITE:
#if WORKLOAD_PRINT
if (opt_verbose)
fprintf(log_stream, "[Rank %d] Operation %lld : WRITE file %"PRIu64" (sz = %"PRId64
", off = %"PRId64")\n",
rank, op_number, replay_op.u.write.file_id, replay_op.u.write.size,
replay_op.u.write.offset);
#endif
if (!opt_noop)
{
......@@ -413,11 +426,12 @@ int replay_workload_op(struct codes_workload_op replay_op, int rank, long long i
}
return 0;
case CODES_WK_READ:
#if WORKLOAD_PRINT
if (opt_verbose)
fprintf(log_stream, "[Rank %d] Operation %lld : READ file %"PRIu64" (sz = %"PRId64
", off = %"PRId64")\n", rank, op_number, replay_op.u.read.file_id,
replay_op.u.read.size, replay_op.u.read.offset);
#endif
if (!opt_noop)
{
/* search for the corresponding file descriptor in the hash table */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment