/* * (C) 2009 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */ #define _XOPEN_SOURCE 500 #define _GNU_SOURCE /* for tdestroy() */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include "mpi.h" #include "darshan.h" #include "darshan-config.h" extern char* __progname; /* maximum number of memory segments each process will write to the log */ #define CP_MAX_MEM_SEGMENTS 8 #define CP_DATATYPE_INC(__file, __datatype) do {\ int num_integers, num_addresses, num_datatypes, combiner, ret; \ ret = MPI_Type_get_envelope(__datatype, &num_integers, &num_addresses, \ &num_datatypes, &combiner); \ if(ret == MPI_SUCCESS) { \ switch(combiner) { \ case MPI_COMBINER_NAMED:\ CP_INC(__file,CP_COMBINER_NAMED,1); break; \ case MPI_COMBINER_DUP:\ CP_INC(__file,CP_COMBINER_DUP,1); break; \ case MPI_COMBINER_CONTIGUOUS:\ CP_INC(__file,CP_COMBINER_CONTIGUOUS,1); break; \ case MPI_COMBINER_VECTOR:\ CP_INC(__file,CP_COMBINER_VECTOR,1); break; \ case MPI_COMBINER_HVECTOR_INTEGER:\ CP_INC(__file,CP_COMBINER_HVECTOR_INTEGER,1); break; \ case MPI_COMBINER_HVECTOR:\ CP_INC(__file,CP_COMBINER_HVECTOR,1); break; \ case MPI_COMBINER_INDEXED:\ CP_INC(__file,CP_COMBINER_INDEXED,1); break; \ case MPI_COMBINER_HINDEXED_INTEGER:\ CP_INC(__file,CP_COMBINER_HINDEXED_INTEGER,1); break; \ case MPI_COMBINER_HINDEXED:\ CP_INC(__file,CP_COMBINER_HINDEXED,1); break; \ case MPI_COMBINER_INDEXED_BLOCK:\ CP_INC(__file,CP_COMBINER_INDEXED_BLOCK,1); break; \ case MPI_COMBINER_STRUCT_INTEGER:\ CP_INC(__file,CP_COMBINER_STRUCT_INTEGER,1); break; \ case MPI_COMBINER_STRUCT:\ CP_INC(__file,CP_COMBINER_STRUCT,1); break; \ case MPI_COMBINER_SUBARRAY:\ CP_INC(__file,CP_COMBINER_SUBARRAY,1); break; \ case MPI_COMBINER_DARRAY:\ CP_INC(__file,CP_COMBINER_DARRAY,1); break; \ case MPI_COMBINER_F90_REAL:\ CP_INC(__file,CP_COMBINER_F90_REAL,1); break; \ case MPI_COMBINER_F90_COMPLEX:\ CP_INC(__file,CP_COMBINER_F90_COMPLEX,1); break; \ case MPI_COMBINER_F90_INTEGER:\ CP_INC(__file,CP_COMBINER_F90_INTEGER,1); break; \ case MPI_COMBINER_RESIZED:\ CP_INC(__file,CP_COMBINER_RESIZED,1); break; \ } \ } \ } while(0) #define CP_RECORD_MPI_WRITE(__ret, __fh, __count, __datatype, __counter, __tm1, __tm2) do { \ struct darshan_file_runtime* file; \ int size = 0; \ MPI_Aint extent = 0; \ if(__ret != MPI_SUCCESS) break; \ file = darshan_file_by_fh(__fh); \ if(!file) break; \ MPI_Type_size(__datatype, &size); \ size = size * __count; \ MPI_Type_extent(__datatype, &extent); \ CP_BUCKET_INC(file, CP_SIZE_WRITE_AGG_0_100, size); \ CP_BUCKET_INC(file, CP_EXTENT_WRITE_0_100, extent); \ CP_INC(file, __counter, 1); \ CP_DATATYPE_INC(file, __datatype); \ CP_F_INC(file, CP_F_MPI_WRITE_TIME, (__tm2-__tm1)); \ if(CP_F_VALUE(file, CP_F_WRITE_START_TIMESTAMP) == 0) \ CP_F_SET(file, CP_F_WRITE_START_TIMESTAMP, __tm1); \ CP_F_SET(file, CP_F_WRITE_END_TIMESTAMP, __tm2); \ } while(0) #define CP_RECORD_MPI_READ(__ret, __fh, __count, __datatype, __counter, __tm1, __tm2) do { \ struct darshan_file_runtime* file; \ int size = 0; \ MPI_Aint extent = 0; \ if(__ret != MPI_SUCCESS) break; \ file = darshan_file_by_fh(__fh); \ if(!file) break; \ MPI_Type_size(__datatype, &size); \ size = size * __count; \ MPI_Type_extent(__datatype, &extent); \ CP_BUCKET_INC(file, CP_SIZE_READ_AGG_0_100, size); \ CP_BUCKET_INC(file, CP_EXTENT_READ_0_100, extent); \ CP_INC(file, __counter, 1); \ CP_DATATYPE_INC(file, __datatype); \ CP_F_INC(file, CP_F_MPI_READ_TIME, (__tm2-__tm1)); \ if(CP_F_VALUE(file, CP_F_READ_START_TIMESTAMP) == 0) \ CP_F_SET(file, CP_F_READ_START_TIMESTAMP, __tm1); \ CP_F_SET(file, CP_F_READ_END_TIMESTAMP, __tm2); \ } while(0) static struct darshan_file_runtime* darshan_file_by_fh(MPI_File fh); static void cp_log_construct_indices(struct darshan_job_runtime* final_job, int rank, int* inout_count, int* lengths, void** pointers); static int cp_log_write(struct darshan_job_runtime* final_job, int rank, char* logfile_name, char* logdir_name, int count, int* lengths, void** pointers, double start_log_time); static int cp_log_reduction(struct darshan_job_runtime* final_job, int rank, MPI_Offset* next_offset); static void darshan_file_reduce(void* infile_v, void* inoutfile_v, int *len, MPI_Datatype *datatype); static int cp_log_compress(struct darshan_job_runtime* final_job, int rank, int* inout_count, int* lengths, void** pointers); static int file_compare(const void* a, const void* b); int MPI_Init(int *argc, char ***argv) { int ret; int nprocs; int rank; ret = PMPI_Init(argc, argv); if(ret != MPI_SUCCESS) { return(ret); } MPI_Comm_size(MPI_COMM_WORLD, &nprocs); MPI_Comm_rank(MPI_COMM_WORLD, &rank); CP_LOCK(); if(argc && argv) { darshan_initialize(*argc, *argv, nprocs, rank); } else { /* we don't see argc and argv here in fortran */ darshan_initialize(0, NULL, nprocs, rank); } CP_UNLOCK(); return(MPI_SUCCESS); } void darshan_shutdown(int timing_flag) { int rank; char* logfile_name; char* logdir_name; struct darshan_job_runtime* final_job; double start_log_time = 0; int flags; int all_ret = 0; int local_ret = 0; MPI_Offset next_offset = 0; char* jobid_str; int jobid; int index_count = 0; int lengths[CP_MAX_MEM_SEGMENTS]; void* pointers[CP_MAX_MEM_SEGMENTS]; int ret; double red1=0, red2=0, gz1=0, gz2=0, write1=0, write2=0, tm_end=0; int nprocs; CP_LOCK(); if(!darshan_global_job) { CP_UNLOCK(); } /* disable further tracing while hanging onto the data so that we can * write it out */ final_job = darshan_global_job; darshan_global_job = NULL; flags = final_job->flags; CP_UNLOCK(); start_log_time = PMPI_Wtime(); /* figure out which access sizes to log */ darshan_walk_file_accesses(final_job); /* if the records have been condensed, then zero out fields that are no * longer valid for safety */ if(final_job->flags & CP_FLAG_CONDENSED && final_job->file_count) { CP_SET(&final_job->file_runtime_array[0], CP_MODE, 0); CP_SET(&final_job->file_runtime_array[0], CP_CONSEC_READS, 0); CP_SET(&final_job->file_runtime_array[0], CP_CONSEC_WRITES, 0); CP_SET(&final_job->file_runtime_array[0], CP_SEQ_READS, 0); CP_SET(&final_job->file_runtime_array[0], CP_SEQ_WRITES, 0); CP_SET(&final_job->file_runtime_array[0], CP_STRIDE1_STRIDE, 0); CP_SET(&final_job->file_runtime_array[0], CP_STRIDE2_STRIDE, 0); CP_SET(&final_job->file_runtime_array[0], CP_STRIDE3_STRIDE, 0); CP_SET(&final_job->file_runtime_array[0], CP_STRIDE4_STRIDE, 0); CP_SET(&final_job->file_runtime_array[0], CP_STRIDE1_COUNT, 0); CP_SET(&final_job->file_runtime_array[0], CP_STRIDE2_COUNT, 0); CP_SET(&final_job->file_runtime_array[0], CP_STRIDE3_COUNT, 0); CP_SET(&final_job->file_runtime_array[0], CP_STRIDE4_COUNT, 0); CP_SET(&final_job->file_runtime_array[0], CP_ACCESS1_ACCESS, 0); CP_SET(&final_job->file_runtime_array[0], CP_ACCESS2_ACCESS, 0); CP_SET(&final_job->file_runtime_array[0], CP_ACCESS3_ACCESS, 0); CP_SET(&final_job->file_runtime_array[0], CP_ACCESS4_ACCESS, 0); CP_SET(&final_job->file_runtime_array[0], CP_ACCESS1_COUNT, 0); CP_SET(&final_job->file_runtime_array[0], CP_ACCESS2_COUNT, 0); CP_SET(&final_job->file_runtime_array[0], CP_ACCESS3_COUNT, 0); CP_SET(&final_job->file_runtime_array[0], CP_ACCESS4_COUNT, 0); CP_F_SET(&final_job->file_runtime_array[0], CP_F_OPEN_TIMESTAMP, 0); CP_F_SET(&final_job->file_runtime_array[0], CP_F_CLOSE_TIMESTAMP, 0); CP_F_SET(&final_job->file_runtime_array[0], CP_F_READ_START_TIMESTAMP, 0); CP_F_SET(&final_job->file_runtime_array[0], CP_F_READ_END_TIMESTAMP, 0); CP_F_SET(&final_job->file_runtime_array[0], CP_F_WRITE_START_TIMESTAMP, 0); CP_F_SET(&final_job->file_runtime_array[0], CP_F_WRITE_END_TIMESTAMP, 0); } logfile_name = malloc(PATH_MAX); if(!logfile_name) { darshan_finalize(final_job); return; } logdir_name = malloc(PATH_MAX); if(!logdir_name) { free(logfile_name); darshan_finalize(final_job); return; } PMPI_Comm_rank(MPI_COMM_WORLD, &rank); /* construct log file name */ if(rank == 0) { char cuser[L_cuserid] = {0}; struct tm* my_tm; /* find a job id */ jobid_str = getenv("COBALT_JOBID"); if(jobid_str) { /* in cobalt we can find it in env var */ ret = sscanf(jobid_str, "%d", &jobid); } if(!jobid_str || ret != 1) { /* use pid as fall back */ jobid = getpid(); } /* break out time into something human readable */ my_tm = localtime(&final_job->log_job.start_time); /* note: getpwuid() causes link errors for static binaries */ cuserid(cuser); ret = snprintf(logfile_name, PATH_MAX, "%s/%d/%d/%s/%s_%s_id%d_%d-%d-%d.darshan_partial", __CP_LOG_PATH, (my_tm->tm_year+1900), (my_tm->tm_mon+1), cuser, cuser, __progname, jobid, (my_tm->tm_mon+1), my_tm->tm_mday, (my_tm->tm_hour*60*60 + my_tm->tm_min*60 + my_tm->tm_sec)); if(ret == (PATH_MAX-1)) { /* file name was too big; squish it down */ snprintf(logfile_name, PATH_MAX, "%s/id%d.darshan_partial", __CP_LOG_PATH, jobid); } snprintf(logdir_name, PATH_MAX, "%s/%d/%d/%s", __CP_LOG_PATH, (my_tm->tm_year+1900), (my_tm->tm_mon+1), cuser); } /* broadcast log file name */ PMPI_Bcast(logfile_name, PATH_MAX, MPI_CHAR, 0, MPI_COMM_WORLD); PMPI_Bcast(logdir_name, PATH_MAX, MPI_CHAR, 0, MPI_COMM_WORLD); final_job->log_job.end_time = time(NULL); /* reduce records for shared files */ if(timing_flag) red1 = PMPI_Wtime(); local_ret = cp_log_reduction(final_job, rank, &next_offset); if(timing_flag) red2 = PMPI_Wtime(); PMPI_Allreduce(&local_ret, &all_ret, 1, MPI_INT, MPI_LOR, MPI_COMM_WORLD); if(all_ret == 0) { /* collect data to write from local process */ cp_log_construct_indices(final_job, rank, &index_count, lengths, pointers); } if(all_ret == 0) { /* compress data */ if(timing_flag) gz1 = PMPI_Wtime(); local_ret = cp_log_compress(final_job, rank, &index_count, lengths, pointers); if(timing_flag) gz2 = PMPI_Wtime(); PMPI_Allreduce(&local_ret, &all_ret, 1, MPI_INT, MPI_LOR, MPI_COMM_WORLD); } if(all_ret == 0) { /* actually write out log file */ if(timing_flag) write1 = PMPI_Wtime(); local_ret = cp_log_write(final_job, rank, logfile_name, logdir_name, index_count, lengths, pointers, start_log_time); if(timing_flag) write2 = PMPI_Wtime(); PMPI_Allreduce(&local_ret, &all_ret, 1, MPI_INT, MPI_LOR, MPI_COMM_WORLD); } /* if any process failed to write log, then delete the whole file so we * don't leave corrupted results */ if(all_ret != 0 && rank == 0) { unlink(logfile_name); } free(logfile_name); free(logdir_name); darshan_finalize(final_job); if(timing_flag) { double red_tm, red_slowest; double gz_tm, gz_slowest; double write_tm, write_slowest; double all_tm, all_slowest; tm_end = PMPI_Wtime(); red_tm = red2-red1; gz_tm = gz2-gz1; write_tm = write2-write1; all_tm = tm_end-start_log_time; PMPI_Allreduce(&red_tm, &red_slowest, 1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD); PMPI_Allreduce(&gz_tm, &gz_slowest, 1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD); PMPI_Allreduce(&write_tm, &write_slowest, 1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD); PMPI_Allreduce(&all_tm, &all_slowest, 1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD); if(rank == 0) { MPI_Comm_size(MPI_COMM_WORLD, &nprocs); printf("#\t\t