/* * (C) 2009 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */ #define _XOPEN_SOURCE 500 #define _GNU_SOURCE /* for tdestroy() */ #include "darshan-runtime-config.h" #include #ifdef HAVE_MNTENT_H #include #endif #include #include #include #include #include #include #include #include #include #include #include #include "mpi.h" #include "darshan.h" #include "darshan-dynamic.h" extern char* __progname; /* maximum number of memory segments each process will write to the log */ #define CP_MAX_MEM_SEGMENTS 8 /* Some old versions of MPI don't provide all of these COMBINER definitions. * If any are missing then we define them to an arbitrary value just to * prevent compile errors in DATATYPE_INC(). */ #ifndef MPI_COMBINER_NAMED #define MPI_COMBINER_NAMED CP_COMBINER_NAMED #endif #ifndef MPI_COMBINER_DUP #define MPI_COMBINER_DUP CP_COMBINER_DUP #endif #ifndef MPI_COMBINER_CONTIGUOUS #define MPI_COMBINER_CONTIGUOUS CP_COMBINER_CONTIGUOUS #endif #ifndef MPI_COMBINER_VECTOR #define MPI_COMBINER_VECTOR CP_COMBINER_VECTOR #endif #ifndef MPI_COMBINER_HVECTOR_INTEGER #define MPI_COMBINER_HVECTOR_INTEGER CP_COMBINER_HVECTOR_INTEGER #endif #ifndef MPI_COMBINER_HVECTOR #define MPI_COMBINER_HVECTOR CP_COMBINER_HVECTOR #endif #ifndef MPI_COMBINER_INDEXED #define MPI_COMBINER_INDEXED CP_COMBINER_INDEXED #endif #ifndef MPI_COMBINER_HINDEXED_INTEGER #define MPI_COMBINER_HINDEXED_INTEGER CP_COMBINER_HINDEXED_INTEGER #endif #ifndef MPI_COMBINER_HINDEXED #define MPI_COMBINER_HINDEXED CP_COMBINER_HINDEXED #endif #ifndef MPI_COMBINER_INDEXED_BLOCK #define MPI_COMBINER_INDEXED_BLOCK CP_COMBINER_INDEXED_BLOCK #endif #ifndef MPI_COMBINER_STRUCT_INTEGER #define MPI_COMBINER_STRUCT_INTEGER CP_COMBINER_STRUCT_INTEGER #endif #ifndef MPI_COMBINER_STRUCT #define MPI_COMBINER_STRUCT CP_COMBINER_STRUCT #endif #ifndef MPI_COMBINER_SUBARRAY #define MPI_COMBINER_SUBARRAY CP_COMBINER_SUBARRAY #endif #ifndef MPI_COMBINER_DARRAY #define MPI_COMBINER_DARRAY CP_COMBINER_DARRAY #endif #ifndef MPI_COMBINER_F90_REAL #define MPI_COMBINER_F90_REAL CP_COMBINER_F90_REAL #endif #ifndef MPI_COMBINER_F90_COMPLEX #define MPI_COMBINER_F90_COMPLEX CP_COMBINER_F90_COMPLEX #endif #ifndef MPI_COMBINER_F90_INTEGER #define MPI_COMBINER_F90_INTEGER CP_COMBINER_F90_INTEGER #endif #ifndef MPI_COMBINER_RESIZED #define MPI_COMBINER_RESIZED CP_COMBINER_RESIZED #endif #define CP_DATATYPE_INC(__file, __datatype) do {\ int num_integers, num_addresses, num_datatypes, combiner, ret; \ ret = DARSHAN_MPI_CALL(PMPI_Type_get_envelope)(__datatype, &num_integers, \ &num_addresses, &num_datatypes, &combiner); \ if(ret == MPI_SUCCESS) { \ switch(combiner) { \ case MPI_COMBINER_NAMED:\ CP_INC(__file,CP_COMBINER_NAMED,1); break; \ case MPI_COMBINER_DUP:\ CP_INC(__file,CP_COMBINER_DUP,1); break; \ case MPI_COMBINER_CONTIGUOUS:\ CP_INC(__file,CP_COMBINER_CONTIGUOUS,1); break; \ case MPI_COMBINER_VECTOR:\ CP_INC(__file,CP_COMBINER_VECTOR,1); break; \ case MPI_COMBINER_HVECTOR_INTEGER:\ CP_INC(__file,CP_COMBINER_HVECTOR_INTEGER,1); break; \ case MPI_COMBINER_HVECTOR:\ CP_INC(__file,CP_COMBINER_HVECTOR,1); break; \ case MPI_COMBINER_INDEXED:\ CP_INC(__file,CP_COMBINER_INDEXED,1); break; \ case MPI_COMBINER_HINDEXED_INTEGER:\ CP_INC(__file,CP_COMBINER_HINDEXED_INTEGER,1); break; \ case MPI_COMBINER_HINDEXED:\ CP_INC(__file,CP_COMBINER_HINDEXED,1); break; \ case MPI_COMBINER_INDEXED_BLOCK:\ CP_INC(__file,CP_COMBINER_INDEXED_BLOCK,1); break; \ case MPI_COMBINER_STRUCT_INTEGER:\ CP_INC(__file,CP_COMBINER_STRUCT_INTEGER,1); break; \ case MPI_COMBINER_STRUCT:\ CP_INC(__file,CP_COMBINER_STRUCT,1); break; \ case MPI_COMBINER_SUBARRAY:\ CP_INC(__file,CP_COMBINER_SUBARRAY,1); break; \ case MPI_COMBINER_DARRAY:\ CP_INC(__file,CP_COMBINER_DARRAY,1); break; \ case MPI_COMBINER_F90_REAL:\ CP_INC(__file,CP_COMBINER_F90_REAL,1); break; \ case MPI_COMBINER_F90_COMPLEX:\ CP_INC(__file,CP_COMBINER_F90_COMPLEX,1); break; \ case MPI_COMBINER_F90_INTEGER:\ CP_INC(__file,CP_COMBINER_F90_INTEGER,1); break; \ case MPI_COMBINER_RESIZED:\ CP_INC(__file,CP_COMBINER_RESIZED,1); break; \ } \ } \ } while(0) #define CP_RECORD_MPI_WRITE(__ret, __fh, __count, __datatype, __counter, __tm1, __tm2) do { \ struct darshan_file_runtime* file; \ int size = 0; \ MPI_Aint extent = 0; \ if(__ret != MPI_SUCCESS) break; \ file = darshan_file_by_fh(__fh); \ if(!file) break; \ DARSHAN_MPI_CALL(PMPI_Type_size)(__datatype, &size); \ size = size * __count; \ DARSHAN_MPI_CALL(PMPI_Type_extent)(__datatype, &extent); \ CP_BUCKET_INC(file, CP_SIZE_WRITE_AGG_0_100, size); \ CP_BUCKET_INC(file, CP_EXTENT_WRITE_0_100, extent); \ CP_INC(file, __counter, 1); \ CP_DATATYPE_INC(file, __datatype); \ CP_F_INC(file, CP_F_MPI_WRITE_TIME, (__tm2-__tm1)); \ if(CP_F_VALUE(file, CP_F_WRITE_START_TIMESTAMP) == 0) \ CP_F_SET(file, CP_F_WRITE_START_TIMESTAMP, __tm1); \ CP_F_SET(file, CP_F_WRITE_END_TIMESTAMP, __tm2); \ } while(0) #define CP_RECORD_MPI_READ(__ret, __fh, __count, __datatype, __counter, __tm1, __tm2) do { \ struct darshan_file_runtime* file; \ int size = 0; \ MPI_Aint extent = 0; \ if(__ret != MPI_SUCCESS) break; \ file = darshan_file_by_fh(__fh); \ if(!file) break; \ DARSHAN_MPI_CALL(PMPI_Type_size)(__datatype, &size); \ size = size * __count; \ DARSHAN_MPI_CALL(PMPI_Type_extent)(__datatype, &extent); \ CP_BUCKET_INC(file, CP_SIZE_READ_AGG_0_100, size); \ CP_BUCKET_INC(file, CP_EXTENT_READ_0_100, extent); \ CP_INC(file, __counter, 1); \ CP_DATATYPE_INC(file, __datatype); \ CP_F_INC(file, CP_F_MPI_READ_TIME, (__tm2-__tm1)); \ if(CP_F_VALUE(file, CP_F_READ_START_TIMESTAMP) == 0) \ CP_F_SET(file, CP_F_READ_START_TIMESTAMP, __tm1); \ CP_F_SET(file, CP_F_READ_END_TIMESTAMP, __tm2); \ } while(0) static struct darshan_file_runtime* darshan_file_by_fh(MPI_File fh); static void cp_log_construct_indices(struct darshan_job_runtime* final_job, int rank, int* inout_count, int* lengths, void** pointers, char* trailing_data); static int cp_log_write(struct darshan_job_runtime* final_job, int rank, char* logfile_name, int count, int* lengths, void** pointers, double start_log_time); static void cp_log_record_hints(struct darshan_job_runtime* final_job, int rank); static int cp_log_reduction(struct darshan_job_runtime* final_job, int rank, char* logfile_name, MPI_Offset* next_offset); static void darshan_file_reduce(void* infile_v, void* inoutfile_v, int *len, MPI_Datatype *datatype); static int cp_log_compress(struct darshan_job_runtime* final_job, int rank, int* inout_count, int* lengths, void** pointers); static int file_compare(const void* a, const void* b); void darshan_mpi_initialize(int *argc, char ***argv); static char* darshan_get_exe_and_mounts(struct darshan_job_runtime* final_job); static int darshan_file_variance( struct darshan_file *infile_array, struct darshan_file *outfile_array, int count, int rank); static void pairwise_variance_reduce ( void *invec, void *inoutvec, int *len, MPI_Datatype *dt); #define CP_MAX_MNTS 32 uint64_t mnt_hash_array[CP_MAX_MNTS] = {0}; int64_t mnt_id_array[CP_MAX_MNTS] = {0}; uint64_t mnt_hash_array_root[CP_MAX_MNTS] = {0}; int64_t mnt_id_array_root[CP_MAX_MNTS] = {0}; struct { int64_t mnt_id_local; int64_t mnt_id_root; } mnt_mapping[CP_MAX_MNTS]; struct variance_dt { double n; double T; double S; }; void darshan_mpi_initialize(int *argc, char ***argv) { int nprocs; int rank; DARSHAN_MPI_CALL(PMPI_Comm_size)(MPI_COMM_WORLD, &nprocs); DARSHAN_MPI_CALL(PMPI_Comm_rank)(MPI_COMM_WORLD, &rank); if(argc && argv) { darshan_initialize(*argc, *argv, nprocs, rank); } else { /* we don't see argc and argv here in fortran */ darshan_initialize(0, NULL, nprocs, rank); } return; } void darshan_shutdown(int timing_flag) { int rank; char* logfile_name; struct darshan_job_runtime* final_job; double start_log_time = 0; int flags; int all_ret = 0; int local_ret = 0; MPI_Offset next_offset = 0; char* jobid_str; char* envjobid; char* logpath; int jobid; int index_count = 0; int lengths[CP_MAX_MEM_SEGMENTS]; void* pointers[CP_MAX_MEM_SEGMENTS]; int ret; double red1=0, red2=0, gz1=0, gz2=0, write1=0, write2=0, tm_end=0; double bcst1=0, bcst2=0, bcst3=0; int nprocs; char* trailing_data = NULL; int i, j; int map_index = 0; time_t start_time_tmp = 0; uint64_t logmod; char hname[HOST_NAME_MAX]; char* logpath_override = NULL; #ifdef __CP_LOG_ENV char env_check[256]; char* env_tok; #endif CP_LOCK(); if(!darshan_global_job) { CP_UNLOCK(); return; } /* disable further tracing while hanging onto the data so that we can * write it out */ final_job = darshan_global_job; darshan_global_job = NULL; flags = final_job->flags; CP_UNLOCK(); start_log_time = DARSHAN_MPI_CALL(PMPI_Wtime)(); /* figure out which access sizes to log */ darshan_walk_file_accesses(final_job); /* if the records have been condensed, then zero out fields that are no * longer valid for safety */ if(final_job->flags & CP_FLAG_CONDENSED && final_job->file_count) { CP_SET(&final_job->file_runtime_array[0], CP_MODE, 0); CP_SET(&final_job->file_runtime_array[0], CP_CONSEC_READS, 0); CP_SET(&final_job->file_runtime_array[0], CP_CONSEC_WRITES, 0); CP_SET(&final_job->file_runtime_array[0], CP_SEQ_READS, 0); CP_SET(&final_job->file_runtime_array[0], CP_SEQ_WRITES, 0); CP_SET(&final_job->file_runtime_array[0], CP_STRIDE1_STRIDE, 0); CP_SET(&final_job->file_runtime_array[0], CP_STRIDE2_STRIDE, 0); CP_SET(&final_job->file_runtime_array[0], CP_STRIDE3_STRIDE, 0); CP_SET(&final_job->file_runtime_array[0], CP_STRIDE4_STRIDE, 0); CP_SET(&final_job->file_runtime_array[0], CP_STRIDE1_COUNT, 0); CP_SET(&final_job->file_runtime_array[0], CP_STRIDE2_COUNT, 0); CP_SET(&final_job->file_runtime_array[0], CP_STRIDE3_COUNT, 0); CP_SET(&final_job->file_runtime_array[0], CP_STRIDE4_COUNT, 0); CP_SET(&final_job->file_runtime_array[0], CP_ACCESS1_ACCESS, 0); CP_SET(&final_job->file_runtime_array[0], CP_ACCESS2_ACCESS, 0); CP_SET(&final_job->file_runtime_array[0], CP_ACCESS3_ACCESS, 0); CP_SET(&final_job->file_runtime_array[0], CP_ACCESS4_ACCESS, 0); CP_SET(&final_job->file_runtime_array[0], CP_ACCESS1_COUNT, 0); CP_SET(&final_job->file_runtime_array[0], CP_ACCESS2_COUNT, 0); CP_SET(&final_job->file_runtime_array[0], CP_ACCESS3_COUNT, 0); CP_SET(&final_job->file_runtime_array[0], CP_ACCESS4_COUNT, 0); CP_F_SET(&final_job->file_runtime_array[0], CP_F_OPEN_TIMESTAMP, 0); CP_F_SET(&final_job->file_runtime_array[0], CP_F_CLOSE_TIMESTAMP, 0); CP_F_SET(&final_job->file_runtime_array[0], CP_F_READ_START_TIMESTAMP, 0); CP_F_SET(&final_job->file_runtime_array[0], CP_F_READ_END_TIMESTAMP, 0); CP_F_SET(&final_job->file_runtime_array[0], CP_F_WRITE_START_TIMESTAMP, 0); CP_F_SET(&final_job->file_runtime_array[0], CP_F_WRITE_END_TIMESTAMP, 0); } logfile_name = malloc(PATH_MAX); if(!logfile_name) { darshan_finalize(final_job); return; } DARSHAN_MPI_CALL(PMPI_Comm_rank)(MPI_COMM_WORLD, &rank); /* collect information about command line and * mounted file systems */ trailing_data = darshan_get_exe_and_mounts(final_job); /* broadcast mount point information from root */ if(rank == 0) { memcpy(mnt_hash_array_root, mnt_hash_array, CP_MAX_MNTS*sizeof(uint64_t)); memcpy(mnt_id_array_root, mnt_id_array, CP_MAX_MNTS*sizeof(int64_t)); } bcst1=DARSHAN_MPI_CALL(PMPI_Wtime)(); DARSHAN_MPI_CALL(PMPI_Bcast)(mnt_id_array_root, CP_MAX_MNTS*sizeof(int64_t), MPI_BYTE, 0, MPI_COMM_WORLD); DARSHAN_MPI_CALL(PMPI_Bcast)(mnt_hash_array_root, CP_MAX_MNTS*sizeof(uint64_t), MPI_BYTE, 0, MPI_COMM_WORLD); bcst2=DARSHAN_MPI_CALL(PMPI_Wtime)(); /* identify any common mount points that have different device ids on * non-root processes */ for(i=0; (ifile_count && map_index > 0); i++) { for(j=0; jfile_array[i].counters[CP_DEVICE] == mnt_mapping[j].mnt_id_local) { final_job->file_array[i].counters[CP_DEVICE] = mnt_mapping[j].mnt_id_root; break; } } } /* construct log file name */ if(rank == 0) { char cuser[L_cuserid] = {0}; struct tm* my_tm; /* Use CP_JOBID_OVERRIDE for the env var or CP_JOBID */ envjobid = getenv(CP_JOBID_OVERRIDE); if (!envjobid) { envjobid = CP_JOBID; } /* Use CP_LOG_PATH_OVERRIDE for the value or __CP_LOG_PATH */ logpath = getenv(CP_LOG_PATH_OVERRIDE); if (!logpath) { #ifdef __CP_LOG_PATH logpath = __CP_LOG_PATH; #endif } /* find a job id */ jobid_str = getenv(envjobid); if(jobid_str) { /* in cobalt we can find it in env var */ ret = sscanf(jobid_str, "%d", &jobid); } if(!jobid_str || ret != 1) { /* use pid as fall back */ jobid = getpid(); } /* break out time into something human readable */ start_time_tmp += final_job->log_job.start_time; my_tm = localtime(&start_time_tmp); /* get the username for this job. In order we will try each of the * following until one of them succeeds: * * - cuserid() * - getenv("LOGNAME") * - snprintf(..., geteuid()); * * Note that we do not use getpwuid() because it generally will not * work in statically compiled binaries. */ #ifndef DARSHAN_DISABLE_CUSERID cuserid(cuser); #endif /* if cuserid() didn't work, then check the environment */ if (strcmp(cuser, "") == 0) { char* logname_string; logname_string = getenv("LOGNAME"); if(logname_string) { strncpy(cuser, logname_string, (L_cuserid-1)); } } /* if cuserid() and environment both fail, then fall back to uid */ if (strcmp(cuser, "") == 0) { uid_t uid = geteuid(); snprintf(cuser, sizeof(cuser), "%u", uid); } /* generate a random number to help differentiate the log */ (void) gethostname(hname, sizeof(hname)); logmod = darshan_hash((void*)hname,strlen(hname),0); /* see if darshan was configured using the --with-logpath-by-env * argument, which allows the user to specify an absolute path to * place logs via an env variable. */ #ifdef __CP_LOG_ENV /* just silently skip if the environment variable list is too big */ if(strlen(__CP_LOG_ENV) < 256) { /* copy env variable list to a temporary buffer */ strcpy(env_check, __CP_LOG_ENV); /* tokenize the comma-separated list */ env_tok = strtok(env_check, ","); if(env_tok) { do { /* check each env variable in order */ logpath_override = getenv(env_tok); if(logpath_override) { /* stop as soon as we find a match */ break; } }while((env_tok = strtok(NULL, ","))); } } #endif if(!logpath && !logpath_override) { /* we could not find any location to write the log file */ darshan_finalize(final_job); return; } if(logpath_override) { ret = snprintf(logfile_name, PATH_MAX, "%s/%s_%s_id%d_%d-%d-%d-%" PRIu64 ".darshan_partial", logpath_override, cuser, __progname, jobid, (my_tm->tm_mon+1), my_tm->tm_mday, (my_tm->tm_hour*60*60 + my_tm->tm_min*60 + my_tm->tm_sec), logmod); if(ret == (PATH_MAX-1)) { /* file name was too big; squish it down */ snprintf(logfile_name, PATH_MAX, "%s/id%d.darshan_partial", logpath_override, jobid); } } else { ret = snprintf(logfile_name, PATH_MAX, "%s/%d/%d/%d/%s_%s_id%d_%d-%d-%d-%" PRIu64 ".darshan_partial", logpath, (my_tm->tm_year+1900), (my_tm->tm_mon+1), my_tm->tm_mday, cuser, __progname, jobid, (my_tm->tm_mon+1), my_tm->tm_mday, (my_tm->tm_hour*60*60 + my_tm->tm_min*60 + my_tm->tm_sec), logmod); if(ret == (PATH_MAX-1)) { /* file name was too big; squish it down */ snprintf(logfile_name, PATH_MAX, "%s/id%d.darshan_partial", logpath, jobid); } } /* add jobid */ final_job->log_job.jobid = (int64_t)jobid; } /* broadcast log file name */ bcst3=DARSHAN_MPI_CALL(PMPI_Wtime)(); DARSHAN_MPI_CALL(PMPI_Bcast)(logfile_name, PATH_MAX, MPI_CHAR, 0, MPI_COMM_WORLD); final_job->log_job.end_time = time(NULL); /* reduce records for shared files */ if(timing_flag) red1 = DARSHAN_MPI_CALL(PMPI_Wtime)(); local_ret = cp_log_reduction(final_job, rank, logfile_name, &next_offset); if(timing_flag) red2 = DARSHAN_MPI_CALL(PMPI_Wtime)(); DARSHAN_MPI_CALL(PMPI_Allreduce)(&local_ret, &all_ret, 1, MPI_INT, MPI_LOR, MPI_COMM_WORLD); /* if we are using any hints to write the log file, then record those * hints in the log file header */ cp_log_record_hints(final_job, rank); if(all_ret == 0) { /* collect data to write from local process */ cp_log_construct_indices(final_job, rank, &index_count, lengths, pointers, trailing_data); } if(all_ret == 0) { /* compress data */ if(timing_flag) gz1 = DARSHAN_MPI_CALL(PMPI_Wtime)(); local_ret = cp_log_compress(final_job, rank, &index_count, lengths, pointers); if(timing_flag) gz2 = DARSHAN_MPI_CALL(PMPI_Wtime)(); DARSHAN_MPI_CALL(PMPI_Allreduce)(&local_ret, &all_ret, 1, MPI_INT, MPI_LOR, MPI_COMM_WORLD); } if(all_ret == 0) { /* actually write out log file */ if(timing_flag) write1 = DARSHAN_MPI_CALL(PMPI_Wtime)(); local_ret = cp_log_write(final_job, rank, logfile_name, index_count, lengths, pointers, start_log_time); if(timing_flag) write2 = DARSHAN_MPI_CALL(PMPI_Wtime)(); DARSHAN_MPI_CALL(PMPI_Allreduce)(&local_ret, &all_ret, 1, MPI_INT, MPI_LOR, MPI_COMM_WORLD); } if(rank == 0) { if(all_ret != 0) { fprintf(stderr, "darshan library warning: unable to write log file %s\n", logfile_name); /* if any process failed to write log, then delete the whole * file so we don't leave corrupted results */ unlink(logfile_name); } else { /* rename from *.darshan_partial to *-.darshan.gz, * which indicates that this log file is complete and ready for * analysis */ char* mod_index; double end_log_time; char* new_logfile_name; new_logfile_name = malloc(PATH_MAX); if(new_logfile_name) { new_logfile_name[0] = '\0'; end_log_time = DARSHAN_MPI_CALL(PMPI_Wtime)(); strcat(new_logfile_name, logfile_name); mod_index = strstr(new_logfile_name, ".darshan_partial"); sprintf(mod_index, "_%d.darshan.gz", (int)(end_log_time-start_log_time+1)); rename(logfile_name, new_logfile_name); /* set permissions on log file */ chmod(new_logfile_name, (S_IRUSR)); free(new_logfile_name); } } } if(trailing_data) free(trailing_data); free(logfile_name); darshan_finalize(final_job); if(timing_flag) { double red_tm, red_slowest; double gz_tm, gz_slowest; double write_tm, write_slowest; double all_tm, all_slowest; double bcst_tm, bcst_slowest; tm_end = DARSHAN_MPI_CALL(PMPI_Wtime)(); bcst_tm=(bcst2-bcst1)+(red1-bcst3); red_tm = red2-red1; gz_tm = gz2-gz1; write_tm = write2-write1; all_tm = tm_end-start_log_time; DARSHAN_MPI_CALL(PMPI_Allreduce)(&red_tm, &red_slowest, 1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD); DARSHAN_MPI_CALL(PMPI_Allreduce)(&gz_tm, &gz_slowest, 1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD); DARSHAN_MPI_CALL(PMPI_Allreduce)(&write_tm, &write_slowest, 1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD); DARSHAN_MPI_CALL(PMPI_Allreduce)(&all_tm, &all_slowest, 1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD); DARSHAN_MPI_CALL(PMPI_Allreduce)(&bcst_tm, &bcst_slowest, 1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD); if(rank == 0) { DARSHAN_MPI_CALL(PMPI_Comm_size)(MPI_COMM_WORLD, &nprocs); printf("#\t\t