Commit 66a83b03 authored by Shane Snyder's avatar Shane Snyder
Browse files

first cut at the stitch logs utility

Currently, this tool can take a set of temporary, uncompressed log
files on a given node, and combine into a single darshan log.
Next step would be to modify this code to perform shared file
reduction logic to combine per-node log files into per-job log
files.
parent 4ac75905
#include <stdio.h>
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <glob.h>
#include <string.h>
#include <assert.h>
#include "darshan-logutils.h"
#define DEF_MOD_BUF_SIZE 1024 /* 1 KiB is enough for all current mod records ... */
int logfile_path_comp(const void *a, const void *b)
{
char *pathA = *(char **)a;
......@@ -30,30 +31,30 @@ int logfile_path_comp(const void *a, const void *b)
int main(int argc, char *argv[])
{
darshan_fd out_fd;
char *out_logname; /* XXX default + configurable? */
int ret;
int i, j;
char *tmplog_dir;
int job_id;
glob_t globbuf;
char glob_pstr[512];
int tmp_fd;
struct darshan_job in_job, out_job;
char in_exe_mnt[DARSHAN_EXE_LEN+1];
char *mnt_s, *pos;
char out_exe[DARSHAN_EXE_LEN+1];
char **out_mnt_pts;
char **out_fs_types;
int out_mnt_count = 0;
char *stitch_logname = "/tmp/test123.darshan"; /* XXX default + configurable? */
darshan_fd in_fd, stitch_fd;
struct darshan_job in_job, stitch_job;
char stitch_exe[DARSHAN_EXE_LEN+1];
char **stitch_mnt_pts;
char **stitch_fs_types;
int stitch_mnt_count = 0;
struct darshan_record_ref *in_hash = NULL;
struct darshan_record_ref *stitch_hash = NULL;
struct darshan_record_ref *ref, *tmp, *found;
darshan_record_id rec_id;
char *mod_buf;
int i, j;
int ret;
/* TODO: are there any checks we should do to ensure tmp logs belong to the same job */
/* we can't specifically check the job id, since the pid is used if no job scheduler */
/* TODO: how do we set the output logfile name to be unique, and have necessary semantic info contained */
/* TODO: more thorough way of cleaning up (free, close, etc.) */
if(argc != 3)
{
fprintf(stderr, "Usage: ./darshan-stitch-tmplogs <tmp_dir> <job_id>\n"
......@@ -83,235 +84,241 @@ int main(int argc, char *argv[])
*/
qsort(globbuf.gl_pathv, globbuf.gl_pathc, sizeof(char *), logfile_path_comp);
memset(&out_job, 0, sizeof(struct darshan_job));
memset(&stitch_job, 0, sizeof(struct darshan_job));
/* first pass at stitching together logs:
* - compose a final job-level metadata structure
* - compose output job-level metadata structure (including exe & mount data)
* - compose output record_id->file_name mapping
*/
for(i = 0; i < globbuf.gl_pathc; i++)
{
memset(&in_job, 0, sizeof(struct darshan_job));
tmp_fd = open(globbuf.gl_pathv[i], O_RDONLY);
if(tmp_fd < 0)
in_fd = darshan_log_open(globbuf.gl_pathv[i]);
if(in_fd == NULL)
{
fprintf(stderr,
"Error: unable to open Darshan log file %s.\n",
"Error: unable to open input Darshan log file %s.\n",
globbuf.gl_pathv[i]);
globfree(&globbuf);
return(-1);
}
/* read job-level metadata from the input file */
ret = pread(tmp_fd, &in_job, sizeof(struct darshan_job), 0);
if(ret < sizeof(struct darshan_job))
ret = darshan_log_getjob(in_fd, &in_job);
if(ret < 0)
{
fprintf(stderr,
"Error: unable to read job data from Darshan log file %s\n",
"Error: unable to read job data from input Darshan log file %s.\n",
globbuf.gl_pathv[i]);
darshan_log_close(in_fd);
globfree(&globbuf);
return(-1);
}
if(i == 0)
{
/* get all job data from the first input log */
memcpy(&out_job, &in_job, sizeof(struct darshan_job));
/* get job data, exe, & mounts directly from the first input log */
memcpy(&stitch_job, &in_job, sizeof(struct darshan_job));
ret = pread(tmp_fd, in_exe_mnt, DARSHAN_EXE_LEN+1, sizeof(struct darshan_job));
if(ret < DARSHAN_EXE_LEN+1)
ret = darshan_log_getexe(in_fd, stitch_exe);
if(ret < 0)
{
fprintf(stderr,
"Error: unable to read exe & mount data from Darshan log file %s\n",
"Error: unable to read exe string from Darshan log file %s.\n",
globbuf.gl_pathv[i]);
darshan_log_close(in_fd);
globfree(&globbuf);
return(-1);
}
/* get the exe string */
mnt_s = strchr(in_exe_mnt, '\n');
memcpy(out_exe, in_exe_mnt, mnt_s-in_exe_mnt);
out_exe[mnt_s-in_exe_mnt] = '\0';
/* build mount structures */
pos = mnt_s;
while((pos = strchr(pos, '\n')) != NULL)
ret = darshan_log_getmounts(in_fd, &stitch_mnt_pts,
&stitch_fs_types, &stitch_mnt_count);
if(ret < 0)
{
pos++;
out_mnt_count++;
fprintf(stderr,
"Error: unable to read mount info from Darshan log file %s.\n",
globbuf.gl_pathv[i]);
darshan_log_close(in_fd);
globfree(&globbuf);
return(-1);
}
if(out_mnt_count)
}
else
{
out_mnt_pts = malloc(out_mnt_count * sizeof(char *));
assert(out_mnt_pts);
out_fs_types = malloc(out_mnt_count * sizeof(char *));
assert(out_fs_types);
/* potentially update job timestamps using remaining logs */
if(in_job.start_time < stitch_job.start_time)
stitch_job.start_time = in_job.start_time;
if(in_job.end_time > stitch_job.end_time)
stitch_job.end_time = in_job.end_time;
}
/* work backwards through the table and parse each line (except for
* first, which holds command line information)
*/
j = 0;
while((pos = strrchr(in_exe_mnt, '\n')) != NULL)
{
/* overestimate string lengths */
out_mnt_pts[j] = malloc(DARSHAN_EXE_LEN);
assert(out_mnt_pts[j]);
out_fs_types[j] = malloc(DARSHAN_EXE_LEN);
assert(out_fs_types[j]);
ret = sscanf(++pos, "%s\t%s", out_fs_types[j], out_mnt_pts[j]);
if(ret != 2)
ret = darshan_log_gethash(in_fd, &in_hash);
if(ret < 0)
{
fprintf(stderr,
"Error: poorly formatted mount table in darshan log file %s.\n",
"Error: unable to read job data from input Darshan log file %s.\n",
globbuf.gl_pathv[i]);
darshan_log_close(in_fd);
globfree(&globbuf);
return(-1);
}
pos--;
*pos = '\0';
j++;
}
}
/* iterate the input hash, copying over record_id->file_name mappings
* that have not already been copied to the output hash
*/
HASH_ITER(hlink, in_hash, ref, tmp)
{
HASH_FIND(hlink, stitch_hash, &(ref->rec.id),
sizeof(darshan_record_id), found);
if(!found)
{
HASH_ADD(hlink, stitch_hash, rec.id,
sizeof(darshan_record_id), ref);
}
else
else if(strcmp(ref->rec.name, found->rec.name))
{
/* potentially update job timestamps */
if(in_job.start_time < out_job.start_time)
out_job.start_time = in_job.start_time;
if(in_job.end_time > out_job.end_time)
out_job.end_time = in_job.end_time;
fprintf(stderr,
"Error: invalid Darshan record table entry.\n");
darshan_log_close(in_fd);
globfree(&globbuf);
return(-1);
}
}
close(tmp_fd);
darshan_log_close(in_fd);
}
/* create the output "stitched together" log */
out_fd = darshan_log_create("/tmp/test123.darshan", DARSHAN_ZLIB_COMP, 1);
if(out_fd == NULL)
stitch_fd = darshan_log_create(stitch_logname, DARSHAN_ZLIB_COMP, 1);
if(stitch_fd == NULL)
{
fprintf(stderr, "Error: unable to create output darshan log.\n");
globfree(&globbuf);
return(-1);
}
/* write the darshan job info, exe string, and mount data to output file */
ret = darshan_log_putjob(out_fd, &out_job);
ret = darshan_log_putjob(stitch_fd, &stitch_job);
if(ret < 0)
{
fprintf(stderr, "Error: unable to write job data to output darshan log.\n");
globfree(&globbuf);
darshan_log_close(stitch_fd);
unlink(stitch_logname);
return(-1);
}
ret = darshan_log_putexe(out_fd, out_exe);
ret = darshan_log_putexe(stitch_fd, stitch_exe);
if(ret < 0)
{
fprintf(stderr, "Error: unable to write exe string to output darshan log.\n");
globfree(&globbuf);
darshan_log_close(stitch_fd);
unlink(stitch_logname);
return(-1);
}
ret = darshan_log_putmounts(out_fd, out_mnt_pts, out_fs_types, out_mnt_count);
ret = darshan_log_putmounts(stitch_fd, stitch_mnt_pts, stitch_fs_types, stitch_mnt_count);
if(ret < 0)
{
fprintf(stderr, "Error: unable to write mount data to output darshan log.\n");
globfree(&globbuf);
darshan_log_close(stitch_fd);
unlink(stitch_logname);
return(-1);
}
void *mod_buf = malloc(2*1024*1024);
assert(mod_buf);
/* second pass at stitching together logs:
* - append module data to output log file
*/
for(i = 0; i < globbuf.gl_pathc; i++)
{
tmp_fd = open(globbuf.gl_pathv[i], O_RDONLY);
if(tmp_fd < 0)
/* write the stitched together table of records to output file */
ret = darshan_log_puthash(stitch_fd, stitch_hash);
if(ret < 0)
{
fprintf(stderr,
"Error: unable to open Darshan log file %s.\n",
globbuf.gl_pathv[i]);
fprintf(stderr, "Error: unable to write record table to output darshan log.\n");
globfree(&globbuf);
darshan_log_close(stitch_fd);
unlink(stitch_logname);
return(-1);
}
ret = pread(tmp_fd, mod_buf, 2*1024*1024, 4*1024);
if(ret < (2*1024*1024))
mod_buf = malloc(DEF_MOD_BUF_SIZE);
if(!mod_buf)
{
fprintf(stderr,
"Error: unable to read module data from Darshan log file %s\n",
globbuf.gl_pathv[i]);
globfree(&globbuf);
darshan_log_close(stitch_fd);
unlink(stitch_logname);
return(-1);
}
memset(mod_buf, 0, DEF_MOD_BUF_SIZE);
/* iterate and write file records */
void *mod_buf_p = mod_buf;
while(1)
for(i = 0; i < DARSHAN_MPIIO_MOD; i++)
{
if(!mod_logutils[i]) continue;
#if 0
ret = darshan_log_put_posix_file(out_fd, VOID*);
if(ret < 0)
/* XXX first build shared record list? */
for(j = 0; j < globbuf.gl_pathc; j++)
{
fprintf(stderr,
"Error: unable to write module record to output darshan log.\n");
return(-1);
}
#endif
}
close(tmp_fd);
}
/* XXX second aggregate shared records ? */
for(j = 0; j < globbuf.gl_pathc; j++)
{
darshan_log_close(out_fd);
}
#endif
/* XXX third write each rank's blobs, with rank 0 writing the shared ones ? */
for(j = 0; j < globbuf.gl_pathc; j++)
{
in_fd = darshan_log_open(globbuf.gl_pathv[j]);
if(in_fd == NULL)
{
fprintf(stderr,
"Error: unable to open input Darshan log file %s.\n",
globbuf.gl_pathv[j]);
free(mod_buf);
globfree(&globbuf);
darshan_log_close(in_fd);
unlink(stitch_logname);
return(-1);
}
return(0);
}
#if 0
/* print out job info to sanity check */
time_t tmp_time = 0;
char *token;
char *save;
char buffer[DARSHAN_JOB_METADATA_LEN];
#include <time.h>
printf("# exe: %s\n", out_exe);
printf("# uid: %" PRId64 "\n", out_job.uid);
printf("# jobid: %" PRId64 "\n", out_job.jobid);
printf("# start_time: %" PRId64 "\n", out_job.start_time);
tmp_time += out_job.start_time;
printf("# start_time_asci: %s", ctime(&tmp_time));
printf("# end_time: %" PRId64 "\n", out_job.end_time);
tmp_time = 0;
tmp_time += out_job.end_time;
printf("# end_time_asci: %s", ctime(&tmp_time));
printf("# nprocs: %" PRId64 "\n", out_job.nprocs);
printf("# run time: %" PRId64 "\n", out_job.end_time - out_job.start_time + 1);
for(token=strtok_r(out_job.metadata, "\n", &save);
token != NULL;
token=strtok_r(NULL, "\n", &save))
/* loop over module records and write them to output file */
while((ret = mod_logutils[i]->log_get_record(in_fd, mod_buf, &rec_id)) == 1)
{
char *key;
char *value;
/* NOTE: we intentionally only split on the first = character.
* There may be additional = characters in the value portion
* (for example, when storing mpi-io hints).
*/
strcpy(buffer, token);
key = buffer;
value = index(buffer, '=');
if(!value)
continue;
/* convert = to a null terminator to split key and value */
value[0] = '\0';
value++;
printf("# metadata: %s = %s\n", key, value);
ret = mod_logutils[i]->log_put_record(stitch_fd, mod_buf);
if(ret < 0)
{
fprintf(stderr,
"Error: unable to write %s module record to output log file %s.\n",
darshan_module_names[i], globbuf.gl_pathv[j]);
free(mod_buf);
darshan_log_close(in_fd);
unlink(stitch_logname);
return(-1);
}
/* print table of mounted file systems */
printf("\n# mounted file systems (mount point and fs type)\n");
printf("# -------------------------------------------------------\n");
for(j=0; j<out_mnt_count; j++)
memset(mod_buf, 0, DEF_MOD_BUF_SIZE);
}
if(ret < 0)
{
printf("# mount entry:\t%s\t%s\n", out_mnt_pts[j], out_fs_types[j]);
fprintf(stderr,
"Error: unable to read %s module record from input log file %s.\n",
darshan_module_names[i], globbuf.gl_pathv[j]);
free(mod_buf);
darshan_log_close(in_fd);
unlink(stitch_logname);
return(-1);
}
#endif
darshan_log_close(in_fd);
}
}
darshan_log_close(stitch_fd);
globfree(&globbuf);
free(mod_buf);
return(0);
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment