Commit aa46bf39 authored by Shane Snyder's avatar Shane Snyder

add time/size variance reductions to mpiio/posix

parent 4e6d0c89
......@@ -115,6 +115,10 @@
/* total i/o and meta time for fastest/slowest ranks */\
X(MPIIO_F_FASTEST_RANK_TIME) \
X(MPIIO_F_SLOWEST_RANK_TIME) \
/* variance of total i/o time and bytes moved across all ranks */\
/* NOTE: for shared records only */\
X(MPIIO_F_VARIANCE_RANK_TIME) \
X(MPIIO_F_VARIANCE_RANK_BYTES) \
/* end of counters*/\
X(MPIIO_F_NUM_INDICES)
......
......@@ -8,11 +8,6 @@
#include "darshan-log-format.h"
/* TODO we need to be able to run more reduction operations to get
* time and byte variances for shared files. currently, darshan-core
* just runs a single reduction, which is used to reduce all other
* shared record fields. (VARIANCE_RANK_TIME, VARIANCE_RANK_BYTES) */
#define POSIX_COUNTERS \
/* count of posix opens */\
X(POSIX_OPENS) \
......@@ -144,6 +139,10 @@
/* total i/o and meta time consumed for fastest/slowest ranks */\
X(POSIX_F_FASTEST_RANK_TIME) \
X(POSIX_F_SLOWEST_RANK_TIME) \
/* variance of total i/o time and bytes moved across all ranks */\
/* NOTE: for shared records only */\
X(POSIX_F_VARIANCE_RANK_TIME) \
X(POSIX_F_VARIANCE_RANK_BYTES) \
/* end of counters */\
X(POSIX_F_NUM_INDICES)
......
......@@ -118,6 +118,14 @@ enum darshan_io_type
DARSHAN_IO_WRITE = 2,
};
/* struct used for calculating variances */
struct darshan_variance_dt
{
double n;
double T;
double S;
};
/***********************************************
* darshan-common functions for darshan modules *
***********************************************/
......@@ -164,4 +172,21 @@ void darshan_walk_common_vals(
int64_t* val_p,
int64_t* cnt_p);
/* darshan_variance_reduce()
*
* MPI reduction operation to calculate variances on counters in
* data records which are shared across all processes. This could
* be used, for instance, to find the variance in I/O time or total
* bytes moved for a given data record. This function needs to be
* passed to MPI_Op_create to obtain a corresponding MPI operation
* which can be used to complete the reduction. For more details,
* consult the documentation for MPI_Op_create. Example use cases
* can be found in the POSIX and MPIIO modules.
*/
void darshan_variance_reduce(
void *invec,
void *inoutvec,
int *len,
MPI_Datatype *dt);
#endif /* __DARSHAN_COMMON_H */
......@@ -173,6 +173,27 @@ static int darshan_common_val_compare(const void* a_p, const void* b_p)
return(0);
}
void darshan_variance_reduce(void *invec, void *inoutvec, int *len,
MPI_Datatype *dt)
{
int i;
struct darshan_variance_dt *X = invec;
struct darshan_variance_dt *Y = inoutvec;
struct darshan_variance_dt Z;
for (i=0; i<*len; i++,X++,Y++)
{
Z.n = X->n + Y->n;
Z.T = X->T + Y->T;
Z.S = X->S + Y->S + (X->n/(Y->n*Z.n)) *
((Y->n/X->n)*X->T - Y->T) * ((Y->n/X->n)*X->T - Y->T);
*Y = Z;
}
return;
}
/*
* Local variables:
* c-indent-level: 4
......
......@@ -122,6 +122,9 @@ static void mpiio_file_close_fh(MPI_File fh);
static int mpiio_record_compare(const void* a, const void* b);
static void mpiio_record_reduction_op(void* infile_v, void* inoutfile_v,
int *len, MPI_Datatype *datatype);
static void mpiio_shared_record_variance(MPI_Comm mod_comm,
struct darshan_mpiio_file *inrec_array, struct darshan_mpiio_file *outrec_array,
int shared_rec_count);
static void mpiio_begin_shutdown(void);
static void mpiio_get_output_data(MPI_Comm mod_comm, darshan_record_id *shared_recs,
......@@ -1196,6 +1199,88 @@ static void mpiio_record_reduction_op(
return;
}
static void mpiio_shared_record_variance(MPI_Comm mod_comm,
struct darshan_mpiio_file *inrec_array, struct darshan_mpiio_file *outrec_array,
int shared_rec_count)
{
MPI_Datatype var_dt;
MPI_Op var_op;
int i;
struct darshan_variance_dt *var_send_buf = NULL;
struct darshan_variance_dt *var_recv_buf = NULL;
DARSHAN_MPI_CALL(PMPI_Type_contiguous)(sizeof(struct darshan_variance_dt),
MPI_BYTE, &var_dt);
DARSHAN_MPI_CALL(PMPI_Type_commit)(&var_dt);
DARSHAN_MPI_CALL(PMPI_Op_create)(darshan_variance_reduce, 1, &var_op);
var_send_buf = malloc(shared_rec_count * sizeof(struct darshan_variance_dt));
if(!var_send_buf)
return;
if(my_rank == 0)
{
var_recv_buf = malloc(shared_rec_count * sizeof(struct darshan_variance_dt));
if(!var_recv_buf)
return;
}
/* get total i/o time variances for shared records */
for(i=0; i<shared_rec_count; i++)
{
var_send_buf[i].n = 1;
var_send_buf[i].S = 0;
var_send_buf[i].T = inrec_array[i].fcounters[MPIIO_F_READ_TIME] +
inrec_array[i].fcounters[MPIIO_F_WRITE_TIME] +
inrec_array[i].fcounters[MPIIO_F_META_TIME];
}
DARSHAN_MPI_CALL(PMPI_Reduce)(var_send_buf, var_recv_buf, shared_rec_count,
var_dt, var_op, 0, mod_comm);
if(my_rank == 0)
{
for(i=0; i<shared_rec_count; i++)
{
outrec_array[i].fcounters[MPIIO_F_VARIANCE_RANK_TIME] =
(var_recv_buf[i].S / var_recv_buf[i].n);
}
}
/* get total bytes moved variances for shared records */
for(i=0; i<shared_rec_count; i++)
{
var_send_buf[i].n = 1;
var_send_buf[i].S = 0;
var_send_buf[i].T = (double)
inrec_array[i].counters[MPIIO_BYTES_READ] +
inrec_array[i].counters[MPIIO_BYTES_WRITTEN];
}
DARSHAN_MPI_CALL(PMPI_Reduce)(var_send_buf, var_recv_buf, shared_rec_count,
var_dt, var_op, 0, mod_comm);
if(my_rank == 0)
{
for(i=0; i<shared_rec_count; i++)
{
outrec_array[i].fcounters[MPIIO_F_VARIANCE_RANK_BYTES] =
(var_recv_buf[i].S / var_recv_buf[i].n);
}
}
DARSHAN_MPI_CALL(PMPI_Type_free)(&var_dt);
DARSHAN_MPI_CALL(PMPI_Op_free)(&var_op);
free(var_send_buf);
free(var_recv_buf);
return;
}
/**************************************************************************
* Functions exported by MPI-IO module for coordinating with darshan-core *
**************************************************************************/
......@@ -1312,6 +1397,10 @@ static void mpiio_get_output_data(
DARSHAN_MPI_CALL(PMPI_Reduce)(red_send_buf, red_recv_buf,
shared_rec_count, red_type, red_op, 0, mod_comm);
/* get the time and byte variances for shared files */
mpiio_shared_record_variance(mod_comm, red_send_buf, red_recv_buf,
shared_rec_count);
/* clean up reduction state */
if(my_rank == 0)
{
......
......@@ -189,11 +189,14 @@ static struct posix_file_runtime* posix_file_by_name(const char *name);
static struct posix_file_runtime* posix_file_by_name_setfd(const char* name, int fd);
static struct posix_file_runtime* posix_file_by_fd(int fd);
static void posix_file_close_fd(int fd);
static int posix_record_compare(const void* a, const void* b);
static void posix_aio_tracker_add(int fd, void *aiocbp);
static struct posix_aio_tracker* posix_aio_tracker_del(int fd, void *aiocbp);
static int posix_record_compare(const void* a, const void* b);
static void posix_record_reduction_op(void* infile_v, void* inoutfile_v,
int *len, MPI_Datatype *datatype);
static void posix_shared_record_variance(MPI_Comm mod_comm,
struct darshan_posix_file *inrec_array, struct darshan_posix_file *outrec_array,
int shared_rec_count);
static void posix_begin_shutdown(void);
static void posix_get_output_data(MPI_Comm mod_comm, darshan_record_id *shared_recs,
......@@ -1646,27 +1649,6 @@ static int posix_record_compare(const void* a_p, const void* b_p)
return 0;
}
/* adds a tracker for the given aio operation */
static void posix_aio_tracker_add(int fd, void *aiocbp)
{
struct posix_aio_tracker* tracker;
struct posix_file_runtime* file;
file = posix_file_by_fd(fd);
if (file)
{
tracker = malloc(sizeof(*tracker));
if (tracker)
{
tracker->tm1 = darshan_core_wtime();
tracker->aiocbp = aiocbp;
LL_PREPEND(file->aio_list, tracker);
}
}
return;
}
/* finds the tracker structure for a given aio operation, removes it from
* the linked list for the darshan_file structure, and returns a pointer.
*
......@@ -1694,11 +1676,29 @@ static struct posix_aio_tracker* posix_aio_tracker_del(int fd, void *aiocbp)
return(tracker);
}
static void posix_record_reduction_op(
void* infile_v,
void* inoutfile_v,
int *len,
MPI_Datatype *datatype)
/* adds a tracker for the given aio operation */
static void posix_aio_tracker_add(int fd, void *aiocbp)
{
struct posix_aio_tracker* tracker;
struct posix_file_runtime* file;
file = posix_file_by_fd(fd);
if (file)
{
tracker = malloc(sizeof(*tracker));
if (tracker)
{
tracker->tm1 = darshan_core_wtime();
tracker->aiocbp = aiocbp;
LL_PREPEND(file->aio_list, tracker);
}
}
return;
}
static void posix_record_reduction_op(void* infile_v, void* inoutfile_v,
int *len, MPI_Datatype *datatype)
{
struct darshan_posix_file tmp_file;
struct darshan_posix_file *infile = infile_v;
......@@ -1927,6 +1927,88 @@ static void posix_record_reduction_op(
return;
}
static void posix_shared_record_variance(MPI_Comm mod_comm,
struct darshan_posix_file *inrec_array, struct darshan_posix_file *outrec_array,
int shared_rec_count)
{
MPI_Datatype var_dt;
MPI_Op var_op;
int i;
struct darshan_variance_dt *var_send_buf = NULL;
struct darshan_variance_dt *var_recv_buf = NULL;
DARSHAN_MPI_CALL(PMPI_Type_contiguous)(sizeof(struct darshan_variance_dt),
MPI_BYTE, &var_dt);
DARSHAN_MPI_CALL(PMPI_Type_commit)(&var_dt);
DARSHAN_MPI_CALL(PMPI_Op_create)(darshan_variance_reduce, 1, &var_op);
var_send_buf = malloc(shared_rec_count * sizeof(struct darshan_variance_dt));
if(!var_send_buf)
return;
if(my_rank == 0)
{
var_recv_buf = malloc(shared_rec_count * sizeof(struct darshan_variance_dt));
if(!var_recv_buf)
return;
}
/* get total i/o time variances for shared records */
for(i=0; i<shared_rec_count; i++)
{
var_send_buf[i].n = 1;
var_send_buf[i].S = 0;
var_send_buf[i].T = inrec_array[i].fcounters[POSIX_F_READ_TIME] +
inrec_array[i].fcounters[POSIX_F_WRITE_TIME] +
inrec_array[i].fcounters[POSIX_F_META_TIME];
}
DARSHAN_MPI_CALL(PMPI_Reduce)(var_send_buf, var_recv_buf, shared_rec_count,
var_dt, var_op, 0, mod_comm);
if(my_rank == 0)
{
for(i=0; i<shared_rec_count; i++)
{
outrec_array[i].fcounters[POSIX_F_VARIANCE_RANK_TIME] =
(var_recv_buf[i].S / var_recv_buf[i].n);
}
}
/* get total bytes moved variances for shared records */
for(i=0; i<shared_rec_count; i++)
{
var_send_buf[i].n = 1;
var_send_buf[i].S = 0;
var_send_buf[i].T = (double)
inrec_array[i].counters[POSIX_BYTES_READ] +
inrec_array[i].counters[POSIX_BYTES_WRITTEN];
}
DARSHAN_MPI_CALL(PMPI_Reduce)(var_send_buf, var_recv_buf, shared_rec_count,
var_dt, var_op, 0, mod_comm);
if(my_rank == 0)
{
for(i=0; i<shared_rec_count; i++)
{
outrec_array[i].fcounters[POSIX_F_VARIANCE_RANK_BYTES] =
(var_recv_buf[i].S / var_recv_buf[i].n);
}
}
DARSHAN_MPI_CALL(PMPI_Type_free)(&var_dt);
DARSHAN_MPI_CALL(PMPI_Op_free)(&var_op);
free(var_send_buf);
free(var_recv_buf);
return;
}
/************************************************************************
* Functions exported by this module for coordinating with darshan-core *
************************************************************************/
......@@ -1954,8 +2036,8 @@ static void posix_get_output_data(
struct posix_file_runtime *tmp;
int i;
double posix_time;
void *red_send_buf = NULL;
void *red_recv_buf = NULL;
struct darshan_posix_file *red_send_buf = NULL;
struct darshan_posix_file *red_recv_buf = NULL;
MPI_Datatype red_type;
MPI_Op red_op;
......@@ -2049,6 +2131,10 @@ static void posix_get_output_data(
DARSHAN_MPI_CALL(PMPI_Reduce)(red_send_buf, red_recv_buf,
shared_rec_count, red_type, red_op, 0, mod_comm);
/* get the time and byte variances for shared files */
posix_shared_record_variance(mod_comm, red_send_buf, red_recv_buf,
shared_rec_count);
/* clean up reduction state */
if(my_rank == 0)
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment