Commit 7e0952c1 authored by Shane Snyder's avatar Shane Snyder
Browse files

cleanup dxt shutdown routines

parent 3d2de3ab
......@@ -41,12 +41,6 @@ typedef int64_t off64_t;
/* NOTE: when this size is exceeded, the buffer size is doubled */
#define IO_TRACE_BUF_SIZE 64
struct dxt_record_ref_tracker
{
void *rec_ref_p;
UT_hash_handle hlink;
};
/* The dxt_file_record_ref structure maintains necessary runtime metadata
* for the DXT file record (dxt_file_record structure, defined in
* darshan-dxt-log-format.h) pointed to by 'file_rec'. This metadata
......@@ -75,12 +69,16 @@ struct dxt_posix_runtime
{
void *rec_id_hash;
int file_rec_count;
char *record_buf;
int record_buf_size;
};
struct dxt_mpiio_runtime
{
void *rec_id_hash;
int file_rec_count;
char *record_buf;
int record_buf_size;
};
/* dxt read/write instrumentation wrappers for POSIX and MPI-IO */
......@@ -509,7 +507,7 @@ static struct dxt_file_record_ref *dxt_mpiio_track_new_file_record(
free(file_rec);
free(rec_ref);
DXT_UNLOCK();
return;
return(NULL);
}
dxt_mem_remaining -= sizeof(struct dxt_file_record);
......@@ -525,18 +523,19 @@ static struct dxt_file_record_ref *dxt_mpiio_track_new_file_record(
return(rec_ref);
}
static void dxt_free_trace_buffers(void *rec_ref_p)
static void dxt_free_record_data(void *rec_ref_p)
{
struct dxt_file_record_ref *dxt_rec_ref = (struct dxt_file_record_ref *)rec_ref_p;
/* TODO: update these pointer addresses once {write/read}_traces are moved to rec_ref structure */
free(dxt_rec_ref->file_rec->write_traces);
free(dxt_rec_ref->file_rec->read_traces);
free(dxt_rec_ref->file_rec);
}
static void dxt_posix_cleanup_runtime()
{
darshan_iter_record_refs(&(dxt_posix_runtime->rec_id_hash), dxt_free_trace_buffers);
darshan_iter_record_refs(dxt_posix_runtime->rec_id_hash, dxt_free_record_data);
darshan_clear_record_refs(&(dxt_posix_runtime->rec_id_hash), 1);
free(dxt_posix_runtime);
......@@ -547,7 +546,7 @@ static void dxt_posix_cleanup_runtime()
static void dxt_mpiio_cleanup_runtime()
{
darshan_iter_record_refs(&(dxt_mpiio_runtime->rec_id_hash), dxt_free_trace_buffers);
darshan_iter_record_refs(dxt_mpiio_runtime->rec_id_hash, dxt_free_record_data);
darshan_clear_record_refs(&(dxt_mpiio_runtime->rec_id_hash), 1);
free(dxt_mpiio_runtime);
......@@ -561,108 +560,109 @@ static void dxt_mpiio_cleanup_runtime()
* shutdown function exported by this module for coordinating with darshan-core *
********************************************************************************/
static void dxt_posix_shutdown(
MPI_Comm mod_comm,
darshan_record_id *shared_recs,
int shared_rec_count,
void **dxt_posix_buf,
int *dxt_posix_buf_sz)
static void dxt_serialize_posix_records(void *rec_ref_p)
{
struct dxt_file_record_ref *rec_ref;
struct dxt_file_record_ref *rec_ref = (struct dxt_file_record_ref *)rec_ref_p;
struct dxt_file_record *file_rec;
int i, idx;
int64_t offset;
int64_t length;
int64_t rank;
double start_time;
double end_time;
int64_t record_size = 0;
int64_t record_write_count = 0;
int64_t record_read_count = 0;
void *tmp_buf_ptr = *dxt_posix_buf;
void *tmp_buf_ptr;
assert(dxt_posix_runtime);
assert(rec_ref);
file_rec = rec_ref->file_rec;
assert(file_rec);
int dxt_rec_count = dxt_posix_runtime->file_rec_count;
struct dxt_record_ref_tracker *ref_tracker, *tmp;
struct dxt_record_ref_tracker *ref_tracker_head =
(struct dxt_record_ref_tracker *)(dxt_posix_runtime->rec_id_hash);
record_write_count = file_rec->write_count;
record_read_count = file_rec->read_count;
if (record_write_count == 0 && record_read_count == 0)
return;
*dxt_posix_buf_sz = 0;
/*
* Buffer format:
* dxt_file_record + ost_ids + write_traces + read_traces
*/
record_size = sizeof(struct dxt_file_record) +
(record_write_count + record_read_count) * sizeof(segment_info);
HASH_ITER(hlink, ref_tracker_head, ref_tracker, tmp) {
rec_ref = (struct dxt_file_record_ref *)ref_tracker->rec_ref_p;
assert(rec_ref);
tmp_buf_ptr = (void *)(dxt_posix_runtime->record_buf +
dxt_posix_runtime->record_buf_size);
file_rec = rec_ref->file_rec;
/*Copy struct dxt_file_record */
memcpy(tmp_buf_ptr, (void *)file_rec, sizeof(struct dxt_file_record));
tmp_buf_ptr = (void *)(tmp_buf_ptr + sizeof(struct dxt_file_record));
record_write_count = file_rec->write_count;
record_read_count = file_rec->read_count;
/*Copy write record */
memcpy(tmp_buf_ptr, (void *)(file_rec->write_traces),
record_write_count * sizeof(segment_info));
tmp_buf_ptr = (void *)(tmp_buf_ptr +
record_write_count * sizeof(segment_info));
if (record_write_count == 0 && record_read_count == 0)
continue;
/*Copy read record */
memcpy(tmp_buf_ptr, (void *)(file_rec->read_traces),
record_read_count * sizeof(segment_info));
tmp_buf_ptr = (void *)(tmp_buf_ptr +
record_read_count * sizeof(segment_info));
/*
* Buffer format:
* dxt_file_record + ost_ids + write_traces + read_traces
*/
record_size = sizeof(struct dxt_file_record) +
(record_write_count + record_read_count) * sizeof(segment_info);
dxt_posix_runtime->record_buf_size += record_size;
if (*dxt_posix_buf_sz == 0) {
*dxt_posix_buf = (void *)malloc(record_size);
} else {
*dxt_posix_buf = (void *)realloc((*dxt_posix_buf),
*dxt_posix_buf_sz + record_size);
}
tmp_buf_ptr = (void *)(*dxt_posix_buf) + *dxt_posix_buf_sz;
#if 0
int i;
int64_t offset;
int64_t length;
int64_t rank;
double start_time;
double end_time;
/*Copy struct dxt_file_record */
memcpy(tmp_buf_ptr, (void *)file_rec, sizeof(struct dxt_file_record));
tmp_buf_ptr = ((void *)tmp_buf_ptr) + sizeof(struct dxt_file_record);
printf("DXT, record_id: %" PRIu64 "\n", rec_ref->file_rec->base_rec.id);
printf("DXT, write_count is: %d read_count is: %d\n",
file_rec->write_count, file_rec->read_count);
/*Copy write record */
memcpy(tmp_buf_ptr, (void *)(file_rec->write_traces),
record_write_count * sizeof(segment_info));
tmp_buf_ptr = ((void *)tmp_buf_ptr) +
record_write_count * sizeof(segment_info);
for (i = 0; i < file_rec->write_count; i++) {
rank = file_rec->base_rec.rank;
offset = file_rec->write_traces[i].offset;
length = file_rec->write_traces[i].length;
start_time = file_rec->write_traces[i].start_time;
end_time = file_rec->write_traces[i].end_time;
/*Copy read record */
memcpy(tmp_buf_ptr, (void *)(file_rec->read_traces),
record_read_count * sizeof(segment_info));
tmp_buf_ptr = ((char *)tmp_buf_ptr) +
record_read_count * sizeof(segment_info);
printf("DXT, rank %d writes segment %lld [offset: %lld length: %lld start_time: %fs end_time: %fs]\n", rank, i, offset, length, start_time, end_time);
}
*dxt_posix_buf_sz += record_size;
for (i = 0; i < file_rec->read_count; i++) {
rank = file_rec->base_rec.rank;
offset = file_rec->read_traces[i].offset;
length = file_rec->read_traces[i].length;
start_time = file_rec->read_traces[i].start_time;
end_time = file_rec->read_traces[i].end_time;
#if 0
printf("DXT, record_id: %" PRIu64 "\n", rec_ref->file_rec->base_rec.id);
printf("DXT, write_count is: %d read_count is: %d\n",
file_rec->write_count, file_rec->read_count);
for (i = 0; i < file_rec->write_count; i++) {
rank = file_rec->base_rec.rank;
offset = file_rec->write_traces[i].offset;
length = file_rec->write_traces[i].length;
start_time = file_rec->write_traces[i].start_time;
end_time = file_rec->write_traces[i].end_time;
printf("DXT, rank %d writes segment %lld [offset: %lld length: %lld start_time: %fs end_time: %fs]\n", rank, i, offset, length, start_time, end_time);
}
printf("DXT, rank %d reads segment %lld [offset: %lld length: %lld start_time: %fs end_time: %fs]\n", rank, i, offset, length, start_time, end_time);
}
#endif
}
for (i = 0; i < file_rec->read_count; i++) {
rank = file_rec->base_rec.rank;
offset = file_rec->read_traces[i].offset;
length = file_rec->read_traces[i].length;
start_time = file_rec->read_traces[i].start_time;
end_time = file_rec->read_traces[i].end_time;
static void dxt_posix_shutdown(
MPI_Comm mod_comm,
darshan_record_id *shared_recs,
int shared_rec_count,
void **dxt_posix_buf,
int *dxt_posix_buf_sz)
{
assert(dxt_posix_runtime);
printf("DXT, rank %d reads segment %lld [offset: %lld length: %lld start_time: %fs end_time: %fs]\n", rank, i, offset, length, start_time, end_time);
}
#endif
}
*dxt_posix_buf_sz = 0;
dxt_posix_runtime->record_buf = malloc(DXT_IO_TRACE_MEM_MAX);
if(!(dxt_posix_runtime->record_buf))
return;
memset(dxt_posix_runtime->record_buf, 0, DXT_IO_TRACE_MEM_MAX);
dxt_posix_runtime->record_buf_size = 0;
/* iterate all dxt posix records and serialize them to the output buffer */
darshan_iter_record_refs(dxt_posix_runtime->rec_id_hash, dxt_serialize_posix_records);
/* set output */
*dxt_posix_buf = dxt_posix_runtime->record_buf;
*dxt_posix_buf_sz = dxt_posix_runtime->record_buf_size;
/* shutdown internal structures used for instrumenting */
dxt_posix_cleanup_runtime();
......@@ -673,108 +673,110 @@ static void dxt_posix_shutdown(
return;
}
static void dxt_mpiio_shutdown(
MPI_Comm mod_comm,
darshan_record_id *shared_recs,
int shared_rec_count,
void **dxt_mpiio_buf,
int *dxt_mpiio_buf_sz)
static void dxt_serialize_mpiio_records(void *rec_ref_p)
{
struct dxt_file_record_ref *rec_ref;
struct dxt_file_record_ref *rec_ref = (struct dxt_file_record_ref *)rec_ref_p;
struct dxt_file_record *file_rec;
int i, idx;
int64_t offset;
int64_t length;
int64_t rank;
double start_time;
double end_time;
int64_t record_size = 0;
int64_t record_write_count = 0;
int64_t record_read_count = 0;
void *tmp_buf_ptr = *dxt_mpiio_buf;
void *tmp_buf_ptr;
assert(dxt_mpiio_runtime);
assert(rec_ref);
file_rec = rec_ref->file_rec;
assert(file_rec);
int dxt_rec_count = dxt_mpiio_runtime->file_rec_count;
struct dxt_record_ref_tracker *ref_tracker, *tmp;
struct dxt_record_ref_tracker *ref_tracker_head =
(struct dxt_record_ref_tracker *)(dxt_mpiio_runtime->rec_id_hash);
record_write_count = file_rec->write_count;
record_read_count = file_rec->read_count;
if (record_write_count == 0 && record_read_count == 0)
return;
*dxt_mpiio_buf_sz = 0;
/*
* Buffer format:
* dxt_file_record + ost_ids + write_traces + read_traces
*/
record_size = sizeof(struct dxt_file_record) +
(record_write_count + record_read_count) * sizeof(segment_info);
HASH_ITER(hlink, ref_tracker_head, ref_tracker, tmp) {
rec_ref = (struct dxt_file_record_ref *)ref_tracker->rec_ref_p;
assert(rec_ref);
file_rec = rec_ref->file_rec;
record_write_count = file_rec->write_count;
record_read_count = file_rec->read_count;
if (record_write_count == 0 && record_read_count == 0)
continue;
/*
* Buffer format:
* dxt_file_record + ost_ids + write_traces + read_traces
*/
record_size = sizeof(struct dxt_file_record) +
(record_write_count + record_read_count) * sizeof(segment_info);
if (*dxt_mpiio_buf_sz == 0) {
*dxt_mpiio_buf = (void *)malloc(record_size);
} else {
*dxt_mpiio_buf = (void *)realloc((*dxt_mpiio_buf),
*dxt_mpiio_buf_sz + record_size);
}
tmp_buf_ptr = (void *)(*dxt_mpiio_buf) + *dxt_mpiio_buf_sz;
tmp_buf_ptr = (void *)(dxt_mpiio_runtime->record_buf +
dxt_mpiio_runtime->record_buf_size);
/*Copy struct dxt_file_record */
memcpy(tmp_buf_ptr, (void *)file_rec, sizeof(struct dxt_file_record));
tmp_buf_ptr = ((void *)tmp_buf_ptr) + sizeof(struct dxt_file_record);
/*Copy struct dxt_file_record */
memcpy(tmp_buf_ptr, (void *)file_rec, sizeof(struct dxt_file_record));
tmp_buf_ptr = (void *)(tmp_buf_ptr + sizeof(struct dxt_file_record));
/*Copy write record */
memcpy(tmp_buf_ptr, (void *)(file_rec->write_traces),
/*Copy write record */
memcpy(tmp_buf_ptr, (void *)(file_rec->write_traces),
record_write_count * sizeof(segment_info));
tmp_buf_ptr = (void *)(tmp_buf_ptr +
record_write_count * sizeof(segment_info));
tmp_buf_ptr = ((void *)tmp_buf_ptr) +
record_write_count * sizeof(segment_info);
/*Copy read record */
memcpy(tmp_buf_ptr, (void *)(file_rec->read_traces),
/*Copy read record */
memcpy(tmp_buf_ptr, (void *)(file_rec->read_traces),
record_read_count * sizeof(segment_info));
tmp_buf_ptr = (void *)(tmp_buf_ptr +
record_read_count * sizeof(segment_info));
tmp_buf_ptr = ((char *)tmp_buf_ptr) +
record_read_count * sizeof(segment_info);
*dxt_mpiio_buf_sz += record_size;
dxt_mpiio_runtime->record_buf_size += record_size;
#if 0
printf("Cong, record_id: %" PRIu64 "\n", rec_ref->file_rec->base_rec.id);
printf("DXT, file_rec->write_count is: %d\n",
file_rec->write_count);
for (i = 0; i < file_rec->write_count; i++) {
rank = file_rec->base_rec.rank;
offset = file_rec->write_traces[i].offset;
length = file_rec->write_traces[i].length;
start_time = file_rec->write_traces[i].start_time;
end_time = file_rec->write_traces[i].end_time;
printf("DXT, rank %d writes segment %lld [offset: %lld length: %lld start_time: %fs end_time: %fs]\n", rank, i, offset, length, start_time, end_time);
}
int i;
int64_t offset;
int64_t length;
int64_t rank;
double start_time;
double end_time;
for (i = 0; i < file_rec->read_count; i++) {
rank = file_rec->base_rec.rank;
offset = file_rec->read_traces[i].offset;
length = file_rec->read_traces[i].length;
start_time = file_rec->read_traces[i].start_time;
end_time = file_rec->read_traces[i].end_time;
printf("Cong, record_id: %" PRIu64 "\n", rec_ref->file_rec->base_rec.id);
printf("DXT, file_rec->write_count is: %d\n",
file_rec->write_count);
printf("DXT, rank %d reads segment %lld [offset: %lld length: %lld start_time: %fs end_time: %fs]\n", rank, i, offset, length, start_time, end_time);
}
#endif
for (i = 0; i < file_rec->write_count; i++) {
rank = file_rec->base_rec.rank;
offset = file_rec->write_traces[i].offset;
length = file_rec->write_traces[i].length;
start_time = file_rec->write_traces[i].start_time;
end_time = file_rec->write_traces[i].end_time;
printf("DXT, rank %d writes segment %lld [offset: %lld length: %lld start_time: %fs end_time: %fs]\n", rank, i, offset, length, start_time, end_time);
}
for (i = 0; i < file_rec->read_count; i++) {
rank = file_rec->base_rec.rank;
offset = file_rec->read_traces[i].offset;
length = file_rec->read_traces[i].length;
start_time = file_rec->read_traces[i].start_time;
end_time = file_rec->read_traces[i].end_time;
printf("DXT, rank %d reads segment %lld [offset: %lld length: %lld start_time: %fs end_time: %fs]\n", rank, i, offset, length, start_time, end_time);
}
#endif
}
static void dxt_mpiio_shutdown(
MPI_Comm mod_comm,
darshan_record_id *shared_recs,
int shared_rec_count,
void **dxt_mpiio_buf,
int *dxt_mpiio_buf_sz)
{
assert(dxt_mpiio_runtime);
*dxt_mpiio_buf_sz = 0;
dxt_mpiio_runtime->record_buf = malloc(DXT_IO_TRACE_MEM_MAX);
if(!(dxt_mpiio_runtime->record_buf))
return;
memset(dxt_mpiio_runtime->record_buf, 0, DXT_IO_TRACE_MEM_MAX);
dxt_mpiio_runtime->record_buf_size = 0;
/* iterate all dxt posix records and serialize them to the output buffer */
darshan_iter_record_refs(dxt_mpiio_runtime->rec_id_hash, dxt_serialize_mpiio_records);
/* set output */
*dxt_mpiio_buf = dxt_mpiio_runtime->record_buf;
*dxt_mpiio_buf_sz = dxt_mpiio_runtime->record_buf_size;
/* shutdown internal structures used for instrumenting */
dxt_mpiio_cleanup_runtime();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment