Commit 3d7cc3c6 authored by Philip Carns's avatar Philip Carns

implement shared reduction for stdio records

parent 5fd3c5c1
......@@ -5,7 +5,6 @@
*/
/* TODO list (general) for this module:
* - implement reduction operator
* - add stdio page to darshan-job-summary
* - add regression test cases for all functions captured here
* - especially the scanf and printf variants with variable arguments
......@@ -220,6 +219,9 @@ static void stdio_begin_shutdown(void);
static void stdio_get_output_data(MPI_Comm mod_comm, darshan_record_id *shared_recs,
int shared_rec_count, void **stdio_buf, int *stdio_buf_sz);
static void stdio_shutdown(void);
static void stdio_record_reduction_op(void* infile_v, void* inoutfile_v,
int *len, MPI_Datatype *datatype);
static int stdio_record_compare(const void* a, const void* b);
#define STDIO_LOCK() pthread_mutex_lock(&stdio_runtime_mutex)
#define STDIO_UNLOCK() pthread_mutex_unlock(&stdio_runtime_mutex)
......@@ -1152,14 +1154,168 @@ static void stdio_get_output_data(
void **stdio_buf,
int *stdio_buf_sz)
{
struct stdio_file_runtime *file;
int i;
struct darshan_stdio_record *red_send_buf = NULL;
struct darshan_stdio_record *red_recv_buf = NULL;
MPI_Datatype red_type;
MPI_Op red_op;
assert(stdio_runtime);
/* if there are globally shared files, do a shared file reduction */
/* NOTE: the shared file reduction is also skipped if the
* DARSHAN_DISABLE_SHARED_REDUCTION environment variable is set.
*/
if(shared_rec_count && !getenv("DARSHAN_DISABLE_SHARED_REDUCTION"))
{
/* necessary initialization of shared records */
for(i = 0; i < shared_rec_count; i++)
{
HASH_FIND(hlink, stdio_runtime->file_hash, &shared_recs[i],
sizeof(darshan_record_id), file);
assert(file);
file->file_record->rank = -1;
}
/* sort the array of files descending by rank so that we get all of the
* shared files (marked by rank -1) in a contiguous portion at end
* of the array
*/
qsort(stdio_runtime->file_record_array, stdio_runtime->file_array_ndx,
sizeof(struct darshan_stdio_record), stdio_record_compare);
/* make *send_buf point to the shared files at the end of sorted array */
red_send_buf =
&(stdio_runtime->file_record_array[stdio_runtime->file_array_ndx-shared_rec_count]);
/* allocate memory for the reduction output on rank 0 */
if(my_rank == 0)
{
red_recv_buf = malloc(shared_rec_count * sizeof(struct darshan_stdio_record));
if(!red_recv_buf)
{
return;
}
}
/* construct a datatype for a STDIO file record. This is serving no purpose
* except to make sure we can do a reduction on proper boundaries
*/
DARSHAN_MPI_CALL(PMPI_Type_contiguous)(sizeof(struct darshan_stdio_record),
MPI_BYTE, &red_type);
DARSHAN_MPI_CALL(PMPI_Type_commit)(&red_type);
/* register a STDIO file record reduction operator */
DARSHAN_MPI_CALL(PMPI_Op_create)(stdio_record_reduction_op, 1, &red_op);
/* reduce shared STDIO file records */
DARSHAN_MPI_CALL(PMPI_Reduce)(red_send_buf, red_recv_buf,
shared_rec_count, red_type, red_op, 0, mod_comm);
/* clean up reduction state */
if(my_rank == 0)
{
int tmp_ndx = stdio_runtime->file_array_ndx - shared_rec_count;
memcpy(&(stdio_runtime->file_record_array[tmp_ndx]), red_recv_buf,
shared_rec_count * sizeof(struct darshan_stdio_record));
free(red_recv_buf);
}
else
{
stdio_runtime->file_array_ndx -= shared_rec_count;
}
DARSHAN_MPI_CALL(PMPI_Type_free)(&red_type);
DARSHAN_MPI_CALL(PMPI_Op_free)(&red_op);
}
*stdio_buf = (void *)(stdio_runtime->file_record_array);
*stdio_buf_sz = stdio_runtime->file_array_ndx * sizeof(struct darshan_stdio_record);
return;
}
/* compare function for sorting file records by descending rank */
static int stdio_record_compare(const void* a_p, const void* b_p)
{
const struct darshan_stdio_record* a = a_p;
const struct darshan_stdio_record* b = b_p;
if(a->rank < b->rank)
return 1;
if(a->rank > b->rank)
return -1;
return 0;
}
static void stdio_record_reduction_op(void* infile_v, void* inoutfile_v,
int *len, MPI_Datatype *datatype)
{
struct darshan_stdio_record tmp_file;
struct darshan_stdio_record *infile = infile_v;
struct darshan_stdio_record *inoutfile = inoutfile_v;
int i, j;
assert(stdio_runtime);
for(i=0; i<*len; i++)
{
memset(&tmp_file, 0, sizeof(struct darshan_stdio_record));
tmp_file.f_id = infile->f_id;
tmp_file.rank = -1;
/* sum */
for(j=STDIO_OPENS; j<=STDIO_BYTES_READ; j++)
{
tmp_file.counters[j] = infile->counters[j] + inoutfile->counters[j];
}
/* max */
for(j=STDIO_MAX_BYTE_READ; j<=STDIO_MAX_BYTE_WRITTEN; j++)
{
if(infile->counters[j] > inoutfile->counters[j])
tmp_file.counters[j] = infile->counters[j];
else
tmp_file.counters[j] = inoutfile->counters[j];
}
/* sum */
for(j=STDIO_F_META_TIME; j<=STDIO_F_READ_TIME; j++)
{
tmp_file.fcounters[j] = infile->fcounters[j] + inoutfile->fcounters[j];
}
/* min non-zero (if available) value */
for(j=STDIO_F_OPEN_START_TIMESTAMP; j<=STDIO_F_READ_START_TIMESTAMP; j++)
{
if((infile->fcounters[j] < inoutfile->fcounters[j] &&
infile->fcounters[j] > 0) || inoutfile->fcounters[j] == 0)
tmp_file.fcounters[j] = infile->fcounters[j];
else
tmp_file.fcounters[j] = inoutfile->fcounters[j];
}
/* max */
for(j=STDIO_F_OPEN_END_TIMESTAMP; j<=STDIO_F_READ_END_TIMESTAMP; j++)
{
if(infile->fcounters[j] > inoutfile->fcounters[j])
tmp_file.fcounters[j] = infile->fcounters[j];
else
tmp_file.fcounters[j] = inoutfile->fcounters[j];
}
/* update pointers */
*inoutfile = tmp_file;
inoutfile++;
infile++;
}
return;
}
static void stdio_shutdown()
{
struct stdio_file_runtime_ref *ref, *tmp;
......
......@@ -13,48 +13,48 @@
#define STDIO_COUNTERS \
/* count of fopens */\
X(STDIO_OPENS) \
/* maximum byte (offset) written */\
X(STDIO_MAX_BYTE_WRITTEN) \
/* total bytes written */ \
X(STDIO_BYTES_WRITTEN) \
/* number of writes */ \
X(STDIO_WRITES) \
/* maximum byte (offset) read */\
X(STDIO_MAX_BYTE_READ) \
/* total bytes read */ \
X(STDIO_BYTES_READ) \
/* number of reads */ \
X(STDIO_READS) \
/* number of writes */ \
X(STDIO_WRITES) \
/* count of seeks */\
X(STDIO_SEEKS) \
/* count of flushes */\
X(STDIO_FLUSHES) \
/* total bytes written */ \
X(STDIO_BYTES_WRITTEN) \
/* total bytes read */ \
X(STDIO_BYTES_READ) \
/* maximum byte (offset) read */\
X(STDIO_MAX_BYTE_READ) \
/* maximum byte (offset) written */\
X(STDIO_MAX_BYTE_WRITTEN) \
/* end of counters */\
X(STDIO_NUM_INDICES)
#define STDIO_F_COUNTERS \
/* cumulative meta time */\
X(STDIO_F_META_TIME) \
/* cumulative write time */\
X(STDIO_F_WRITE_TIME) \
/* cumulative read time */\
X(STDIO_F_READ_TIME) \
/* timestamp of first open */\
X(STDIO_F_OPEN_START_TIMESTAMP) \
/* timestamp of last open completion */\
X(STDIO_F_OPEN_END_TIMESTAMP) \
/* timestamp of first close */\
X(STDIO_F_CLOSE_START_TIMESTAMP) \
/* timestamp of last close completion */\
X(STDIO_F_CLOSE_END_TIMESTAMP) \
/* timestamp of first write */\
X(STDIO_F_WRITE_START_TIMESTAMP) \
/* timestamp of last write completion */\
X(STDIO_F_WRITE_END_TIMESTAMP) \
/* timestamp of first read */\
X(STDIO_F_READ_START_TIMESTAMP) \
/* timestamp of last open completion */\
X(STDIO_F_OPEN_END_TIMESTAMP) \
/* timestamp of last close completion */\
X(STDIO_F_CLOSE_END_TIMESTAMP) \
/* timestamp of last write completion */\
X(STDIO_F_WRITE_END_TIMESTAMP) \
/* timestamp of last read completion */\
X(STDIO_F_READ_END_TIMESTAMP) \
/* cumulative meta time */\
X(STDIO_F_META_TIME) \
/* cumulative write time */\
X(STDIO_F_WRITE_TIME) \
/* cumulative read time */\
X(STDIO_F_READ_TIME) \
/* end of counters */\
X(STDIO_F_NUM_INDICES)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment