Commit 1a856178 authored by Shane Snyder's avatar Shane Snyder

shared record reduction enhancements

parent 969f0b8d
......@@ -16,16 +16,13 @@
/* TODO: this goes where ? -- shared libs */
#define DARSHAN_MPI_CALL(func) func
#define DARSHAN_CORE_MAX_RECORDS 1024
/* TODO: revisit this default size if we change memory per module */
#define DARSHAN_CORE_COMP_BUF_SIZE (2 * 1024 * 1024)
#define DARSHAN_CORE_MOD_SET(flags, id) (flags | (1 << id))
#define DARSHAN_CORE_MOD_UNSET(flags, id) (flags & ~(1 << id))
#define DARSHAN_CORE_MOD_ISSET(flags, id) (flags & (1 << id))
/* in memory structure to keep up with job level data */
......
......@@ -40,7 +40,7 @@ struct darshan_module_funcs
int *rec_size /* size of records being stored for this module */
);
/* reduce records which are shared globally across this module */
void (*reduce_records)(
void (*record_reduction_op)(
void* infile_v,
void* inoutfile_v,
int *len,
......
......@@ -508,7 +508,8 @@ static void darshan_core_shutdown()
}
/* if there are globally shared files, do a shared file reduction */
if(shared_rec_count)
if(shared_rec_count && this_mod->mod_funcs.prepare_for_reduction &&
this_mod->mod_funcs.record_reduction_op)
{
this_mod->mod_funcs.prepare_for_reduction(mod_shared_recs, &shared_rec_count,
&red_send_buf, &red_recv_buf, &rec_sz);
......@@ -522,7 +523,7 @@ static void darshan_core_shutdown()
DARSHAN_MPI_CALL(PMPI_Type_commit)(&red_type);
/* register a reduction operator for this module */
DARSHAN_MPI_CALL(PMPI_Op_create)(this_mod->mod_funcs.reduce_records,
DARSHAN_MPI_CALL(PMPI_Op_create)(this_mod->mod_funcs.record_reduction_op,
1, &red_op);
/* reduce shared file records for this module */
......
......@@ -171,7 +171,7 @@ static void mpiio_runtime_initialize()
{
.disable_instrumentation = &mpiio_disable_instrumentation,
.prepare_for_reduction = NULL,
.reduce_records = NULL,
.record_reduction_op = NULL,
.get_output_data = &mpiio_get_output_data,
.shutdown = &mpiio_shutdown
};
......@@ -275,7 +275,7 @@ static void posix_file_close_fd(int fd);
static void posix_prepare_for_reduction(darshan_record_id *shared_recs,
int *shared_rec_count, void **send_buf, void **recv_buf, int *rec_size);
static void posix_reduce_records(void* infile_v, void* inoutfile_v,
static void posix_record_reduction_op(void* infile_v, void* inoutfile_v,
int *len, MPI_Datatype *datatype);
static void posix_get_output_data(void **buffer, int *size);
static void posix_shutdown(void);
......@@ -451,7 +451,7 @@ static void posix_runtime_initialize()
{
.disable_instrumentation = &posix_disable_instrumentation,
.prepare_for_reduction = &posix_prepare_for_reduction,
.reduce_records = &posix_reduce_records,
.record_reduction_op = &posix_record_reduction_op,
.get_output_data = &posix_get_output_data,
.shutdown = &posix_shutdown
};
......@@ -691,7 +691,7 @@ static void posix_prepare_for_reduction(
return;
}
static void posix_reduce_records(
static void posix_record_reduction_op(
void* infile_v,
void* inoutfile_v,
int *len,
......
......@@ -144,7 +144,7 @@ static void posix_file_close_fd(int fd);
static void posix_disable_instrumentation(void);
static void posix_prepare_for_reduction(darshan_record_id *shared_recs,
int *shared_rec_count, void **send_buf, void **recv_buf, int *rec_size);
static void posix_reduce_records(void* infile_v, void* inoutfile_v,
static void posix_record_reduction_op(void* infile_v, void* inoutfile_v,
int *len, MPI_Datatype *datatype);
static void posix_get_output_data(void **buffer, int *size);
static void posix_shutdown(void);
......@@ -648,7 +648,7 @@ static void posix_runtime_initialize()
{
.disable_instrumentation = &posix_disable_instrumentation,
.prepare_for_reduction = &posix_prepare_for_reduction,
.reduce_records = &posix_reduce_records,
.record_reduction_op = &posix_record_reduction_op,
.get_output_data = &posix_get_output_data,
.shutdown = &posix_shutdown
};
......@@ -899,7 +899,7 @@ static void posix_prepare_for_reduction(
return;
}
static void posix_reduce_records(
static void posix_record_reduction_op(
void* infile_v,
void* inoutfile_v,
int *len,
......@@ -909,29 +909,81 @@ static void posix_reduce_records(
struct darshan_posix_file *infile = infile_v;
struct darshan_posix_file *inoutfile = inoutfile_v;
int i;
int j;
assert(posix_runtime);
for(i = 0; i < *len; i++)
for(i=0; i<*len; i++)
{
memset(&tmp_file, 0, sizeof(struct darshan_posix_file));
tmp_file.f_id = infile->f_id;
tmp_file.rank = -1;
tmp_file.counters[POSIX_OPENS] = infile->counters[POSIX_OPENS] +
inoutfile->counters[POSIX_OPENS];
if((infile->fcounters[POSIX_F_OPEN_TIMESTAMP] > inoutfile->fcounters[POSIX_F_OPEN_TIMESTAMP]) &&
(inoutfile->fcounters[POSIX_F_OPEN_TIMESTAMP] > 0))
tmp_file.fcounters[POSIX_F_OPEN_TIMESTAMP] = inoutfile->fcounters[POSIX_F_OPEN_TIMESTAMP];
/* sum */
for(j=POSIX_OPENS; j<=POSIX_FWRITES; j++)
{
tmp_file.counters[j] = infile->counters[j] + inoutfile->counters[j];
}
tmp_file.counters[POSIX_MODE] = infile->counters[POSIX_MODE];
/* min non-zero (if available) value */
for(j=POSIX_F_OPEN_TIMESTAMP; j<=POSIX_F_WRITE_START_TIMESTAMP; j++)
{
if(infile->fcounters[j] > inoutfile->fcounters[j] && inoutfile->fcounters[j] > 0)
tmp_file.fcounters[j] = inoutfile->fcounters[j];
else
tmp_file.fcounters[j] = infile->fcounters[j];
}
/* max */
for(j=POSIX_F_READ_END_TIMESTAMP; j<=POSIX_F_CLOSE_TIMESTAMP; j++)
{
if(infile->fcounters[j] > inoutfile->fcounters[j])
tmp_file.fcounters[j] = infile->fcounters[j];
else
tmp_file.fcounters[j] = inoutfile->fcounters[j];
}
/* sum */
for(j=POSIX_F_READ_TIME; j<=POSIX_F_META_TIME; j++)
{
tmp_file.fcounters[j] = infile->fcounters[j] + inoutfile->fcounters[j];
}
/* max (special case) */
if(infile->fcounters[POSIX_F_MAX_READ_TIME] >
inoutfile->fcounters[POSIX_F_MAX_READ_TIME])
{
tmp_file.fcounters[POSIX_F_MAX_READ_TIME] =
infile->fcounters[POSIX_F_MAX_READ_TIME];
tmp_file.counters[POSIX_MAX_READ_TIME_SIZE] =
infile->counters[POSIX_MAX_READ_TIME_SIZE];
}
else
tmp_file.fcounters[POSIX_F_OPEN_TIMESTAMP] = infile->fcounters[POSIX_F_OPEN_TIMESTAMP];
if(infile->fcounters[POSIX_F_CLOSE_TIMESTAMP] > inoutfile->fcounters[POSIX_F_CLOSE_TIMESTAMP])
tmp_file.fcounters[POSIX_F_CLOSE_TIMESTAMP] = infile->fcounters[POSIX_F_CLOSE_TIMESTAMP];
{
tmp_file.fcounters[POSIX_F_MAX_READ_TIME] =
inoutfile->fcounters[POSIX_F_MAX_READ_TIME];
tmp_file.counters[POSIX_MAX_READ_TIME_SIZE] =
inoutfile->counters[POSIX_MAX_READ_TIME_SIZE];
}
if(infile->fcounters[POSIX_F_MAX_WRITE_TIME] >
inoutfile->fcounters[POSIX_F_MAX_WRITE_TIME])
{
tmp_file.fcounters[POSIX_F_MAX_WRITE_TIME] =
infile->fcounters[POSIX_F_MAX_WRITE_TIME];
tmp_file.counters[POSIX_MAX_WRITE_TIME_SIZE] =
infile->counters[POSIX_MAX_WRITE_TIME_SIZE];
}
else
tmp_file.fcounters[POSIX_F_CLOSE_TIMESTAMP] = inoutfile->fcounters[POSIX_F_CLOSE_TIMESTAMP];
{
tmp_file.fcounters[POSIX_F_MAX_WRITE_TIME] =
inoutfile->fcounters[POSIX_F_MAX_WRITE_TIME];
tmp_file.counters[POSIX_MAX_WRITE_TIME_SIZE] =
inoutfile->counters[POSIX_MAX_WRITE_TIME_SIZE];
}
/* update pointers */
*inoutfile = tmp_file;
......
......@@ -22,8 +22,6 @@
static int darshan_log_seek(darshan_fd fd, off_t offset);
static int darshan_log_read(darshan_fd fd, void *buf, int len);
static int darshan_log_write(darshan_fd fd, void *buf, int len);
static int darshan_decompress_buffer(char *comp_buf, int comp_buf_sz,
char *decomp_buf, int *inout_decomp_buf_sz);
/* darshan_log_open()
*
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment