Commit 3b0c668b authored by Shane Snyder's avatar Shane Snyder
Browse files

update logutils to read uncompressed log files

parent bdb2a5f0
...@@ -77,9 +77,9 @@ static char * const darshan_module_names[] = ...@@ -77,9 +77,9 @@ static char * const darshan_module_names[] =
/* compression method used on darshan log file */ /* compression method used on darshan log file */
enum darshan_comp_type enum darshan_comp_type
{ {
DARSHAN_NO_COMP,
DARSHAN_ZLIB_COMP, DARSHAN_ZLIB_COMP,
DARSHAN_BZIP2_COMP, DARSHAN_BZIP2_COMP,
DARSHAN_NO_COMP,
}; };
typedef uint64_t darshan_record_id; typedef uint64_t darshan_record_id;
......
...@@ -250,7 +250,7 @@ int main(int argc, char **argv) ...@@ -250,7 +250,7 @@ int main(int argc, char **argv)
if(!infile) if(!infile)
return(-1); return(-1);
comp_type = bzip2 ? comp_type = DARSHAN_BZIP2_COMP : DARSHAN_ZLIB_COMP; comp_type = bzip2 ? DARSHAN_BZIP2_COMP : DARSHAN_ZLIB_COMP;
outfile = darshan_log_create(outfile_name, comp_type, infile->partial_flag); outfile = darshan_log_create(outfile_name, comp_type, infile->partial_flag);
if(!outfile) if(!outfile)
{ {
......
...@@ -31,16 +31,18 @@ ...@@ -31,16 +31,18 @@
struct darshan_dz_state struct darshan_dz_state
{ {
/* (libz/bzip2) stream data structure for managing /* pointer to arbitrary data structure used for managing
* compression and decompression state */ * compression/decompression state (e.g., z_stream
void *strm; * structure needed for libz)
*/
void *comp_dat;
/* buffer for staging compressed data to/from log file */ /* buffer for staging compressed data to/from log file */
unsigned char *buf; unsigned char *buf;
/* size of staging buffer */ /* size of staging buffer */
int size; unsigned int size;
/* for reading logs, flag indicating end of log file region */ /* for reading logs, flag indicating end of log file region */
int eor; int eor;
/* the region we last tried reading/writing */ /* the region id we last tried reading/writing */
int prev_reg_id; int prev_reg_id;
}; };
...@@ -53,8 +55,6 @@ struct darshan_fd_int_state ...@@ -53,8 +55,6 @@ struct darshan_fd_int_state
int64_t pos; int64_t pos;
/* flag indicating whether log file was created (and written) */ /* flag indicating whether log file was created (and written) */
int creat_flag; int creat_flag;
/* compression type used on log file (libz or bzip2) */
enum darshan_comp_type comp_type;
/* log file path name */ /* log file path name */
char logfile_path[PATH_MAX]; char logfile_path[PATH_MAX];
/* pointer to exe & mount data in darshan job data structure */ /* pointer to exe & mount data in darshan job data structure */
...@@ -62,7 +62,7 @@ struct darshan_fd_int_state ...@@ -62,7 +62,7 @@ struct darshan_fd_int_state
/* whether previous file operations have failed */ /* whether previous file operations have failed */
int err; int err;
/* compression/decompression state */ /* compression/decompression stream read/write state */
struct darshan_dz_state dz; struct darshan_dz_state dz;
}; };
...@@ -71,20 +71,26 @@ static int darshan_log_putheader(darshan_fd fd); ...@@ -71,20 +71,26 @@ static int darshan_log_putheader(darshan_fd fd);
static int darshan_log_seek(darshan_fd fd, off_t offset); static int darshan_log_seek(darshan_fd fd, off_t offset);
static int darshan_log_read(darshan_fd fd, void *buf, int len); static int darshan_log_read(darshan_fd fd, void *buf, int len);
static int darshan_log_write(darshan_fd fd, void *buf, int len); static int darshan_log_write(darshan_fd fd, void *buf, int len);
static int darshan_log_dzinit(struct darshan_fd_int_state *state); static int darshan_log_dzinit(darshan_fd fd);
static void darshan_log_dzdestroy(struct darshan_fd_int_state *state); static void darshan_log_dzdestroy(darshan_fd fd);
static int darshan_log_dzread(darshan_fd fd, int region_id, void *buf, int len); static int darshan_log_dzread(darshan_fd fd, int region_id, void *buf, int len);
static int darshan_log_dzwrite(darshan_fd fd, int region_id, void *buf, int len); static int darshan_log_dzwrite(darshan_fd fd, int region_id, void *buf, int len);
static int darshan_log_libz_read(darshan_fd fd, int region_id, void *buf, int len); static int darshan_log_libz_read(darshan_fd fd, struct darshan_log_map map,
static int darshan_log_libz_write(darshan_fd fd, int region_id, void *buf, int len); void *buf, int len, int reset_strm_flag);
static int darshan_log_libz_write(darshan_fd fd, struct darshan_log_map *map_p,
void *buf, int len, int flush_strm_flag);
static int darshan_log_libz_flush(darshan_fd fd, int region_id); static int darshan_log_libz_flush(darshan_fd fd, int region_id);
#ifdef HAVE_LIBBZ2 #ifdef HAVE_LIBBZ2
static int darshan_log_bzip2_read(darshan_fd fd, int region_id, void *buf, int len); static int darshan_log_bzip2_read(darshan_fd fd, struct darshan_log_map map,
static int darshan_log_bzip2_write(darshan_fd fd, int region_id, void *buf, int len); void *buf, int len, int reset_strm_flag);
static int darshan_log_bzip2_write(darshan_fd fd, struct darshan_log_map *map_p,
void *buf, int len, int flush_strm_flag);
static int darshan_log_bzip2_flush(darshan_fd fd, int region_id); static int darshan_log_bzip2_flush(darshan_fd fd, int region_id);
#endif #endif
static int darshan_log_dzload(darshan_fd fd, struct darshan_log_map map); static int darshan_log_dzload(darshan_fd fd, struct darshan_log_map map);
static int darshan_log_dzunload(darshan_fd fd, struct darshan_log_map *map_p); static int darshan_log_dzunload(darshan_fd fd, struct darshan_log_map *map_p);
static int darshan_log_noz_read(darshan_fd fd, struct darshan_log_map map,
void *buf, int len, int reset_strm_flag);
/* each module's implementation of the darshan logutil functions */ /* each module's implementation of the darshan logutil functions */
#define X(a, b, c) c, #define X(a, b, c) c,
...@@ -140,7 +146,7 @@ darshan_fd darshan_log_open(const char *name) ...@@ -140,7 +146,7 @@ darshan_fd darshan_log_open(const char *name)
} }
/* initialize compression data structures */ /* initialize compression data structures */
ret = darshan_log_dzinit(tmp_fd->state); ret = darshan_log_dzinit(tmp_fd);
if(ret < 0) if(ret < 0)
{ {
fprintf(stderr, "Error: failed to initialize decompression data structures.\n"); fprintf(stderr, "Error: failed to initialize decompression data structures.\n");
...@@ -177,6 +183,7 @@ darshan_fd darshan_log_create(const char *name, enum darshan_comp_type comp_type ...@@ -177,6 +183,7 @@ darshan_fd darshan_log_create(const char *name, enum darshan_comp_type comp_type
return(NULL); return(NULL);
} }
memset(tmp_fd->state, 0, sizeof(struct darshan_fd_int_state)); memset(tmp_fd->state, 0, sizeof(struct darshan_fd_int_state));
tmp_fd->comp_type = comp_type;
/* create the log for writing, making sure to not overwrite existing log */ /* create the log for writing, making sure to not overwrite existing log */
tmp_fd->state->fildes = creat(name, 0400); tmp_fd->state->fildes = creat(name, 0400);
...@@ -188,7 +195,6 @@ darshan_fd darshan_log_create(const char *name, enum darshan_comp_type comp_type ...@@ -188,7 +195,6 @@ darshan_fd darshan_log_create(const char *name, enum darshan_comp_type comp_type
return(NULL); return(NULL);
} }
tmp_fd->state->creat_flag = 1; tmp_fd->state->creat_flag = 1;
tmp_fd->state->comp_type = comp_type;
tmp_fd->partial_flag = partial_flag; tmp_fd->partial_flag = partial_flag;
strncpy(tmp_fd->state->logfile_path, name, PATH_MAX); strncpy(tmp_fd->state->logfile_path, name, PATH_MAX);
...@@ -208,7 +214,7 @@ darshan_fd darshan_log_create(const char *name, enum darshan_comp_type comp_type ...@@ -208,7 +214,7 @@ darshan_fd darshan_log_create(const char *name, enum darshan_comp_type comp_type
} }
/* initialize compression data structures */ /* initialize compression data structures */
ret = darshan_log_dzinit(tmp_fd->state); ret = darshan_log_dzinit(tmp_fd);
if(ret < 0) if(ret < 0)
{ {
fprintf(stderr, "Error: failed to initialize compression data structures.\n"); fprintf(stderr, "Error: failed to initialize compression data structures.\n");
...@@ -299,7 +305,7 @@ int darshan_log_putjob(darshan_fd fd, struct darshan_job *job) ...@@ -299,7 +305,7 @@ int darshan_log_putjob(darshan_fd fd, struct darshan_job *job)
/* write the compressed job data to log file */ /* write the compressed job data to log file */
ret = darshan_log_dzwrite(fd, DARSHAN_JOB_REGION_ID, &job_copy, sizeof(*job)); ret = darshan_log_dzwrite(fd, DARSHAN_JOB_REGION_ID, &job_copy, sizeof(*job));
if(ret != (int)sizeof(*job)) if(ret != sizeof(*job))
{ {
state->err = -1; state->err = -1;
fprintf(stderr, "Error: failed to write darshan log file job data.\n"); fprintf(stderr, "Error: failed to write darshan log file job data.\n");
...@@ -774,7 +780,7 @@ void darshan_log_close(darshan_fd fd) ...@@ -774,7 +780,7 @@ void darshan_log_close(darshan_fd fd)
if(state->creat_flag) if(state->creat_flag)
{ {
/* flush the last region of the log to file */ /* flush the last region of the log to file */
switch(state->comp_type) switch(fd->comp_type)
{ {
case DARSHAN_ZLIB_COMP: case DARSHAN_ZLIB_COMP:
ret = darshan_log_libz_flush(fd, state->dz.prev_reg_id); ret = darshan_log_libz_flush(fd, state->dz.prev_reg_id);
...@@ -812,7 +818,7 @@ void darshan_log_close(darshan_fd fd) ...@@ -812,7 +818,7 @@ void darshan_log_close(darshan_fd fd)
unlink(state->logfile_path); unlink(state->logfile_path);
} }
darshan_log_dzdestroy(state); darshan_log_dzdestroy(fd);
if(state->exe_mnt_data) if(state->exe_mnt_data)
free(state->exe_mnt_data); free(state->exe_mnt_data);
free(state); free(state);
...@@ -830,7 +836,6 @@ void darshan_log_close(darshan_fd fd) ...@@ -830,7 +836,6 @@ void darshan_log_close(darshan_fd fd)
*/ */
static int darshan_log_getheader(darshan_fd fd) static int darshan_log_getheader(darshan_fd fd)
{ {
struct darshan_fd_int_state *state = fd->state;
struct darshan_header header; struct darshan_header header;
int i; int i;
int ret; int ret;
...@@ -883,7 +888,7 @@ static int darshan_log_getheader(darshan_fd fd) ...@@ -883,7 +888,7 @@ static int darshan_log_getheader(darshan_fd fd)
} }
} }
state->comp_type = header.comp_type; fd->comp_type = header.comp_type;
fd->partial_flag = header.partial_flag; fd->partial_flag = header.partial_flag;
/* save the mapping of data within log file to this file descriptor */ /* save the mapping of data within log file to this file descriptor */
...@@ -928,7 +933,6 @@ static int darshan_log_getheader(darshan_fd fd) ...@@ -928,7 +933,6 @@ static int darshan_log_getheader(darshan_fd fd)
*/ */
static int darshan_log_putheader(darshan_fd fd) static int darshan_log_putheader(darshan_fd fd)
{ {
struct darshan_fd_int_state *state = fd->state;
struct darshan_header header; struct darshan_header header;
int ret; int ret;
...@@ -942,7 +946,7 @@ static int darshan_log_putheader(darshan_fd fd) ...@@ -942,7 +946,7 @@ static int darshan_log_putheader(darshan_fd fd)
memset(&header, 0, sizeof(header)); memset(&header, 0, sizeof(header));
strcpy(header.version_string, DARSHAN_LOG_VERSION); strcpy(header.version_string, DARSHAN_LOG_VERSION);
header.magic_nr = DARSHAN_MAGIC_NR; header.magic_nr = DARSHAN_MAGIC_NR;
header.comp_type = state->comp_type; header.comp_type = fd->comp_type;
header.partial_flag = fd->partial_flag; header.partial_flag = fd->partial_flag;
/* copy the mapping information to the header */ /* copy the mapping information to the header */
...@@ -986,13 +990,20 @@ static int darshan_log_read(darshan_fd fd, void* buf, int len) ...@@ -986,13 +990,20 @@ static int darshan_log_read(darshan_fd fd, void* buf, int len)
{ {
struct darshan_fd_int_state *state = fd->state; struct darshan_fd_int_state *state = fd->state;
int ret; int ret;
unsigned int read_so_far = 0;
/* read data from the log file using the given map */ do
ret = read(state->fildes, buf, len); {
if(ret > 0) ret = read(state->fildes, buf + read_so_far, len - read_so_far);
state->pos += ret; if(ret <= 0)
break;
read_so_far += ret;
} while(read_so_far < len);
if(ret < 0)
return(-1);
return(ret); state->pos += read_so_far;
return(read_so_far);
} }
/* return amount written on success, -1 on failure. /* return amount written on success, -1 on failure.
...@@ -1001,26 +1012,37 @@ static int darshan_log_write(darshan_fd fd, void* buf, int len) ...@@ -1001,26 +1012,37 @@ static int darshan_log_write(darshan_fd fd, void* buf, int len)
{ {
struct darshan_fd_int_state *state = fd->state; struct darshan_fd_int_state *state = fd->state;
int ret; int ret;
unsigned int wrote_so_far = 0;
ret = write(state->fildes, buf, len); do
if(ret > 0) {
state->pos += ret; ret = write(state->fildes, buf + wrote_so_far, len - wrote_so_far);
if(ret <= 0)
break;
wrote_so_far += ret;
} while(wrote_so_far < len);
if(ret < 0)
return(-1);
return(ret); state->pos += wrote_so_far;
return(wrote_so_far);
} }
static int darshan_log_dzinit(struct darshan_fd_int_state *state) static int darshan_log_dzinit(darshan_fd fd)
{ {
struct darshan_fd_int_state *state = fd->state;
int ret; int ret;
/* initialize buffers for staging compressed data to/from log file */ /* initialize buffers for staging compressed data
* to/from log file
*/
state->dz.buf = malloc(DARSHAN_DEF_COMP_BUF_SZ); state->dz.buf = malloc(DARSHAN_DEF_COMP_BUF_SZ);
if(state->dz.buf == NULL) if(state->dz.buf == NULL)
return(-1); return(-1);
state->dz.size = 0;
state->dz.prev_reg_id = DARSHAN_HEADER_REGION_ID; state->dz.prev_reg_id = DARSHAN_HEADER_REGION_ID;
switch(state->comp_type) switch(fd->comp_type)
{ {
case DARSHAN_ZLIB_COMP: case DARSHAN_ZLIB_COMP:
{ {
...@@ -1055,7 +1077,7 @@ static int darshan_log_dzinit(struct darshan_fd_int_state *state) ...@@ -1055,7 +1077,7 @@ static int darshan_log_dzinit(struct darshan_fd_int_state *state)
free(state->dz.buf); free(state->dz.buf);
return(-1); return(-1);
} }
state->dz.strm = tmp_zstrm; state->dz.comp_dat = tmp_zstrm;
break; break;
} }
#ifdef HAVE_LIBBZ2 #ifdef HAVE_LIBBZ2
...@@ -1071,9 +1093,9 @@ static int darshan_log_dzinit(struct darshan_fd_int_state *state) ...@@ -1071,9 +1093,9 @@ static int darshan_log_dzinit(struct darshan_fd_int_state *state)
tmp_bzstrm->bzfree = NULL; tmp_bzstrm->bzfree = NULL;
tmp_bzstrm->opaque = NULL; tmp_bzstrm->opaque = NULL;
tmp_bzstrm->avail_in = 0; tmp_bzstrm->avail_in = 0;
tmp_bzstrm->next_in = Z_NULL; tmp_bzstrm->next_in = NULL;
if(state->creat_flag) if(!(state->creat_flag))
{ {
/* read only file, init decompress algorithm */ /* read only file, init decompress algorithm */
ret = BZ2_bzDecompressInit(tmp_bzstrm, 1, 0); ret = BZ2_bzDecompressInit(tmp_bzstrm, 1, 0);
...@@ -1091,10 +1113,18 @@ static int darshan_log_dzinit(struct darshan_fd_int_state *state) ...@@ -1091,10 +1113,18 @@ static int darshan_log_dzinit(struct darshan_fd_int_state *state)
free(state->dz.buf); free(state->dz.buf);
return(-1); return(-1);
} }
state->dz.strm = tmp_bzstrm; state->dz.comp_dat = tmp_bzstrm;
break; break;
} }
#endif #endif
case DARSHAN_NO_COMP:
{
/* we just track an offset into the staging buffers for no_comp */
int *buf_off = malloc(sizeof(int));
*buf_off = 0;
state->dz.comp_dat = buf_off;
break;
}
default: default:
fprintf(stderr, "Error: invalid compression type.\n"); fprintf(stderr, "Error: invalid compression type.\n");
return(-1); return(-1);
...@@ -1103,30 +1133,36 @@ static int darshan_log_dzinit(struct darshan_fd_int_state *state) ...@@ -1103,30 +1133,36 @@ static int darshan_log_dzinit(struct darshan_fd_int_state *state)
return(0); return(0);
} }
static void darshan_log_dzdestroy(struct darshan_fd_int_state *state) static void darshan_log_dzdestroy(darshan_fd fd)
{ {
switch(state->comp_type) struct darshan_fd_int_state *state = fd->state;
switch(fd->comp_type)
{ {
case DARSHAN_ZLIB_COMP: case DARSHAN_ZLIB_COMP:
if(!(state->creat_flag)) if(!(state->creat_flag))
inflateEnd(state->dz.strm); inflateEnd((z_stream *)state->dz.comp_dat);
else else
deflateEnd(state->dz.strm); deflateEnd((z_stream *)state->dz.comp_dat);
free(state->dz.strm);
break; break;
#ifdef HAVE_LIBBZ2 #ifdef HAVE_LIBBZ2
case DARSHAN_BZIP2_COMP: case DARSHAN_BZIP2_COMP:
if(!(state->creat_flag)) if(!(state->creat_flag))
BZ2_bzDecompressEnd(state->dz.strm); BZ2_bzDecompressEnd((bz_stream *)state->dz.comp_dat);
else else
BZ2_bzCompressEnd(state->dz.strm); BZ2_bzCompressEnd((bz_stream *)state->dz.comp_dat);
free(state->dz.strm);
break; break;
#endif #endif
case DARSHAN_NO_COMP:
{
/* do nothing */
break;
}
default: default:
fprintf(stderr, "Error: invalid compression type.\n"); fprintf(stderr, "Error: invalid compression type.\n");
} }
free(state->dz.comp_dat);
free(state->dz.buf); free(state->dz.buf);
return; return;
} }
...@@ -1134,74 +1170,111 @@ static void darshan_log_dzdestroy(struct darshan_fd_int_state *state) ...@@ -1134,74 +1170,111 @@ static void darshan_log_dzdestroy(struct darshan_fd_int_state *state)
static int darshan_log_dzread(darshan_fd fd, int region_id, void *buf, int len) static int darshan_log_dzread(darshan_fd fd, int region_id, void *buf, int len)
{ {
struct darshan_fd_int_state *state = fd->state; struct darshan_fd_int_state *state = fd->state;
struct darshan_log_map map;
int reset_strm_flag = 0;
int ret; int ret;
switch(state->comp_type) /* if new log region, we reload buffers and clear eor flag */
if(region_id != state->dz.prev_reg_id)
{
state->dz.eor = 0;
reset_strm_flag = 1; /* reset libz/bzip2 streams */
}
if(region_id == DARSHAN_JOB_REGION_ID)
map = fd->job_map;
else if(region_id == DARSHAN_REC_MAP_REGION_ID)
map = fd->rec_map;
else
map = fd->mod_map[region_id];
switch(fd->comp_type)
{ {
case DARSHAN_ZLIB_COMP: case DARSHAN_ZLIB_COMP:
ret = darshan_log_libz_read(fd, region_id, buf, len); ret = darshan_log_libz_read(fd, map, buf, len, reset_strm_flag);
break; break;
#ifdef HAVE_LIBBZ2 #ifdef HAVE_LIBBZ2
case DARSHAN_BZIP2_COMP: case DARSHAN_BZIP2_COMP:
ret = darshan_log_bzip2_read(fd, region_id, buf, len); ret = darshan_log_bzip2_read(fd, map, buf, len, reset_strm_flag);
break; break;
#endif #endif
case DARSHAN_NO_COMP:
{
ret = darshan_log_noz_read(fd, map, buf, len, reset_strm_flag);
break;
}
default: default:
fprintf(stderr, "Error: invalid compression type.\n"); fprintf(stderr, "Error: invalid compression type.\n");
return(-1); return(-1);
} }
state->dz.prev_reg_id = region_id;
return(ret); return(ret);
} }
static int darshan_log_dzwrite(darshan_fd fd, int region_id, void *buf, int len) static int darshan_log_dzwrite(darshan_fd fd, int region_id, void *buf, int len)
{ {
struct darshan_fd_int_state *state = fd->state; struct darshan_fd_int_state *state = fd->state;
struct darshan_log_map *map_p;
int flush_strm_flag = 0;
int ret; int ret;
switch(state->comp_type) /* if new log region, finish prev region's zstream and flush to log file */
if(region_id != state->dz.prev_reg_id)
{
/* error out if the region we are writing to precedes the previous
* region we wrote -- we shouldn't be moving backwards in the log
*/
if(region_id < state->dz.prev_reg_id)
return(-1);
if(state->dz.prev_reg_id != DARSHAN_HEADER_REGION_ID)
flush_strm_flag = 1;
}
if(region_id == DARSHAN_JOB_REGION_ID)
map_p = &(fd->job_map);
else if(region_id == DARSHAN_REC_MAP_REGION_ID)
map_p = &(fd->rec_map);
else
map_p = &(fd->mod_map[region_id]);
switch(fd->comp_type)
{ {
case DARSHAN_ZLIB_COMP: case DARSHAN_ZLIB_COMP:
ret = darshan_log_libz_write(fd, region_id, buf, len); ret = darshan_log_libz_write(fd, map_p, buf, len, flush_strm_flag);
break; break;
#ifdef HAVE_LIBBZ2 #ifdef HAVE_LIBBZ2
case DARSHAN_BZIP2_COMP: case DARSHAN_BZIP2_COMP:
ret = darshan_log_bzip2_write(fd, region_id, buf, len); ret = darshan_log_bzip2_write(fd, map_p, buf, len, flush_strm_flag);
break; break;
#endif #endif
case DARSHAN_NO_COMP:
fprintf(stderr,
"Error: uncompressed writing of log files is not supported.\n");
return(-1);
default: default:
fprintf(stderr, "Error: invalid compression type.\n"); fprintf(stderr, "Error: invalid compression type.\n");
return(-1); return(-1);
} }
state->dz.prev_reg_id = region_id;
return(ret); return(ret);
} }