Commit d4a3d112 authored by Shane Snyder's avatar Shane Snyder
Browse files

add variance counter reduction logic to logutils

parent f26baefc
...@@ -11,8 +11,6 @@ ...@@ -11,8 +11,6 @@
#define DEF_MOD_BUF_SIZE 1024 /* 1 KiB is enough for all current mod records ... */ #define DEF_MOD_BUF_SIZE 1024 /* 1 KiB is enough for all current mod records ... */
/* TODO: set job end timestamp? */
struct darshan_shared_record_ref struct darshan_shared_record_ref
{ {
darshan_record_id id; darshan_record_id id;
...@@ -97,7 +95,7 @@ int build_mod_shared_rec_hash(char **infile_list, int n_infiles, ...@@ -97,7 +95,7 @@ int build_mod_shared_rec_hash(char **infile_list, int n_infiles,
darshan_fd in_fd; darshan_fd in_fd;
struct darshan_base_record *base_rec; struct darshan_base_record *base_rec;
struct darshan_shared_record_ref *ref, *tmp; struct darshan_shared_record_ref *ref, *tmp;
int init = 0; int init_rank = -1;
int ret; int ret;
int i; int i;
...@@ -116,9 +114,11 @@ int build_mod_shared_rec_hash(char **infile_list, int n_infiles, ...@@ -116,9 +114,11 @@ int build_mod_shared_rec_hash(char **infile_list, int n_infiles,
while((ret = mod_logutils[mod_id]->log_get_record(in_fd, mod_buf)) == 1) while((ret = mod_logutils[mod_id]->log_get_record(in_fd, mod_buf)) == 1)
{ {
base_rec = (struct darshan_base_record *)mod_buf; base_rec = (struct darshan_base_record *)mod_buf;
if(init_rank == -1)
init_rank = base_rec->rank;
/* initialize the hash with the first rank's records */ /* initialize the hash with the first rank's records */
if(!init) if(base_rec->rank == init_rank)
{ {
struct darshan_base_record *agg_base; struct darshan_base_record *agg_base;
...@@ -140,10 +140,9 @@ int build_mod_shared_rec_hash(char **infile_list, int n_infiles, ...@@ -140,10 +140,9 @@ int build_mod_shared_rec_hash(char **infile_list, int n_infiles,
ref->id = base_rec->id; ref->id = base_rec->id;
ref->ref_cnt = 1; ref->ref_cnt = 1;
HASH_ADD(hlink, *shared_rec_hash, id, sizeof(darshan_record_id), ref); HASH_ADD(hlink, *shared_rec_hash, id, sizeof(darshan_record_id), ref);
init = 1;
} }
else else
{ {
/* search for this record in shared record hash */ /* search for this record in shared record hash */
HASH_FIND(hlink, *shared_rec_hash, &(base_rec->id), HASH_FIND(hlink, *shared_rec_hash, &(base_rec->id),
sizeof(darshan_record_id), ref); sizeof(darshan_record_id), ref);
......
...@@ -224,6 +224,14 @@ static void darshan_log_print_mpiio_file_diff(void *file_rec1, char *file_name1, ...@@ -224,6 +224,14 @@ static void darshan_log_print_mpiio_file_diff(void *file_rec1, char *file_name1,
return; return;
} }
/* simple helper struct for determining time & byte variances */
struct var_t
{
double n;
double M;
double S;
};
static void darshan_log_agg_mpiio_files(void *rec, void *agg_rec, int init_flag) static void darshan_log_agg_mpiio_files(void *rec, void *agg_rec, int init_flag)
{ {
struct darshan_mpiio_file *mpi_rec = (struct darshan_mpiio_file *)rec; struct darshan_mpiio_file *mpi_rec = (struct darshan_mpiio_file *)rec;
...@@ -232,31 +240,16 @@ static void darshan_log_agg_mpiio_files(void *rec, void *agg_rec, int init_flag) ...@@ -232,31 +240,16 @@ static void darshan_log_agg_mpiio_files(void *rec, void *agg_rec, int init_flag)
int set; int set;
int min_ndx; int min_ndx;
int64_t min; int64_t min;
double old_M;
double mpi_time = mpi_rec->fcounters[MPIIO_F_READ_TIME] + double mpi_time = mpi_rec->fcounters[MPIIO_F_READ_TIME] +
mpi_rec->fcounters[MPIIO_F_WRITE_TIME] + mpi_rec->fcounters[MPIIO_F_WRITE_TIME] +
mpi_rec->fcounters[MPIIO_F_META_TIME]; mpi_rec->fcounters[MPIIO_F_META_TIME];
double mpi_bytes = (double)mpi_rec->counters[MPIIO_BYTES_READ] +
/* special case initialization of shared record for mpi_rec->counters[MPIIO_BYTES_WRITTEN];
* first call of this function struct var_t *var_time_p = (struct var_t *)
*/ ((char *)rec + sizeof(struct darshan_mpiio_file));
if(init_flag) struct var_t *var_bytes_p = (struct var_t *)
{ ((char *)var_time_p + sizeof(struct var_t));
/* set fastest/slowest rank counters according to root rank.
* these counters will be determined as the aggregation progresses.
*/
agg_mpi_rec->counters[MPIIO_FASTEST_RANK] = mpi_rec->base_rec.rank;
agg_mpi_rec->counters[MPIIO_FASTEST_RANK_BYTES] =
mpi_rec->counters[MPIIO_BYTES_READ] +
mpi_rec->counters[MPIIO_BYTES_WRITTEN];
agg_mpi_rec->fcounters[MPIIO_F_FASTEST_RANK_TIME] = mpi_time;
agg_mpi_rec->counters[MPIIO_SLOWEST_RANK] =
agg_mpi_rec->counters[MPIIO_FASTEST_RANK];
agg_mpi_rec->counters[MPIIO_SLOWEST_RANK_BYTES] =
agg_mpi_rec->counters[MPIIO_FASTEST_RANK_BYTES];
agg_mpi_rec->fcounters[MPIIO_F_SLOWEST_RANK_TIME] =
agg_mpi_rec->fcounters[MPIIO_F_FASTEST_RANK_TIME];
}
for(i = 0; i < MPIIO_NUM_INDICES; i++) for(i = 0; i < MPIIO_NUM_INDICES; i++)
{ {
...@@ -403,27 +396,80 @@ static void darshan_log_agg_mpiio_files(void *rec, void *agg_rec, int init_flag) ...@@ -403,27 +396,80 @@ static void darshan_log_agg_mpiio_files(void *rec, void *agg_rec, int init_flag)
} }
break; break;
case MPIIO_F_FASTEST_RANK_TIME: case MPIIO_F_FASTEST_RANK_TIME:
if(init_flag)
{
/* set fastest rank counters according to root rank. these counters
* will be determined as the aggregation progresses.
*/
agg_mpi_rec->counters[MPIIO_FASTEST_RANK] = mpi_rec->base_rec.rank;
agg_mpi_rec->counters[MPIIO_FASTEST_RANK_BYTES] = mpi_bytes;
agg_mpi_rec->fcounters[MPIIO_F_FASTEST_RANK_TIME] = mpi_time;
}
if(mpi_time < agg_mpi_rec->fcounters[MPIIO_F_FASTEST_RANK_TIME]) if(mpi_time < agg_mpi_rec->fcounters[MPIIO_F_FASTEST_RANK_TIME])
{ {
agg_mpi_rec->counters[MPIIO_FASTEST_RANK] = mpi_rec->base_rec.rank; agg_mpi_rec->counters[MPIIO_FASTEST_RANK] = mpi_rec->base_rec.rank;
agg_mpi_rec->counters[MPIIO_FASTEST_RANK_BYTES] = agg_mpi_rec->counters[MPIIO_FASTEST_RANK_BYTES] = mpi_bytes;
mpi_rec->counters[MPIIO_BYTES_READ] +
mpi_rec->counters[MPIIO_BYTES_WRITTEN];
agg_mpi_rec->fcounters[MPIIO_F_FASTEST_RANK_TIME] = mpi_time; agg_mpi_rec->fcounters[MPIIO_F_FASTEST_RANK_TIME] = mpi_time;
} }
break; break;
case MPIIO_F_SLOWEST_RANK_TIME: case MPIIO_F_SLOWEST_RANK_TIME:
if(init_flag)
{
/* set slowest rank counters according to root rank. these counters
* will be determined as the aggregation progresses.
*/
agg_mpi_rec->counters[MPIIO_SLOWEST_RANK] = mpi_rec->base_rec.rank;
agg_mpi_rec->counters[MPIIO_SLOWEST_RANK_BYTES] = mpi_bytes;
agg_mpi_rec->fcounters[MPIIO_F_SLOWEST_RANK_TIME] = mpi_time;
}
if(mpi_time > agg_mpi_rec->fcounters[MPIIO_F_SLOWEST_RANK_TIME]) if(mpi_time > agg_mpi_rec->fcounters[MPIIO_F_SLOWEST_RANK_TIME])
{ {
agg_mpi_rec->counters[MPIIO_SLOWEST_RANK] = mpi_rec->base_rec.rank; agg_mpi_rec->counters[MPIIO_SLOWEST_RANK] = mpi_rec->base_rec.rank;
agg_mpi_rec->counters[MPIIO_SLOWEST_RANK_BYTES] = agg_mpi_rec->counters[MPIIO_SLOWEST_RANK_BYTES] = mpi_bytes;
mpi_rec->counters[MPIIO_BYTES_READ] +
mpi_rec->counters[MPIIO_BYTES_WRITTEN];
agg_mpi_rec->fcounters[MPIIO_F_SLOWEST_RANK_TIME] = mpi_time; agg_mpi_rec->fcounters[MPIIO_F_SLOWEST_RANK_TIME] = mpi_time;
} }
break; break;
case MPIIO_F_VARIANCE_RANK_TIME:
if(init_flag)
{
var_time_p->n = 1;
var_time_p->M = mpi_time;
var_time_p->S = 0;
}
else
{
old_M = var_time_p->M;
var_time_p->n++;
var_time_p->M += (mpi_time - var_time_p->M) / var_time_p->n;
var_time_p->S += (mpi_time - var_time_p->M) * (mpi_time - old_M);
agg_mpi_rec->fcounters[MPIIO_F_VARIANCE_RANK_TIME] =
var_time_p->S / var_time_p->n;
}
break;
case MPIIO_F_VARIANCE_RANK_BYTES:
if(init_flag)
{
var_bytes_p->n = 1;
var_bytes_p->M = mpi_bytes;
var_bytes_p->S = 0;
}
else
{
old_M = var_bytes_p->M;
var_bytes_p->n++;
var_bytes_p->M += (mpi_bytes - var_bytes_p->M) / var_bytes_p->n;
var_bytes_p->S += (mpi_bytes - var_bytes_p->M) * (mpi_bytes - old_M);
agg_mpi_rec->fcounters[MPIIO_F_VARIANCE_RANK_BYTES] =
var_bytes_p->S / var_bytes_p->n;
}
break;
default: default:
/* TODO: variance */
agg_mpi_rec->fcounters[i] = -1; agg_mpi_rec->fcounters[i] = -1;
break; break;
} }
......
...@@ -225,6 +225,14 @@ static void darshan_log_print_posix_file_diff(void *file_rec1, char *file_name1, ...@@ -225,6 +225,14 @@ static void darshan_log_print_posix_file_diff(void *file_rec1, char *file_name1,
return; return;
} }
/* simple helper struct for determining time & byte variances */
struct var_t
{
double n;
double M;
double S;
};
static void darshan_log_agg_posix_files(void *rec, void *agg_rec, int init_flag) static void darshan_log_agg_posix_files(void *rec, void *agg_rec, int init_flag)
{ {
struct darshan_posix_file *psx_rec = (struct darshan_posix_file *)rec; struct darshan_posix_file *psx_rec = (struct darshan_posix_file *)rec;
...@@ -233,31 +241,16 @@ static void darshan_log_agg_posix_files(void *rec, void *agg_rec, int init_flag) ...@@ -233,31 +241,16 @@ static void darshan_log_agg_posix_files(void *rec, void *agg_rec, int init_flag)
int set; int set;
int min_ndx; int min_ndx;
int64_t min; int64_t min;
double old_M;
double psx_time = psx_rec->fcounters[POSIX_F_READ_TIME] + double psx_time = psx_rec->fcounters[POSIX_F_READ_TIME] +
psx_rec->fcounters[POSIX_F_WRITE_TIME] + psx_rec->fcounters[POSIX_F_WRITE_TIME] +
psx_rec->fcounters[POSIX_F_META_TIME]; psx_rec->fcounters[POSIX_F_META_TIME];
double psx_bytes = (double)psx_rec->counters[POSIX_BYTES_READ] +
/* special case initialization of shared record for psx_rec->counters[POSIX_BYTES_WRITTEN];
* first call of this function struct var_t *var_time_p = (struct var_t *)
*/ ((char *)rec + sizeof(struct darshan_posix_file));
if(init_flag) struct var_t *var_bytes_p = (struct var_t *)
{ ((char *)var_time_p + sizeof(struct var_t));
/* set fastest/slowest rank counters according to root rank.
* these counters will be determined as the aggregation progresses.
*/
agg_psx_rec->counters[POSIX_FASTEST_RANK] = psx_rec->base_rec.rank;
agg_psx_rec->counters[POSIX_FASTEST_RANK_BYTES] =
psx_rec->counters[POSIX_BYTES_READ] +
psx_rec->counters[POSIX_BYTES_WRITTEN];
agg_psx_rec->fcounters[POSIX_F_FASTEST_RANK_TIME] = psx_time;
agg_psx_rec->counters[POSIX_SLOWEST_RANK] =
agg_psx_rec->counters[POSIX_FASTEST_RANK];
agg_psx_rec->counters[POSIX_SLOWEST_RANK_BYTES] =
agg_psx_rec->counters[POSIX_FASTEST_RANK_BYTES];
agg_psx_rec->fcounters[POSIX_F_SLOWEST_RANK_TIME] =
agg_psx_rec->fcounters[POSIX_F_FASTEST_RANK_TIME];
}
for(i = 0; i < POSIX_NUM_INDICES; i++) for(i = 0; i < POSIX_NUM_INDICES; i++)
{ {
...@@ -427,27 +420,80 @@ static void darshan_log_agg_posix_files(void *rec, void *agg_rec, int init_flag) ...@@ -427,27 +420,80 @@ static void darshan_log_agg_posix_files(void *rec, void *agg_rec, int init_flag)
} }
break; break;
case POSIX_F_FASTEST_RANK_TIME: case POSIX_F_FASTEST_RANK_TIME:
if(init_flag)
{
/* set fastest rank counters according to root rank. these counters
* will be determined as the aggregation progresses.
*/
agg_psx_rec->counters[POSIX_FASTEST_RANK] = psx_rec->base_rec.rank;
agg_psx_rec->counters[POSIX_FASTEST_RANK_BYTES] = psx_bytes;
agg_psx_rec->fcounters[POSIX_F_FASTEST_RANK_TIME] = psx_time;
}
if(psx_time < agg_psx_rec->fcounters[POSIX_F_FASTEST_RANK_TIME]) if(psx_time < agg_psx_rec->fcounters[POSIX_F_FASTEST_RANK_TIME])
{ {
agg_psx_rec->counters[POSIX_FASTEST_RANK] = psx_rec->base_rec.rank; agg_psx_rec->counters[POSIX_FASTEST_RANK] = psx_rec->base_rec.rank;
agg_psx_rec->counters[POSIX_FASTEST_RANK_BYTES] = agg_psx_rec->counters[POSIX_FASTEST_RANK_BYTES] = psx_bytes;
psx_rec->counters[POSIX_BYTES_READ] +
psx_rec->counters[POSIX_BYTES_WRITTEN];
agg_psx_rec->fcounters[POSIX_F_FASTEST_RANK_TIME] = psx_time; agg_psx_rec->fcounters[POSIX_F_FASTEST_RANK_TIME] = psx_time;
} }
break; break;
case POSIX_F_SLOWEST_RANK_TIME: case POSIX_F_SLOWEST_RANK_TIME:
if(init_flag)
{
/* set slowest rank counters according to root rank. these counters
* will be determined as the aggregation progresses.
*/
agg_psx_rec->counters[POSIX_SLOWEST_RANK] = psx_rec->base_rec.rank;
agg_psx_rec->counters[POSIX_SLOWEST_RANK_BYTES] = psx_bytes;
agg_psx_rec->fcounters[POSIX_F_SLOWEST_RANK_TIME] = psx_time;
}
if(psx_time > agg_psx_rec->fcounters[POSIX_F_SLOWEST_RANK_TIME]) if(psx_time > agg_psx_rec->fcounters[POSIX_F_SLOWEST_RANK_TIME])
{ {
agg_psx_rec->counters[POSIX_SLOWEST_RANK] = psx_rec->base_rec.rank; agg_psx_rec->counters[POSIX_SLOWEST_RANK] = psx_rec->base_rec.rank;
agg_psx_rec->counters[POSIX_SLOWEST_RANK_BYTES] = agg_psx_rec->counters[POSIX_SLOWEST_RANK_BYTES] = psx_bytes;
psx_rec->counters[POSIX_BYTES_READ] +
psx_rec->counters[POSIX_BYTES_WRITTEN];
agg_psx_rec->fcounters[POSIX_F_SLOWEST_RANK_TIME] = psx_time; agg_psx_rec->fcounters[POSIX_F_SLOWEST_RANK_TIME] = psx_time;
} }
break; break;
case POSIX_F_VARIANCE_RANK_TIME:
if(init_flag)
{
var_time_p->n = 1;
var_time_p->M = psx_time;
var_time_p->S = 0;
}
else
{
old_M = var_time_p->M;
var_time_p->n++;
var_time_p->M += (psx_time - var_time_p->M) / var_time_p->n;
var_time_p->S += (psx_time - var_time_p->M) * (psx_time - old_M);
agg_psx_rec->fcounters[POSIX_F_VARIANCE_RANK_TIME] =
var_time_p->S / var_time_p->n;
}
break;
case POSIX_F_VARIANCE_RANK_BYTES:
if(init_flag)
{
var_bytes_p->n = 1;
var_bytes_p->M = psx_bytes;
var_bytes_p->S = 0;
}
else
{
old_M = var_bytes_p->M;
var_bytes_p->n++;
var_bytes_p->M += (psx_bytes - var_bytes_p->M) / var_bytes_p->n;
var_bytes_p->S += (psx_bytes - var_bytes_p->M) * (psx_bytes - old_M);
agg_psx_rec->fcounters[POSIX_F_VARIANCE_RANK_BYTES] =
var_bytes_p->S / var_bytes_p->n;
}
break;
default: default:
/* TODO: variance */
agg_psx_rec->fcounters[i] = -1; agg_psx_rec->fcounters[i] = -1;
break; break;
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment