Commit 80c07c27 authored by Shane Snyder's avatar Shane Snyder

more darshan-util updates for handling tmp logs

parent a84e8657
......@@ -209,6 +209,7 @@ int main(int argc, char **argv)
char** mnt_pts;
char** fs_types;
time_t tmp_time = 0;
int64_t run_time = 0;
char *token;
char *save;
char buffer[DARSHAN_JOB_METADATA_LEN];
......@@ -287,7 +288,9 @@ int main(int argc, char **argv)
tmp_time += job.end_time;
printf("# end_time_asci: %s", ctime(&tmp_time));
printf("# nprocs: %" PRId64 "\n", job.nprocs);
printf("# run time: %" PRId64 "\n", job.end_time - job.start_time + 1);
if(job.end_time >= job.start_time)
run_time = job.end_time - job.start_time + 1;
printf("# run time: %" PRId64 "\n", run_time);
for(token=strtok_r(job.metadata, "\n", &save);
token != NULL;
token=strtok_r(NULL, "\n", &save))
......
......@@ -188,6 +188,7 @@ static void darshan_log_agg_posix_files(void *rec, void *agg_rec, int init_flag)
case POSIX_SIZE_WRITE_4M_10M:
case POSIX_SIZE_WRITE_10M_100M:
case POSIX_SIZE_WRITE_100M_1G:
case POSIX_SIZE_WRITE_1G_PLUS:
/* sum */
agg_psx_rec->counters[i] += psx_rec->counters[i];
break;
......@@ -233,9 +234,10 @@ static void darshan_log_agg_posix_files(void *rec, void *agg_rec, int init_flag)
case POSIX_F_OPEN_TIMESTAMP:
case POSIX_F_READ_START_TIMESTAMP:
case POSIX_F_WRITE_START_TIMESTAMP:
/* minimum */
if(psx_rec->fcounters[i] > 0 &&
(psx_rec->fcounters[i] < agg_psx_rec->fcounters[i]))
/* minimum non-zero */
if((psx_rec->fcounters[i] > 0) &&
((agg_psx_rec->fcounters[i] == 0) ||
(psx_rec->fcounters[i] < agg_psx_rec->fcounters[i])))
{
agg_psx_rec->fcounters[i] = psx_rec->fcounters[i];
}
......
......@@ -5,6 +5,8 @@
#include <getopt.h>
#include <glob.h>
#include "uthash-1.9.2/src/uthash.h"
#include "darshan-logutils.h"
#define DEF_MOD_BUF_SIZE 1024 /* 1 KiB is enough for all current mod records ... */
......@@ -14,6 +16,16 @@
/* TODO: how do we set the output logfile name to be unique, and have necessary semantic info contained */
/* TODO: set job end timestamp? */
struct darshan_shared_record_ref
{
darshan_record_id id;
int ref_cnt;
char agg_rec[DEF_MOD_BUF_SIZE];
UT_hash_handle hlink;
};
void usage(char *exename)
{
fprintf(stderr, "Usage: %s [options] <tmp_dir> <job_id>\n", exename);
......@@ -87,6 +99,94 @@ int logfile_path_comp(const void *a, const void *b)
return(0);
}
int build_mod_shared_rec_hash(glob_t *globbuf, darshan_module_id mod_id,
int nprocs, char *mod_buf, struct darshan_shared_record_ref **shared_rec_hash)
{
darshan_fd in_fd;
struct darshan_base_record *base_rec;
struct darshan_shared_record_ref *ref, *tmp;
int init = 0;
int ret;
int i;
/* loop over each input log file */
for(i = 0; i < globbuf->gl_pathc; i++)
{
in_fd = darshan_log_open(globbuf->gl_pathv[i]);
if(in_fd == NULL)
{
fprintf(stderr,
"Error: unable to open input Darshan log file %s.\n",
globbuf->gl_pathv[i]);
return(-1);
}
while((ret = mod_logutils[mod_id]->log_get_record(in_fd, mod_buf)) == 1)
{
base_rec = (struct darshan_base_record *)mod_buf;
/* initialize the hash with the first rank's records */
if(!init)
{
struct darshan_base_record *agg_base;
/* create a new ref and add to the hash */
ref = malloc(sizeof(*ref));
if(!ref)
{
darshan_log_close(in_fd);
return(-1);
}
/* initialize the aggregate record with this rank's record */
agg_base = (struct darshan_base_record *)ref->agg_rec;
agg_base->id = base_rec->id;
agg_base->rank = -1;
mod_logutils[mod_id]->log_agg_records(mod_buf, ref->agg_rec, 1);
ref->id = base_rec->id;
ref->ref_cnt = 1;
HASH_ADD(hlink, *shared_rec_hash, id, sizeof(darshan_record_id), ref);
init = 1;
}
else
{
/* search for this record in shared record hash */
HASH_FIND(hlink, *shared_rec_hash, &(base_rec->id),
sizeof(darshan_record_id), ref);
if(ref)
{
/* if found, aggregate this rank's record into the shared record */
mod_logutils[mod_id]->log_agg_records(mod_buf, ref->agg_rec, 0);
ref->ref_cnt++;
}
}
}
if(ret < 0)
{
fprintf(stderr,
"Error: unable to read %s module record from input log file %s.\n",
darshan_module_names[mod_id], globbuf->gl_pathv[i]);
darshan_log_close(in_fd);
return(-1);
}
darshan_log_close(in_fd);
}
/* prune any non-shared records from the hash one last time */
HASH_ITER(hlink, *shared_rec_hash, ref, tmp)
{
if(ref->ref_cnt != nprocs)
{
HASH_DELETE(hlink, *shared_rec_hash, ref);
free(ref);
}
}
return(0);
}
int main(int argc, char *argv[])
{
int shared_redux;
......@@ -104,8 +204,10 @@ int main(int argc, char *argv[])
struct darshan_record_ref *in_hash = NULL;
struct darshan_record_ref *stitch_hash = NULL;
struct darshan_record_ref *ref, *tmp, *found;
darshan_record_id rec_id;
char *mod_buf;
struct darshan_shared_record_ref *shared_rec_hash = NULL;
struct darshan_shared_record_ref *sref, *stmp;
struct darshan_base_record *base_rec;
char mod_buf[DEF_MOD_BUF_SIZE];
int i, j;
int ret;
......@@ -161,6 +263,21 @@ int main(int argc, char *argv[])
return(-1);
}
/* if the input darshan log has metadata set indicating the darshan
* shutdown procedure was called on the log, then we error out. if the
* shutdown procedure was started, then it's possible the log has
* incomplete or corrupt data, so we just throw out the data for now.
*/
if(strstr(in_job.metadata, "darshan_shutdown=yes"))
{
fprintf(stderr,
"Error: potentially corrupt data found in input log file %s.\n",
globbuf.gl_pathv[i]);
darshan_log_close(in_fd);
globfree(&globbuf);
return(-1);
}
if(i == 0)
{
/* get job data, exe, & mounts directly from the first input log */
......@@ -198,6 +315,7 @@ int main(int argc, char *argv[])
stitch_job.end_time = in_job.end_time;
}
/* read the hash of ids->names for the input log */
ret = darshan_log_gethash(in_fd, &in_hash);
if(ret < 0)
{
......@@ -283,16 +401,6 @@ int main(int argc, char *argv[])
return(-1);
}
mod_buf = malloc(DEF_MOD_BUF_SIZE);
if(!mod_buf)
{
globfree(&globbuf);
darshan_log_close(stitch_fd);
unlink(stitch_logname);
return(-1);
}
memset(mod_buf, 0, DEF_MOD_BUF_SIZE);
/* iterate over active darshan modules and gather module data to write
* to the stitched together output log
*/
......@@ -302,21 +410,37 @@ int main(int argc, char *argv[])
if(shared_redux)
{
/* copy all root's file records into an array */
/* compare and updated shared records? */
for(j = 1; j < globbuf.gl_pathc; j++)
/* build the hash of records shared globally by this module */
ret = build_mod_shared_rec_hash(&globbuf, i, stitch_job.nprocs,
mod_buf, &shared_rec_hash);
if(ret < 0)
{
fprintf(stderr,
"Error: unable to build list of %s module's shared records.\n",
darshan_module_names[i]);
globfree(&globbuf);
darshan_log_close(stitch_fd);
unlink(stitch_logname);
return(-1);
}
/* XXX aggregate shared records? */
for(j = 0; j < globbuf.gl_pathc; j++)
/* write out the shared records first */
HASH_ITER(hlink, shared_rec_hash, sref, stmp)
{
ret = mod_logutils[i]->log_put_record(stitch_fd, sref->agg_rec);
if(ret < 0)
{
fprintf(stderr,
"Error: unable to write %s module record to output darshan log.\n",
darshan_module_names[i]);
globfree(&globbuf);
darshan_log_close(stitch_fd);
unlink(stitch_logname);
return(-1);
}
}
}
/* XXX third write each rank's blobs, with rank 0 writing the shared ones? */
for(j = 0; j < globbuf.gl_pathc; j++)
{
in_fd = darshan_log_open(globbuf.gl_pathv[j]);
......@@ -325,48 +449,63 @@ int main(int argc, char *argv[])
fprintf(stderr,
"Error: unable to open input Darshan log file %s.\n",
globbuf.gl_pathv[j]);
free(mod_buf);
globfree(&globbuf);
darshan_log_close(in_fd);
darshan_log_close(stitch_fd);
unlink(stitch_logname);
return(-1);
}
/* loop over module records and write them to output file */
while((ret = mod_logutils[i]->log_get_record(in_fd, mod_buf, &rec_id)) == 1)
while((ret = mod_logutils[i]->log_get_record(in_fd, mod_buf)) == 1)
{
base_rec = (struct darshan_base_record *)mod_buf;
HASH_FIND(hlink, shared_rec_hash, &(base_rec->id), sizeof(darshan_record_id), sref);
if(sref)
continue; /* skip shared records */
ret = mod_logutils[i]->log_put_record(stitch_fd, mod_buf);
if(ret < 0)
{
fprintf(stderr,
"Error: unable to write %s module record to output log file %s.\n",
darshan_module_names[i], globbuf.gl_pathv[j]);
free(mod_buf);
globfree(&globbuf);
darshan_log_close(in_fd);
darshan_log_close(stitch_fd);
unlink(stitch_logname);
return(-1);
}
memset(mod_buf, 0, DEF_MOD_BUF_SIZE);
}
if(ret < 0)
{
fprintf(stderr,
"Error: unable to read %s module record from input log file %s.\n",
darshan_module_names[i], globbuf.gl_pathv[j]);
free(mod_buf);
globfree(&globbuf);
darshan_log_close(in_fd);
darshan_log_close(stitch_fd);
unlink(stitch_logname);
return(-1);
}
darshan_log_close(in_fd);
}
/* clear the shared record hash for the next module */
if(shared_redux)
{
HASH_ITER(hlink, shared_rec_hash, sref, stmp)
{
HASH_DELETE(hlink, shared_rec_hash, sref);
free(sref);
}
}
}
darshan_log_close(stitch_fd);
globfree(&globbuf);
free(mod_buf);
return(0);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment