From cb808ec2ab449f156045621f0e3998a544387adc Mon Sep 17 00:00:00 2001 From: Shane Snyder Date: Fri, 18 Sep 2015 17:26:08 -0500 Subject: [PATCH] add bzip2 implementation to darshan-logutils --- darshan-util/darshan-logutils.c | 304 ++++++++++++++++++++++++-------- 1 file changed, 230 insertions(+), 74 deletions(-) diff --git a/darshan-util/darshan-logutils.c b/darshan-util/darshan-logutils.c index e3560c3..25725ac 100644 --- a/darshan-util/darshan-logutils.c +++ b/darshan-util/darshan-logutils.c @@ -41,6 +41,11 @@ static int darshan_log_dzwrite(darshan_fd fd, int region_id, void *buf, int len) static int darshan_log_libz_read(darshan_fd fd, int region_id, void *buf, int len); static int darshan_log_libz_write(darshan_fd fd, int region_id, void *buf, int len); static int darshan_log_libz_flush(darshan_fd fd, int region_id); +#ifdef HAVE_LIBBZ2 +static int darshan_log_bzip2_read(darshan_fd fd, int region_id, void *buf, int len); +static int darshan_log_bzip2_write(darshan_fd fd, int region_id, void *buf, int len); +static int darshan_log_bzip2_flush(darshan_fd fd, int region_id); +#endif static int darshan_log_dzload(darshan_fd fd, struct darshan_log_map map); static int darshan_log_dzunload(darshan_fd fd, struct darshan_log_map *map_p); @@ -697,6 +702,12 @@ void darshan_log_close(darshan_fd fd) ret = darshan_log_libz_flush(fd, fd->dz_prev_reg_id); if(ret == 0) break; +#ifdef HAVE_LIBBZ2 + case DARSHAN_BZIP2_COMP: + ret = darshan_log_bzip2_flush(fd, fd->dz_prev_reg_id); + if(ret == 0) + break; +#endif default: /* if flush fails, remove the output log file */ fd->err = -1; @@ -934,6 +945,43 @@ static int darshan_log_dzinit(darshan_fd fd) fd->dz_strm = tmp_zstrm; break; } +#ifdef HAVE_LIBBZ2 + case DARSHAN_BZIP2_COMP: + { + bz_stream *tmp_bzstrm = malloc(sizeof(*tmp_bzstrm)); + if(!tmp_bzstrm) + { + free(fd->dz_buf); + return(-1); + } + tmp_bzstrm->bzalloc = NULL; + tmp_bzstrm->bzfree = NULL; + tmp_bzstrm->opaque = NULL; + tmp_bzstrm->avail_in = 0; + tmp_bzstrm->next_in = Z_NULL; + + if(fd->o_flags == O_RDONLY) + { + /* read only file, init decompress algorithm */ + ret = BZ2_bzDecompressInit(tmp_bzstrm, 1, 0); + } + else + { + /* write only file, init compress algorithm */ + ret = BZ2_bzCompressInit(tmp_bzstrm, 9, 1, 30); + tmp_bzstrm->avail_out = DARSHAN_DEF_COMP_BUF_SZ; + tmp_bzstrm->next_out = (char *)fd->dz_buf; + } + if(ret != BZ_OK) + { + free(tmp_bzstrm); + free(fd->dz_buf); + return(-1); + } + fd->dz_strm = tmp_bzstrm; + break; + } +#endif default: fprintf(stderr, "Error: invalid compression type.\n"); return(-1); @@ -948,15 +996,20 @@ static void darshan_log_dzdestroy(darshan_fd fd) { case DARSHAN_ZLIB_COMP: if(fd->o_flags == O_RDONLY) - { inflateEnd(fd->dz_strm); - } else - { deflateEnd(fd->dz_strm); - } free(fd->dz_strm); break; +#ifdef HAVE_LIBBZ2 + case DARSHAN_BZIP2_COMP: + if(fd->o_flags == O_RDONLY) + BZ2_bzDecompressEnd(fd->dz_strm); + else + BZ2_bzCompressEnd(fd->dz_strm); + free(fd->dz_strm); + break; +#endif default: fprintf(stderr, "Error: invalid compression type.\n"); } @@ -974,6 +1027,11 @@ static int darshan_log_dzread(darshan_fd fd, int region_id, void *buf, int len) case DARSHAN_ZLIB_COMP: ret = darshan_log_libz_read(fd, region_id, buf, len); break; +#ifdef HAVE_LIBBZ2 + case DARSHAN_BZIP2_COMP: + ret = darshan_log_bzip2_read(fd, region_id, buf, len); + break; +#endif default: fprintf(stderr, "Error: invalid compression type.\n"); return(-1); @@ -991,6 +1049,11 @@ static int darshan_log_dzwrite(darshan_fd fd, int region_id, void *buf, int len) case DARSHAN_ZLIB_COMP: ret = darshan_log_libz_write(fd, region_id, buf, len); break; +#ifdef HAVE_LIBBZ2 + case DARSHAN_BZIP2_COMP: + ret = darshan_log_bzip2_write(fd, region_id, buf, len); + break; +#endif default: fprintf(stderr, "Error: invalid compression type.\n"); return(-1); @@ -1007,7 +1070,7 @@ static int darshan_log_libz_read(darshan_fd fd, int region_id, void *buf, int le struct darshan_log_map map; z_stream *z_strmp = (z_stream *)fd->dz_strm; - assert(fd->dz_strm); + assert(z_strmp); /* if new log region, we reload buffers and clear eor flag */ if(region_id != fd->dz_prev_reg_id) @@ -1080,7 +1143,7 @@ static int darshan_log_libz_write(darshan_fd fd, int region_id, void *buf, int l struct darshan_log_map *map_p; z_stream *z_strmp = (z_stream *)fd->dz_strm; - assert(fd->dz_strm); + assert(z_strmp); /* if new log region, finish prev region's zstream and flush to log file */ if(region_id != fd->dz_prev_reg_id) @@ -1149,6 +1212,8 @@ static int darshan_log_libz_flush(darshan_fd fd, int region_id) struct darshan_log_map *map_p; z_stream *z_strmp = (z_stream *)fd->dz_strm; + assert(z_strmp); + if(region_id == DARSHAN_JOB_REGION_ID) map_p = &(fd->job_map); else if(region_id == DARSHAN_REC_MAP_REGION_ID) @@ -1185,110 +1250,201 @@ static int darshan_log_libz_flush(darshan_fd fd, int region_id) return(0); } -#if 0 #ifdef HAVE_LIBBZ2 -static int darshan_bzip2_decomp(char* comp_buf, int comp_buf_sz, - char* decomp_buf, int* inout_decomp_buf_sz) + +static int darshan_log_bzip2_read(darshan_fd fd, int region_id, void *buf, int len) { int ret; - int total_out = 0; - bz_stream tmp_stream; - - memset(&tmp_stream, 0, sizeof(tmp_stream)); - tmp_stream.bzalloc = NULL; - tmp_stream.bzfree = NULL; - tmp_stream.opaque = NULL; - tmp_stream.next_in = comp_buf; - tmp_stream.avail_in = comp_buf_sz; - tmp_stream.next_out = decomp_buf; - tmp_stream.avail_out = *inout_decomp_buf_sz; - - ret = BZ2_bzDecompressInit(&tmp_stream, 1, 0); - if(ret != BZ_OK) + int total_bytes = 0; + int tmp_out_bytes; + struct darshan_log_map map; + bz_stream *bz_strmp = (bz_stream *)fd->dz_strm; + + assert(bz_strmp); + + /* if new log region, we reload buffers and clear eor flag */ + if(region_id != fd->dz_prev_reg_id) { - return(-1); + bz_strmp->avail_in = 0; + fd->dz_eor = 0; + fd->dz_prev_reg_id = region_id; } - /* while we have not finished consuming all of the uncompressed input data */ - while(tmp_stream.avail_in) + if(region_id == DARSHAN_JOB_REGION_ID) + map = fd->job_map; + else if(region_id == DARSHAN_REC_MAP_REGION_ID) + map = fd->rec_map; + else + map = fd->mod_map[region_id]; + + bz_strmp->avail_out = len; + bz_strmp->next_out = buf; + + /* we just decompress until the output buffer is full, assuming there + * is enough compressed data in file to satisfy the request size. + */ + while(bz_strmp->avail_out) { - if(tmp_stream.avail_out == 0) + /* check if we need more compressed data */ + if(bz_strmp->avail_in == 0) { - /* We ran out of buffer space for decompression. In theory, - * we could just alloc more space, but probably just easier - * to bump up the default size of the output buffer. + /* if the eor flag is set, clear it and return -- future + * reads of this log region will restart at the beginning */ - BZ2_bzDecompressEnd(&tmp_stream); - return(-1); + if(fd->dz_eor) + { + fd->dz_eor = 0; + break; + } + + /* read more data from input file */ + ret = darshan_log_dzload(fd, map); + if(ret < 0) + return(-1); + assert(fd->dz_size > 0); + + bz_strmp->avail_in = fd->dz_size; + bz_strmp->next_in = (char *)fd->dz_buf; } - /* decompress data */ - ret = BZ2_bzDecompress(&tmp_stream); - if(ret != BZ_STREAM_END) + tmp_out_bytes = bz_strmp->total_out_lo32; + ret = BZ2_bzDecompress(bz_strmp); + if(ret != BZ_OK && ret != BZ_STREAM_END) { - BZ2_bzDecompressEnd(&tmp_stream); + fprintf(stderr, "Error: unable to decompress darshan log data.\n"); return(-1); } + total_bytes += (bz_strmp->total_out_lo32 - tmp_out_bytes); - assert(tmp_stream.total_out_hi32 == 0); - total_out += tmp_stream.total_out_lo32; - if(tmp_stream.avail_in) + /* reset the decompression if we encountered end of stream */ + if(ret == BZ_STREAM_END) { - /* reinitialize bzip2 stream, we have more data to - * decompress - */ - BZ2_bzDecompressEnd(&tmp_stream); - ret = BZ2_bzDecompressInit(&tmp_stream, 1, 0); - if(ret != BZ_OK) - { - return(-1); - } + BZ2_bzDecompressEnd(bz_strmp); + BZ2_bzDecompressInit(bz_strmp, 1, 0); } } - BZ2_bzDecompressEnd(&tmp_stream); - *inout_decomp_buf_sz = total_out; - return(0); + return(total_bytes); } -static int darshan_bzip2_comp(char* decomp_buf, int decomp_buf_sz, - char* comp_buf, int* inout_comp_buf_sz) +static int darshan_log_bzip2_write(darshan_fd fd, int region_id, void *buf, int len) { int ret; - bz_stream tmp_stream; - - memset(&tmp_stream, 0, sizeof(tmp_stream)); - tmp_stream.bzalloc = NULL; - tmp_stream.bzfree = NULL; - tmp_stream.opaque = NULL; - tmp_stream.next_in = decomp_buf; - tmp_stream.avail_in = decomp_buf_sz; - tmp_stream.next_out = comp_buf; - tmp_stream.avail_out = *inout_comp_buf_sz; - - ret = BZ2_bzCompressInit(&tmp_stream, 9, 1, 30); - if(ret != BZ_OK) + int total_bytes = 0; + int tmp_in_bytes; + int tmp_out_bytes; + struct darshan_log_map *map_p; + bz_stream *bz_strmp = (bz_stream *)fd->dz_strm; + + assert(bz_strmp); + + /* if new log region, finish prev region's zstream and flush to log file */ + if(region_id != fd->dz_prev_reg_id) { - return(-1); + /* error out if the region we are writing to precedes the previous + * region we wrote -- we shouldn't be moving backwards in the log + */ + if(region_id < fd->dz_prev_reg_id) + return(-1); + + if(fd->dz_prev_reg_id != DARSHAN_HEADER_REGION_ID) + { + ret = darshan_log_bzip2_flush(fd, fd->dz_prev_reg_id); + if(ret < 0) + return(-1); + } + + fd->dz_prev_reg_id = region_id; } - /* compress data */ + if(region_id == DARSHAN_JOB_REGION_ID) + map_p = &(fd->job_map); + else if(region_id == DARSHAN_REC_MAP_REGION_ID) + map_p = &(fd->rec_map); + else + map_p = &(fd->mod_map[region_id]); + + bz_strmp->avail_in = len; + bz_strmp->next_in = buf; + + /* compress input data until none left */ + while(bz_strmp->avail_in) + { + /* if we are out of output, flush to log file */ + if(bz_strmp->avail_out == 0) + { + assert(fd->dz_size == DARSHAN_DEF_COMP_BUF_SZ); + + ret = darshan_log_dzunload(fd, map_p); + if(ret < 0) + return(-1); + + bz_strmp->avail_out = DARSHAN_DEF_COMP_BUF_SZ; + bz_strmp->next_out = (char *)fd->dz_buf; + } + + tmp_in_bytes = bz_strmp->total_in_lo32; + tmp_out_bytes = bz_strmp->total_out_lo32; + ret = BZ2_bzCompress(bz_strmp, BZ_RUN); + if(ret != BZ_RUN_OK) + { + fprintf(stderr, "Error: unable to compress darshan log data.\n"); + return(-1); + } + total_bytes += (bz_strmp->total_in_lo32 - tmp_in_bytes); + fd->dz_size += (bz_strmp->total_out_lo32 - tmp_out_bytes); + } + + return(total_bytes); +} + +static int darshan_log_bzip2_flush(darshan_fd fd, int region_id) +{ + int ret; + int tmp_out_bytes; + struct darshan_log_map *map_p; + bz_stream *bz_strmp = (bz_stream *)fd->dz_strm; + + assert(bz_strmp); + + if(region_id == DARSHAN_JOB_REGION_ID) + map_p = &(fd->job_map); + else if(region_id == DARSHAN_REC_MAP_REGION_ID) + map_p = &(fd->rec_map); + else + map_p = &(fd->mod_map[region_id]); + + /* make sure deflate finishes this stream */ + bz_strmp->avail_in = 0; + bz_strmp->next_in = NULL; do { - ret = BZ2_bzCompress(&tmp_stream, BZ_FINISH); + tmp_out_bytes = bz_strmp->total_out_lo32; + ret = BZ2_bzCompress(bz_strmp, BZ_FINISH); if(ret < 0) { - BZ2_bzCompressEnd(&tmp_stream); + fprintf(stderr, "Error: unable to compress darshan log data.\n"); return(-1); } + fd->dz_size += (bz_strmp->total_out_lo32 - tmp_out_bytes); + + if(fd->dz_size) + { + /* flush to file */ + if(darshan_log_dzunload(fd, map_p) < 0) + return(-1); + + bz_strmp->avail_out = DARSHAN_DEF_COMP_BUF_SZ; + bz_strmp->next_out = (char *)fd->dz_buf; + } } while (ret != BZ_STREAM_END); - BZ2_bzCompressEnd(&tmp_stream); - assert(tmp_stream.total_out_hi32 == 0); - *inout_comp_buf_sz = tmp_stream.total_out_lo32; + + BZ2_bzCompressEnd(bz_strmp); + BZ2_bzCompressInit(bz_strmp, 9, 1, 30); return(0); } -#endif + #endif static int darshan_log_dzload(darshan_fd fd, struct darshan_log_map map) -- 2.26.2