Commit b2268e7b authored by Shane Snyder's avatar Shane Snyder

log file format changes

* header is now raw uncompressed, rather than gzip format
* mapping info stored in header is now in uncompressed terms
* storing mapping info for job data now
parent 0d7fe693
......@@ -88,14 +88,14 @@ static void darshan_get_shared_records(
static int darshan_log_open_all(
char *logfile_name, MPI_File *log_fh);
static int darshan_deflate_buffer(
void **pointers, int *lengths, int count, int nocomp_flag,
char *comp_buf, int *comp_length);
void **pointers, int *lengths, int count, char *comp_buf,
int *comp_buf_length);
static int darshan_log_write_record_hash(
MPI_File log_fh, struct darshan_core_runtime *core,
uint64_t *inout_off);
static int darshan_log_append_all(
MPI_File log_fh, struct darshan_core_runtime *core, void *buf,
int count, uint64_t *inout_off, uint64_t *agg_uncomp_sz);
int count, uint64_t *inout_off);
static void darshan_core_cleanup(
struct darshan_core_runtime* core);
......@@ -238,7 +238,6 @@ void darshan_core_shutdown()
double header1, header2;
double tm_end;
uint64_t gz_fp = 0;
uint64_t tmp_off = 0;
MPI_File log_fh;
MPI_Status status;
......@@ -380,7 +379,7 @@ void darshan_core_shutdown()
int comp_buf_sz = 0;
/* compress the job info and the trailing mount/exe data */
all_ret = darshan_deflate_buffer(pointers, lengths, 2, 0,
all_ret = darshan_deflate_buffer(pointers, lengths, 2,
final_core->comp_buf, &comp_buf_sz);
if(all_ret)
{
......@@ -390,7 +389,7 @@ void darshan_core_shutdown()
else
{
/* write the job information, preallocing space for the log header */
gz_fp += sizeof(struct darshan_header) + 23; /* gzip headers/trailers ... */
gz_fp += sizeof(struct darshan_header);
all_ret = DARSHAN_MPI_CALL(PMPI_File_write_at)(log_fh, gz_fp,
final_core->comp_buf, comp_buf_sz, MPI_BYTE, &status);
if(all_ret != MPI_SUCCESS)
......@@ -400,8 +399,6 @@ void darshan_core_shutdown()
unlink(logfile_name);
}
/* set the beginning offset of record hash, which follows job info just written */
gz_fp += comp_buf_sz;
}
}
......@@ -420,8 +417,9 @@ void darshan_core_shutdown()
if(internal_timing_flag)
rec1 = DARSHAN_MPI_CALL(PMPI_Wtime)();
/* write the record name->id hash to the log file */
final_core->log_header.rec_map.off = gz_fp;
ret = darshan_log_write_record_hash(log_fh, final_core, &gz_fp);
tmp_off = final_core->log_header.rec_map.off + final_core->log_header.rec_map.len;
final_core->log_header.rec_map.len = gz_fp - final_core->log_header.rec_map.off;
/* error out if unable to write record hash */
DARSHAN_MPI_CALL(PMPI_Allreduce)(&ret, &all_ret, 1, MPI_INT,
......@@ -496,12 +494,11 @@ void darshan_core_shutdown()
mod_shared_rec_cnt, &mod_buf, &mod_buf_sz);
}
final_core->log_header.mod_map[i].off = tmp_off;
/* append this module's data to the darshan log */
ret = darshan_log_append_all(log_fh, final_core, mod_buf, mod_buf_sz,
&gz_fp, &(final_core->log_header.mod_map[i].len));
tmp_off += final_core->log_header.mod_map[i].len;
final_core->log_header.mod_map[i].off = gz_fp;
ret = darshan_log_append_all(log_fh, final_core, mod_buf, mod_buf_sz, &gz_fp);
final_core->log_header.mod_map[i].len =
gz_fp - final_core->log_header.mod_map[i].off;
/* error out if the log append failed */
DARSHAN_MPI_CALL(PMPI_Allreduce)(&ret, &all_ret, 1, MPI_INT,
......@@ -534,39 +531,18 @@ void darshan_core_shutdown()
/* rank 0 is responsible for writing the log header */
if(my_rank == 0)
{
void *header_buf = &(final_core->log_header);
int header_buf_sz = sizeof(struct darshan_header);
int comp_buf_sz = 0;
/* initialize the remaining header fields */
strcpy(final_core->log_header.version_string, DARSHAN_LOG_VERSION);
final_core->log_header.magic_nr = DARSHAN_MAGIC_NR;
/* deflate the header */
/* NOTE: the header is not actually compressed because space for it must
* be preallocated before writing. i.e., the "compressed" header
* must be constant sized, sizeof(struct darshan_header) + 23.
* it is still necessary to deflate the header or the resulting log
* file will not be a valid gzip file.
*/
all_ret = darshan_deflate_buffer((void **)&header_buf, &header_buf_sz, 1, 1,
final_core->comp_buf, &comp_buf_sz);
if(all_ret)
all_ret = DARSHAN_MPI_CALL(PMPI_File_write_at)(log_fh, 0, &(final_core->log_header),
sizeof(struct darshan_header), MPI_BYTE, &status);
if(all_ret != MPI_SUCCESS)
{
fprintf(stderr, "darshan library warning: unable to compress header\n");
fprintf(stderr, "darshan library warning: unable to write header to log file %s\n",
logfile_name);
unlink(logfile_name);
}
else
{
all_ret = DARSHAN_MPI_CALL(PMPI_File_write_at)(log_fh, 0, final_core->comp_buf,
comp_buf_sz, MPI_BYTE, &status);
if(all_ret != MPI_SUCCESS)
{
fprintf(stderr, "darshan library warning: unable to write header to log file %s\n",
logfile_name);
unlink(logfile_name);
}
}
}
/* error out if unable to write log header */
......@@ -1199,12 +1175,11 @@ static int darshan_log_open_all(char *logfile_name, MPI_File *log_fh)
}
static int darshan_deflate_buffer(void **pointers, int *lengths, int count,
int nocomp_flag, char *comp_buf, int *comp_length)
char *comp_buf, int *comp_buf_length)
{
int ret = 0;
int i;
int total_target = 0;
int z_comp_level;
z_stream tmp_stream;
/* just return if there is no data */
......@@ -1218,7 +1193,7 @@ static int darshan_deflate_buffer(void **pointers, int *lengths, int count,
}
else
{
*comp_length = 0;
*comp_buf_length = 0;
return(0);
}
......@@ -1229,10 +1204,9 @@ static int darshan_deflate_buffer(void **pointers, int *lengths, int count,
/* initialize the zlib compression parameters */
/* TODO: check these parameters? */
z_comp_level = nocomp_flag ? Z_NO_COMPRESSION : Z_DEFAULT_COMPRESSION;
ret = deflateInit2(&tmp_stream, z_comp_level, Z_DEFLATED,
15 + 16, 8, Z_DEFAULT_STRATEGY);
// ret = deflateInit(&tmp_stream, z_comp_level);
// ret = deflateInit2(&tmp_stream, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
// 15 + 16, 8, Z_DEFAULT_STRATEGY);
ret = deflateInit(&tmp_stream, Z_DEFAULT_COMPRESSION);
if(ret != Z_OK)
{
return(-1);
......@@ -1282,7 +1256,7 @@ static int darshan_deflate_buffer(void **pointers, int *lengths, int count,
}
deflateEnd(&tmp_stream);
*comp_length = tmp_stream.total_out;
*comp_buf_length = tmp_stream.total_out;
return(0);
}
......@@ -1352,15 +1326,10 @@ static int darshan_log_write_record_hash(MPI_File log_fh, struct darshan_core_ru
memcpy(hash_buf_off, ref->rec.name, name_len);
hash_buf_off += name_len;
}
/* store uncompressed offset of the record hash */
core->log_header.rec_map.off = sizeof(struct darshan_header) +
DARSHAN_JOB_RECORD_SIZE;
hash_buf_sz = hash_buf_off - hash_buf;
/* collectively write out the record hash to the darshan log */
hash_buf_sz = hash_buf_off - hash_buf;
ret = darshan_log_append_all(log_fh, core, hash_buf, hash_buf_sz,
inout_off, &(core->log_header.rec_map.len));
ret = darshan_log_append_all(log_fh, core, hash_buf, hash_buf_sz, inout_off);
free(hash_buf);
......@@ -1369,22 +1338,18 @@ static int darshan_log_write_record_hash(MPI_File log_fh, struct darshan_core_ru
/* NOTE: inout_off contains the starting offset of this append at the beginning
* of the call, and contains the ending offset at the end of the call.
* total_uncomp_sz will store the collective size of the uncompressed
* buffer being written. This data is necessary to properly index data
* when reading a darshan log. Also, these variables should only be valid
* on the root rank (rank 0).
* This variable is only valid on the root rank (rank 0).
*/
static int darshan_log_append_all(MPI_File log_fh, struct darshan_core_runtime *core,
void *buf, int count, uint64_t *inout_off, uint64_t *agg_uncomp_sz)
void *buf, int count, uint64_t *inout_off)
{
MPI_Offset send_off, my_off;
MPI_Status status;
uint64_t uncomp_buf_sz = count;
int comp_buf_sz = 0;
int ret;
/* compress the input buffer */
ret = darshan_deflate_buffer((void **)&buf, &count, 1, 0,
ret = darshan_deflate_buffer((void **)&buf, &count, 1,
core->comp_buf, &comp_buf_sz);
if(ret < 0)
comp_buf_sz = 0;
......@@ -1416,16 +1381,16 @@ static int darshan_log_append_all(MPI_File log_fh, struct darshan_core_runtime *
core->comp_buf, comp_buf_sz, MPI_BYTE, &status);
}
/* send the ending offset from rank (n-1) to rank 0 */
if(nprocs > 1)
{
/* send the ending offset from rank (n-1) to rank 0 */
if(my_rank == (nprocs-1))
{
my_off += comp_buf_sz;
DARSHAN_MPI_CALL(PMPI_Send)(&my_off, 1, MPI_OFFSET, 0, 0,
MPI_COMM_WORLD);
}
else if(my_rank == 0)
if(my_rank == 0)
{
DARSHAN_MPI_CALL(PMPI_Recv)(&my_off, 1, MPI_OFFSET, (nprocs-1), 0,
MPI_COMM_WORLD, &status);
......@@ -1438,10 +1403,6 @@ static int darshan_log_append_all(MPI_File log_fh, struct darshan_core_runtime *
*inout_off = my_off + comp_buf_sz;
}
/* pass back the aggregate uncompressed size of this blob */
DARSHAN_MPI_CALL(PMPI_Reduce)(&uncomp_buf_sz, agg_uncomp_sz, 1,
MPI_UINT64_T, MPI_SUM, 0, MPI_COMM_WORLD);
if(ret != 0)
return(-1);
return(0);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment