Commit da477e42 authored by Shane Snyder's avatar Shane Snyder
Browse files

Complete (first) implementation for darshan

-darshan-core:
    - initializes darshan job data structures
    - creates log file
    - writes job info header
    - reduces which modules were used locally/globally
    - shut down and write out individual modules one by one

-darshan-posix:
    - register w/ darshan-core and allocate allowed memory
    - track open counter and timestamp for opened files
    - provide file record buffer at shutdown

Major TODOs:
    - shared file reductions (prefer reusable across modules)
    - register handles with darshan file_ids (prefer reusable)
    - generic compression of module buffers (zlib/bzip2)
    - write out map of filenames to darshan ids
    - modify darshan-util and add darshan-posix parser to read logs
parent 7496c279
......@@ -121,3 +121,5 @@ TODO NOTES:
- would the posix module be the only one to leverage this info?
- it is used to correlate filenames with mount points, but not sure what is needed where
- test fortran and c apps to make sure command line args are handled properly (see email with Phil 11/4)
- how do we abstract shared file reduction so each module doesn't completely reimplement?
- when is DARSHAN_MPI_CALL macro needed? All MPI functions, or just those intercepted by darshan?
......@@ -12,18 +12,6 @@
#include "darshan.h"
/* calculation of compression buffer size (defaults to 50% of the maximum
* memory that Darshan is allowed to consume on a process)
*/
//#define CP_COMP_BUF_SIZE ((CP_MAX_FILES * sizeof(struct darshan_file))/2)
#define CP_COMP_BUF_SIZE 0
/* max length of module name string (not counting \0) */
#define DARSHAN_MOD_NAME_LEN 31
/* flags to indicate properties of file records */
#define CP_FLAG_NOTIMING 1<<1
struct darshan_core_module
{
darshan_module_id id;
......@@ -32,13 +20,12 @@ struct darshan_core_module
};
/* in memory structure to keep up with job level data */
/* TODO: trailing data ? */
struct darshan_core_job_runtime
{
struct darshan_job log_job;
struct darshan_core_module* mod_array[DARSHAN_MAX_MODS];
char exe[CP_EXE_LEN+1];
char comp_buf[CP_COMP_BUF_SIZE];
int flags;
double wtime_offset;
};
......
......@@ -48,8 +48,12 @@ typedef uint64_t darshan_file_id;
struct darshan_module_funcs
{
void (*prepare_for_shutdown)(void);
void (*get_output_data)(void **, int);
void (*get_output_data)(
MPI_Comm mod_comm, /* communicator to use for module shutdown */
void** buf, /* output parameter to save module buffer address */
int* size /* output parameter to save module buffer size */
);
void (*shutdown)(void);
};
/*********************************************
......
......@@ -30,11 +30,13 @@ extern char* __progname;
static struct darshan_core_job_runtime *darshan_core_job = NULL;
static pthread_mutex_t darshan_mutex = PTHREAD_MUTEX_INITIALIZER;
static int my_rank = -1;
static int nprocs = -1;
static void darshan_core_initialize(int *argc, char ***argv);
static void darshan_core_shutdown(void);
static void darshan_core_cleanup(struct darshan_core_job_runtime* job);
static void darshan_get_logfile_name(char* logfile_name, int jobid, struct tm* start_tm);
static void darshan_log_record_hints_and_ver(struct darshan_core_job_runtime* job);
#define DARSHAN_LOCK() pthread_mutex_lock(&darshan_mutex)
#define DARSHAN_UNLOCK() pthread_mutex_unlock(&darshan_mutex)
......@@ -85,7 +87,6 @@ int MPI_Finalize(void)
static void darshan_core_initialize(int *argc, char ***argv)
{
int i;
int nprocs;
int internal_timing_flag = 0;
double init_start, init_time, init_max;
char* truncate_string = "<TRUNCATED>";
......@@ -110,11 +111,6 @@ static void darshan_core_initialize(int *argc, char ***argv)
{
memset(darshan_core_job, 0, sizeof(*darshan_core_job));
if(getenv("DARSHAN_DISABLE_TIMING"))
{
darshan_core_job->flags |= CP_FLAG_NOTIMING;
}
strcpy(darshan_core_job->log_job.version_string, CP_VERSION);
darshan_core_job->log_job.magic_nr = CP_MAGIC_NR;
darshan_core_job->log_job.uid = getuid();
......@@ -181,12 +177,27 @@ static void darshan_core_shutdown()
int jobid;
struct tm* start_tm;
time_t start_time_tmp;
int ret;
int ret = 0;
int all_ret = 0;
int64_t first_start_time;
int64_t last_end_time;
int local_mod_use[DARSHAN_MAX_MODS] = {0};
int global_mod_use_count[DARSHAN_MAX_MODS] = {0};
int i;
char* key;
char* value;
char* hints;
char* tok_str;
char* orig_tok_str;
char* saveptr = NULL;
char* mod_index;
char* new_logfile_name;
double start_log_time;
double end_log_time;
long offset;
MPI_File log_fh;
MPI_Info info;
MPI_Status status;
if(getenv("DARSHAN_INTERNAL_TIMING"))
internal_timing_flag = 1;
......@@ -204,6 +215,8 @@ static void darshan_core_shutdown()
darshan_core_job = NULL;
DARSHAN_UNLOCK();
start_log_time = DARSHAN_MPI_CALL(PMPI_Wtime)();
logfile_name = malloc(PATH_MAX);
if(!logfile_name)
{
......@@ -211,7 +224,7 @@ static void darshan_core_shutdown()
return;
}
/* set jobid and logfile name on rank 0 */
/* set darshan job id/metadata and constuct log file name on rank 0 */
if(my_rank == 0)
{
/* Use CP_JOBID_OVERRIDE for the env var or CP_JOBID */
......@@ -233,9 +246,13 @@ static void darshan_core_shutdown()
jobid = getpid();
}
/* add to darshan core job */
final_job->log_job.jobid = (int64_t)jobid;
/* if we are using any hints to write the log file, then record those
* hints in the log file header
*/
darshan_log_record_hints_and_ver(final_job);
/* use human readable start time format in log filename */
start_time_tmp = final_job->log_job.start_time;
start_tm = localtime(&start_time_tmp);
......@@ -251,6 +268,7 @@ static void darshan_core_shutdown()
if(strlen(logfile_name) == 0)
{
/* failed to generate log file name */
free(logfile_name);
darshan_core_cleanup(final_job);
return;
}
......@@ -278,9 +296,224 @@ static void darshan_core_shutdown()
/* reduce the number of times a module was opened globally and bcast to everyone */
DARSHAN_MPI_CALL(PMPI_Allreduce)(local_mod_use, global_mod_use_count, DARSHAN_MAX_MODS, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
MPI_Info_create(&info);
/* check environment variable to see if the default MPI file hints have
* been overridden
*/
hints = getenv(CP_LOG_HINTS_OVERRIDE);
if(!hints)
{
hints = __CP_LOG_HINTS;
}
if(hints && strlen(hints) > 0)
{
tok_str = strdup(hints);
if(tok_str)
{
orig_tok_str = tok_str;
do
{
/* split string on semicolon */
key = strtok_r(tok_str, ";", &saveptr);
if(key)
{
tok_str = NULL;
/* look for = sign splitting key/value pairs */
value = index(key, '=');
if(value)
{
/* break key and value into separate null terminated strings */
value[0] = '\0';
value++;
if(strlen(key) > 0)
MPI_Info_set(info, key, value);
}
}
}while(key != NULL);
free(orig_tok_str);
}
}
/* open the darshan log file for writing */
ret = DARSHAN_MPI_CALL(PMPI_File_open)(MPI_COMM_WORLD, logfile_name,
MPI_MODE_CREATE | MPI_MODE_WRONLY | MPI_MODE_EXCL, info, &log_fh);
MPI_Info_free(&info);
/* error out if unable to open log file */
DARSHAN_MPI_CALL(PMPI_Allreduce)(&ret, &all_ret, 1, MPI_INT,
MPI_LOR, MPI_COMM_WORLD);
if(all_ret != 0)
{
if(my_rank == 0)
{
int msg_len;
char msg[MPI_MAX_ERROR_STRING] = {0};
MPI_Error_string(ret, msg, &msg_len);
fprintf(stderr, "darshan library warning: unable to open log file %s: %s\n",
logfile_name, msg);
unlink(logfile_name);
}
free(logfile_name);
darshan_core_cleanup(final_job);
return;
}
/* TODO: is there another header, or is job info first data ? */
/* TODO: are MPI data types necessary or can we just write buffers of MPI_BYTEs? */
/* write the job info on rank 0 */
if(my_rank == 0)
{
ret = DARSHAN_MPI_CALL(PMPI_File_write_at)(log_fh, 0, &(final_job->log_job),
sizeof(struct darshan_job), MPI_BYTE, &status);
if(ret != MPI_SUCCESS)
{
int msg_len;
char msg[MPI_MAX_ERROR_STRING] = {0};
MPI_Error_string(ret, msg, &msg_len);
fprintf(stderr, "darshan library warning: unable to write job data to log file %s: %s\n",
logfile_name, msg);
unlink(logfile_name);
free(logfile_name);
darshan_core_cleanup(final_job);
return;
}
}
/* TODO: id->file name map write */
/* loop over globally used darshan modules and:
* - get final output buffer
* - compress (zlib/bzip2) provided output buffer
* - write compressed buffer to log file
* - shutdown the module
*/
for(i = 0; i < DARSHAN_MAX_MODS; i++)
{
struct darshan_core_module* this_mod = final_job->mod_array[i];
MPI_Comm mod_comm;
void* mod_buf = NULL;
int mod_buf_size = 0;
void* comp_buf = NULL;
long comp_buf_size = 0;
long scan_offset = 0;
if(!global_mod_use_count[i])
continue;
/* create a communicator to use for shutting down the module */
if(global_mod_use_count[i] == nprocs)
{
MPI_Comm_dup(MPI_COMM_WORLD, &mod_comm);
}
else
{
MPI_Comm_split(MPI_COMM_WORLD, local_mod_use[i], 0, &mod_comm);
}
/* if module is registered locally, get the corresponding output buffer */
if(local_mod_use[i])
{
/* get output buffer from module */
this_mod->mod_funcs.get_output_data(mod_comm, &mod_buf, &mod_buf_size);
}
if(mod_buf_size > 0)
{
/* TODO generic compression */
comp_buf = mod_buf;
comp_buf_size = mod_buf_size;
}
/* get current file size on rank 0 so we can calculate offset correctly */
scan_offset = comp_buf_size;
if(my_rank == 0)
{
MPI_Offset tmp_off;
ret = MPI_File_get_size(log_fh, &tmp_off);
if(ret != MPI_SUCCESS)
{
int msg_len;
char msg[MPI_MAX_ERROR_STRING] = {0};
MPI_Error_string(ret, msg, &msg_len);
fprintf(stderr, "darshan library warning: unable to write module data to log file %s: %s\n",
logfile_name, msg);
DARSHAN_MPI_CALL(PMPI_File_close)(&log_fh);
unlink(logfile_name);
free(logfile_name);
darshan_core_cleanup(final_job);
return;
}
scan_offset += tmp_off;
}
/* figure out everyone's offset using scan */
DARSHAN_MPI_CALL(PMPI_Scan)(&scan_offset, &offset, 1, MPI_LONG, MPI_SUM, MPI_COMM_WORLD);
offset -= comp_buf_size;
/* collectively write out each rank's contributing data (maybe nothing) */
ret = DARSHAN_MPI_CALL(PMPI_File_write_at_all)(log_fh, offset, comp_buf,
comp_buf_size, MPI_BYTE, &status);
/* error out if unable to write */
DARSHAN_MPI_CALL(PMPI_Allreduce)(&ret, &all_ret, 1, MPI_INT,
MPI_LOR, MPI_COMM_WORLD);
if(all_ret != 0)
{
DARSHAN_MPI_CALL(PMPI_File_close)(&log_fh);
if(my_rank == 0)
{
int msg_len;
char msg[MPI_MAX_ERROR_STRING] = {0};
MPI_Error_string(ret, msg, &msg_len);
fprintf(stderr, "darshan library warning: unable to write module data to log file %s: %s\n",
logfile_name, msg);
unlink(logfile_name);
}
free(logfile_name);
darshan_core_cleanup(final_job);
return;
}
/* shutdown module if registered locally */
if(local_mod_use[i])
{
this_mod->mod_funcs.shutdown();
this_mod = NULL;
}
MPI_Comm_free(&mod_comm);
}
DARSHAN_MPI_CALL(PMPI_File_close)(&log_fh);
/* if we got this far, there are no errors, so rename from *.darshan_partial
* to *-<logwritetime>.darshan.gz, which indicates that this log file is
* complete and ready for analysis
*/
new_logfile_name = malloc(PATH_MAX);
if(new_logfile_name)
{
new_logfile_name[0] = '\0';
end_log_time = DARSHAN_MPI_CALL(PMPI_Wtime)();
strcat(new_logfile_name, logfile_name);
mod_index = strstr(new_logfile_name, ".darshan_partial");
sprintf(mod_index, "_%d.darshan.gz", (int)(end_log_time-start_log_time+1));
rename(logfile_name, new_logfile_name);
/* set permissions on log file */
#ifdef __CP_GROUP_READABLE_LOGS
chmod(new_logfile_name, (S_IRUSR|S_IRGRP));
#else
chmod(new_logfile_name, (S_IRUSR));
#endif
free(new_logfile_name);
}
free(logfile_name);
......@@ -294,13 +527,18 @@ static void darshan_core_shutdown()
return;
}
/* free darshan core data structures to shutdown */
static void darshan_core_cleanup(struct darshan_core_job_runtime* job)
{
int i;
for(i = 0; i < DARSHAN_MAX_MODS; i++)
{
if(job->mod_array[i])
{
free(job->mod_array[i]);
job->mod_array[i] = NULL;
}
}
free(job);
......@@ -308,6 +546,7 @@ static void darshan_core_cleanup(struct darshan_core_job_runtime* job)
return;
}
/* construct the darshan log file name */
static void darshan_get_logfile_name(char* logfile_name, int jobid, struct tm* start_tm)
{
char* logpath;
......@@ -443,6 +682,52 @@ static void darshan_get_logfile_name(char* logfile_name, int jobid, struct tm* s
return;
}
/* record any hints used to write the darshan log in the log header */
static void darshan_log_record_hints_and_ver(struct darshan_core_job_runtime* job)
{
char* hints;
char* header_hints;
int meta_remain = 0;
char* m;
/* check environment variable to see if the default MPI file hints have
* been overridden
*/
hints = getenv(CP_LOG_HINTS_OVERRIDE);
if(!hints)
{
hints = __CP_LOG_HINTS;
}
if(!hints || strlen(hints) < 1)
return;
header_hints = strdup(hints);
if(!header_hints)
return;
meta_remain = DARSHAN_JOB_METADATA_LEN -
strlen(job->log_job.metadata) - 1;
if(meta_remain >= (strlen(PACKAGE_VERSION) + 9))
{
sprintf(job->log_job.metadata, "lib_ver=%s\n", PACKAGE_VERSION);
meta_remain -= (strlen(PACKAGE_VERSION) + 9);
}
if(meta_remain >= (3 + strlen(header_hints)))
{
m = job->log_job.metadata + strlen(job->log_job.metadata);
/* We have room to store the hints in the metadata portion of
* the job header. We just prepend an h= to the hints list. The
* metadata parser will ignore = characters that appear in the value
* portion of the metadata key/value pair.
*/
sprintf(m, "h=%s\n", header_hints);
}
free(header_hints);
return;
}
/* ********************************************************* */
void darshan_core_register_module(
......@@ -519,13 +804,12 @@ void darshan_core_lookup_id(
double darshan_core_wtime()
{
/* TODO since NOTIMING is the only flag (currently), maybe we just drop 'flags' */
if(!darshan_core_job || darshan_core_job->flags & CP_FLAG_NOTIMING)
if(!darshan_core_job)
{
return(0);
}
return DARSHAN_MPI_CALL(PMPI_Wtime)();
return(DARSHAN_MPI_CALL(PMPI_Wtime)() - darshan_core_job->wtime_offset);
}
/*
......
......@@ -167,7 +167,7 @@ struct darshan_posix_file
struct posix_runtime_file
{
struct darshan_posix_file file_record;
struct darshan_posix_file* file_record;
UT_hash_handle hlink;
};
......@@ -180,9 +180,10 @@ struct posix_runtime_file_ref
struct posix_runtime
{
struct posix_runtime_file* file_array;
struct posix_runtime_file* file_runtime_array;
struct darshan_posix_file* file_record_array;
int file_array_size;
int file_count;
int file_array_ndx;
struct posix_runtime_file* file_hash;
struct posix_runtime_file_ref* fd_hash;
};
......@@ -216,26 +217,26 @@ static struct posix_runtime_file* posix_file_by_name(const char *name);
static struct posix_runtime_file* posix_file_by_name_setfd(const char* name, int fd);
static void posix_file_close_fd(int fd);
static void posix_prepare_for_shutdown(void);
static void posix_get_output_data(void **buffer, int size);
static void posix_get_output_data(MPI_Comm comm, void **buffer, int *size);
static void posix_shutdown(void);
#define POSIX_LOCK() pthread_mutex_lock(&posix_runtime_mutex)
#define POSIX_UNLOCK() pthread_mutex_unlock(&posix_runtime_mutex)
#define POSIX_SET(__file, __counter, __value) do {\
(__file)->file_record.counters[__counter] = __value; \
(__file)->file_record->counters[__counter] = __value; \
} while(0)
#define POSIX_F_SET(__file, __counter, __value) do {\
(__file)->file_record.fcounters[__counter] = __value; \
(__file)->file_record->fcounters[__counter] = __value; \
} while(0)
#define POSIX_INC(__file, __counter, __value) do {\
(__file)->file_record.counters[__counter] += __value; \
(__file)->file_record->counters[__counter] += __value; \
} while(0)
#define POSIX_F_INC(__file, __counter, __value) do {\
(__file)->file_record.fcounters[__counter] += __value; \
(__file)->file_record->fcounters[__counter] += __value; \
} while(0)
#define POSIX_F_INC_NO_OVERLAP(__file, __tm1, __tm2, __last, __counter) do { \
......@@ -248,15 +249,15 @@ static void posix_get_output_data(void **buffer, int size);
} while(0)
#define POSIX_VALUE(__file, __counter) \
((__file)->file_record.counters[__counter])
((__file)->file_record->counters[__counter])
#define POSIX_F_VALUE(__file, __counter) \
((__file)->file_record.fcounters[__counter])
((__file)->file_record->fcounters[__counter])
#define POSIX_MAX(__file, __counter, __value) do {\
if((__file)->file_record.counters[__counter] < __value) \
if((__file)->file_record->counters[__counter] < __value) \
{ \
(__file)->file_record.counters[__counter] = __value; \
(__file)->file_record->counters[__counter] = __value; \
} \
} while(0)
......@@ -273,7 +274,7 @@ static void posix_get_output_data(void **buffer, int size);
if(exclude) break; \
file = posix_file_by_name_setfd(__path, __ret); \
if(!file) break; \
file->file_record.rank = my_rank; \
file->file_record->rank = my_rank; \
if(__mode) \
POSIX_SET(file, CP_MODE, __mode); \
if(__stream_flag)\
......@@ -353,8 +354,8 @@ static void posix_runtime_initialize()
int mem_limit;
struct darshan_module_funcs posix_mod_fns =
{
.prepare_for_shutdown = &posix_prepare_for_shutdown,
.get_output_data = &posix_get_output_data,
.shutdown = &posix_shutdown
};
if(posix_runtime)
......@@ -379,17 +380,22 @@ static void posix_runtime_initialize()
/* set maximum number of file records according to max memory limit */
/* NOTE: maximum number of records is based on the size of a posix file record */
posix_runtime->file_array_size = mem_limit / sizeof(struct darshan_posix_file);
posix_runtime->file_array_ndx = 0;
/* allocate array of runtime file records */
posix_runtime->file_array = malloc(sizeof(struct posix_runtime_file) *
posix_runtime->file_array_size);
if(!posix_runtime->file_array)
posix_runtime->file_runtime_array = malloc(posix_runtime->file_array_size *
sizeof(struct posix_runtime_file));
posix_runtime->file_record_array = malloc(posix_runtime->file_array_size *
sizeof(struct darshan_posix_file));
if(!posix_runtime->file_runtime_array || !posix_runtime->file_record_array)
{
posix_runtime->file_array_size = 0;
return;
}
memset(posix_runtime->file_array, 0, sizeof(struct posix_runtime_file) *
posix_runtime->file_array_size);
memset(posix_runtime->file_runtime_array, 0, posix_runtime->file_array_size *
sizeof(struct posix_runtime_file));
memset(posix_runtime->file_record_array, 0, posix_runtime->file_array_size *
sizeof(struct darshan_posix_file));
DARSHAN_MPI_CALL(PMPI_Comm_rank)(MPI_COMM_WORLD, &my_rank);
......@@ -452,14 +458,18 @@ static struct posix_runtime_file* posix_file_by_name(const char *name)
return(file);
}
if(posix_runtime->file_array_ndx < posix_runtime->file_array_size);
{
/* no existing record, assign a new file record from the global array */
file = &posix_runtime->file_array[posix_runtime->file_count];
file->file_record.f_id = file_id;
file = &(posix_runtime->file_runtime_array[posix_runtime->file_array_ndx]);
file->file_record = &(posix_runtime->file_record_array[posix_runtime->file_array_ndx]);
file->file_record->f_id = file_id;
/* add new record to file hash table */
HASH_ADD(hlink, posix_runtime->file_hash, file_record.f_id, sizeof(darshan_file_id), file);
HASH_ADD(hlink, posix_runtime->file_hash, file_record->f_id, sizeof(darshan_file_id), file);
posix_runtime->file_count++;
posix_runtime->file_array_ndx++;
}
if(newname != name)
free(newname);
......@@ -527,16 +537,36 @@ static void posix_file_close_fd(int fd)
/* ***************************************************** */
<