Commit cb808ec2 authored by Shane Snyder's avatar Shane Snyder

add bzip2 implementation to darshan-logutils

parent 8659e6a3
...@@ -41,6 +41,11 @@ static int darshan_log_dzwrite(darshan_fd fd, int region_id, void *buf, int len) ...@@ -41,6 +41,11 @@ static int darshan_log_dzwrite(darshan_fd fd, int region_id, void *buf, int len)
static int darshan_log_libz_read(darshan_fd fd, int region_id, void *buf, int len); static int darshan_log_libz_read(darshan_fd fd, int region_id, void *buf, int len);
static int darshan_log_libz_write(darshan_fd fd, int region_id, void *buf, int len); static int darshan_log_libz_write(darshan_fd fd, int region_id, void *buf, int len);
static int darshan_log_libz_flush(darshan_fd fd, int region_id); static int darshan_log_libz_flush(darshan_fd fd, int region_id);
#ifdef HAVE_LIBBZ2
static int darshan_log_bzip2_read(darshan_fd fd, int region_id, void *buf, int len);
static int darshan_log_bzip2_write(darshan_fd fd, int region_id, void *buf, int len);
static int darshan_log_bzip2_flush(darshan_fd fd, int region_id);
#endif
static int darshan_log_dzload(darshan_fd fd, struct darshan_log_map map); static int darshan_log_dzload(darshan_fd fd, struct darshan_log_map map);
static int darshan_log_dzunload(darshan_fd fd, struct darshan_log_map *map_p); static int darshan_log_dzunload(darshan_fd fd, struct darshan_log_map *map_p);
...@@ -697,6 +702,12 @@ void darshan_log_close(darshan_fd fd) ...@@ -697,6 +702,12 @@ void darshan_log_close(darshan_fd fd)
ret = darshan_log_libz_flush(fd, fd->dz_prev_reg_id); ret = darshan_log_libz_flush(fd, fd->dz_prev_reg_id);
if(ret == 0) if(ret == 0)
break; break;
#ifdef HAVE_LIBBZ2
case DARSHAN_BZIP2_COMP:
ret = darshan_log_bzip2_flush(fd, fd->dz_prev_reg_id);
if(ret == 0)
break;
#endif
default: default:
/* if flush fails, remove the output log file */ /* if flush fails, remove the output log file */
fd->err = -1; fd->err = -1;
...@@ -934,6 +945,43 @@ static int darshan_log_dzinit(darshan_fd fd) ...@@ -934,6 +945,43 @@ static int darshan_log_dzinit(darshan_fd fd)
fd->dz_strm = tmp_zstrm; fd->dz_strm = tmp_zstrm;
break; break;
} }
#ifdef HAVE_LIBBZ2
case DARSHAN_BZIP2_COMP:
{
bz_stream *tmp_bzstrm = malloc(sizeof(*tmp_bzstrm));
if(!tmp_bzstrm)
{
free(fd->dz_buf);
return(-1);
}
tmp_bzstrm->bzalloc = NULL;
tmp_bzstrm->bzfree = NULL;
tmp_bzstrm->opaque = NULL;
tmp_bzstrm->avail_in = 0;
tmp_bzstrm->next_in = Z_NULL;
if(fd->o_flags == O_RDONLY)
{
/* read only file, init decompress algorithm */
ret = BZ2_bzDecompressInit(tmp_bzstrm, 1, 0);
}
else
{
/* write only file, init compress algorithm */
ret = BZ2_bzCompressInit(tmp_bzstrm, 9, 1, 30);
tmp_bzstrm->avail_out = DARSHAN_DEF_COMP_BUF_SZ;
tmp_bzstrm->next_out = (char *)fd->dz_buf;
}
if(ret != BZ_OK)
{
free(tmp_bzstrm);
free(fd->dz_buf);
return(-1);
}
fd->dz_strm = tmp_bzstrm;
break;
}
#endif
default: default:
fprintf(stderr, "Error: invalid compression type.\n"); fprintf(stderr, "Error: invalid compression type.\n");
return(-1); return(-1);
...@@ -948,15 +996,20 @@ static void darshan_log_dzdestroy(darshan_fd fd) ...@@ -948,15 +996,20 @@ static void darshan_log_dzdestroy(darshan_fd fd)
{ {
case DARSHAN_ZLIB_COMP: case DARSHAN_ZLIB_COMP:
if(fd->o_flags == O_RDONLY) if(fd->o_flags == O_RDONLY)
{
inflateEnd(fd->dz_strm); inflateEnd(fd->dz_strm);
}
else else
{
deflateEnd(fd->dz_strm); deflateEnd(fd->dz_strm);
}
free(fd->dz_strm); free(fd->dz_strm);
break; break;
#ifdef HAVE_LIBBZ2
case DARSHAN_BZIP2_COMP:
if(fd->o_flags == O_RDONLY)
BZ2_bzDecompressEnd(fd->dz_strm);
else
BZ2_bzCompressEnd(fd->dz_strm);
free(fd->dz_strm);
break;
#endif
default: default:
fprintf(stderr, "Error: invalid compression type.\n"); fprintf(stderr, "Error: invalid compression type.\n");
} }
...@@ -974,6 +1027,11 @@ static int darshan_log_dzread(darshan_fd fd, int region_id, void *buf, int len) ...@@ -974,6 +1027,11 @@ static int darshan_log_dzread(darshan_fd fd, int region_id, void *buf, int len)
case DARSHAN_ZLIB_COMP: case DARSHAN_ZLIB_COMP:
ret = darshan_log_libz_read(fd, region_id, buf, len); ret = darshan_log_libz_read(fd, region_id, buf, len);
break; break;
#ifdef HAVE_LIBBZ2
case DARSHAN_BZIP2_COMP:
ret = darshan_log_bzip2_read(fd, region_id, buf, len);
break;
#endif
default: default:
fprintf(stderr, "Error: invalid compression type.\n"); fprintf(stderr, "Error: invalid compression type.\n");
return(-1); return(-1);
...@@ -991,6 +1049,11 @@ static int darshan_log_dzwrite(darshan_fd fd, int region_id, void *buf, int len) ...@@ -991,6 +1049,11 @@ static int darshan_log_dzwrite(darshan_fd fd, int region_id, void *buf, int len)
case DARSHAN_ZLIB_COMP: case DARSHAN_ZLIB_COMP:
ret = darshan_log_libz_write(fd, region_id, buf, len); ret = darshan_log_libz_write(fd, region_id, buf, len);
break; break;
#ifdef HAVE_LIBBZ2
case DARSHAN_BZIP2_COMP:
ret = darshan_log_bzip2_write(fd, region_id, buf, len);
break;
#endif
default: default:
fprintf(stderr, "Error: invalid compression type.\n"); fprintf(stderr, "Error: invalid compression type.\n");
return(-1); return(-1);
...@@ -1007,7 +1070,7 @@ static int darshan_log_libz_read(darshan_fd fd, int region_id, void *buf, int le ...@@ -1007,7 +1070,7 @@ static int darshan_log_libz_read(darshan_fd fd, int region_id, void *buf, int le
struct darshan_log_map map; struct darshan_log_map map;
z_stream *z_strmp = (z_stream *)fd->dz_strm; z_stream *z_strmp = (z_stream *)fd->dz_strm;
assert(fd->dz_strm); assert(z_strmp);
/* if new log region, we reload buffers and clear eor flag */ /* if new log region, we reload buffers and clear eor flag */
if(region_id != fd->dz_prev_reg_id) if(region_id != fd->dz_prev_reg_id)
...@@ -1080,7 +1143,7 @@ static int darshan_log_libz_write(darshan_fd fd, int region_id, void *buf, int l ...@@ -1080,7 +1143,7 @@ static int darshan_log_libz_write(darshan_fd fd, int region_id, void *buf, int l
struct darshan_log_map *map_p; struct darshan_log_map *map_p;
z_stream *z_strmp = (z_stream *)fd->dz_strm; z_stream *z_strmp = (z_stream *)fd->dz_strm;
assert(fd->dz_strm); assert(z_strmp);
/* if new log region, finish prev region's zstream and flush to log file */ /* if new log region, finish prev region's zstream and flush to log file */
if(region_id != fd->dz_prev_reg_id) if(region_id != fd->dz_prev_reg_id)
...@@ -1149,6 +1212,8 @@ static int darshan_log_libz_flush(darshan_fd fd, int region_id) ...@@ -1149,6 +1212,8 @@ static int darshan_log_libz_flush(darshan_fd fd, int region_id)
struct darshan_log_map *map_p; struct darshan_log_map *map_p;
z_stream *z_strmp = (z_stream *)fd->dz_strm; z_stream *z_strmp = (z_stream *)fd->dz_strm;
assert(z_strmp);
if(region_id == DARSHAN_JOB_REGION_ID) if(region_id == DARSHAN_JOB_REGION_ID)
map_p = &(fd->job_map); map_p = &(fd->job_map);
else if(region_id == DARSHAN_REC_MAP_REGION_ID) else if(region_id == DARSHAN_REC_MAP_REGION_ID)
...@@ -1185,110 +1250,201 @@ static int darshan_log_libz_flush(darshan_fd fd, int region_id) ...@@ -1185,110 +1250,201 @@ static int darshan_log_libz_flush(darshan_fd fd, int region_id)
return(0); return(0);
} }
#if 0
#ifdef HAVE_LIBBZ2 #ifdef HAVE_LIBBZ2
static int darshan_bzip2_decomp(char* comp_buf, int comp_buf_sz,
char* decomp_buf, int* inout_decomp_buf_sz) static int darshan_log_bzip2_read(darshan_fd fd, int region_id, void *buf, int len)
{ {
int ret; int ret;
int total_out = 0; int total_bytes = 0;
bz_stream tmp_stream; int tmp_out_bytes;
struct darshan_log_map map;
memset(&tmp_stream, 0, sizeof(tmp_stream)); bz_stream *bz_strmp = (bz_stream *)fd->dz_strm;
tmp_stream.bzalloc = NULL;
tmp_stream.bzfree = NULL; assert(bz_strmp);
tmp_stream.opaque = NULL;
tmp_stream.next_in = comp_buf; /* if new log region, we reload buffers and clear eor flag */
tmp_stream.avail_in = comp_buf_sz; if(region_id != fd->dz_prev_reg_id)
tmp_stream.next_out = decomp_buf;
tmp_stream.avail_out = *inout_decomp_buf_sz;
ret = BZ2_bzDecompressInit(&tmp_stream, 1, 0);
if(ret != BZ_OK)
{ {
return(-1); bz_strmp->avail_in = 0;
fd->dz_eor = 0;
fd->dz_prev_reg_id = region_id;
} }
/* while we have not finished consuming all of the uncompressed input data */ if(region_id == DARSHAN_JOB_REGION_ID)
while(tmp_stream.avail_in) map = fd->job_map;
else if(region_id == DARSHAN_REC_MAP_REGION_ID)
map = fd->rec_map;
else
map = fd->mod_map[region_id];
bz_strmp->avail_out = len;
bz_strmp->next_out = buf;
/* we just decompress until the output buffer is full, assuming there
* is enough compressed data in file to satisfy the request size.
*/
while(bz_strmp->avail_out)
{ {
if(tmp_stream.avail_out == 0) /* check if we need more compressed data */
if(bz_strmp->avail_in == 0)
{ {
/* We ran out of buffer space for decompression. In theory, /* if the eor flag is set, clear it and return -- future
* we could just alloc more space, but probably just easier * reads of this log region will restart at the beginning
* to bump up the default size of the output buffer.
*/ */
BZ2_bzDecompressEnd(&tmp_stream); if(fd->dz_eor)
return(-1); {
fd->dz_eor = 0;
break;
}
/* read more data from input file */
ret = darshan_log_dzload(fd, map);
if(ret < 0)
return(-1);
assert(fd->dz_size > 0);
bz_strmp->avail_in = fd->dz_size;
bz_strmp->next_in = (char *)fd->dz_buf;
} }
/* decompress data */ tmp_out_bytes = bz_strmp->total_out_lo32;
ret = BZ2_bzDecompress(&tmp_stream); ret = BZ2_bzDecompress(bz_strmp);
if(ret != BZ_STREAM_END) if(ret != BZ_OK && ret != BZ_STREAM_END)
{ {
BZ2_bzDecompressEnd(&tmp_stream); fprintf(stderr, "Error: unable to decompress darshan log data.\n");
return(-1); return(-1);
} }
total_bytes += (bz_strmp->total_out_lo32 - tmp_out_bytes);
assert(tmp_stream.total_out_hi32 == 0); /* reset the decompression if we encountered end of stream */
total_out += tmp_stream.total_out_lo32; if(ret == BZ_STREAM_END)
if(tmp_stream.avail_in)
{ {
/* reinitialize bzip2 stream, we have more data to BZ2_bzDecompressEnd(bz_strmp);
* decompress BZ2_bzDecompressInit(bz_strmp, 1, 0);
*/
BZ2_bzDecompressEnd(&tmp_stream);
ret = BZ2_bzDecompressInit(&tmp_stream, 1, 0);
if(ret != BZ_OK)
{
return(-1);
}
} }
} }
BZ2_bzDecompressEnd(&tmp_stream);
*inout_decomp_buf_sz = total_out; return(total_bytes);
return(0);
} }
static int darshan_bzip2_comp(char* decomp_buf, int decomp_buf_sz, static int darshan_log_bzip2_write(darshan_fd fd, int region_id, void *buf, int len)
char* comp_buf, int* inout_comp_buf_sz)
{ {
int ret; int ret;
bz_stream tmp_stream; int total_bytes = 0;
int tmp_in_bytes;
memset(&tmp_stream, 0, sizeof(tmp_stream)); int tmp_out_bytes;
tmp_stream.bzalloc = NULL; struct darshan_log_map *map_p;
tmp_stream.bzfree = NULL; bz_stream *bz_strmp = (bz_stream *)fd->dz_strm;
tmp_stream.opaque = NULL;
tmp_stream.next_in = decomp_buf; assert(bz_strmp);
tmp_stream.avail_in = decomp_buf_sz;
tmp_stream.next_out = comp_buf; /* if new log region, finish prev region's zstream and flush to log file */
tmp_stream.avail_out = *inout_comp_buf_sz; if(region_id != fd->dz_prev_reg_id)
ret = BZ2_bzCompressInit(&tmp_stream, 9, 1, 30);
if(ret != BZ_OK)
{ {
return(-1); /* error out if the region we are writing to precedes the previous
* region we wrote -- we shouldn't be moving backwards in the log
*/
if(region_id < fd->dz_prev_reg_id)
return(-1);
if(fd->dz_prev_reg_id != DARSHAN_HEADER_REGION_ID)
{
ret = darshan_log_bzip2_flush(fd, fd->dz_prev_reg_id);
if(ret < 0)
return(-1);
}
fd->dz_prev_reg_id = region_id;
} }
/* compress data */ if(region_id == DARSHAN_JOB_REGION_ID)
map_p = &(fd->job_map);
else if(region_id == DARSHAN_REC_MAP_REGION_ID)
map_p = &(fd->rec_map);
else
map_p = &(fd->mod_map[region_id]);
bz_strmp->avail_in = len;
bz_strmp->next_in = buf;
/* compress input data until none left */
while(bz_strmp->avail_in)
{
/* if we are out of output, flush to log file */
if(bz_strmp->avail_out == 0)
{
assert(fd->dz_size == DARSHAN_DEF_COMP_BUF_SZ);
ret = darshan_log_dzunload(fd, map_p);
if(ret < 0)
return(-1);
bz_strmp->avail_out = DARSHAN_DEF_COMP_BUF_SZ;
bz_strmp->next_out = (char *)fd->dz_buf;
}
tmp_in_bytes = bz_strmp->total_in_lo32;
tmp_out_bytes = bz_strmp->total_out_lo32;
ret = BZ2_bzCompress(bz_strmp, BZ_RUN);
if(ret != BZ_RUN_OK)
{
fprintf(stderr, "Error: unable to compress darshan log data.\n");
return(-1);
}
total_bytes += (bz_strmp->total_in_lo32 - tmp_in_bytes);
fd->dz_size += (bz_strmp->total_out_lo32 - tmp_out_bytes);
}
return(total_bytes);
}
static int darshan_log_bzip2_flush(darshan_fd fd, int region_id)
{
int ret;
int tmp_out_bytes;
struct darshan_log_map *map_p;
bz_stream *bz_strmp = (bz_stream *)fd->dz_strm;
assert(bz_strmp);
if(region_id == DARSHAN_JOB_REGION_ID)
map_p = &(fd->job_map);
else if(region_id == DARSHAN_REC_MAP_REGION_ID)
map_p = &(fd->rec_map);
else
map_p = &(fd->mod_map[region_id]);
/* make sure deflate finishes this stream */
bz_strmp->avail_in = 0;
bz_strmp->next_in = NULL;
do do
{ {
ret = BZ2_bzCompress(&tmp_stream, BZ_FINISH); tmp_out_bytes = bz_strmp->total_out_lo32;
ret = BZ2_bzCompress(bz_strmp, BZ_FINISH);
if(ret < 0) if(ret < 0)
{ {
BZ2_bzCompressEnd(&tmp_stream); fprintf(stderr, "Error: unable to compress darshan log data.\n");
return(-1); return(-1);
} }
fd->dz_size += (bz_strmp->total_out_lo32 - tmp_out_bytes);
if(fd->dz_size)
{
/* flush to file */
if(darshan_log_dzunload(fd, map_p) < 0)
return(-1);
bz_strmp->avail_out = DARSHAN_DEF_COMP_BUF_SZ;
bz_strmp->next_out = (char *)fd->dz_buf;
}
} while (ret != BZ_STREAM_END); } while (ret != BZ_STREAM_END);
BZ2_bzCompressEnd(&tmp_stream);
assert(tmp_stream.total_out_hi32 == 0);
*inout_comp_buf_sz = tmp_stream.total_out_lo32; BZ2_bzCompressEnd(bz_strmp);
BZ2_bzCompressInit(bz_strmp, 9, 1, 30);
return(0); return(0);
} }
#endif
#endif #endif
static int darshan_log_dzload(darshan_fd fd, struct darshan_log_map map) static int darshan_log_dzload(darshan_fd fd, struct darshan_log_map map)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment