Commit 093b9f22 authored by Shane Snyder's avatar Shane Snyder
Browse files

update log put functions for new log format

parent d8b6e0dc
...@@ -31,21 +31,30 @@ ...@@ -31,21 +31,30 @@
/* max length of exe string within job record (not counting '\0') */ /* max length of exe string within job record (not counting '\0') */
#define DARSHAN_EXE_LEN (DARSHAN_JOB_RECORD_SIZE - sizeof(struct darshan_job) - 1) #define DARSHAN_EXE_LEN (DARSHAN_JOB_RECORD_SIZE - sizeof(struct darshan_job) - 1)
typedef uint64_t darshan_record_id;
#define DARSHAN_MAX_MODS 16 #define DARSHAN_MAX_MODS 16
/* TODO: do we want the logutil defs here ? */
/* X-macro for keeping module ordering consistent */
/* NOTE: first val used to define module enum values,
* second val used to define module name strings, and
* third val is used to provide the name of a
* corresponding logutils structure for parsing module
* data out of the log file (only used in darshan-util,
* just pass NULL (no quotes) if no log parsing
* functions are required).
*/
#define DARSHAN_MODULE_IDS \ #define DARSHAN_MODULE_IDS \
X(DARSHAN_NULL_MOD, "NULL") \ X(DARSHAN_NULL_MOD, "NULL", NULL) \
X(DARSHAN_POSIX_MOD, "POSIX") \ X(DARSHAN_POSIX_MOD, "POSIX", posix_logutils) \
X(DARSHAN_MPIIO_MOD, "MPI-IO") \ X(DARSHAN_MPIIO_MOD, "MPI-IO", mpiio_logutils) \
X(DARSHAN_HDF5_MOD, "HDF5") \ X(DARSHAN_HDF5_MOD, "HDF5", hdf5_logutils) \
X(DARSHAN_PNETCDF_MOD, "PNETCDF") X(DARSHAN_PNETCDF_MOD, "PNETCDF", pnetcdf_logutils)
/* unique identifiers to distinguish between available darshan modules */ /* unique identifiers to distinguish between available darshan modules */
/* NOTES: - valid ids range from [0...DARSHAN_MAX_MODS-1] /* NOTES: - valid ids range from [0...DARSHAN_MAX_MODS-1]
* - order of ids control module shutdown order (and consequently, order in log file) * - order of ids control module shutdown order (and consequently, order in log file)
*/ */
#define X(a, b) a, #define X(a, b, c) a,
typedef enum typedef enum
{ {
DARSHAN_MODULE_IDS DARSHAN_MODULE_IDS
...@@ -53,7 +62,7 @@ typedef enum ...@@ -53,7 +62,7 @@ typedef enum
#undef X #undef X
/* module name strings */ /* module name strings */
#define X(a, b) b, #define X(a, b, c) b,
static char * const darshan_module_names[] = static char * const darshan_module_names[] =
{ {
DARSHAN_MODULE_IDS DARSHAN_MODULE_IDS
...@@ -67,6 +76,8 @@ enum darshan_comp_type ...@@ -67,6 +76,8 @@ enum darshan_comp_type
DARSHAN_BZIP2_COMP, DARSHAN_BZIP2_COMP,
}; };
typedef uint64_t darshan_record_id;
/* the darshan_log_map structure is used to indicate the location of /* the darshan_log_map structure is used to indicate the location of
* specific module data in a Darshan log. Note that 'off' and 'len' are * specific module data in a Darshan log. Note that 'off' and 'len' are
* the respective offset and length of the data in the file, in *uncompressed* * the respective offset and length of the data in the file, in *uncompressed*
......
...@@ -26,6 +26,8 @@ static int darshan_log_read(darshan_fd fd, void *buf, int len); ...@@ -26,6 +26,8 @@ static int darshan_log_read(darshan_fd fd, void *buf, int len);
static int darshan_log_write(darshan_fd fd, void *buf, int len); static int darshan_log_write(darshan_fd fd, void *buf, int len);
static int darshan_decompress_buf(char* comp_buf, int comp_buf_sz, static int darshan_decompress_buf(char* comp_buf, int comp_buf_sz,
char* decomp_buf, int* inout_decomp_buf_sz); char* decomp_buf, int* inout_decomp_buf_sz);
static int darshan_compress_buf(char* decomp_buf, int decomp_buf_sz,
char* comp_buf, int* inout_comp_buf_sz);
/* TODO: can we make this s.t. we don't care about ordering (i.e., X macro it ) */ /* TODO: can we make this s.t. we don't care about ordering (i.e., X macro it ) */
struct darshan_mod_logutil_funcs *mod_logutils[DARSHAN_MAX_MODS] = struct darshan_mod_logutil_funcs *mod_logutils[DARSHAN_MAX_MODS] =
...@@ -56,31 +58,30 @@ struct darshan_mod_logutil_funcs *mod_logutils[DARSHAN_MAX_MODS] = ...@@ -56,31 +58,30 @@ struct darshan_mod_logutil_funcs *mod_logutils[DARSHAN_MAX_MODS] =
*/ */
darshan_fd darshan_log_open(const char *name, const char *mode) darshan_fd darshan_log_open(const char *name, const char *mode)
{ {
int o_flags;
darshan_fd tmp_fd; darshan_fd tmp_fd;
/* we only allow "w" or "r" modes, nothing fancy */ /* we only allow "w" or "r" modes, nothing fancy */
assert(strlen(mode) == 1); assert(strlen(mode) == 1);
assert(mode[0] == 'r' || mode[0] == 'w'); assert(mode[0] == 'r' || mode[0] == 'w');
tmp_fd = malloc(sizeof(*tmp_fd));
if(!tmp_fd)
return(NULL);
memset(tmp_fd, 0, sizeof(*tmp_fd));
if(mode[0] == 'r') if(mode[0] == 'r')
{ {
o_flags = O_RDONLY; tmp_fd->fildes = open(name, O_RDONLY);
} }
else if (mode[0] == 'w') else if (mode[0] == 'w')
{ {
/* TODO: permissions when creating? umask */
/* when writing, we create the log file making sure not to overwrite /* when writing, we create the log file making sure not to overwrite
* an existing log * an existing log
*/ */
o_flags = O_WRONLY | O_CREAT | O_EXCL; tmp_fd->fildes = open(name, O_WRONLY | O_CREAT | O_EXCL, 0400);
} }
tmp_fd = malloc(sizeof(*tmp_fd));
if(!tmp_fd)
return(NULL);
memset(tmp_fd, 0, sizeof(*tmp_fd));
tmp_fd->fildes = open(name, o_flags);
if(tmp_fd->fildes < 0) if(tmp_fd->fildes < 0)
{ {
perror("darshan_log_open: "); perror("darshan_log_open: ");
...@@ -161,15 +162,17 @@ int darshan_log_getheader(darshan_fd fd, struct darshan_header *header) ...@@ -161,15 +162,17 @@ int darshan_log_getheader(darshan_fd fd, struct darshan_header *header)
return(0); return(0);
} }
#if 0
/* darshan_log_putheader() /* darshan_log_putheader()
* *
* write a darshan header to log file * write a darshan header to log file
* NOTE: the header is not passed in, and is instead built using
* contents of the given file descriptor
* *
* returns 0 on success, -1 on failure * returns 0 on success, -1 on failure
*/ */
int darshan_log_putheader(darshan_fd fd, struct darshan_header *header) int darshan_log_putheader(darshan_fd fd)
{ {
struct darshan_header header;
int ret; int ret;
ret = darshan_log_seek(fd, 0); ret = darshan_log_seek(fd, 0);
...@@ -179,21 +182,24 @@ int darshan_log_putheader(darshan_fd fd, struct darshan_header *header) ...@@ -179,21 +182,24 @@ int darshan_log_putheader(darshan_fd fd, struct darshan_header *header)
return(-1); return(-1);
} }
memset(&header, 0, sizeof(header));
strcpy(header.version_string, DARSHAN_LOG_VERSION);
header.magic_nr = DARSHAN_MAGIC_NR;
/* copy the mapping information to the header */
memcpy(&header.rec_map, &fd->rec_map, sizeof(struct darshan_log_map));
memcpy(&header.mod_map, &fd->mod_map, DARSHAN_MAX_MODS * sizeof(struct darshan_log_map));
/* write header to file */ /* write header to file */
ret = darshan_log_write(fd, header, sizeof(*header)); ret = darshan_log_write(fd, &header, sizeof(header));
if(ret != sizeof(*header)) if(ret != sizeof(header))
{ {
fprintf(stderr, "Error: failed to write Darshan log file header.\n"); fprintf(stderr, "Error: failed to write Darshan log file header.\n");
return(-1); return(-1);
} }
/* copy the mapping information to the file descriptor */
memcpy(&fd->rec_map, &header->rec_map, sizeof(struct darshan_log_map));
memcpy(&fd->mod_map, &header->mod_map, DARSHAN_MAX_MODS * sizeof(struct darshan_log_map));
return(0); return(0);
} }
#endif
/* darshan_log_getjob() /* darshan_log_getjob()
* *
...@@ -238,7 +244,6 @@ int darshan_log_getjob(darshan_fd fd, struct darshan_job *job) ...@@ -238,7 +244,6 @@ int darshan_log_getjob(darshan_fd fd, struct darshan_job *job)
free(comp_buf); free(comp_buf);
return(-1); return(-1);
} }
assert(job_buf_sz == DARSHAN_JOB_RECORD_SIZE);
free(comp_buf); free(comp_buf);
memcpy(job, job_buf, sizeof(*job)); memcpy(job, job_buf, sizeof(*job));
...@@ -263,7 +268,6 @@ int darshan_log_getjob(darshan_fd fd, struct darshan_job *job) ...@@ -263,7 +268,6 @@ int darshan_log_getjob(darshan_fd fd, struct darshan_job *job)
return(0); return(0);
} }
#if 0
/* darshan_log_putjob() /* darshan_log_putjob()
* *
* write job level metadat to darshan log file * write job level metadat to darshan log file
...@@ -273,6 +277,8 @@ int darshan_log_getjob(darshan_fd fd, struct darshan_job *job) ...@@ -273,6 +277,8 @@ int darshan_log_getjob(darshan_fd fd, struct darshan_job *job)
int darshan_log_putjob(darshan_fd fd, struct darshan_job *job) int darshan_log_putjob(darshan_fd fd, struct darshan_job *job)
{ {
struct darshan_job job_copy; struct darshan_job job_copy;
char *comp_buf;
int comp_buf_sz;
int len; int len;
int ret; int ret;
...@@ -297,9 +303,23 @@ int darshan_log_putjob(darshan_fd fd, struct darshan_job *job) ...@@ -297,9 +303,23 @@ int darshan_log_putjob(darshan_fd fd, struct darshan_job *job)
} }
} }
comp_buf = malloc(sizeof(*job));
if(!comp_buf)
return(-1);
comp_buf_sz = sizeof(*job);
/* compress the job data */
ret = darshan_compress_buf((char*)&job_copy, sizeof(*job), comp_buf, &comp_buf_sz);
if(ret < 0)
{
fprintf(stderr, "Error: failed to decompress darshan job data.\n");
free(comp_buf);
return(-1);
}
/* write job data to file */ /* write job data to file */
ret = darshan_log_write(fd, job, sizeof(*job)); ret = darshan_log_write(fd, comp_buf, comp_buf_sz);
if(ret != sizeof(*job)) if(ret != comp_buf_sz)
{ {
fprintf(stderr, "Error: failed to write darshan log file job data.\n"); fprintf(stderr, "Error: failed to write darshan log file job data.\n");
return(-1); return(-1);
...@@ -307,7 +327,6 @@ int darshan_log_putjob(darshan_fd fd, struct darshan_job *job) ...@@ -307,7 +327,6 @@ int darshan_log_putjob(darshan_fd fd, struct darshan_job *job)
return(0); return(0);
} }
#endif
/* darshan_log_getexe() /* darshan_log_getexe()
* *
...@@ -340,30 +359,34 @@ int darshan_log_getexe(darshan_fd fd, char *buf) ...@@ -340,30 +359,34 @@ int darshan_log_getexe(darshan_fd fd, char *buf)
return (0); return (0);
} }
#if 0
/* darshan_log_putexe() /* darshan_log_putexe()
* *
* wrties the application exe name to darshan log file * wrties the application exe name to darshan log file
* NOTE: this needs to be called immediately following put_job as it
* expects the final pointer to be positioned immediately following
* the darshan job information
* *
* returns 0 on success, -1 on failure * returns 0 on success, -1 on failure
*/ */
int darshan_log_putexe(darshan_fd fd, char *buf) int darshan_log_putexe(darshan_fd fd, char *buf)
{ {
int tmp_off = sizeof(struct darshan_header) + sizeof(struct darshan_job);
int len; int len;
int ret; int ret;
char comp_buf[DARSHAN_EXE_LEN] = {0};
int comp_buf_sz = DARSHAN_EXE_LEN;
len = strlen(buf);
ret = darshan_log_seek(fd, tmp_off); /* compress the input exe string */
ret = darshan_compress_buf(buf, len, comp_buf, &comp_buf_sz);
if(ret < 0) if(ret < 0)
{ {
fprintf(stderr, "Error: unable to seek in darshan log file.\n"); fprintf(stderr, "Error: unable to compress exe string.\n");
return(-1); return(-1);
} }
len = strlen(buf); ret = darshan_log_write(fd, comp_buf, comp_buf_sz);
if(ret != comp_buf_sz)
ret = darshan_log_write(fd, buf, len);
if(ret != len)
{ {
fprintf(stderr, "Error: failed to write exe string to darshan log file.\n"); fprintf(stderr, "Error: failed to write exe string to darshan log file.\n");
return(-1); return(-1);
...@@ -371,7 +394,6 @@ int darshan_log_putexe(darshan_fd fd, char *buf) ...@@ -371,7 +394,6 @@ int darshan_log_putexe(darshan_fd fd, char *buf)
return(0); return(0);
} }
#endif
/* darshan_log_getmounts() /* darshan_log_getmounts()
* *
...@@ -445,7 +467,6 @@ int darshan_log_getmounts(darshan_fd fd, char*** mnt_pts, ...@@ -445,7 +467,6 @@ int darshan_log_getmounts(darshan_fd fd, char*** mnt_pts,
return(0); return(0);
} }
#if 0
/* darshan_log_putmounts() /* darshan_log_putmounts()
* *
* writes mount information to the darshan log file * writes mount information to the darshan log file
...@@ -459,31 +480,40 @@ int darshan_log_putmounts(darshan_fd fd, char** mnt_pts, char** fs_types, int co ...@@ -459,31 +480,40 @@ int darshan_log_putmounts(darshan_fd fd, char** mnt_pts, char** fs_types, int co
{ {
int i; int i;
char line[1024]; char line[1024];
char mnt_dat[DARSHAN_EXE_LEN] = {0};
int mnt_dat_sz = 0;
char comp_buf[DARSHAN_EXE_LEN] = {0};
int comp_buf_sz = DARSHAN_EXE_LEN;
char *tmp;
int ret; int ret;
/* write each mount entry to file */ /* write each mount entry to file */
tmp = mnt_dat;
for(i=count-1; i>=0; i--) for(i=count-1; i>=0; i--)
{ {
sprintf(line, "\n%s\t%s", fs_types[i], mnt_pts[i]); sprintf(line, "\n%s\t%s", fs_types[i], mnt_pts[i]);
ret = darshan_log_write(fd, line, strlen(line));
if (ret != strlen(line)) memcpy(tmp, line, strlen(line));
{ tmp += strlen(line);
fprintf(stderr, "Error: failed to write darshan log mount entry.\n"); mnt_dat_sz += strlen(line);
return(-1); }
}
ret = darshan_compress_buf(mnt_dat, mnt_dat_sz, comp_buf, &comp_buf_sz);
if(ret < 0)
{
fprintf(stderr, "Error: unable to compress mount data.\n");
return(-1);
} }
/* seek ahead to end of exe region, will be zero filled */ ret = darshan_log_write(fd, comp_buf, comp_buf_sz);
ret = darshan_log_seek(fd, DARSHAN_JOB_RECORD_SIZE); if (ret != comp_buf_sz)
if (ret < 0)
{ {
fprintf(stderr, "Error: unable to seek forward in darshan log file.\n"); fprintf(stderr, "Error: failed to write darshan log mount data.\n");
return(-1); return(-1);
} }
return(0); return(0);
} }
#endif
/* darshan_log_gethash() /* darshan_log_gethash()
* *
...@@ -591,10 +621,12 @@ int darshan_log_gethash(darshan_fd fd, struct darshan_record_ref **hash) ...@@ -591,10 +621,12 @@ int darshan_log_gethash(darshan_fd fd, struct darshan_record_ref **hash)
return(0); return(0);
} }
#if 0
/* darshan_log_puthash() /* darshan_log_puthash()
* *
* writes the hash table of records to the darshan log file * writes the hash table of records to the darshan log file
* NOTE: this function call should follow immediately after the call
* to darshan_log_putmounts(), as it assumes the darshan log file pointer
* is pointing to the offset immediately following the mount information
* *
* returns 0 on success, -1 on failure * returns 0 on success, -1 on failure
*/ */
...@@ -606,15 +638,10 @@ int darshan_log_puthash(darshan_fd fd, struct darshan_record_ref *hash) ...@@ -606,15 +638,10 @@ int darshan_log_puthash(darshan_fd fd, struct darshan_record_ref *hash)
struct darshan_record_ref *ref, *tmp; struct darshan_record_ref *ref, *tmp;
uint32_t name_len; uint32_t name_len;
size_t record_sz; size_t record_sz;
char *comp_buf;
char comp_buf_sz;
int ret; int ret;
ret = darshan_log_seek(fd, fd->rec_map.off);
if(ret < 0)
{
fprintf(stderr, "Error: unable to seek in darshan log file.\n");
return(-1);
}
/* allocate a buffer to store 2 MiB worth of record data */ /* allocate a buffer to store 2 MiB worth of record data */
/* NOTE: this buffer may be reallocated if estimate is too small */ /* NOTE: this buffer may be reallocated if estimate is too small */
hash_buf_sz = 2 * 1024 * 1024; hash_buf_sz = 2 * 1024 * 1024;
...@@ -663,13 +690,28 @@ int darshan_log_puthash(darshan_fd fd, struct darshan_record_ref *hash) ...@@ -663,13 +690,28 @@ int darshan_log_puthash(darshan_fd fd, struct darshan_record_ref *hash)
memcpy(hash_buf_off, ref->rec.name, name_len); memcpy(hash_buf_off, ref->rec.name, name_len);
hash_buf_off += name_len; hash_buf_off += name_len;
} }
/* collectively write out the record hash to the darshan log */
hash_buf_sz = hash_buf_off - hash_buf; hash_buf_sz = hash_buf_off - hash_buf;
comp_buf = malloc(hash_buf_sz);
if(!comp_buf)
return(-1);
comp_buf_sz = hash_buf_sz;
/* compress the record hash */
ret = darshan_compress_buf(hash_buf, hash_buf_sz, comp_buf, &comp_buf_sz);
if(ret < 0)
{
fprintf(stderr, "Error: unable to compress darshan record hash.\n");
return(-1);
}
/* set the appropriate mapping info for the record hash in the file descriptor */
fd->rec_map.off = fd->pos;
fd->rec_map.len = comp_buf_sz;
/* write the record hash to file */ /* write the record hash to file */
ret = darshan_log_write(fd, hash_buf, hash_buf_sz); ret = darshan_log_write(fd, comp_buf, comp_buf_sz);
if(ret != hash_buf_sz) if(ret != comp_buf_sz)
{ {
fprintf(stderr, "Error: failed to write record hash to darshan log file.\n"); fprintf(stderr, "Error: failed to write record hash to darshan log file.\n");
return(-1); return(-1);
...@@ -679,7 +721,6 @@ int darshan_log_puthash(darshan_fd fd, struct darshan_record_ref *hash) ...@@ -679,7 +721,6 @@ int darshan_log_puthash(darshan_fd fd, struct darshan_record_ref *hash)
return(0); return(0);
} }
#endif
/* darshan_log_getmod() /* darshan_log_getmod()
* *
...@@ -738,7 +779,7 @@ int darshan_log_getmod(darshan_fd fd, darshan_module_id mod_id, ...@@ -738,7 +779,7 @@ int darshan_log_getmod(darshan_fd fd, darshan_module_id mod_id,
return(1); return(1);
} }
#if 0 /* XXX */
/* darshan_log_putmod() /* darshan_log_putmod()
* *
* write a chunk of module data to the darshan log file * write a chunk of module data to the darshan log file
...@@ -748,15 +789,8 @@ int darshan_log_getmod(darshan_fd fd, darshan_module_id mod_id, ...@@ -748,15 +789,8 @@ int darshan_log_getmod(darshan_fd fd, darshan_module_id mod_id,
int darshan_log_putmod(darshan_fd fd, darshan_module_id mod_id, int darshan_log_putmod(darshan_fd fd, darshan_module_id mod_id,
void *mod_buf, int mod_buf_sz) void *mod_buf, int mod_buf_sz)
{ {
int mod_buf_end = fd->mod_map[mod_id].off + fd->mod_map[mod_id].len;
int ret; int ret;
if(!fd->mod_map[mod_id].len)
{
fprintf(stderr, "Error: attempting to write data for empty module.\n");
return(-1);
}
/* only seek to start of module data if current log file position /* only seek to start of module data if current log file position
* is not within the given mod_id's range. This allows one to * is not within the given mod_id's range. This allows one to
* repeatedly call this function and put chunks of a module's * repeatedly call this function and put chunks of a module's
...@@ -791,7 +825,6 @@ int darshan_log_putmod(darshan_fd fd, darshan_module_id mod_id, ...@@ -791,7 +825,6 @@ int darshan_log_putmod(darshan_fd fd, darshan_module_id mod_id,
return(0); return(0);
} }
#endif
/* darshan_log_close() /* darshan_log_close()
* *
...@@ -892,7 +925,7 @@ static int darshan_decompress_buf(char* comp_buf, int comp_buf_sz, ...@@ -892,7 +925,7 @@ static int darshan_decompress_buf(char* comp_buf, int comp_buf_sz,
{ {
if(tmp_stream.avail_out == 0) if(tmp_stream.avail_out == 0)
{ {
/* We ran out of buffer space for compression. In theory, /* We ran out of buffer space for decompression. In theory,
* we could just alloc more space, but probably just easier * we could just alloc more space, but probably just easier
* to bump up the default size of the output buffer. * to bump up the default size of the output buffer.
*/ */
...@@ -921,7 +954,53 @@ static int darshan_decompress_buf(char* comp_buf, int comp_buf_sz, ...@@ -921,7 +954,53 @@ static int darshan_decompress_buf(char* comp_buf, int comp_buf_sz,
static int darshan_compress_buf(char* decomp_buf, int decomp_buf_sz, static int darshan_compress_buf(char* decomp_buf, int decomp_buf_sz,
char* comp_buf, int* inout_comp_buf_sz) char* comp_buf, int* inout_comp_buf_sz)
{ {
int ret;
int total_out = 0;
z_stream tmp_stream;
memset(&tmp_stream, 0, sizeof(tmp_stream));
tmp_stream.zalloc = Z_NULL;
tmp_stream.zfree = Z_NULL;
tmp_stream.opaque = Z_NULL;
tmp_stream.next_in = (unsigned char*)decomp_buf;
tmp_stream.avail_in = decomp_buf_sz;
tmp_stream.next_out = (unsigned char*)comp_buf;
tmp_stream.avail_out = *inout_comp_buf_sz;
ret = deflateInit(&tmp_stream, Z_DEFAULT_COMPRESSION);
if(ret != Z_OK)
{
return(-1);
}
/* while we have not finished consuming all of the uncompressed input data */
while(tmp_stream.avail_in)
{
if(tmp_stream.avail_out == 0)