Commit 8f85fd08 authored by Shane Snyder's avatar Shane Snyder
Browse files

log compression good to go, revamped logutils

parent efd500f0
......@@ -15,6 +15,10 @@
/* TODO: enforce this when handing out ids */
#define DARSHAN_CORE_MAX_RECORDS 1024
/* default compression buffer size of 2 MiB */
/* TODO: revisit this default size if we change memory per module */
#define DARSHAN_COMP_BUF_SIZE (2 * 1024 * 1024)
struct darshan_core_module
{
darshan_module_id id;
......@@ -28,6 +32,7 @@ struct darshan_core_runtime
char exe[DARSHAN_EXE_LEN+1];
struct darshan_core_record_ref *rec_hash;
struct darshan_core_module* mod_array[DARSHAN_MAX_MODS];
char comp_buf[DARSHAN_COMP_BUF_SIZE];
double wtime_offset;
char *trailing_data;
};
......
......@@ -69,8 +69,10 @@ static void darshan_get_shared_record_ids(
struct darshan_core_runtime *core, darshan_record_id *shared_recs);
static int darshan_log_coll_open(
char *logfile_name, MPI_File *log_fh);
static int darshan_compress_buffer(void **pointers, int *lengths,
int count, char *comp_buf, int *comp_length);
static int darshan_log_write_record_hash(
MPI_File log_fh, struct darshan_core_record_ref *rec_hash,
MPI_File log_fh, struct darshan_core_runtime *core,
darshan_record_id *shared_recs, struct darshan_log_map *map);
static int darshan_log_coll_write(
MPI_File log_fh, void *buf, int count, struct darshan_log_map *map);
......@@ -139,7 +141,6 @@ static void darshan_core_initialize(int *argc, char ***argv)
if(internal_timing_flag)
init_start = DARSHAN_MPI_CALL(PMPI_Wtime)();
DARSHAN_CORE_LOCK();
/* setup darshan runtime if darshan is enabled and hasn't been initialized already */
if(!getenv("DARSHAN_DISABLE") && !darshan_core)
{
......@@ -191,7 +192,6 @@ static void darshan_core_initialize(int *argc, char ***argv)
darshan_core->trailing_data = darshan_get_exe_and_mounts(darshan_core);
}
}
DARSHAN_CORE_UNLOCK();
if(internal_timing_flag)
{
......@@ -230,7 +230,7 @@ static void darshan_core_shutdown()
long offset;
struct darshan_header log_header;
MPI_File log_fh;
MPI_Offset tmp_off;
MPI_Offset tmp_off = 0;
MPI_Status status;
if(getenv("DARSHAN_INTERNAL_TIMING"))
......@@ -356,21 +356,27 @@ static void darshan_core_shutdown()
return;
}
/* rank 0 is responsible for writing the darshan job information */
/* rank 0 is responsible for writing the compressed darshan job information */
if(my_rank == 0)
{
unsigned char tmp_buf[DARSHAN_JOB_RECORD_SIZE];
unsigned char *tmp_ptr;
void *pointers[2] = {&final_core->log_job, final_core->trailing_data};
int lengths[2] = {sizeof(struct darshan_job), DARSHAN_EXE_LEN+1};
int comp_buf_sz = 0;
/* pack the job info and exe/mount info into a buffer for writing */
tmp_ptr = tmp_buf;
memcpy(tmp_ptr, &final_core->log_job, sizeof(struct darshan_job));
tmp_ptr += sizeof(struct darshan_job);
memcpy(tmp_ptr, final_core->trailing_data, DARSHAN_EXE_LEN+1);
/* write the job information, making sure to prealloc space for the log header */
all_ret = DARSHAN_MPI_CALL(PMPI_File_write_at)(log_fh, sizeof(struct darshan_header),
tmp_buf, DARSHAN_JOB_RECORD_SIZE, MPI_BYTE, &status);
/* compress the job info and the trailing mount/exe data */
all_ret = darshan_compress_buffer(pointers, lengths, 2,
final_core->comp_buf, &comp_buf_sz);
if(all_ret)
{
fprintf(stderr, "darshan library warning: unable to compress job data\n");
unlink(logfile_name);
}
else
{
/* write the job information, preallocing space for the log header */
all_ret = DARSHAN_MPI_CALL(PMPI_File_write_at)(
log_fh, sizeof(struct darshan_header), final_core->comp_buf,
comp_buf_sz, MPI_BYTE, &status);
if(all_ret != MPI_SUCCESS)
{
fprintf(stderr, "darshan library warning: unable to write job data to log file %s\n",
......@@ -378,8 +384,9 @@ static void darshan_core_shutdown()
unlink(logfile_name);
}
/* TODO: after compression is added, this should be fixed */
log_header.rec_map.off = sizeof(struct darshan_header) + DARSHAN_JOB_RECORD_SIZE;
/* set the beginning offset of record hash, which precedes job info just written */
log_header.rec_map.off = sizeof(struct darshan_header) + comp_buf_sz;
}
}
/* error out if unable to write job information */
......@@ -392,8 +399,7 @@ static void darshan_core_shutdown()
}
/* write the record name->id hash to the log file */
ret = darshan_log_write_record_hash(log_fh, final_core->rec_hash,
shared_recs, &log_header.rec_map);
ret = darshan_log_write_record_hash(log_fh, final_core, shared_recs, &log_header.rec_map);
/* error out if unable to write record hash */
DARSHAN_MPI_CALL(PMPI_Allreduce)(&ret, &all_ret, 1, MPI_INT,
......@@ -402,7 +408,7 @@ static void darshan_core_shutdown()
{
if(my_rank == 0)
{
fprintf(stderr, "darshan library warning: unable to write record map to log file %s\n",
fprintf(stderr, "darshan library warning: unable to write record hash to log file %s\n",
logfile_name);
unlink(logfile_name);
}
......@@ -423,7 +429,8 @@ static void darshan_core_shutdown()
struct darshan_core_module* this_mod = final_core->mod_array[i];
MPI_Comm mod_comm;
void* mod_buf = NULL;
int mod_buf_size = 0;
int mod_buf_sz = 0;
int comp_buf_sz = 0;
if(!global_mod_use_count[i])
{
......@@ -447,19 +454,33 @@ static void darshan_core_shutdown()
if(local_mod_use[i])
{
/* get output buffer from module */
this_mod->mod_funcs.get_output_data(mod_comm, &mod_buf, &mod_buf_size);
this_mod->mod_funcs.get_output_data(mod_comm, &mod_buf, &mod_buf_sz);
}
/* compress the module buffer */
ret = darshan_compress_buffer((void**)&mod_buf, &mod_buf_sz, 1,
final_core->comp_buf, &comp_buf_sz);
if(ret == 0)
{
/* set the starting offset of this module */
if(tmp_off == 0)
tmp_off = log_header.rec_map.off + log_header.rec_map.len;
log_header.mod_map[i].off = tmp_off;
/* write module data buffer to the darshan log file */
ret = darshan_log_coll_write(log_fh, mod_buf, mod_buf_size, &log_header.mod_map[i]);
/* write (compressed) module data buffer to the darshan log file */
ret = darshan_log_coll_write(log_fh, final_core->comp_buf, comp_buf_sz,
&log_header.mod_map[i]);
}
else
{
/* error in compression, participate in collective write to avoid
* deadlock, but preserve return value to terminate darshan_shutdown.
*/
(void)darshan_log_coll_write(log_fh, NULL, 0, &log_header.mod_map[i]);
}
/* error out if unable to write this module's data */
/* error out if the compression or collective write failed */
DARSHAN_MPI_CALL(PMPI_Allreduce)(&ret, &all_ret, 1, MPI_INT,
MPI_LOR, MPI_COMM_WORLD);
if(all_ret != 0)
......@@ -1050,10 +1071,81 @@ static int darshan_log_coll_open(char *logfile_name, MPI_File *log_fh)
return(0);
}
static int darshan_compress_buffer(void **pointers, int *lengths, int count,
char *comp_buf, int *comp_length)
{
int ret = 0;
int i;
int total_target = 0;
z_stream tmp_stream;
memset(&tmp_stream, 0, sizeof(tmp_stream));
tmp_stream.zalloc = Z_NULL;
tmp_stream.zfree = Z_NULL;
tmp_stream.opaque = Z_NULL;
/* initialize the zlib compression parameters */
/* TODO: check these parameters? */
// ret = deflateInit2(&tmp_stream, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
// 31, 8, Z_DEFAULT_STRATEGY);
ret = deflateInit(&tmp_stream, Z_DEFAULT_COMPRESSION);
if(ret != Z_OK)
{
return(-1);
}
tmp_stream.next_out = comp_buf;
tmp_stream.avail_out = DARSHAN_COMP_BUF_SIZE;
/* loop over the input pointers */
for(i = 0; i < count; i++)
{
total_target += lengths[i];
tmp_stream.next_in = pointers[i];
tmp_stream.avail_in = lengths[i];
/* while we have not finished consuming all of the data available to
* this point in the loop
*/
while(tmp_stream.total_in < total_target)
{
if(tmp_stream.avail_out == 0)
{
/* We ran out of buffer space for compression. In theory,
* we could start using some of the file_array buffer space
* without having to malloc again. In practice, this case
* is going to be practically impossible to hit.
*/
deflateEnd(&tmp_stream);
return(-1);
}
/* compress data */
ret = deflate(&tmp_stream, Z_NO_FLUSH);
if(ret != Z_OK)
{
deflateEnd(&tmp_stream);
return(-1);
}
}
}
/* flush compression and end */
ret = deflate(&tmp_stream, Z_FINISH);
if(ret != Z_STREAM_END)
{
deflateEnd(&tmp_stream);
return(-1);
}
deflateEnd(&tmp_stream);
*comp_length = tmp_stream.total_out;
return(0);
}
/* NOTE: the map written to file may contain duplicate id->name entries if a
* record is opened by multiple ranks, but not all ranks
*/
static int darshan_log_write_record_hash(MPI_File log_fh, struct darshan_core_record_ref *rec_hash,
static int darshan_log_write_record_hash(MPI_File log_fh, struct darshan_core_runtime *core,
darshan_record_id *shared_recs, struct darshan_log_map *map)
{
int i;
......@@ -1062,20 +1154,20 @@ static int darshan_log_write_record_hash(MPI_File log_fh, struct darshan_core_re
uint32_t name_len;
size_t record_sz;
size_t hash_buf_sz = 0;
unsigned char *hash_buf;
unsigned char *hash_buf_off;
char *hash_buf;
char *hash_buf_off;
MPI_Status status;
/* non-root ranks (rank > 0) remove shared records from their map --
/* non-root ranks (rank > 0) remove shared records from their hash --
* these records will be written by rank 0
*/
if(my_rank > 0)
{
for(i=0; (i<DARSHAN_CORE_MAX_RECORDS && shared_recs[i]); i++)
{
HASH_FIND(hlink, rec_hash, &shared_recs[i], sizeof(darshan_record_id), ref);
HASH_FIND(hlink, core->rec_hash, &shared_recs[i], sizeof(darshan_record_id), ref);
assert(ref); /* this id had better be in the hash ... */
HASH_DELETE(hlink, rec_hash, ref);
HASH_DELETE(hlink, core->rec_hash, ref);
if(ref->rec.name) free(ref->rec.name);
free(ref);
}
......@@ -1090,19 +1182,19 @@ static int darshan_log_write_record_hash(MPI_File log_fh, struct darshan_core_re
return(-1);
}
/* serialize the record map into a buffer for writing */
/* serialize the record hash into a buffer for writing */
hash_buf_off = hash_buf;
HASH_ITER(hlink, rec_hash, ref, tmp)
HASH_ITER(hlink, core->rec_hash, ref, tmp)
{
name_len = strlen(ref->rec.name);
record_sz = sizeof(darshan_record_id) + sizeof(int) + name_len;
/* make sure there is room in the buffer for this record */
if((hash_buf_off + record_sz) > (hash_buf + hash_buf_sz))
{
unsigned char *tmp_buf;
char *tmp_buf;
size_t old_buf_sz;
/* if no room, reallocate the map buffer at twice the current size */
/* if no room, reallocate the hash buffer at twice the current size */
old_buf_sz = hash_buf_off - hash_buf;
hash_buf_sz *= 2;
tmp_buf = malloc(hash_buf_sz);
......@@ -1118,8 +1210,8 @@ static int darshan_log_write_record_hash(MPI_File log_fh, struct darshan_core_re
hash_buf_off = hash_buf + old_buf_sz;
}
/* now serialize the record into the map buffer.
* NOTE: darshan record map serialization method:
/* now serialize the record into the hash buffer.
* NOTE: darshan record hash serialization method:
* ... darshan_record_id | (uint32_t) path_len | path ...
*/
*((darshan_record_id *)hash_buf_off) = ref->rec.id;
......@@ -1130,11 +1222,27 @@ static int darshan_log_write_record_hash(MPI_File log_fh, struct darshan_core_re
hash_buf_off += name_len;
}
/* collectively write out the record map to the darshan log */
/* collectively write out the record hash to the darshan log */
if(hash_buf_off > hash_buf)
{
/* we have records to contribute to the collective write of the record map */
ret = darshan_log_coll_write(log_fh, hash_buf, (hash_buf_off-hash_buf), map);
int hash_buf_sz = hash_buf_off - hash_buf;
int comp_buf_sz = 0;
/* compress the record hash buffer */
ret = darshan_compress_buffer((void **)&hash_buf, &hash_buf_sz, 1,
core->comp_buf, &comp_buf_sz);
if(ret < 0)
{
/* participate in collective write to avoid deadlock, but preserve
* the compression error to pass back to the caller.
*/
(void)darshan_log_coll_write(log_fh, NULL, 0, map);
}
else
{
/* we have records to contribute to the write of the record hash */
ret = darshan_log_coll_write(log_fh, core->comp_buf, comp_buf_sz, map);
}
}
else
{
......@@ -1217,14 +1325,11 @@ void darshan_core_register_module(
struct darshan_core_module* mod;
*runtime_mem_limit = 0;
DARSHAN_CORE_LOCK();
if(!darshan_core || (id >= DARSHAN_MAX_MODS))
{
DARSHAN_CORE_UNLOCK();
return;
}
/* see if this module is already registered */
DARSHAN_CORE_LOCK();
if(darshan_core->mod_array[id])
{
/* if module is already registered just return */
......@@ -1273,8 +1378,8 @@ void darshan_core_lookup_record_id(
/* hash the input name to get a unique id for this record */
tmp_id = darshan_hash(name, len, 0);
DARSHAN_CORE_LOCK();
/* check to see if we've already stored the id->name mapping for this record */
DARSHAN_CORE_LOCK();
HASH_FIND(hlink, darshan_core->rec_hash, &tmp_id, sizeof(darshan_record_id), ref);
if(!ref)
{
......
......@@ -23,6 +23,9 @@
#include "darshan-logutils.h"
/* default to a compression buffer size of 4 MiB */
#define DARSHAN_DEF_DECOMP_BUF_SZ (4*1024*1024)
struct darshan_fd_s
{
int pf;
......@@ -38,7 +41,8 @@ struct darshan_fd_s
static int darshan_log_seek(darshan_fd fd, off_t offset);
static int darshan_log_read(darshan_fd fd, void *buf, int len);
static int darshan_log_write(darshan_fd fd, void *buf, int len);
static int darshan_decompress_buffer(char *comp_buf, int comp_buf_sz,
char *decomp_buf, int *inout_decomp_buf_sz);
/* darshan_log_open()
*
......@@ -150,23 +154,44 @@ int darshan_log_getheader(darshan_fd fd, struct darshan_header *header)
*/
int darshan_log_getjob(darshan_fd fd, struct darshan_job *job)
{
char *comp_buf;
char job_buf[DARSHAN_JOB_RECORD_SIZE] = {0};
int job_buf_sz = DARSHAN_JOB_RECORD_SIZE;
int ret;
/* allocate a buffer to store the (compressed) darshan job info */
comp_buf = malloc(fd->job_map.len);
if(!comp_buf)
return(-1);
ret = darshan_log_seek(fd, fd->job_map.off);
if(ret < 0)
{
fprintf(stderr, "Error: unable to seek in darshan log file.\n");
free(comp_buf);
return(ret);
}
/* read the job data from the log file */
ret = darshan_log_read(fd, job_buf, fd->job_map.len);
ret = darshan_log_read(fd, comp_buf, fd->job_map.len);
if(ret < fd->job_map.len)
{
fprintf(stderr, "Error: invalid darshan log file (failed to read job data).\n");
free(comp_buf);
return(-1);
}
/* decompress the job data */
ret = darshan_decompress_buffer(comp_buf, fd->job_map.len,
job_buf, &job_buf_sz);
if(ret < 0)
{
fprintf(stderr, "Error: unable to decompress darshan job data.\n");
free(comp_buf);
return(-1);
}
assert(job_buf_sz == DARSHAN_JOB_RECORD_SIZE);
free(comp_buf);
memcpy(job, job_buf, sizeof(*job));
......@@ -181,6 +206,7 @@ int darshan_log_getjob(darshan_fd fd, struct darshan_job *job)
}
/* save trailing job data, so exe and mount information can be retrieved later */
if(!fd->exe_mnt_data)
fd->exe_mnt_data = malloc(DARSHAN_EXE_LEN+1);
if(!fd->exe_mnt_data)
return(-1);
......@@ -193,9 +219,17 @@ int darshan_log_getexe(darshan_fd fd, char *buf)
{
char *newline;
/* TODO: try reading log job one more time to set this buffer up */
/* if the exe/mount info has not been saved yet, read in the job
* header to get this data.
*/
if(!fd->exe_mnt_data)
{
struct darshan_job job;
(void)darshan_log_getjob(fd, &job);
if(!fd->exe_mnt_data)
return(-1);
}
newline = strchr(fd->exe_mnt_data, '\n');
......@@ -219,11 +253,18 @@ int darshan_log_getmounts(darshan_fd fd, char*** mnt_pts,
char *pos;
int array_index = 0;
/* TODO: try reading log job one more time to set this buffer up */
/* if the exe/mount info has not been saved yet, read in the job
* header to get this data.
*/
if(!fd->exe_mnt_data)
{
struct darshan_job job;
(void)darshan_log_getjob(fd, &job);
if(!fd->exe_mnt_data)
return(-1);
}
/* count entries */
*count = 0;
pos = fd->exe_mnt_data;
while((pos = strchr(pos, '\n')) != NULL)
......@@ -278,37 +319,51 @@ int darshan_log_getmounts(darshan_fd fd, char*** mnt_pts,
*/
int darshan_log_gethash(darshan_fd fd, struct darshan_record_ref **hash)
{
unsigned char *hash_buf;
unsigned char *buf_ptr;
char *comp_buf;
char hash_buf[DARSHAN_DEF_DECOMP_BUF_SZ] = {0};
int hash_buf_sz = DARSHAN_DEF_DECOMP_BUF_SZ;
char *buf_ptr;
darshan_record_id *rec_id_ptr;
uint32_t *path_len_ptr;
char *path_ptr;
struct darshan_record_ref *ref;
int ret;
/* allocate a buffer to store the (compressed, serialized) darshan record hash */
comp_buf = malloc(fd->rec_map.len);
if(!comp_buf)
return(-1);
ret = darshan_log_seek(fd, fd->rec_map.off);
if(ret < 0)
{
fprintf(stderr, "Error: unable to seek in darshan log file.\n");
free(comp_buf);
return(ret);
}
/* allocate a buffer to store the (serialized) darshan record hash */
hash_buf = malloc(fd->rec_map.len);
if(!hash_buf)
return(-1);
/* read the record map from the log file */
ret = darshan_log_read(fd, hash_buf, fd->rec_map.len);
/* read the record hash from the log file */
ret = darshan_log_read(fd, comp_buf, fd->rec_map.len);
if(ret < fd->rec_map.len)
{
fprintf(stderr, "Error: invalid darshan log file (failed to read record hash).\n");
free(hash_buf);
free(comp_buf);
return(-1);
}
/* decompress the record hash buffer */
ret = darshan_decompress_buffer(comp_buf, fd->rec_map.len,
hash_buf, &hash_buf_sz);
if(ret < 0)
{
fprintf(stderr, "Error: unable to decompress darshan job data.\n");
free(comp_buf);
return(-1);
}
free(comp_buf);
buf_ptr = hash_buf;
while(buf_ptr < (hash_buf + fd->rec_map.len))
while(buf_ptr < (hash_buf + hash_buf_sz))
{
/* get pointers for each field of this darshan record */
/* NOTE: darshan record hash serialization method:
......@@ -324,13 +379,11 @@ int darshan_log_gethash(darshan_fd fd, struct darshan_record_ref **hash)
ref = malloc(sizeof(*ref));
if(!ref)
{
free(hash_buf);
return(-1);
}
ref->rec.name = malloc(*path_len_ptr + 1);
if(!ref->rec.name)
{
free(hash_buf);
free(ref);
return(-1);
}
......@@ -351,16 +404,88 @@ int darshan_log_gethash(darshan_fd fd, struct darshan_record_ref **hash)
HASH_ADD(hlink, *hash, rec.id, sizeof(darshan_record_id), ref);
}
free(hash_buf);
return(0);
}
int darshan_log_getmod(darshan_fd fd, int mod_id, void **mod_buf,
int *mod_buf_sz)
{
char *comp_buf;
char *tmp_buf;
int tmp_buf_sz;
int ret;
*mod_buf = NULL;
*mod_buf_sz = 0;
if(mod_id < 0 || mod_id >= DARSHAN_MAX_MODS)
{
fprintf(stderr, "Error: invalid Darshan module id.\n");
return(-1);
}
if(fd->mod_map[mod_id].len == 0)
{
/* this module has no data in the log */
return(0);
}
comp_buf = malloc(fd->mod_map[mod_id].len);
if(!comp_buf)
return(-1);
ret = darshan_log_seek(fd, fd->mod_map[mod_id].off);
if(ret < 0)
{
fprintf(stderr, "Error: unable to seek in darshan log file.\n");
free(comp_buf);
return(ret);
}
/* read the given module's (compressed) data from the log file */
ret = darshan_log_read(fd, comp_buf, fd->mod_map[mod_id].len);
if(ret < fd->mod_map[mod_id].len)
{
fprintf(stderr, "Error: invalid darshan log file (failed to read module %s data).\n",
darshan_module_names[mod_id]);
free(comp_buf);
return(-1);
}
tmp_buf_sz = DARSHAN_DEF_DECOMP_BUF_SZ;
tmp_buf = malloc(DARSHAN_DEF_DECOMP_BUF_SZ);
if(!tmp_buf)
{
free(comp_buf);
return(-1);
}
/* decompress this module's data */
ret = darshan_decompress_buffer(comp_buf, fd->mod_map[mod_id].len, tmp_buf,