Commit 9b40a778 authored by Shane Snyder's avatar Shane Snyder

reorganize dxt & mpiio module interactions

parent 03e86bd8
......@@ -65,7 +65,6 @@ struct dxt_file_record_ref
struct dxt_file_record *file_rec;
int64_t write_available_buf;
int64_t read_available_buf;
int fs_type; /* same as darshan_fs_info->fs_type */
};
/* The dxt_runtime structure maintains necessary state for storing
......@@ -81,7 +80,6 @@ struct dxt_posix_runtime
struct dxt_mpiio_runtime
{
void *rec_id_hash;
void *fh_hash;
int file_rec_count;
};
......@@ -90,6 +88,10 @@ void dxt_posix_write(darshan_record_id rec_id, int64_t offset,
int64_t length, double start_time, double end_time);
void dxt_posix_read(darshan_record_id rec_id, int64_t offset,
int64_t length, double start_time, double end_time);
void dxt_mpiio_write(darshan_record_id rec_id, int64_t length,
double start_time, double end_time);
void dxt_mpiio_read(darshan_record_id rec_id, int64_t length,
double start_time, double end_time);
static void check_wr_trace_buf(
struct dxt_file_record_ref *rec_ref);
......@@ -97,13 +99,12 @@ static void check_rd_trace_buf(
struct dxt_file_record_ref *rec_ref);
static void dxt_posix_runtime_initialize(
void);
void dxt_mpiio_runtime_initialize(
static void dxt_mpiio_runtime_initialize(
void);
static struct dxt_file_record_ref *dxt_posix_track_new_file_record(
darshan_record_id rec_id);
void dxt_mpiio_track_new_file_record(
darshan_record_id rec_id, const char *path);
void dxt_mpiio_add_record_ref(darshan_record_id rec_id, MPI_File fh);
static struct dxt_file_record_ref *dxt_mpiio_track_new_file_record(
darshan_record_id rec_id);
static void dxt_posix_cleanup_runtime(
void);
static void dxt_mpiio_cleanup_runtime(
......@@ -270,48 +271,68 @@ void dxt_posix_read(darshan_record_id rec_id, int64_t offset,
file_rec->read_count += 1;
}
void dxt_mpiio_write(MPI_File fh, int64_t length,
void dxt_mpiio_write(darshan_record_id rec_id, int64_t length,
double start_time, double end_time)
{
struct dxt_file_record_ref* rec_ref = NULL;
struct dxt_file_record *file_rec;
rec_ref = darshan_lookup_record_ref(dxt_mpiio_runtime->fh_hash,
&fh, sizeof(MPI_File));
if (!rec_ref) {
fprintf(stderr, "Error: dxt_mpiio_write unable to find rec_ref.\n");
return;
/* make sure dxt mpiio runtime is initialized properly */
if(instrumentation_disabled) return;
if(!dxt_mpiio_runtime)
{
dxt_mpiio_runtime_initialize();
if(!dxt_mpiio_runtime) return;
}
file_rec = rec_ref->file_rec;
if (dxt_mpiio_runtime) {
check_wr_trace_buf(rec_ref);
rec_ref = darshan_lookup_record_ref(dxt_mpiio_runtime->rec_id_hash,
&rec_id, sizeof(darshan_record_id));
if(!rec_ref)
{
/* track new dxt file record */
rec_ref = dxt_mpiio_track_new_file_record(rec_id);
if(!rec_ref) return;
}
file_rec = rec_ref->file_rec;
check_wr_trace_buf(rec_ref);
if(file_rec->write_count == rec_ref->write_available_buf)
return; /* no more memory for i/o segments ... back out */
file_rec->write_traces[file_rec->write_count].length = length;
file_rec->write_traces[file_rec->write_count].start_time = start_time;
file_rec->write_traces[file_rec->write_count].end_time = end_time;
file_rec->write_count += 1;
}
void dxt_mpiio_read(MPI_File fh, int64_t length,
void dxt_mpiio_read(darshan_record_id rec_id, int64_t length,
double start_time, double end_time)
{
struct dxt_file_record_ref* rec_ref = NULL;
struct dxt_file_record *file_rec;
rec_ref = darshan_lookup_record_ref(dxt_mpiio_runtime->fh_hash,
&fh, sizeof(MPI_File));
if (!rec_ref) {
fprintf(stderr, "Error: dxt_mpiio_read unable to find rec_ref.\n");
return;
/* make sure dxt mpiio runtime is initialized properly */
if(instrumentation_disabled) return;
if(!dxt_mpiio_runtime)
{
dxt_mpiio_runtime_initialize();
if(!dxt_mpiio_runtime) return;
}
file_rec = rec_ref->file_rec;
if (dxt_mpiio_runtime) {
check_rd_trace_buf(rec_ref);
rec_ref = darshan_lookup_record_ref(dxt_mpiio_runtime->rec_id_hash,
&rec_id, sizeof(darshan_record_id));
if(!rec_ref)
{
/* track new dxt file record */
rec_ref = dxt_mpiio_track_new_file_record(rec_id);
if(!rec_ref) return;
}
file_rec = rec_ref->file_rec;
check_rd_trace_buf(rec_ref);
if(file_rec->read_count == rec_ref->read_available_buf)
return; /* no more memory for i/o segments ... back out */
file_rec->read_traces[file_rec->read_count].length = length;
file_rec->read_traces[file_rec->read_count].start_time = start_time;
file_rec->read_traces[file_rec->read_count].end_time = end_time;
......@@ -448,78 +469,60 @@ static struct dxt_file_record_ref *dxt_posix_track_new_file_record(
return(rec_ref);
}
void dxt_mpiio_track_new_file_record(
darshan_record_id rec_id, const char *path)
static struct dxt_file_record_ref *dxt_mpiio_track_new_file_record(
darshan_record_id rec_id)
{
struct dxt_file_record *file_rec = NULL;
struct dxt_file_record_ref *rec_ref = NULL;
struct darshan_fs_info fs_info;
int ret;
/* check if we have enough room for a new DXT record */
DXT_LOCK();
if(dxt_mem_remaining < sizeof(struct dxt_file_record))
{
DXT_UNLOCK();
return(NULL);
}
rec_ref = malloc(sizeof(*rec_ref));
if(!rec_ref)
return;
{
DXT_UNLOCK();
return(NULL);
}
memset(rec_ref, 0, sizeof(*rec_ref));
file_rec = malloc(sizeof(*file_rec));
if(!file_rec)
{
free(rec_ref);
DXT_UNLOCK();
return(NULL);
}
memset(file_rec, 0, sizeof(*file_rec));
/* add a reference to this file record based on record id */
ret = darshan_add_record_ref(&(dxt_mpiio_runtime->rec_id_hash), &rec_id,
sizeof(darshan_record_id), rec_ref);
if(ret == 0)
{
free(file_rec);
free(rec_ref);
DXT_UNLOCK();
return;
}
/* register the actual file record with darshan-core so it is persisted
* in the log file
*/
file_rec = darshan_core_register_record(
rec_id,
path,
DXT_MPIIO_MOD,
sizeof(struct dxt_file_record),
&fs_info);
if(!file_rec)
{
darshan_delete_record_ref(&(dxt_mpiio_runtime->rec_id_hash),
&rec_id, sizeof(darshan_record_id));
free(rec_ref);
return;
}
dxt_mem_remaining -= sizeof(struct dxt_file_record);
DXT_UNLOCK();
/* registering this file record was successful, so initialize
* some fields
*/
/* initialize record and record reference fields */
file_rec->base_rec.id = rec_id;
file_rec->base_rec.rank = mpiio_my_rank;
file_rec->write_count = 0;
file_rec->write_traces = malloc(IO_TRACE_BUF_SIZE *
sizeof(segment_info));
file_rec->read_count = 0;
file_rec->read_traces = malloc(IO_TRACE_BUF_SIZE *
sizeof(segment_info));
rec_ref->file_rec = file_rec;
rec_ref->write_available_buf = IO_TRACE_BUF_SIZE;
rec_ref->read_available_buf = IO_TRACE_BUF_SIZE;
rec_ref->fs_type = fs_info.fs_type;
dxt_mpiio_runtime->file_rec_count++;
}
void dxt_mpiio_add_record_ref(darshan_record_id rec_id, MPI_File fh)
{
struct dxt_file_record_ref *rec_ref = NULL;
rec_ref = darshan_lookup_record_ref(dxt_mpiio_runtime->rec_id_hash, &rec_id,
sizeof(darshan_record_id));
assert(rec_ref);
darshan_add_record_ref(&(dxt_mpiio_runtime->fh_hash), &fh,
sizeof(MPI_File), rec_ref);
return(rec_ref);
}
void dxt_clear_record_refs(void **hash_head_p, int free_flag)
......@@ -567,7 +570,6 @@ static void dxt_posix_cleanup_runtime()
static void dxt_mpiio_cleanup_runtime()
{
dxt_clear_record_refs(&(dxt_mpiio_runtime->fh_hash), 0);
dxt_clear_record_refs(&(dxt_mpiio_runtime->rec_id_hash), 1);
free(dxt_mpiio_runtime);
......
......@@ -87,21 +87,17 @@ static void mpiio_shutdown(
MPI_Comm mod_comm, darshan_record_id *shared_recs,
int shared_rec_count, void **mpiio_buf, int *mpiio_buf_sz);
/* DXT */
extern void dxt_mpiio_runtime_initialize();
extern void dxt_mpiio_track_new_file_record(
darshan_record_id rec_id, const char *path);
extern void dxt_mpiio_add_record_ref(
darshan_record_id rec_id, MPI_File fh);
extern void dxt_mpiio_write(MPI_File fh, int64_t length,
/* extern DXT function defs */
extern void dxt_mpiio_write(darshan_record_id rec_id, int64_t length,
double start_time, double end_time);
extern void dxt_mpiio_read(MPI_File fh, int64_t length,
extern void dxt_mpiio_read(darshan_record_id rec_id, int64_t length,
double start_time, double end_time);
static struct mpiio_runtime *mpiio_runtime = NULL;
static pthread_mutex_t mpiio_runtime_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
static int instrumentation_disabled = 0;
static int my_rank = -1;
static int enable_dxt_io_trace = 0;
#define MPIIO_LOCK() pthread_mutex_lock(&mpiio_runtime_mutex)
#define MPIIO_UNLOCK() pthread_mutex_unlock(&mpiio_runtime_mutex)
......@@ -111,10 +107,6 @@ static int my_rank = -1;
if(!instrumentation_disabled) { \
if(!mpiio_runtime) { \
mpiio_runtime_initialize(); \
/* DXT */ \
if (getenv("ENABLE_DXT_IO_TRACE")) { \
dxt_mpiio_runtime_initialize(); \
} \
} \
if(mpiio_runtime) break; \
} \
......@@ -140,13 +132,7 @@ static int my_rank = -1;
} \
rec_id = darshan_core_gen_record_id(newpath); \
rec_ref = darshan_lookup_record_ref(mpiio_runtime->rec_id_hash, &rec_id, sizeof(darshan_record_id)); \
if(!rec_ref) { \
rec_ref = mpiio_track_new_file_record(rec_id, newpath); \
/* DXT */ \
if (getenv("ENABLE_DXT_IO_TRACE")) { \
dxt_mpiio_track_new_file_record(rec_id, newpath); \
} \
} \
if(!rec_ref) rec_ref = mpiio_track_new_file_record(rec_id, newpath); \
if(!rec_ref) { \
if(newpath != __path) free(newpath); \
break; \
......@@ -166,10 +152,6 @@ static int my_rank = -1;
__tm1, __tm2, rec_ref->last_meta_end); \
darshan_add_record_ref(&(mpiio_runtime->fh_hash), &__fh, sizeof(MPI_File), rec_ref); \
if(newpath != __path) free(newpath); \
/* DXT */ \
if (getenv("ENABLE_DXT_IO_TRACE")) { \
dxt_mpiio_add_record_ref(rec_id, __fh); \
} \
} while(0)
#define MPIIO_RECORD_READ(__ret, __fh, __count, __datatype, __counter, __tm1, __tm2) do { \
......@@ -182,8 +164,8 @@ static int my_rank = -1;
DARSHAN_MPI_CALL(PMPI_Type_size)(__datatype, &size); \
size = size * __count; \
/* DXT to record detailed read tracing information */ \
if (getenv("ENABLE_DXT_IO_TRACE")) { \
dxt_mpiio_read(__fh, size, __tm1, __tm2); \
if(enable_dxt_io_trace) { \
dxt_mpiio_read(rec_ref->file_rec->base_rec.id, size, __tm1, __tm2); \
} \
DARSHAN_BUCKET_INC(&(rec_ref->file_rec->counters[MPIIO_SIZE_READ_AGG_0_100]), size); \
darshan_common_val_counter(&rec_ref->access_root, &rec_ref->access_count, size, \
......@@ -215,8 +197,8 @@ static int my_rank = -1;
DARSHAN_MPI_CALL(PMPI_Type_size)(__datatype, &size); \
size = size * __count; \
/* DXT to record detailed write tracing information */ \
if (getenv("ENABLE_DXT_IO_TRACE")) { \
dxt_mpiio_write(__fh, size, __tm1, __tm2); \
if(enable_dxt_io_trace) { \
dxt_mpiio_write(rec_ref->file_rec->base_rec.id, size, __tm1, __tm2); \
} \
DARSHAN_BUCKET_INC(&(rec_ref->file_rec->counters[MPIIO_SIZE_WRITE_AGG_0_100]), size); \
darshan_common_val_counter(&rec_ref->access_root, &rec_ref->access_count, size, \
......@@ -879,6 +861,11 @@ static void mpiio_runtime_initialize()
}
memset(mpiio_runtime, 0, sizeof(*mpiio_runtime));
/* check if DXT (Darshan extended tracing) should be enabled */
if (getenv("ENABLE_DXT_IO_TRACE")) {
enable_dxt_io_trace = 1;
}
return;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment