Commit 03e86bd8 authored by Shane Snyder's avatar Shane Snyder
Browse files

reorganize dxt & posix module interactions

parent 56a61bac
......@@ -34,11 +34,12 @@
typedef int64_t off64_t;
#endif
#ifdef DARSHAN_LUSTRE
#include <lustre/lustre_user.h>
#endif
/* maximum amount of memory to use for storing DXT records */
#define DXT_IO_TRACE_MEM_MAX (4 * 1024 * 1024) /* 4 MiB */
#define IO_TRACE_BUF_SIZE 1024
/* initial size of read/write trace buffer (in number of segments) */
/* NOTE: when this size is exceeded, the buffer size is doubled */
#define IO_TRACE_BUF_SIZE 64
struct dxt_record_ref_tracker
{
......@@ -74,7 +75,6 @@ struct dxt_file_record_ref
struct dxt_posix_runtime
{
void *rec_id_hash;
void *fd_hash;
int file_rec_count;
};
......@@ -85,20 +85,25 @@ struct dxt_mpiio_runtime
int file_rec_count;
};
void dxt_posix_runtime_initialize(
/* dxt read/write instrumentation wrappers for POSIX and MPI-IO */
void dxt_posix_write(darshan_record_id rec_id, int64_t offset,
int64_t length, double start_time, double end_time);
void dxt_posix_read(darshan_record_id rec_id, int64_t offset,
int64_t length, double start_time, double end_time);
static void check_wr_trace_buf(
struct dxt_file_record_ref *rec_ref);
static void check_rd_trace_buf(
struct dxt_file_record_ref *rec_ref);
static void dxt_posix_runtime_initialize(
void);
void dxt_mpiio_runtime_initialize(
void);
void dxt_posix_track_new_file_record(
darshan_record_id rec_id, const char *path);
static struct dxt_file_record_ref *dxt_posix_track_new_file_record(
darshan_record_id rec_id);
void dxt_mpiio_track_new_file_record(
darshan_record_id rec_id, const char *path);
void dxt_posix_add_record_ref(darshan_record_id rec_id, int fd);
void dxt_mpiio_add_record_ref(darshan_record_id rec_id, MPI_File fh);
#if 0
static void dxt_instrument_fs_data(
darshan_record_id rec_id, int fs_type, struct dxt_file_record *file_rec);
#endif
static void dxt_posix_cleanup_runtime(
void);
static void dxt_mpiio_cleanup_runtime(
......@@ -111,39 +116,26 @@ static void dxt_mpiio_shutdown(
MPI_Comm mod_comm, darshan_record_id *shared_recs,
int shared_rec_count, void **dxt_buf, int *dxt_buf_sz);
#ifdef DARSHAN_LUSTRE
/* XXX modules don't expose an API for other modules, so use extern to get
* Lustre instrumentation function
*/
extern void dxt_get_lustre_stripe_info(
darshan_record_id rec_id, struct dxt_file_record *file_rec);
#endif
static struct dxt_posix_runtime *dxt_posix_runtime = NULL;
static struct dxt_mpiio_runtime *dxt_mpiio_runtime = NULL;
static pthread_mutex_t dxt_posix_runtime_mutex =
PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
static pthread_mutex_t dxt_mpiio_runtime_mutex =
static pthread_mutex_t dxt_runtime_mutex =
PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
static int posix_my_rank = -1;
static int mpiio_my_rank = -1;
static int instrumentation_disabled = 0;
static int darshan_mem_alignment = 1;
static int dxt_mem_remaining = DXT_IO_TRACE_MEM_MAX;
#define DXT_POSIX_LOCK() pthread_mutex_lock(&dxt_posix_runtime_mutex)
#define DXT_POSIX_UNLOCK() pthread_mutex_unlock(&dxt_posix_runtime_mutex)
#define DXT_MPIIO_LOCK() pthread_mutex_lock(&dxt_mpiio_runtime_mutex)
#define DXT_MPIIO_UNLOCK() pthread_mutex_unlock(&dxt_mpiio_runtime_mutex)
#define DXT_LOCK() pthread_mutex_lock(&dxt_runtime_mutex)
#define DXT_UNLOCK() pthread_mutex_unlock(&dxt_runtime_mutex)
/**********************************************************
* Wrappers for DXT I/O functions of interest *
**********************************************************/
void check_io_trace_buf(struct dxt_file_record_ref *rec_ref)
static void check_wr_trace_buf(struct dxt_file_record_ref *rec_ref)
{
struct dxt_file_record *file_rec = rec_ref->file_rec;
......@@ -151,47 +143,92 @@ void check_io_trace_buf(struct dxt_file_record_ref *rec_ref)
int write_available_buf = rec_ref->write_available_buf;
if (write_count >= write_available_buf) {
write_available_buf += IO_TRACE_BUF_SIZE;
file_rec->write_traces =
(segment_info *)realloc(file_rec->write_traces,
write_available_buf * sizeof(segment_info));
rec_ref->write_available_buf = write_available_buf;
int write_count_inc;
if(write_available_buf == 0)
write_count_inc = IO_TRACE_BUF_SIZE;
else
write_count_inc = write_available_buf;
DXT_LOCK();
if((write_count_inc * sizeof(segment_info)) > dxt_mem_remaining)
write_count_inc = dxt_mem_remaining / sizeof(segment_info);
dxt_mem_remaining -= (write_count_inc * sizeof(segment_info));
DXT_UNLOCK();
if(write_count_inc > 0)
{
write_available_buf += write_count_inc;
file_rec->write_traces =
(segment_info *)realloc(file_rec->write_traces,
write_available_buf * sizeof(segment_info));
rec_ref->write_available_buf = write_available_buf;
}
}
}
static void check_rd_trace_buf(struct dxt_file_record_ref *rec_ref)
{
struct dxt_file_record *file_rec = rec_ref->file_rec;
int read_count = file_rec->read_count;
int read_available_buf = rec_ref->read_available_buf;
if (read_count >= read_available_buf) {
read_available_buf += IO_TRACE_BUF_SIZE;
file_rec->read_traces =
(segment_info *)realloc(file_rec->read_traces,
read_available_buf * sizeof(segment_info));
rec_ref->read_available_buf = read_available_buf;
int read_count_inc;
if(read_available_buf == 0)
read_count_inc = IO_TRACE_BUF_SIZE;
else
read_count_inc = read_available_buf;
DXT_LOCK();
if((read_count_inc * sizeof(segment_info)) > dxt_mem_remaining)
read_count_inc = dxt_mem_remaining / sizeof(segment_info);
dxt_mem_remaining -= (read_count_inc * sizeof(segment_info));
DXT_UNLOCK();
if(read_count_inc > 0)
{
read_available_buf += read_count_inc;
file_rec->read_traces =
(segment_info *)realloc(file_rec->read_traces,
read_available_buf * sizeof(segment_info));
rec_ref->read_available_buf = read_available_buf;
}
}
}
void dxt_posix_write(int fd, int64_t offset, int64_t length,
double start_time, double end_time)
void dxt_posix_write(darshan_record_id rec_id, int64_t offset,
int64_t length, double start_time, double end_time)
{
struct dxt_file_record_ref* rec_ref = NULL;
struct dxt_file_record *file_rec;
rec_ref = darshan_lookup_record_ref(dxt_posix_runtime->fd_hash,
&fd, sizeof(int));
if (!rec_ref) {
fprintf(stderr, "Error: dxt_posix_write unable to find rec_ref.\n");
return;
/* make sure dxt posix runtime is initialized properly */
if(instrumentation_disabled) return;
if(!dxt_posix_runtime)
{
dxt_posix_runtime_initialize();
if(!dxt_posix_runtime) return;
}
file_rec = rec_ref->file_rec;
if (dxt_posix_runtime) {
check_io_trace_buf(rec_ref);
rec_ref = darshan_lookup_record_ref(dxt_posix_runtime->rec_id_hash,
&rec_id, sizeof(darshan_record_id));
if(!rec_ref)
{
/* track new dxt file record */
rec_ref = dxt_posix_track_new_file_record(rec_id);
if(!rec_ref) return;
}
file_rec = rec_ref->file_rec;
check_wr_trace_buf(rec_ref);
if(file_rec->write_count == rec_ref->write_available_buf)
return; /* no more memory for i/o segments ... back out */
file_rec->write_traces[file_rec->write_count].offset = offset;
file_rec->write_traces[file_rec->write_count].length = length;
file_rec->write_traces[file_rec->write_count].start_time = start_time;
......@@ -199,23 +236,32 @@ void dxt_posix_write(int fd, int64_t offset, int64_t length,
file_rec->write_count += 1;
}
void dxt_posix_read(int fd, int64_t offset, int64_t length,
double start_time, double end_time)
void dxt_posix_read(darshan_record_id rec_id, int64_t offset,
int64_t length, double start_time, double end_time)
{
struct dxt_file_record_ref* rec_ref = NULL;
struct dxt_file_record *file_rec;
rec_ref = darshan_lookup_record_ref(dxt_posix_runtime->fd_hash,
&fd, sizeof(int));
/* make sure dxt posix runtime is initialized properly */
if(instrumentation_disabled) return;
if(!dxt_posix_runtime)
{
dxt_posix_runtime_initialize();
if(!dxt_posix_runtime) return;
}
rec_ref = darshan_lookup_record_ref(dxt_posix_runtime->rec_id_hash,
&rec_id, sizeof(darshan_record_id));
if (!rec_ref) {
fprintf(stderr, "Error: dxt_posix_read unable to find rec_ref.\n");
return;
/* track new dxt file record */
rec_ref = dxt_posix_track_new_file_record(rec_id);
if(!rec_ref) return;
}
file_rec = rec_ref->file_rec;
if (dxt_posix_runtime) {
check_io_trace_buf(rec_ref);
}
check_rd_trace_buf(rec_ref);
if(file_rec->read_count == rec_ref->read_available_buf)
return; /* no more memory for i/o segments ... back out */
file_rec->read_traces[file_rec->read_count].offset = offset;
file_rec->read_traces[file_rec->read_count].length = length;
......@@ -239,7 +285,7 @@ void dxt_mpiio_write(MPI_File fh, int64_t length,
file_rec = rec_ref->file_rec;
if (dxt_mpiio_runtime) {
check_io_trace_buf(rec_ref);
check_wr_trace_buf(rec_ref);
}
file_rec->write_traces[file_rec->write_count].length = length;
......@@ -263,7 +309,7 @@ void dxt_mpiio_read(MPI_File fh, int64_t length,
file_rec = rec_ref->file_rec;
if (dxt_mpiio_runtime) {
check_io_trace_buf(rec_ref);
check_rd_trace_buf(rec_ref);
}
file_rec->read_traces[file_rec->read_count].length = length;
......@@ -278,7 +324,7 @@ void dxt_mpiio_read(MPI_File fh, int64_t length,
**********************************************************/
/* initialize internal DXT module data structures and register with darshan-core */
void dxt_posix_runtime_initialize()
static void dxt_posix_runtime_initialize()
{
/* DXT modules request 0 memory -- buffers will be managed internally by DXT
* and passed back to darshan-core at shutdown time to allow DXT more control
......@@ -346,66 +392,60 @@ void dxt_mpiio_runtime_initialize()
return;
}
void dxt_posix_track_new_file_record(
darshan_record_id rec_id, const char *path)
static struct dxt_file_record_ref *dxt_posix_track_new_file_record(
darshan_record_id rec_id)
{
struct dxt_file_record_ref *rec_ref = NULL;
struct dxt_file_record *file_rec = NULL;
struct darshan_fs_info fs_info;
int ret;
/* check if we have enough room for a new DXT record */
DXT_LOCK();
if(dxt_mem_remaining < sizeof(struct dxt_file_record))
{
DXT_UNLOCK();
return(NULL);
}
rec_ref = malloc(sizeof(*rec_ref));
if(!rec_ref)
return;
{
DXT_UNLOCK();
return(NULL);
}
memset(rec_ref, 0, sizeof(*rec_ref));
file_rec = malloc(sizeof(*file_rec));
if(!file_rec)
{
free(rec_ref);
DXT_UNLOCK();
return(NULL);
}
memset(file_rec, 0, sizeof(*file_rec));
/* add a reference to this file record based on record id */
ret = darshan_add_record_ref(&(dxt_posix_runtime->rec_id_hash), &rec_id,
sizeof(darshan_record_id), rec_ref);
if(ret == 0)
{
free(file_rec);
free(rec_ref);
return;
DXT_UNLOCK();
return(NULL);
}
/* register the actual file record with darshan-core so it is persisted
* in the log file
*/
file_rec = darshan_core_register_record(
rec_id,
path,
DXT_POSIX_MOD,
sizeof(struct dxt_file_record),
&fs_info);
if(!file_rec)
{
darshan_delete_record_ref(&(dxt_posix_runtime->rec_id_hash),
&rec_id, sizeof(darshan_record_id));
free(rec_ref);
return;
}
dxt_mem_remaining -= sizeof(struct dxt_file_record);
DXT_UNLOCK();
/*
* Registering this file record was successful, so initialize
* some fields */
/* initialize record and record reference fields */
file_rec->base_rec.id = rec_id;
file_rec->base_rec.rank = posix_my_rank;
file_rec->write_count = 0;
file_rec->write_traces = malloc(IO_TRACE_BUF_SIZE *
sizeof(segment_info));
file_rec->read_count = 0;
file_rec->read_traces = malloc(IO_TRACE_BUF_SIZE *
sizeof(segment_info));
rec_ref->file_rec = file_rec;
rec_ref->write_available_buf = IO_TRACE_BUF_SIZE;
rec_ref->read_available_buf = IO_TRACE_BUF_SIZE;
rec_ref->fs_type = fs_info.fs_type;
dxt_posix_runtime->file_rec_count++;
return(rec_ref);
}
void dxt_mpiio_track_new_file_record(
......@@ -470,26 +510,6 @@ void dxt_mpiio_track_new_file_record(
dxt_mpiio_runtime->file_rec_count++;
}
void dxt_posix_add_record_ref(darshan_record_id rec_id, int fd)
{
struct dxt_file_record_ref *rec_ref = NULL;
struct dxt_file_record *file_rec;
int i;
rec_ref = darshan_lookup_record_ref(dxt_posix_runtime->rec_id_hash, &rec_id,
sizeof(darshan_record_id));
assert(rec_ref);
darshan_add_record_ref(&(dxt_posix_runtime->fd_hash), &fd,
sizeof(int), rec_ref);
#if 0
/* get Lustre stripe information */
file_rec = rec_ref->file_rec;
dxt_instrument_fs_data(rec_id, rec_ref->fs_type, file_rec);
#endif
}
void dxt_mpiio_add_record_ref(darshan_record_id rec_id, MPI_File fh)
{
struct dxt_file_record_ref *rec_ref = NULL;
......@@ -502,22 +522,6 @@ void dxt_mpiio_add_record_ref(darshan_record_id rec_id, MPI_File fh)
sizeof(MPI_File), rec_ref);
}
#if 0
static void dxt_instrument_fs_data(
darshan_record_id rec_id, int fs_type, struct dxt_file_record *file_rec)
{
#ifdef DARSHAN_LUSTRE
/* allow lustre to generate a record if we configured with lustre support */
if(fs_type == LL_SUPER_MAGIC)
{
dxt_get_lustre_stripe_info(rec_id, file_rec);
return;
}
#endif
return;
}
#endif
void dxt_clear_record_refs(void **hash_head_p, int free_flag)
{
struct dxt_record_ref_tracker *ref_tracker, *tmp;
......@@ -553,7 +557,6 @@ void dxt_clear_record_refs(void **hash_head_p, int free_flag)
static void dxt_posix_cleanup_runtime()
{
dxt_clear_record_refs(&(dxt_posix_runtime->fd_hash), 0);
dxt_clear_record_refs(&(dxt_posix_runtime->rec_id_hash), 1);
free(dxt_posix_runtime);
......@@ -602,7 +605,6 @@ static void dxt_posix_shutdown(
assert(dxt_posix_runtime);
DXT_POSIX_LOCK();
int dxt_rec_count = dxt_posix_runtime->file_rec_count;
struct dxt_record_ref_tracker *ref_tracker, *tmp;
struct dxt_record_ref_tracker *ref_tracker_head =
......@@ -688,8 +690,6 @@ static void dxt_posix_shutdown(
/* disable further instrumentation */
instrumentation_disabled = 1;
DXT_POSIX_UNLOCK();
return;
}
......@@ -717,7 +717,6 @@ static void dxt_mpiio_shutdown(
assert(dxt_mpiio_runtime);
DXT_MPIIO_LOCK();
int dxt_rec_count = dxt_mpiio_runtime->file_rec_count;
struct dxt_record_ref_tracker *ref_tracker, *tmp;
struct dxt_record_ref_tracker *ref_tracker_head =
......@@ -802,8 +801,6 @@ static void dxt_mpiio_shutdown(
/* disable further instrumentation */
instrumentation_disabled = 1;
DXT_MPIIO_UNLOCK();
return;
}
......
......@@ -157,22 +157,18 @@ static void posix_shutdown(
MPI_Comm mod_comm, darshan_record_id *shared_recs,
int shared_rec_count, void **posix_buf, int *posix_buf_sz);
/* DXT */
extern void dxt_posix_runtime_initialize();
extern void dxt_posix_track_new_file_record(
darshan_record_id rec_id, const char *path);
extern void dxt_posix_add_record_ref(
darshan_record_id rec_id, int fd);
extern void dxt_posix_write(int fd, int64_t offset, int64_t length,
double start_time, double end_time);
extern void dxt_posix_read(int fd, int64_t offset, int64_t length,
double start_time, double end_time);
/* extern DXT function defs */
extern void dxt_posix_write(darshan_record_id rec_id, int64_t offset,
int64_t length, double start_time, double end_time);
extern void dxt_posix_read(darshan_record_id rec_id, int64_t offset,
int64_t length, double start_time, double end_time);
static struct posix_runtime *posix_runtime = NULL;
static pthread_mutex_t posix_runtime_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
static int instrumentation_disabled = 0;
static int my_rank = -1;
static int darshan_mem_alignment = 1;
static int enable_dxt_io_trace = 0;
#define POSIX_LOCK() pthread_mutex_lock(&posix_runtime_mutex)
#define POSIX_UNLOCK() pthread_mutex_unlock(&posix_runtime_mutex)
......@@ -182,10 +178,6 @@ static int darshan_mem_alignment = 1;
if(!instrumentation_disabled) { \
if(!posix_runtime) { \
posix_runtime_initialize(); \
/* DXT */ \
if (getenv("ENABLE_DXT_IO_TRACE")) { \
dxt_posix_runtime_initialize(); \
} \
} \
if(posix_runtime) break; \
} \
......@@ -210,13 +202,7 @@ static int darshan_mem_alignment = 1;
} \
rec_id = darshan_core_gen_record_id(newpath); \
rec_ref = darshan_lookup_record_ref(posix_runtime->rec_id_hash, &rec_id, sizeof(darshan_record_id)); \
if(!rec_ref) { \
rec_ref = posix_track_new_file_record(rec_id, newpath); \
/* DXT */ \
if (getenv("ENABLE_DXT_IO_TRACE")) { \
dxt_posix_track_new_file_record(rec_id, newpath); \
} \
} \
if(!rec_ref) rec_ref = posix_track_new_file_record(rec_id, newpath); \
if(!rec_ref) { \
if(newpath != __path) free(newpath); \
break; \
......@@ -236,10 +222,6 @@ static int darshan_mem_alignment = 1;
darshan_add_record_ref(&(posix_runtime->fd_hash), &__ret, sizeof(int), rec_ref); \
darshan_instrument_fs_data(rec_ref->fs_type, newpath, __ret); \
if(newpath != __path) free(newpath); \
/* DXT */ \
if (getenv("ENABLE_DXT_IO_TRACE")) { \
dxt_posix_add_record_ref(rec_id, __ret); \
} \
} while(0)
#define POSIX_RECORD_READ(__ret, __fd, __pread_flag, __pread_offset, __aligned, __tm1, __tm2) do { \
......@@ -256,8 +238,8 @@ static int darshan_mem_alignment = 1;
else \
this_offset = rec_ref->offset; \
/* DXT to record detailed read tracing information */ \
if (getenv("ENABLE_DXT_IO_TRACE")) { \
dxt_posix_read(__fd, this_offset, __ret, __tm1, __tm2); \
if(enable_dxt_io_trace) { \
dxt_posix_read(rec_ref->file_rec->base_rec.id, this_offset, __ret, __tm1, __tm2); \
} \
if(this_offset > rec_ref->last_byte_read) \
rec_ref->file_rec->counters[POSIX_SEQ_READS] += 1; \
......@@ -314,8 +296,8 @@ static int darshan_mem_alignment = 1;
else \
this_offset = rec_ref->offset; \
/* DXT to record detailed write tracing information */ \
if (getenv("ENABLE_DXT_IO_TRACE")) { \
dxt_posix_write(__fd, this_offset, __ret, __tm1, __tm2); \
if(enable_dxt_io_trace) { \
dxt_posix_write(rec_ref->file_rec->base_rec.id, this_offset, __ret, __tm1, __tm2); \
} \
if(this_offset > rec_ref->last_byte_written) \
rec_ref->file_rec->counters[POSIX_SEQ_WRITES] += 1; \
......@@ -1292,6 +1274,11 @@ static void posix_runtime_initialize()
}
memset(posix_runtime, 0, sizeof(*posix_runtime));
/* check if DXT (Darshan extended tracing) should be enabled */
if (getenv("ENABLE_DXT_IO_TRACE")) {
enable_dxt_io_trace = 1;
}
return;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment