Commit 9f202697 authored by Shane Snyder's avatar Shane Snyder
Browse files

update logutils to use new compression format

parent b2268e7b
......@@ -30,49 +30,48 @@ char *hdf5_f_counter_names[] = {
};
#undef X
static int darshan_log_get_hdf5_file(darshan_fd fd, void** hdf5_buf_p,
int* bytes_left, void** file_rec, darshan_record_id* rec_id);
static void darshan_log_print_hdf5_file(void *file_rec,
char *file_name, char *mnt_pt, char *fs_type);
struct darshan_mod_logutil_funcs hdf5_logutils =
{
.log_get_record = &darshan_log_get_hdf5_file,
.log_print_record = &darshan_log_print_hdf5_file,
};
int darshan_log_get_hdf5_file(darshan_fd fd, void **file_rec,
darshan_record_id *rec_id)
static int darshan_log_get_hdf5_file(darshan_fd fd, void** hdf5_buf_p,
int* bytes_left, void** file_rec, darshan_record_id* rec_id)
{
int i;
int ret;
struct darshan_hdf5_file *file = NULL;
*rec_id = 0;
struct darshan_hdf5_file *file = (struct darshan_hdf5_file *)
(*hdf5_buf_p);
file = malloc(sizeof(*file));
if(!file)
if(*bytes_left < sizeof(struct darshan_hdf5_file))
return(-1);
memset(file, 0, sizeof(*file));
ret = darshan_log_getmod(fd, DARSHAN_HDF5_MOD,
(void *)file, sizeof(*file));
if(ret == 1)
if(fd->swap_flag)
{
if(fd->swap_flag)
{
/* swap bytes if necessary */
DARSHAN_BSWAP64(&file->f_id);
DARSHAN_BSWAP64(&file->rank);
for(i=0; i<HDF5_NUM_INDICES; i++)
DARSHAN_BSWAP64(&file->counters[i]);
for(i=0; i<HDF5_F_NUM_INDICES; i++)
DARSHAN_BSWAP64(&file->fcounters[i]);
}
/* pass the file record back */
*file_rec = (void *)file;
*rec_id = file->f_id;
/* swap bytes if necessary */
DARSHAN_BSWAP64(&file->f_id);
DARSHAN_BSWAP64(&file->rank);
for(i=0; i<HDF5_NUM_INDICES; i++)
DARSHAN_BSWAP64(&file->counters[i]);
for(i=0; i<HDF5_F_NUM_INDICES; i++)
DARSHAN_BSWAP64(&file->fcounters[i]);
}
return(ret);
/* update/set output variables */
*file_rec = (void *)file;
*rec_id = file->f_id;
*hdf5_buf_p = (file + 1); /* increment input buf by size of file record */
*bytes_left -= sizeof(struct darshan_hdf5_file);
return(0);
}
void darshan_log_print_hdf5_file(void *file_rec, char *file_name,
static void darshan_log_print_hdf5_file(void *file_rec, char *file_name,
char *mnt_pt, char *fs_type)
{
int i;
......
......@@ -15,9 +15,4 @@ extern char *hdf5_f_counter_names[];
extern struct darshan_mod_logutil_funcs hdf5_logutils;
int darshan_log_get_hdf5_file(darshan_fd fd, void **file_rec,
darshan_record_id *rec_id);
void darshan_log_print_hdf5_file(void *file_rec, char *file_name,
char *mnt_pt, char *fs_type);
#endif
......@@ -19,9 +19,13 @@
#include "darshan-logutils.h"
/* TODO: for log reads, we need to make sure the header has been read prior */
static int darshan_log_seek(darshan_fd fd, off_t offset);
static int darshan_log_read(darshan_fd fd, void *buf, int len);
static int darshan_log_write(darshan_fd fd, void *buf, int len);
static int darshan_decompress_buf(char* comp_buf, int comp_buf_sz,
char* decomp_buf, int* inout_decomp_buf_sz);
/* TODO: can we make this s.t. we don't care about ordering (i.e., X macro it ) */
struct darshan_mod_logutil_funcs *mod_logutils[DARSHAN_MAX_MODS] =
......@@ -52,20 +56,34 @@ struct darshan_mod_logutil_funcs *mod_logutils[DARSHAN_MAX_MODS] =
*/
darshan_fd darshan_log_open(const char *name, const char *mode)
{
int o_flags;
darshan_fd tmp_fd;
/* we only allows "w" or "r" modes, nothing fancy */
/* we only allow "w" or "r" modes, nothing fancy */
assert(strlen(mode) == 1);
assert(mode[0] == 'r' || mode[0] == 'w');
if(mode[0] == 'r')
{
o_flags = O_RDONLY;
}
else if (mode[0] == 'w')
{
/* when writing, we create the log file making sure not to overwrite
* an existing log
*/
o_flags = O_WRONLY | O_CREAT | O_EXCL;
}
tmp_fd = malloc(sizeof(*tmp_fd));
if(!tmp_fd)
return(NULL);
memset(tmp_fd, 0, sizeof(*tmp_fd));
tmp_fd->gzf = gzopen(name, mode);
if(!tmp_fd->gzf)
tmp_fd->fildes = open(name, o_flags);
if(tmp_fd->fildes < 0)
{
perror("darshan_log_open: ");
free(tmp_fd);
tmp_fd = NULL;
}
......@@ -77,6 +95,7 @@ darshan_fd darshan_log_open(const char *name, const char *mode)
*
* read the header of the darshan log and set internal data structures
* NOTE: this function must be called before reading other portions of the log
* NOTE: this is the only portion of the darshan log which is uncompressed
*
* returns 0 on success, -1 on failure
*/
......@@ -92,7 +111,7 @@ int darshan_log_getheader(darshan_fd fd, struct darshan_header *header)
return(-1);
}
/* read header from log file */
/* read uncompressed header from log file */
ret = darshan_log_read(fd, header, sizeof(*header));
if(ret != sizeof(*header))
{
......@@ -134,12 +153,15 @@ int darshan_log_getheader(darshan_fd fd, struct darshan_header *header)
}
/* save the mapping of data within log file to this file descriptor */
fd->job_map.off = sizeof(struct darshan_header);
fd->job_map.len = header->rec_map.off - fd->job_map.off;
memcpy(&fd->rec_map, &header->rec_map, sizeof(struct darshan_log_map));
memcpy(&fd->mod_map, &header->mod_map, DARSHAN_MAX_MODS * sizeof(struct darshan_log_map));
return(0);
}
#if 0
/* darshan_log_putheader()
*
* write a darshan header to log file
......@@ -171,6 +193,7 @@ int darshan_log_putheader(darshan_fd fd, struct darshan_header *header)
return(0);
}
#endif
/* darshan_log_getjob()
*
......@@ -180,22 +203,45 @@ int darshan_log_putheader(darshan_fd fd, struct darshan_header *header)
*/
int darshan_log_getjob(darshan_fd fd, struct darshan_job *job)
{
char *comp_buf;
char job_buf[DARSHAN_JOB_RECORD_SIZE] = {0};
int job_buf_sz = DARSHAN_JOB_RECORD_SIZE;
int ret;
ret = darshan_log_seek(fd, sizeof(struct darshan_header));
/* allocate buffer to store compressed job info */
comp_buf = malloc(fd->job_map.len);
if(!comp_buf)
return(-1);
ret = darshan_log_seek(fd, fd->job_map.off);
if(ret < 0)
{
fprintf(stderr, "Error: unable to seek in darshan log file.\n");
free(comp_buf);
return(-1);
}
/* read the job data from the log file */
ret = darshan_log_read(fd, job, sizeof(*job));
if(ret != sizeof(*job))
/* read the compressed job data from the log file */
ret = darshan_log_read(fd, comp_buf, fd->job_map.len);
if(ret != fd->job_map.len)
{
fprintf(stderr, "Error: failed to read darshan log file job data.\n");
free(comp_buf);
return(-1);
}
/* decompress the job data */
ret = darshan_decompress_buf(comp_buf, fd->job_map.len, job_buf, &job_buf_sz);
if(ret < 0)
{
fprintf(stderr, "Error: failed to decompress darshan job data.\n");
free(comp_buf);
return(-1);
}
assert(job_buf_sz == DARSHAN_JOB_RECORD_SIZE);
free(comp_buf);
memcpy(job, job_buf, sizeof(*job));
if(fd->swap_flag)
{
......@@ -207,9 +253,17 @@ int darshan_log_getjob(darshan_fd fd, struct darshan_job *job)
DARSHAN_BSWAP64(&job->jobid);
}
/* save trailing exe & mount information, so it can be retrieved later */
if(!fd->exe_mnt_data)
fd->exe_mnt_data = malloc(DARSHAN_EXE_LEN+1);
if(!fd->exe_mnt_data)
return(-1);
memcpy(fd->exe_mnt_data, &job_buf[sizeof(*job)], DARSHAN_EXE_LEN+1);
return(0);
}
#if 0
/* darshan_log_putjob()
*
* write job level metadat to darshan log file
......@@ -253,6 +307,7 @@ int darshan_log_putjob(darshan_fd fd, struct darshan_job *job)
return(0);
}
#endif
/* darshan_log_getexe()
*
......@@ -262,33 +317,30 @@ int darshan_log_putjob(darshan_fd fd, struct darshan_job *job)
*/
int darshan_log_getexe(darshan_fd fd, char *buf)
{
int tmp_off = sizeof(struct darshan_header) + sizeof(struct darshan_job);
int ret;
char *newline;
int ret;
ret = darshan_log_seek(fd, tmp_off);
if(ret < 0)
/* if the exe/mount data has not been saved yet, read in the job info */
if(!fd->exe_mnt_data)
{
fprintf(stderr, "Error: unable to seek in darshan log file.\n");
return(-1);
}
struct darshan_job job;
ret = darshan_log_getjob(fd, &job);
/* read the trailing exe data from the darshan log */
ret = darshan_log_read(fd, buf, DARSHAN_EXE_LEN+1);
if(ret != DARSHAN_EXE_LEN+1)
{
fprintf(stderr, "Error: failed to read exe string from darshan log file.\n");
return(-1);
if(ret < 0 || !fd->exe_mnt_data)
return(-1);
}
/* mount info is stored after the exe string, so truncate there */
newline = strchr(buf, '\n');
/* exe string is located before the first line break */
newline = strchr(fd->exe_mnt_data, '\n');
/* copy over the exe string */
if(newline)
*newline = '\0';
memcpy(buf, fd->exe_mnt_data, (newline - fd->exe_mnt_data));
return (0);
}
#if 0
/* darshan_log_putexe()
*
* wrties the application exe name to darshan log file
......@@ -319,6 +371,7 @@ int darshan_log_putexe(darshan_fd fd, char *buf)
return(0);
}
#endif
/* darshan_log_getmounts()
*
......@@ -331,30 +384,23 @@ int darshan_log_putexe(darshan_fd fd, char *buf)
int darshan_log_getmounts(darshan_fd fd, char*** mnt_pts,
char*** fs_types, int* count)
{
int tmp_off = sizeof(struct darshan_header) + sizeof(struct darshan_job);
int ret;
char *pos;
int array_index = 0;
char buf[DARSHAN_EXE_LEN+1];
int ret;
ret = darshan_log_seek(fd, tmp_off);
if(ret < 0)
/* if the exe/mount data has not been saved yet, read in the job info */
if(!fd->exe_mnt_data)
{
fprintf(stderr, "Error: unable to seek in darshan log file.\n");
return(-1);
}
struct darshan_job job;
ret = darshan_log_getjob(fd, &job);
/* read the trailing mount data from the darshan log */
ret = darshan_log_read(fd, buf, DARSHAN_EXE_LEN+1);
if(ret != DARSHAN_EXE_LEN+1)
{
fprintf(stderr, "Error: failed to read mount info from darshan log file.\n");
return(-1);
if(ret < 0 || !fd->exe_mnt_data)
return(-1);
}
/* count entries */
*count = 0;
pos = buf;
pos = fd->exe_mnt_data;
while((pos = strchr(pos, '\n')) != NULL)
{
pos++;
......@@ -376,7 +422,7 @@ int darshan_log_getmounts(darshan_fd fd, char*** mnt_pts,
/* work backwards through the table and parse each line (except for
* first, which holds command line information)
*/
while((pos = strrchr(buf, '\n')) != NULL)
while((pos = strrchr(fd->exe_mnt_data, '\n')) != NULL)
{
/* overestimate string lengths */
(*mnt_pts)[array_index] = malloc(DARSHAN_EXE_LEN);
......@@ -399,6 +445,7 @@ int darshan_log_getmounts(darshan_fd fd, char*** mnt_pts,
return(0);
}
#if 0
/* darshan_log_putmounts()
*
* writes mount information to the darshan log file
......@@ -436,6 +483,7 @@ int darshan_log_putmounts(darshan_fd fd, char** mnt_pts, char** fs_types, int co
return(0);
}
#endif
/* darshan_log_gethash()
*
......@@ -445,8 +493,9 @@ int darshan_log_putmounts(darshan_fd fd, char** mnt_pts, char** fs_types, int co
*/
int darshan_log_gethash(darshan_fd fd, struct darshan_record_ref **hash)
{
char *comp_buf;
char *hash_buf;
int hash_buf_sz = fd->rec_map.len;
int hash_buf_sz;
char *buf_ptr;
darshan_record_id *rec_id_ptr;
uint32_t *path_len_ptr;
......@@ -454,24 +503,43 @@ int darshan_log_gethash(darshan_fd fd, struct darshan_record_ref **hash)
struct darshan_record_ref *ref;
int ret;
/* allocate buffers to store compressed and decompressed record
* hash data
*/
comp_buf = malloc(fd->rec_map.len);
hash_buf = malloc(DARSHAN_DEF_COMP_BUF_SZ);
if(!comp_buf || !hash_buf)
return(-1);
hash_buf_sz = DARSHAN_DEF_COMP_BUF_SZ;
memset(hash_buf, 0, DARSHAN_DEF_COMP_BUF_SZ);
ret = darshan_log_seek(fd, fd->rec_map.off);
if(ret < 0)
{
fprintf(stderr, "Error: unable to seek in darshan log file.\n");
free(comp_buf);
return(-1);
}
hash_buf = malloc(hash_buf_sz);
if(!hash_buf)
return(-1);
/* read the record hash from the log file */
ret = darshan_log_read(fd, hash_buf, fd->rec_map.len);
ret = darshan_log_read(fd, comp_buf, fd->rec_map.len);
if(ret != fd->rec_map.len)
{
fprintf(stderr, "Error: failed to read record hash from darshan log file.\n");
free(comp_buf);
return(-1);
}
/* decompress the record hash buffer */
ret = darshan_decompress_buf(comp_buf, fd->rec_map.len,
hash_buf, &hash_buf_sz);
if(ret < 0)
{
fprintf(stderr, "Error: unable to decompress darshan job data.\n");
free(comp_buf);
return(-1);
}
free(comp_buf);
buf_ptr = hash_buf;
while(buf_ptr < (hash_buf + hash_buf_sz))
......@@ -519,9 +587,11 @@ int darshan_log_gethash(darshan_fd fd, struct darshan_record_ref **hash)
}
}
free(hash_buf);
return(0);
}
#if 0
/* darshan_log_puthash()
*
* writes the hash table of records to the darshan log file
......@@ -609,60 +679,66 @@ int darshan_log_puthash(darshan_fd fd, struct darshan_record_ref *hash)
return(0);
}
#endif
/* darshan_log_getmod()
*
* reads a chunk of data for a corresponding darshan instrumentation module
* NOTE: mod_buf_sz is the uncompressed size of data we wish to read for the given
* module. chunks of data can be read sequentially for a given module by
* repeatedly calling this function with the desired chunk sizes.
*
* returns 1 on successful read of data, 0 on end of module region, -1 on failure
* returns 1 on successful read of module data, 0 on no data, -1 on failure
*/
int darshan_log_getmod(darshan_fd fd, darshan_module_id mod_id,
void *mod_buf, int mod_buf_sz)
void *mod_buf, int *mod_buf_sz)
{
int mod_buf_end = fd->mod_map[mod_id].off + fd->mod_map[mod_id].len;
char *comp_buf;
int ret;
if(!fd->mod_map[mod_id].len || fd->pos == mod_buf_end)
return(0); /* no (more) data corresponding to this mod_id */
/* only seek to start of module data if current log file position
* is not within the given mod_id's range. This allows one to
* repeatedly call this function and get chunks of a module's
* data piecemeal.
*/
if((fd->pos < fd->mod_map[mod_id].off) || (fd->pos > mod_buf_end))
if(mod_id < 0 || mod_id >= DARSHAN_MAX_MODS)
{
ret = darshan_log_seek(fd, fd->mod_map[mod_id].off);
if(ret < 0)
{
fprintf(stderr, "Error: unable to seek in darshan log file.\n");
return(-1);
}
fprintf(stderr, "Error: invalid Darshan module id.\n");
return(-1);
}
/* if the requested amount of data violates mapping info, error out */
if(mod_buf_sz > (mod_buf_end - fd->pos))
{
fprintf(stderr, "Error: module data read violates stored mapping information.\n");
if(fd->mod_map[mod_id].len == 0)
return(0); /* no data corresponding to this mod_id */
comp_buf = malloc(fd->mod_map[mod_id].len);
if(!comp_buf)
return(-1);
ret = darshan_log_seek(fd, fd->mod_map[mod_id].off);
if(ret < 0)
{
fprintf(stderr, "Error: unable to seek in darshan log file.\n");
free(comp_buf);
return(ret);
}
/* read the module chunk from the log file */
ret = darshan_log_read(fd, mod_buf, mod_buf_sz);
if(ret != mod_buf_sz)
/* read this module's data from the log file */
ret = darshan_log_read(fd, comp_buf, fd->mod_map[mod_id].len);
if(ret != fd->mod_map[mod_id].len)
{
fprintf(stderr,
"Error: failed to read module %s data from darshan log file.\n",
darshan_module_names[mod_id]);
free(comp_buf);
return(-1);
}
/* decompress this module's data */
ret = darshan_decompress_buf(comp_buf, fd->mod_map[mod_id].len,
mod_buf, mod_buf_sz);
if(ret < 0)
{
fprintf(stderr, "Error: unable to decompress module %s data.\n",
darshan_module_names[mod_id]);
free(comp_buf);
return(-1);
}
free(comp_buf);
return(1);
}
#if 0
/* darshan_log_putmod()
*
* write a chunk of module data to the darshan log file
......@@ -715,6 +791,7 @@ int darshan_log_putmod(darshan_fd fd, darshan_module_id mod_id,
return(0);
}
#endif
/* darshan_log_close()
*
......@@ -724,8 +801,11 @@ int darshan_log_putmod(darshan_fd fd, darshan_module_id mod_id,
*/
void darshan_log_close(darshan_fd fd)
{
if(fd->gzf)
gzclose(fd->gzf);
if(fd->fildes)
close(fd->fildes);
if(fd->exe_mnt_data)
free(fd->exe_mnt_data);
free(fd);
......@@ -738,22 +818,16 @@ void darshan_log_close(darshan_fd fd)
*/
static int darshan_log_seek(darshan_fd fd, off_t offset)
{
z_off_t zoff = 0;
z_off_t zoff_ret = 0;
off_t ret_off;
if(fd->pos == offset)
return(0);
if(fd->gzf)
ret_off = lseek(fd->fildes, offset, SEEK_SET);
if(ret_off == offset)
{
zoff += offset;
zoff_ret = gzseek(fd->gzf, zoff, SEEK_SET);
if(zoff_ret == zoff)
{
fd->pos = offset;
return(0);
}
return(-1);
fd->pos = offset;
return(0);
}
return(-1);
......@@ -765,15 +839,12 @@ static int darshan_log_read(darshan_fd fd, void* buf, int len)
{
int ret;
if(fd->gzf)
{
ret = gzread(fd->gzf, buf, len);
if(ret > 0)
fd->pos += ret;
return(ret);
}
/* read data from the log file using the given map */
ret = read(fd->fildes, buf, len);
if(ret > 0)
fd->pos += ret;
return(-1);
return(ret);
}
/* return amount written on success, -1 on failure.
......@@ -782,15 +853,76 @@ static int darshan_log_write(darshan_fd fd, void* buf, int len)
{
int ret;
if(fd->gzf)
ret = write(fd->fildes, buf, len);
if(ret > 0)
fd->pos += ret;
return(ret);
}
/* return 0 on successful decompression, -1 on failure
*/
static int darshan_decompress_buf(char* comp_buf, int comp_buf_sz,