Commit 4e6d0c89 authored by Shane Snyder's avatar Shane Snyder
Browse files

move shared file redux code to get_output func

parent f4df41a3
......@@ -56,38 +56,28 @@
/* module developers provide the following functions to darshan-core */
struct darshan_module_funcs
{
/* perform any necessary pre-shutdown steps */
/* perform any necessary pre-shutdown steps
*
* NOTE: this typically includes disabling wrapper functions so
* darshan-core can shutdown in a consistent state.
*/
void (*begin_shutdown)(void);
/* retrieve module data to write to log file */
/* retrieve module data to write to log file
*
* NOTE: module developers can use this function to run collective
* MPI operations at shutdown time. Typically this functionality
* has been used to reduce records shared globablly (given in the
* 'shared_recs' array) into a single data record.
*/
void (*get_output_data)(
void** buf, /* output parameter to save module buffer address */
int* size /* output parameter to save module buffer size */
MPI_Comm mod_comm, /* MPI communicator to run collectives with */
darshan_record_id *shared_recs, /* list of shared data record ids */
int shared_rec_count, /* count of shared data records */
void** mod_buf, /* output parameter to save module buffer address */
int* mod_buf_sz /* output parameter to save module buffer size */
);
/* shutdown module data structures */
void (*shutdown)(void);
/* (OPTIONAL) perform any necessary steps prior to performing a reduction
* of shared Darshan I/O records. To bypass shared file reduction mechanism,
* set this pointer to NULL.
*/
void (*setup_reduction)(
darshan_record_id *shared_recs, /* input list of shared records */
int *shared_rec_count, /* in/out shared record count */
void **send_buf, /* send buffer for shared file reduction */
void **recv_buf, /* recv buffer for shared file reduction (root only) */
int *rec_size /* size of records being stored for this module */
);
/* (OPTIONAL) perform the actual shared file reduction operation. This
* operation follows the prototype of MPI_Op_create, which allows the
* specification of user-defined combination functions which may be used
* directly by MPI. To bypass shared file reduction mechanism, set this
* pointer to NULL.
*/
void (*record_reduction_op)(
void* infile_v,
void* inoutfile_v,
int *len,
MPI_Datatype *datatype
);
};
/* paths that darshan will not trace */
......
......@@ -452,8 +452,9 @@ void darshan_core_shutdown()
for(i = 0; i < DARSHAN_MAX_MODS; i++)
{
struct darshan_core_module* this_mod = final_core->mod_array[i];
darshan_record_id mod_shared_recs[DARSHAN_CORE_MAX_RECORDS];
struct darshan_core_record_ref *ref = NULL;
darshan_record_id mod_shared_recs[DARSHAN_CORE_MAX_RECORDS];
int mod_shared_rec_cnt = 0;
void* mod_buf = NULL;
int mod_buf_sz = 0;
int j;
......@@ -469,63 +470,30 @@ void darshan_core_shutdown()
}
if(internal_timing_flag)
mod1[i] = DARSHAN_MPI_CALL(PMPI_Wtime)();
/* if all processes used this module, prepare to do a shared file reduction */
if(global_mod_use_count[i] == nprocs)
{
int shared_rec_count = 0;
int rec_sz = 0;
void *red_send_buf = NULL, *red_recv_buf = NULL;
MPI_Datatype red_type;
MPI_Op red_op;
/* set the shared file list for this module */
memset(mod_shared_recs, 0, DARSHAN_CORE_MAX_RECORDS * sizeof(darshan_record_id));
for(j = 0; j < DARSHAN_CORE_MAX_RECORDS && shared_recs[j] != 0; j++)
{
HASH_FIND(hlink, final_core->rec_hash, &shared_recs[j],
sizeof(darshan_record_id), ref);
assert(ref);
if(DARSHAN_CORE_MOD_ISSET(ref->global_mod_flags, i))
{
mod_shared_recs[shared_rec_count++] = shared_recs[j];
}
}
mod1[i] = DARSHAN_MPI_CALL(PMPI_Wtime)();
/* if there are globally shared files, do a shared file reduction */
if(shared_rec_count && this_mod->mod_funcs.setup_reduction &&
this_mod->mod_funcs.record_reduction_op)
/* set the shared file list for this module */
memset(mod_shared_recs, 0, DARSHAN_CORE_MAX_RECORDS * sizeof(darshan_record_id));
for(j = 0; j < DARSHAN_CORE_MAX_RECORDS && shared_recs[j] != 0; j++)
{
HASH_FIND(hlink, final_core->rec_hash, &shared_recs[j],
sizeof(darshan_record_id), ref);
assert(ref);
if(DARSHAN_CORE_MOD_ISSET(ref->global_mod_flags, i))
{
this_mod->mod_funcs.setup_reduction(mod_shared_recs, &shared_rec_count,
&red_send_buf, &red_recv_buf, &rec_sz);
if(shared_rec_count)
{
/* construct a datatype for a file record. This is serving no purpose
* except to make sure we can do a reduction on proper boundaries
*/
DARSHAN_MPI_CALL(PMPI_Type_contiguous)(rec_sz, MPI_BYTE, &red_type);
DARSHAN_MPI_CALL(PMPI_Type_commit)(&red_type);
/* register a reduction operator for this module */
DARSHAN_MPI_CALL(PMPI_Op_create)(this_mod->mod_funcs.record_reduction_op,
1, &red_op);
/* reduce shared file records for this module */
DARSHAN_MPI_CALL(PMPI_Reduce)(red_send_buf, red_recv_buf,
shared_rec_count, red_type, red_op, 0, MPI_COMM_WORLD);
DARSHAN_MPI_CALL(PMPI_Type_free)(&red_type);
DARSHAN_MPI_CALL(PMPI_Op_free)(&red_op);
}
mod_shared_recs[mod_shared_rec_cnt++] = shared_recs[j];
}
}
/* if module is registered locally, get the corresponding output buffer */
/* if module is registered locally, get the corresponding output buffer
*
* NOTE: this function can be used to run collective operations across
* modules, if there are file records shared globally.
*/
if(this_mod)
{
/* get output buffer from module */
this_mod->mod_funcs.get_output_data(&mod_buf, &mod_buf_sz);
this_mod->mod_funcs.get_output_data(MPI_COMM_WORLD, mod_shared_recs,
mod_shared_rec_cnt, &mod_buf, &mod_buf_sz);
}
final_core->log_header.mod_map[i].off = tmp_off;
......
......@@ -107,8 +107,6 @@ struct mpiio_runtime
int file_array_ndx;
struct mpiio_file_runtime* file_hash;
struct mpiio_file_runtime_ref* fh_hash;
void *red_buf;
int shared_rec_count;
};
static struct mpiio_runtime *mpiio_runtime = NULL;
......@@ -122,13 +120,12 @@ static struct mpiio_file_runtime* mpiio_file_by_name_setfh(const char* name, MPI
static struct mpiio_file_runtime* mpiio_file_by_fh(MPI_File fh);
static void mpiio_file_close_fh(MPI_File fh);
static int mpiio_record_compare(const void* a, const void* b);
static void mpiio_begin_shutdown(void);
static void mpiio_setup_reduction(darshan_record_id *shared_recs, int *shared_rec_count,
void **send_buf, void **recv_buf, int *rec_size);
static void mpiio_record_reduction_op(void* infile_v, void* inoutfile_v,
int *len, MPI_Datatype *datatype);
static void mpiio_get_output_data(void **buffer, int *size);
static void mpiio_begin_shutdown(void);
static void mpiio_get_output_data(MPI_Comm mod_comm, darshan_record_id *shared_recs,
int shared_rec_count, void **mpiio_buf, int *mpiio_buf_sz);
static void mpiio_shutdown(void);
#define MPIIO_LOCK() pthread_mutex_lock(&mpiio_runtime_mutex)
......@@ -829,8 +826,6 @@ static void mpiio_runtime_initialize()
struct darshan_module_funcs mpiio_mod_fns =
{
.begin_shutdown = &mpiio_begin_shutdown,
.setup_reduction = &mpiio_setup_reduction,
.record_reduction_op = &mpiio_record_reduction_op,
.get_output_data = &mpiio_get_output_data,
.shutdown = &mpiio_shutdown
};
......@@ -1022,110 +1017,6 @@ static int mpiio_record_compare(const void* a_p, const void* b_p)
return 0;
}
/**************************************************************************
* Functions exported by MPI-IO module for coordinating with darshan-core *
**************************************************************************/
static void mpiio_begin_shutdown()
{
int i;
struct mpiio_file_runtime* tmp;
assert(mpiio_runtime);
MPIIO_LOCK();
instrumentation_disabled = 1;
/* go through and set the 4 most common access sizes for MPI-IO */
for(i = 0; i < mpiio_runtime->file_array_ndx; i++)
{
tmp = &(mpiio_runtime->file_runtime_array[i]);
darshan_walk_common_vals(tmp->access_root,
&(tmp->file_record->counters[MPIIO_ACCESS1_ACCESS]),
&(tmp->file_record->counters[MPIIO_ACCESS1_COUNT]));
}
MPIIO_UNLOCK();
return;
}
static void mpiio_setup_reduction(
darshan_record_id *shared_recs,
int *shared_rec_count,
void **send_buf,
void **recv_buf,
int *rec_size)
{
struct mpiio_file_runtime *file;
int i;
double mpiio_time;
assert(mpiio_runtime);
/* necessary initialization of shared records (e.g., change rank to -1) */
for(i = 0; i < *shared_rec_count; i++)
{
HASH_FIND(hlink, mpiio_runtime->file_hash, &shared_recs[i],
sizeof(darshan_record_id), file);
assert(file);
mpiio_time =
file->file_record->fcounters[MPIIO_F_READ_TIME] +
file->file_record->fcounters[MPIIO_F_WRITE_TIME] +
file->file_record->fcounters[MPIIO_F_META_TIME];
/* initialize fastest/slowest info prior to the reduction */
file->file_record->counters[MPIIO_FASTEST_RANK] =
file->file_record->rank;
file->file_record->counters[MPIIO_FASTEST_RANK_BYTES] =
file->file_record->counters[MPIIO_BYTES_READ] +
file->file_record->counters[MPIIO_BYTES_WRITTEN];
file->file_record->fcounters[MPIIO_F_FASTEST_RANK_TIME] =
mpiio_time;
/* until reduction occurs, we assume that this rank is both
* the fastest and slowest. It is up to the reduction operator
* to find the true min and max.
*/
file->file_record->counters[MPIIO_SLOWEST_RANK] =
file->file_record->counters[MPIIO_FASTEST_RANK];
file->file_record->counters[MPIIO_SLOWEST_RANK_BYTES] =
file->file_record->counters[MPIIO_FASTEST_RANK_BYTES];
file->file_record->fcounters[MPIIO_F_SLOWEST_RANK_TIME] =
file->file_record->fcounters[MPIIO_F_FASTEST_RANK_TIME];
file->file_record->rank = -1;
}
/* sort the array of files descending by rank so that we get all of the
* shared files (marked by rank -1) in a contiguous portion at end
* of the array
*/
qsort(mpiio_runtime->file_record_array, mpiio_runtime->file_array_ndx,
sizeof(struct darshan_mpiio_file), mpiio_record_compare);
/* make *send_buf point to the shared files at the end of sorted array */
*send_buf =
&(mpiio_runtime->file_record_array[mpiio_runtime->file_array_ndx-(*shared_rec_count)]);
/* allocate memory for the reduction output on rank 0 */
if(my_rank == 0)
{
*recv_buf = malloc(*shared_rec_count * sizeof(struct darshan_mpiio_file));
if(!(*recv_buf))
return;
/* TODO: cleaner way to do this? */
mpiio_runtime->red_buf = *recv_buf;
}
*rec_size = sizeof(struct darshan_mpiio_file);
mpiio_runtime->shared_rec_count = *shared_rec_count;
return;
}
static void mpiio_record_reduction_op(
void* infile_v,
void* inoutfile_v,
......@@ -1305,27 +1196,141 @@ static void mpiio_record_reduction_op(
return;
}
/**************************************************************************
* Functions exported by MPI-IO module for coordinating with darshan-core *
**************************************************************************/
static void mpiio_begin_shutdown()
{
assert(mpiio_runtime);
MPIIO_LOCK();
/* disable further instrumentation while Darshan shuts down */
instrumentation_disabled = 1;
MPIIO_UNLOCK();
return;
}
static void mpiio_get_output_data(
void **buffer,
int *size)
MPI_Comm mod_comm,
darshan_record_id *shared_recs,
int shared_rec_count,
void **mpiio_buf,
int *mpiio_buf_sz)
{
struct mpiio_file_runtime *file;
struct mpiio_file_runtime* tmp;
int i;
double mpiio_time;
void *red_send_buf = NULL;
void *red_recv_buf = NULL;
MPI_Datatype red_type;
MPI_Op red_op;
assert(mpiio_runtime);
/* clean up reduction state */
if(my_rank == 0)
/* go through and set the 4 most common access sizes for MPI-IO */
for(i = 0; i < mpiio_runtime->file_array_ndx; i++)
{
int tmp_ndx = mpiio_runtime->file_array_ndx - mpiio_runtime->shared_rec_count;
memcpy(&(mpiio_runtime->file_record_array[tmp_ndx]), mpiio_runtime->red_buf,
mpiio_runtime->shared_rec_count * sizeof(struct darshan_mpiio_file));
free(mpiio_runtime->red_buf);
tmp = &(mpiio_runtime->file_runtime_array[i]);
/* common access sizes */
darshan_walk_common_vals(tmp->access_root,
&(tmp->file_record->counters[MPIIO_ACCESS1_ACCESS]),
&(tmp->file_record->counters[MPIIO_ACCESS1_COUNT]));
}
else
/* if there are globally shared files, do a shared file reduction */
if(shared_rec_count)
{
mpiio_runtime->file_array_ndx -= mpiio_runtime->shared_rec_count;
/* necessary initialization of shared records */
for(i = 0; i < shared_rec_count; i++)
{
HASH_FIND(hlink, mpiio_runtime->file_hash, &shared_recs[i],
sizeof(darshan_record_id), file);
assert(file);
mpiio_time =
file->file_record->fcounters[MPIIO_F_READ_TIME] +
file->file_record->fcounters[MPIIO_F_WRITE_TIME] +
file->file_record->fcounters[MPIIO_F_META_TIME];
/* initialize fastest/slowest info prior to the reduction */
file->file_record->counters[MPIIO_FASTEST_RANK] =
file->file_record->rank;
file->file_record->counters[MPIIO_FASTEST_RANK_BYTES] =
file->file_record->counters[MPIIO_BYTES_READ] +
file->file_record->counters[MPIIO_BYTES_WRITTEN];
file->file_record->fcounters[MPIIO_F_FASTEST_RANK_TIME] =
mpiio_time;
/* until reduction occurs, we assume that this rank is both
* the fastest and slowest. It is up to the reduction operator
* to find the true min and max.
*/
file->file_record->counters[MPIIO_SLOWEST_RANK] =
file->file_record->counters[MPIIO_FASTEST_RANK];
file->file_record->counters[MPIIO_SLOWEST_RANK_BYTES] =
file->file_record->counters[MPIIO_FASTEST_RANK_BYTES];
file->file_record->fcounters[MPIIO_F_SLOWEST_RANK_TIME] =
file->file_record->fcounters[MPIIO_F_FASTEST_RANK_TIME];
file->file_record->rank = -1;
}
/* sort the array of files descending by rank so that we get all of the
* shared files (marked by rank -1) in a contiguous portion at end
* of the array
*/
qsort(mpiio_runtime->file_record_array, mpiio_runtime->file_array_ndx,
sizeof(struct darshan_mpiio_file), mpiio_record_compare);
/* make *send_buf point to the shared files at the end of sorted array */
red_send_buf =
&(mpiio_runtime->file_record_array[mpiio_runtime->file_array_ndx-(shared_rec_count)]);
/* allocate memory for the reduction output on rank 0 */
if(my_rank == 0)
{
red_recv_buf = malloc(shared_rec_count * sizeof(struct darshan_mpiio_file));
if(!red_recv_buf)
return;
}
/* construct a datatype for a MPIIO file record. This is serving no purpose
* except to make sure we can do a reduction on proper boundaries
*/
DARSHAN_MPI_CALL(PMPI_Type_contiguous)(sizeof(struct darshan_mpiio_file),
MPI_BYTE, &red_type);
DARSHAN_MPI_CALL(PMPI_Type_commit)(&red_type);
/* register a MPIIO file record reduction operator */
DARSHAN_MPI_CALL(PMPI_Op_create)(mpiio_record_reduction_op, 1, &red_op);
/* reduce shared MPIIO file records */
DARSHAN_MPI_CALL(PMPI_Reduce)(red_send_buf, red_recv_buf,
shared_rec_count, red_type, red_op, 0, mod_comm);
/* clean up reduction state */
if(my_rank == 0)
{
int tmp_ndx = mpiio_runtime->file_array_ndx - shared_rec_count;
memcpy(&(mpiio_runtime->file_record_array[tmp_ndx]), red_recv_buf,
shared_rec_count * sizeof(struct darshan_mpiio_file));
free(red_recv_buf);
}
else
{
mpiio_runtime->file_array_ndx -= shared_rec_count;
}
DARSHAN_MPI_CALL(PMPI_Type_free)(&red_type);
DARSHAN_MPI_CALL(PMPI_Op_free)(&red_op);
}
*buffer = (void *)(mpiio_runtime->file_record_array);
*size = mpiio_runtime->file_array_ndx * sizeof(struct darshan_mpiio_file);
*mpiio_buf = (void *)(mpiio_runtime->file_record_array);
*mpiio_buf_sz = mpiio_runtime->file_array_ndx * sizeof(struct darshan_mpiio_file);
return;
}
......
......@@ -119,7 +119,8 @@ static struct null_record_runtime* null_record_by_name(const char *name);
/* forward declaration for module functions needed to interface with darshan-core */
static void null_begin_shutdown(void);
static void null_get_output_data(void **buffer, int *size);
static void null_get_output_data(MPI_Comm mod_comm, darshan_record_id *shared_recs,
int shared_rec_count, void **null_buf, int *null_buf_sz);
static void null_shutdown(void);
/* macros for obtaining/releasing the "NULL" module lock */
......@@ -203,8 +204,6 @@ static void null_runtime_initialize()
struct darshan_module_funcs null_mod_fns =
{
.begin_shutdown = &null_begin_shutdown,
.setup_reduction = NULL, /* "NULL" module does not do reductions */
.record_reduction_op = NULL, /* "NULL" module does not do reductions */
.get_output_data = &null_get_output_data,
.shutdown = &null_shutdown
};
......@@ -334,17 +333,30 @@ static void null_begin_shutdown()
/* Pass output data for the "NULL" module back to darshan-core to log to file. */
static void null_get_output_data(
void **buffer,
int *size)
MPI_Comm mod_comm,
darshan_record_id *shared_recs,
int shared_rec_count,
void **null_buf,
int *null_buf_sz)
{
assert(null_runtime);
/* NOTE: this function can be used to run collective operations prior to
* shutting down the module, as implied by the MPI communicator passed in
* as the first agrument. Typically, module developers will want to run a
* reduction on shared data records (passed in in the 'shared_recs' array),
* but other collective routines can be run here as well. For a detailed
* example illustrating how to run shared file reductions, consider the
* POSIX or MPIIO instrumentation modules, as they both implement this
* functionality.
*/
/* Just set the output buffer to point at the array of the "NULL" module's
* I/O records, and set the output size according to the number of records
* currently being tracked.
*/
*buffer = (void *)(null_runtime->record_array);
*size = null_runtime->rec_array_ndx * sizeof(struct darshan_null_record);
*null_buf = (void *)(null_runtime->record_array);
*null_buf_sz = null_runtime->rec_array_ndx * sizeof(struct darshan_null_record);
return;
}
......
......@@ -176,8 +176,6 @@ struct posix_runtime
int file_array_ndx;
struct posix_file_runtime* file_hash;
struct posix_file_runtime_ref* fd_hash;
void *red_buf;
int shared_rec_count;
};
static struct posix_runtime *posix_runtime = NULL;
......@@ -194,13 +192,12 @@ static void posix_file_close_fd(int fd);
static int posix_record_compare(const void* a, const void* b);
static void posix_aio_tracker_add(int fd, void *aiocbp);
static struct posix_aio_tracker* posix_aio_tracker_del(int fd, void *aiocbp);
static void posix_begin_shutdown(void);
static void posix_setup_reduction(darshan_record_id *shared_recs, int *shared_rec_count,
void **send_buf, void **recv_buf, int *rec_size);
static void posix_record_reduction_op(void* infile_v, void* inoutfile_v,
int *len, MPI_Datatype *datatype);
static void posix_get_output_data(void **buffer, int *size);
static void posix_begin_shutdown(void);
static void posix_get_output_data(MPI_Comm mod_comm, darshan_record_id *shared_recs,
int shared_rec_count, void **posix_buf, int *posix_buf_sz);
static void posix_shutdown(void);
#define POSIX_LOCK() pthread_mutex_lock(&posix_runtime_mutex)
......@@ -1454,8 +1451,6 @@ static void posix_runtime_initialize()
struct darshan_module_funcs posix_mod_fns =
{
.begin_shutdown = &posix_begin_shutdown,
.setup_reduction = &posix_setup_reduction,
.record_reduction_op = &posix_record_reduction_op,
.get_output_data = &posix_get_output_data,
.shutdown = &posix_shutdown
};
......@@ -1699,118 +1694,6 @@ static struct posix_aio_tracker* posix_aio_tracker_del(int fd, void *aiocbp)
return(tracker);
}
/************************************************************************
* Functions exported by this module for coordinating with darshan-core *
************************************************************************/
static void posix_begin_shutdown()
{
int i;
struct posix_file_runtime* tmp;
assert(posix_runtime);
POSIX_LOCK();
instrumentation_disabled = 1;
/* go through file access data for each record and set the 4 most common
* stride/access size counters.
*/
for(i = 0; i < posix_runtime->file_array_ndx; i++)
{
tmp = &(posix_runtime->file_runtime_array[i]);
/* common accesses */
darshan_walk_common_vals(tmp->access_root,
&(tmp->file_record->counters[POSIX_ACCESS1_ACCESS]),
&(tmp->file_record->counters[POSIX_ACCESS1_COUNT]));
/* common strides */
darshan_walk_common_vals(tmp->stride_root,
&(tmp->file_record->counters[POSIX_STRIDE1_STRIDE]),
&(tmp->file_record->counters[POSIX_STRIDE1_COUNT]));
}
/* disable further instrumentation while Darshan shuts down */
POSIX_UNLOCK();
return;
}
static void posix_setup_reduction(
darshan_record_id *shared_recs,
int *shared_rec_count,
void **send_buf,
void **recv_buf,
int *rec_size)
{
struct posix_file_runtime *file;
int i;
double posix_time;
assert(posix_runtime);
/* necessary initialization of shared records (e.g., change rank to -1) */
for(i = 0; i < *shared_rec_count; i++)
{
HASH_FIND(hlink, posix_runtime->file_hash, &shared_recs[i],
sizeof(darshan_record_id), file);
assert(file);
posix_time =
file->file_record->fcounters[POSIX_F_READ_TIME] +
file->file_record->fcounters[POSIX_F_WRITE_TIME] +
file->file_record->fcounters[POSIX_F_META_TIME];
/* initialize fastest/slowest info prior to the reduction */
file->file_record->counters[POSIX_FASTEST_RANK] =
file->file_record->rank;
file->file_record->counters[POSIX_FASTEST_RANK_BYTES] =