Commit f67c6ce7 authored by Shane Snyder's avatar Shane Snyder
Browse files

apply no-mpi changes to posix module

parent c866a0af
......@@ -88,6 +88,7 @@
* avoid any reduction steps.
*/
typedef void (*darshan_module_redux)(
void *mod_buf, /* input parameter indicating module's buffer address */
MPI_Comm mod_comm, /* MPI communicator to run collectives with */
darshan_record_id *shared_recs, /* list of shared data record ids */
int shared_rec_count /* count of shared data records */
......
......@@ -563,8 +563,9 @@ void darshan_core_shutdown()
}
/* allow the module an opportunity to reduce shared files */
if(this_mod->mod_funcs.mod_redux_func)
this_mod->mod_funcs.mod_redux_func(MPI_COMM_WORLD, mod_shared_recs,
if(this_mod->mod_funcs.mod_redux_func && (mod_shared_recs > 0) &&
(!getenv("DARSHAN_DISABLE_SHARED_REDUCTION")))
this_mod->mod_funcs.mod_redux_func(mod_buf, MPI_COMM_WORLD, mod_shared_recs,
mod_shared_rec_cnt);
}
#endif
......
......@@ -153,17 +153,23 @@ static struct posix_aio_tracker* posix_aio_tracker_del(
int fd, void *aiocbp);
static void posix_finalize_file_records(
void *rec_ref_p);
#ifdef HAVE_MPI
static void posix_record_reduction_op(
void* infile_v, void* inoutfile_v, int *len, MPI_Datatype *datatype);
static void posix_shared_record_variance(
MPI_Comm mod_comm, struct darshan_posix_file *inrec_array,
struct darshan_posix_file *outrec_array, int shared_rec_count);
#endif
static void posix_cleanup_runtime(
void);
#ifdef HAVE_MPI
static void posix_mpi_redux(
void *posix_buf, MPI_Comm mod_comm,
darshan_record_id *shared_recs, int shared_rec_count);
#endif
static void posix_shutdown(
MPI_Comm mod_comm, darshan_record_id *shared_recs,
int shared_rec_count, void **posix_buf, int *posix_buf_sz);
void **posix_buf, int *posix_buf_sz);
/* extern DXT function defs */
extern void dxt_posix_write(darshan_record_id rec_id, int64_t offset,
......@@ -1479,6 +1485,12 @@ int DARSHAN_DECL(rename)(const char *oldpath, const char *newpath)
static void posix_runtime_initialize()
{
int psx_buf_size;
darshan_module_funcs mod_funcs = {
#ifdef HAVE_MPI
.mod_redux_func = &posix_mpi_redux,
#endif
.mod_shutdown_func = &posix_shutdown
};
/* try and store a default number of records for this module */
psx_buf_size = DARSHAN_DEF_MOD_REC_COUNT * sizeof(struct darshan_posix_file);
......@@ -1486,7 +1498,7 @@ static void posix_runtime_initialize()
/* register the POSIX module with darshan core */
darshan_core_register_module(
DARSHAN_POSIX_MOD,
&posix_shutdown,
mod_funcs,
&psx_buf_size,
&my_rank,
&darshan_mem_alignment);
......@@ -1628,6 +1640,7 @@ static void posix_finalize_file_records(void *rec_ref_p)
return;
}
#ifdef HAVE_MPI
static void posix_record_reduction_op(void* infile_v, void* inoutfile_v,
int *len, MPI_Datatype *datatype)
{
......@@ -1941,6 +1954,7 @@ static void posix_shared_record_variance(MPI_Comm mod_comm,
return;
}
#endif
static void posix_cleanup_runtime()
{
......@@ -2043,20 +2057,18 @@ void darshan_posix_shutdown_bench_setup(int test_case)
return;
}
/********************************************************************************
* shutdown function exported by this module for coordinating with darshan-core *
********************************************************************************/
/*********************************************************************************
* shutdown functions exported by this module for coordinating with darshan-core *
*********************************************************************************/
static void posix_shutdown(
MPI_Comm mod_comm,
darshan_record_id *shared_recs,
int shared_rec_count,
void **posix_buf,
int *posix_buf_sz)
#ifdef HAVE_MPI
static void posix_mpi_redux(
void *posix_buf, MPI_Comm mod_comm,
darshan_record_id *shared_recs, int shared_rec_count)
{
int posix_rec_count;
struct posix_file_record_ref *rec_ref;
struct darshan_posix_file *posix_rec_buf = *(struct darshan_posix_file **)posix_buf;
int posix_rec_count;
double posix_time;
struct darshan_posix_file *red_send_buf = NULL;
struct darshan_posix_file *red_recv_buf = NULL;
......@@ -2069,113 +2081,122 @@ static void posix_shutdown(
posix_rec_count = posix_runtime->file_rec_count;
/* perform any final transformations on POSIX file records before
* writing them out to log file
*/
darshan_iter_record_refs(posix_runtime->rec_id_hash, &posix_finalize_file_records);
/* necessary initialization of shared records */
for(i = 0; i < shared_rec_count; i++)
{
rec_ref = darshan_lookup_record_ref(posix_runtime->rec_id_hash,
&shared_recs[i], sizeof(darshan_record_id));
assert(rec_ref);
posix_time =
rec_ref->file_rec->fcounters[POSIX_F_READ_TIME] +
rec_ref->file_rec->fcounters[POSIX_F_WRITE_TIME] +
rec_ref->file_rec->fcounters[POSIX_F_META_TIME];
/* initialize fastest/slowest info prior to the reduction */
rec_ref->file_rec->counters[POSIX_FASTEST_RANK] =
rec_ref->file_rec->base_rec.rank;
rec_ref->file_rec->counters[POSIX_FASTEST_RANK_BYTES] =
rec_ref->file_rec->counters[POSIX_BYTES_READ] +
rec_ref->file_rec->counters[POSIX_BYTES_WRITTEN];
rec_ref->file_rec->fcounters[POSIX_F_FASTEST_RANK_TIME] =
posix_time;
/* until reduction occurs, we assume that this rank is both
* the fastest and slowest. It is up to the reduction operator
* to find the true min and max.
*/
rec_ref->file_rec->counters[POSIX_SLOWEST_RANK] =
rec_ref->file_rec->counters[POSIX_FASTEST_RANK];
rec_ref->file_rec->counters[POSIX_SLOWEST_RANK_BYTES] =
rec_ref->file_rec->counters[POSIX_FASTEST_RANK_BYTES];
rec_ref->file_rec->fcounters[POSIX_F_SLOWEST_RANK_TIME] =
rec_ref->file_rec->fcounters[POSIX_F_FASTEST_RANK_TIME];
rec_ref->file_rec->base_rec.rank = -1;
}
/* if there are globally shared files, do a shared file reduction */
/* NOTE: the shared file reduction is also skipped if the
* DARSHAN_DISABLE_SHARED_REDUCTION environment variable is set.
/* sort the array of records so we get all of the shared records
* (marked by rank -1) in a contiguous portion at end of the array
*/
if(shared_rec_count && !getenv("DARSHAN_DISABLE_SHARED_REDUCTION"))
darshan_record_sort(posix_rec_buf, posix_rec_count,
sizeof(struct darshan_posix_file));
/* make send_buf point to the shared files at the end of sorted array */
red_send_buf = &(posix_rec_buf[posix_rec_count-shared_rec_count]);
/* allocate memory for the reduction output on rank 0 */
if(my_rank == 0)
{
/* necessary initialization of shared records */
for(i = 0; i < shared_rec_count; i++)
red_recv_buf = malloc(shared_rec_count * sizeof(struct darshan_posix_file));
if(!red_recv_buf)
{
rec_ref = darshan_lookup_record_ref(posix_runtime->rec_id_hash,
&shared_recs[i], sizeof(darshan_record_id));
assert(rec_ref);
posix_time =
rec_ref->file_rec->fcounters[POSIX_F_READ_TIME] +
rec_ref->file_rec->fcounters[POSIX_F_WRITE_TIME] +
rec_ref->file_rec->fcounters[POSIX_F_META_TIME];
/* initialize fastest/slowest info prior to the reduction */
rec_ref->file_rec->counters[POSIX_FASTEST_RANK] =
rec_ref->file_rec->base_rec.rank;
rec_ref->file_rec->counters[POSIX_FASTEST_RANK_BYTES] =
rec_ref->file_rec->counters[POSIX_BYTES_READ] +
rec_ref->file_rec->counters[POSIX_BYTES_WRITTEN];
rec_ref->file_rec->fcounters[POSIX_F_FASTEST_RANK_TIME] =
posix_time;
/* until reduction occurs, we assume that this rank is both
* the fastest and slowest. It is up to the reduction operator
* to find the true min and max.
*/
rec_ref->file_rec->counters[POSIX_SLOWEST_RANK] =
rec_ref->file_rec->counters[POSIX_FASTEST_RANK];
rec_ref->file_rec->counters[POSIX_SLOWEST_RANK_BYTES] =
rec_ref->file_rec->counters[POSIX_FASTEST_RANK_BYTES];
rec_ref->file_rec->fcounters[POSIX_F_SLOWEST_RANK_TIME] =
rec_ref->file_rec->fcounters[POSIX_F_FASTEST_RANK_TIME];
rec_ref->file_rec->base_rec.rank = -1;
POSIX_UNLOCK();
return;
}
}
/* sort the array of records so we get all of the shared records
* (marked by rank -1) in a contiguous portion at end of the array
*/
darshan_record_sort(posix_rec_buf, posix_rec_count,
sizeof(struct darshan_posix_file));
/* construct a datatype for a POSIX file record. This is serving no purpose
* except to make sure we can do a reduction on proper boundaries
*/
PMPI_Type_contiguous(sizeof(struct darshan_posix_file),
MPI_BYTE, &red_type);
PMPI_Type_commit(&red_type);
/* make send_buf point to the shared files at the end of sorted array */
red_send_buf = &(posix_rec_buf[posix_rec_count-shared_rec_count]);
/* register a POSIX file record reduction operator */
PMPI_Op_create(posix_record_reduction_op, 1, &red_op);
/* allocate memory for the reduction output on rank 0 */
if(my_rank == 0)
{
red_recv_buf = malloc(shared_rec_count * sizeof(struct darshan_posix_file));
if(!red_recv_buf)
{
POSIX_UNLOCK();
return;
}
}
/* reduce shared POSIX file records */
PMPI_Reduce(red_send_buf, red_recv_buf,
shared_rec_count, red_type, red_op, 0, mod_comm);
/* construct a datatype for a POSIX file record. This is serving no purpose
* except to make sure we can do a reduction on proper boundaries
*/
PMPI_Type_contiguous(sizeof(struct darshan_posix_file),
MPI_BYTE, &red_type);
PMPI_Type_commit(&red_type);
/* get the time and byte variances for shared files */
posix_shared_record_variance(mod_comm, red_send_buf, red_recv_buf,
shared_rec_count);
/* register a POSIX file record reduction operator */
PMPI_Op_create(posix_record_reduction_op, 1, &red_op);
/* clean up reduction state */
if(my_rank == 0)
{
int tmp_ndx = posix_rec_count - shared_rec_count;
memcpy(&(posix_rec_buf[tmp_ndx]), red_recv_buf,
shared_rec_count * sizeof(struct darshan_posix_file));
free(red_recv_buf);
}
else
{
posix_rec_count -= shared_rec_count;
}
/* reduce shared POSIX file records */
PMPI_Reduce(red_send_buf, red_recv_buf,
shared_rec_count, red_type, red_op, 0, mod_comm);
PMPI_Type_free(&red_type);
PMPI_Op_free(&red_op);
/* get the time and byte variances for shared files */
posix_shared_record_variance(mod_comm, red_send_buf, red_recv_buf,
shared_rec_count);
POSIX_UNLOCK();
return;
}
#endif
/* clean up reduction state */
if(my_rank == 0)
{
int tmp_ndx = posix_rec_count - shared_rec_count;
memcpy(&(posix_rec_buf[tmp_ndx]), red_recv_buf,
shared_rec_count * sizeof(struct darshan_posix_file));
free(red_recv_buf);
}
else
{
posix_rec_count -= shared_rec_count;
}
static void posix_shutdown(
void **posix_buf,
int *posix_buf_sz)
{
int posix_rec_count;
PMPI_Type_free(&red_type);
PMPI_Op_free(&red_op);
}
POSIX_LOCK();
assert(posix_runtime);
/* update output buffer size to account for shared file reduction */
*posix_buf_sz = posix_rec_count * sizeof(struct darshan_posix_file);
posix_rec_count = posix_runtime->file_rec_count;
/* perform any final transformations on POSIX file records before
* writing them out to log file
*/
darshan_iter_record_refs(posix_runtime->rec_id_hash, &posix_finalize_file_records);
/* shutdown internal structures used for instrumenting */
posix_cleanup_runtime();
/* update output buffer size to account for shared file reduction */
*posix_buf_sz = posix_rec_count * sizeof(struct darshan_posix_file);
POSIX_UNLOCK();
return;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment