GitLab maintenance scheduled for Today, 2019-12-05, from 17:00 to 18:00 CT - Services will be unavailable during this time.

Commit f9e5a24e authored by Shane Snyder's avatar Shane Snyder

updated code for darshan modularization

parent 08438176
......@@ -10,33 +10,41 @@
#include <sys/types.h>
#include <stdint.h>
#include "darshan.h"
#define DARSHAN_MPI_CALL(func) func
typedef uint64_t darshan_file_id;
/* calculation of compression buffer size (defaults to 50% of the maximum
* memory that Darshan is allowed to consume on a process)
*/
//#define CP_COMP_BUF_SIZE ((CP_MAX_FILES * sizeof(struct darshan_file))/2)
#define CP_COMP_BUF_SIZE 0
struct darshan_module_funcs
{
void (*prepare_for_shutdown)(void);
void (*get_output_data)(void **, int);
};
/* max length of module name string (not counting \0) */
#define DARSHAN_MOD_NAME_LEN 31
/* flags to indicate properties of file records */
#define CP_FLAG_CONDENSED 1<<0
#define CP_FLAG_NOTIMING 1<<1
struct darshan_module
struct darshan_core_module
{
char *name;
char name[DARSHAN_MOD_NAME_LEN+1];
struct darshan_module_funcs mod_funcs;
struct darshan_core_module *next_mod;
};
void darshan_core_register_module(
char *name,
struct darshan_module_funcs *funcs,
int *runtime_mem_limit);
void darshan_core_lookup_id(
void *name,
int len,
int printable_flag,
darshan_file_id *id);
double darshan_core_wtime(void);
/* in memory structure to keep up with job level data */
struct darshan_core_job_runtime
{
struct darshan_job log_job;
char exe[CP_EXE_LEN+1];
struct darshan_core_module *mod_list_head;
char comp_buf[CP_COMP_BUF_SIZE];
int flags;
int file_count;
double wtime_offset;
char* trailing_data;
};
#endif /* __DARSHAN_CORE_H */
......@@ -12,245 +12,25 @@
#include <mpi.h>
#include "darshan-log-format.h"
/* Environment variable to override CP_JOBID */
#define CP_JOBID_OVERRIDE "DARSHAN_JOBID"
typedef uint64_t darshan_file_id;
/* Environment variable to override __CP_LOG_PATH */
#define CP_LOG_PATH_OVERRIDE "DARSHAN_LOGPATH"
/* Environment variable to override __CP_LOG_PATH */
#define CP_LOG_HINTS_OVERRIDE "DARSHAN_LOGHINTS"
/* Environment variable to override __CP_MEM_ALIGNMENT */
#define CP_MEM_ALIGNMENT_OVERRIDE "DARSHAN_MEMALIGN"
/* maximum number of files per process we will track */
#define CP_MAX_FILES 1024
/* TODO: figure out how to pick good value here */
/* hash table size parameters */
#define CP_HASH_BITS 8
#define CP_HASH_SIZE (1 << CP_HASH_BITS)
#define CP_HASH_MASK (CP_HASH_SIZE - 1)
/* maximum number of access sizes and stride sizes that darshan will track
* per file at runtime; at log time they will be reduced into the 4 most
* frequently occurring ones
*/
#define CP_MAX_ACCESS_COUNT_RUNTIME 32
/* flags to indicate properties of file records */
#define CP_FLAG_CONDENSED 1<<0
#define CP_FLAG_NOTIMING 1<<1
/* calculation of compression buffer size (defaults to 50% of the maximum
* memory that Darshan is allowed to consume on a process)
*/
#define CP_COMP_BUF_SIZE ((CP_MAX_FILES * sizeof(struct darshan_file))/2)
enum cp_io_type
{
CP_READ = 1,
CP_WRITE = 2,
};
struct cp_access_counter
{
int64_t size;
int freq;
};
struct darshan_io_tracker;
/* in memory structure to keep up with file level data */
struct darshan_file_runtime
{
struct darshan_file* log_file;
struct darshan_file_runtime* name_next;
struct darshan_file_runtime* name_prev;
void* access_root;
int access_count;
void* stride_root;
int stride_count;
int64_t last_byte_read;
int64_t last_byte_written;
int64_t offset;
enum cp_io_type last_io_type;
double last_posix_write_end;
double last_mpi_write_end;
double last_posix_read_end;
double last_mpi_read_end;
double last_posix_meta_end;
double last_mpi_meta_end;
struct darshan_aio_tracker* aio_list_head;
struct darshan_aio_tracker* aio_list_tail;
};
/* handles used by various APIs to refer to files */
enum darshan_handle_type
{
DARSHAN_FD = 1,
DARSHAN_FH,
DARSHAN_NCID,
DARSHAN_HID
};
#define DARSHAN_FILE_HANDLE_MAX (sizeof(MPI_File))
/* This struct is used to track a reference to a file by file
* descriptor, MPI file handle, ncdf id, etc.
*/
struct darshan_file_ref
{
struct darshan_file_runtime* file;
char handle[DARSHAN_FILE_HANDLE_MAX];
int handle_sz;
enum darshan_handle_type handle_type;
struct darshan_file_ref* next;
struct darshan_file_ref* prev;
};
/* in memory structure to keep up with job level data */
struct darshan_job_runtime
struct darshan_module_funcs
{
struct darshan_job log_job;
char exe[CP_EXE_LEN+1];
struct darshan_file file_array[CP_MAX_FILES];
struct darshan_file_runtime file_runtime_array[CP_MAX_FILES];
char comp_buf[CP_COMP_BUF_SIZE];
int flags;
int file_count;
struct darshan_file_runtime* name_table[CP_HASH_SIZE];
struct darshan_file_ref* handle_table[CP_HASH_SIZE];
double wtime_offset;
char* trailing_data;
void (*prepare_for_shutdown)(void);
void (*get_output_data)(void **, int);
};
extern pthread_mutex_t cp_mutex;
#define CP_LOCK() pthread_mutex_lock(&cp_mutex)
#define CP_UNLOCK() pthread_mutex_unlock(&cp_mutex)
#define CP_SET(__file, __counter, __value) do {\
(__file)->log_file->counters[__counter] = __value; \
} while(0)
#define CP_F_SET(__file, __counter, __value) do {\
(__file)->log_file->fcounters[__counter] = __value; \
} while(0)
#define CP_INC(__file, __counter, __value) do {\
(__file)->log_file->counters[__counter] += __value; \
} while(0)
#define CP_F_INC(__file, __counter, __value) do {\
(__file)->log_file->fcounters[__counter] += __value; \
} while(0)
#define CP_F_INC_NO_OVERLAP(__file, __tm1, __tm2, __last, __counter) do { \
if(__tm1 > __last) \
CP_F_INC(__file, __counter, (__tm2-__tm1)); \
else \
CP_F_INC(__file, __counter, (__tm2 - __last)); \
if(__tm2 > __last) \
__last = __tm2; \
} while(0)
#define CP_VALUE(__file, __counter) \
((__file)->log_file->counters[__counter])
#define CP_F_VALUE(__file, __counter) \
((__file)->log_file->fcounters[__counter])
#define CP_MAX(__file, __counter, __value) do {\
if((__file)->log_file->counters[__counter] < __value) \
{ \
(__file)->log_file->counters[__counter] = __value; \
} \
} while(0)
#define CP_COUNTER_INC(__file, __value, __count, __maxflag, __validx, __cntidx) do {\
int i; \
int set = 0; \
int64_t min = CP_VALUE(__file, __cntidx); \
int min_index = 0; \
if(__value == 0) break; \
for(i=0; i<4; i++) { \
/* increment bucket if already exists */ \
if(CP_VALUE(__file, __validx + i) == __value) { \
CP_INC(__file, __cntidx + i, (__count)); \
set = 1; \
break; \
} \
/* otherwise find the least frequently used bucket */ \
else if(CP_VALUE(__file, __cntidx + i) < min) { \
min = CP_VALUE(__file, __cntidx + i); \
min_index = i; \
} \
} \
if((!set && !__maxflag) || (!set && __maxflag && (__count) > min)) { \
CP_INC(__file, __cntidx+min_index, (__count)); \
CP_SET(__file, __validx+min_index, __value); \
} \
} while(0)
#define CP_BUCKET_INC(__file, __counter_base, __value) do {\
if(__value < 101) \
(__file)->log_file->counters[__counter_base] += 1; \
else if(__value < 1025) \
(__file)->log_file->counters[__counter_base+1] += 1; \
else if(__value < 10241) \
(__file)->log_file->counters[__counter_base+2] += 1; \
else if(__value < 102401) \
(__file)->log_file->counters[__counter_base+3] += 1; \
else if(__value < 1048577) \
(__file)->log_file->counters[__counter_base+4] += 1; \
else if(__value < 4194305) \
(__file)->log_file->counters[__counter_base+5] += 1; \
else if(__value < 10485761) \
(__file)->log_file->counters[__counter_base+6] += 1; \
else if(__value < 104857601) \
(__file)->log_file->counters[__counter_base+7] += 1; \
else if(__value < 1073741825) \
(__file)->log_file->counters[__counter_base+8] += 1; \
else \
(__file)->log_file->counters[__counter_base+9] += 1; \
} while(0)
enum cp_counter_type
{
CP_COUNTER_ACCESS,
CP_COUNTER_STRIDE
};
/* checking alignment according to this document:
* http://publib.boulder.ibm.com/infocenter/compbgpl/v9v111/index.jsp?topic=/com.ibm.bg9111.doc/bgusing/data_alignment.htm
*/
void darshan_finalize(struct darshan_job_runtime* job);
void darshan_condense(void);
void darshan_walk_file_accesses(struct darshan_job_runtime* final_job);
double darshan_wtime(void);
void darshan_mnt_id_from_path(const char* path, int64_t* device_id, int64_t* block_size);
void darshan_mpi_initialize(int *argc, char ***argv);
uint32_t darshan_hashlittle(const void *key, size_t length, uint32_t initval);
uint64_t darshan_hash(const register unsigned char *k, register uint64_t length, register uint64_t level);
struct darshan_file_runtime* darshan_file_by_name(const char* name);
struct darshan_file_runtime* darshan_file_by_name_sethandle(
const char* name,
const void* handle,
int handle_sz,
enum darshan_handle_type handle_type);
void darshan_core_register_module(
char *name,
struct darshan_module_funcs *funcs,
int *runtime_mem_limit);
struct darshan_file_runtime* darshan_file_by_handle(
const void* handle,
int handle_sz,
enum darshan_handle_type handle_type);
void darshan_core_lookup_id(
void *name,
int len,
int printable_flag,
darshan_file_id *id);
void darshan_file_closehandle(
const void* handle,
int handle_sz,
enum darshan_handle_type handle_type);
double darshan_core_wtime(void);
#endif /* __DARSHAN_H */
......@@ -14,32 +14,38 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/vfs.h>
#include <pthread.h>
#include <mpi.h>
#include "darshan.h"
#include "darshan-core.h"
extern char* __progname_full;
static void darshan_initialize(int *argc, char ***argv);
static void darshan_shutdown(void);
static void darshan_core_initialize(int *argc, char ***argv);
static void darshan_core_shutdown(void);
static char *darshan_get_exe_and_mounts(int rank);
static void darshan_get_exe_and_mounts_root(char* trailing_data, int space_left);
/* internal variables */
static struct darshan_job_runtime *darshan_global_job = NULL;
static struct darshan_core_job_runtime *darshan_global_job = NULL;
static pthread_mutex_t darshan_mutex = PTHREAD_MUTEX_INITIALIZER;
#define CP_MAX_MNTS 64
#define CP_MAX_MNT_PATH 256
#define CP_MAX_MNT_TYPE 32
#define DARSHAN_LOCK() pthread_mutex_lock(&darshan_mutex)
#define DARSHAN_UNLOCK() pthread_mutex_unlock(&darshan_mutex)
#define DARSHAN_MAX_MNTS 64
#define DARSHAN_MAX_MNT_PATH 256
#define DARSHAN_MAX_MNT_TYPE 32
struct mnt_data
{
int64_t hash;
int64_t block_size;
char path[CP_MAX_MNT_PATH];
char type[CP_MAX_MNT_TYPE];
char path[DARSHAN_MAX_MNT_PATH];
char type[DARSHAN_MAX_MNT_TYPE];
};
static struct mnt_data mnt_data_array[CP_MAX_MNTS];
static struct mnt_data mnt_data_array[DARSHAN_MAX_MNTS];
static int mnt_data_count = 0;
/* intercept MPI initialize and finalize to initialize darshan */
......@@ -53,12 +59,12 @@ int MPI_Init(int *argc, char ***argv)
return(ret);
}
darshan_initialize(argc, argv);
darshan_core_initialize(argc, argv);
return(ret);
}
int MPI_Init_thread (int *argc, char ***argv, int required, int *provided)
int MPI_Init_thread(int *argc, char ***argv, int required, int *provided)
{
int ret;
......@@ -68,7 +74,7 @@ int MPI_Init_thread (int *argc, char ***argv, int required, int *provided)
return(ret);
}
darshan_initialize(argc, argv);
darshan_core_initialize(argc, argv);
return(ret);
}
......@@ -77,13 +83,13 @@ int MPI_Finalize(void)
{
int ret;
darshan_shutdown();
darshan_core_shutdown();
ret = DARSHAN_MPI_CALL(PMPI_Finalize)();
return(ret);
}
static void darshan_initialize(int *argc, char ***argv)
static void darshan_core_initialize(int *argc, char ***argv)
{
int i;
int nprocs;
......@@ -123,6 +129,7 @@ static void darshan_initialize(int *argc, char ***argv)
darshan_global_job->log_job.start_time = time(NULL);
darshan_global_job->log_job.nprocs = nprocs;
darshan_global_job->wtime_offset = DARSHAN_MPI_CALL(PMPI_Wtime)();
darshan_global_job->mod_list_head = NULL;
/* record exe and arguments */
for(i=0; i<(*argc); i++)
......@@ -177,7 +184,7 @@ static void darshan_initialize(int *argc, char ***argv)
return;
}
static void darshan_shutdown()
static void darshan_core_shutdown()
{
int internal_timing_flag = 0;
......@@ -210,9 +217,9 @@ static void add_entry(char* trailing_data, int* space_left, struct mntent *entry
struct statfs statfsbuf;
strncpy(mnt_data_array[mnt_data_count].path, entry->mnt_dir,
CP_MAX_MNT_PATH-1);
DARSHAN_MAX_MNT_PATH-1);
strncpy(mnt_data_array[mnt_data_count].type, entry->mnt_type,
CP_MAX_MNT_TYPE-1);
DARSHAN_MAX_MNT_TYPE-1);
mnt_data_array[mnt_data_count].hash =
darshan_hash((void*)mnt_data_array[mnt_data_count].path,
strlen(mnt_data_array[mnt_data_count].path), 0);
......@@ -289,7 +296,7 @@ static void darshan_get_exe_and_mounts_root(char* trailing_data, int space_left)
if(!tab)
return;
/* loop through list of mounted file systems */
while(mnt_data_count<CP_MAX_MNTS && (entry = getmntent(tab)) != NULL)
while(mnt_data_count<DARSHAN_MAX_MNTS && (entry = getmntent(tab)) != NULL)
{
/* filter out excluded fs types */
tmp_index = 0;
......@@ -315,7 +322,7 @@ static void darshan_get_exe_and_mounts_root(char* trailing_data, int space_left)
if(!tab)
return;
/* loop through list of mounted file systems */
while(mnt_data_count<CP_MAX_MNTS && (entry = getmntent(tab)) != NULL)
while(mnt_data_count<DARSHAN_MAX_MNTS && (entry = getmntent(tab)) != NULL)
{
if(strcmp(entry->mnt_type, "nfs") != 0)
continue;
......@@ -373,9 +380,42 @@ void darshan_core_register_module(
struct darshan_module_funcs *funcs,
int *runtime_mem_limit)
{
struct darshan_module mod;
struct darshan_core_module *tmp;
struct darshan_core_module *new_mod;
if (!darshan_global_job)
return;
DARSHAN_LOCK();
tmp = darshan_global_job->mod_list_head;
while(tmp)
{
/* silently return if this module is already registered */
if (strcmp(tmp->name, name) == 0)
{
DARSHAN_UNLOCK();
return;
}
tmp = tmp->next_mod;
}
/* allocate new module and add to the head of the linked list of darshan modules */
new_mod = malloc(sizeof(*new_mod));
if (!new_mod)
{
DARSHAN_UNLOCK();
return;
}
memset(new_mod, 0, sizeof(*new_mod));
strncpy(new_mod->name, name, DARSHAN_MOD_NAME_LEN);
new_mod->mod_funcs = *funcs;
new_mod->next_mod = darshan_global_job->mod_list_head;
darshan_global_job->mod_list_head = new_mod;
DARSHAN_UNLOCK();
printf("%s MODULE REGISTERED\n", name);
/* TODO: something smarter than just 2 MiB per module */
*runtime_mem_limit = 2 * 1024 * 1024;
return;
}
......
......@@ -25,7 +25,6 @@
#include <pthread.h>
#include "darshan.h"
#include "darshan-core.h"
#ifndef HAVE_OFF64_T
typedef int64_t off64_t;
......@@ -109,22 +108,22 @@ NULL
static int darshan_mem_alignment = 1;
static int posix_mod_initialized = 0;
static int posix_mod_mem_limit = 0;
static pthread_mutex_t posix_mod_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
static struct darshan_module_funcs posix_mod_funcs =
{
};
static void posix_runtime_initialize(void);
static double posix_wtime(void);
static void posix_prepare_for_shutdown(void);
static void posix_get_output_data(void **buffer, int size);
#define POSIX_LOCK() pthread_mutex_lock(&posix_mod_mutex)
#define POSIX_UNLOCK() pthread_mutex_unlock(&posix_mod_mutex)
#if 0
static void cp_access_counter(struct darshan_file_runtime* file, ssize_t size, enum cp_counter_type type);
static void darshan_aio_tracker_add(int fd, void *aiocbp);
static struct darshan_aio_tracker* darshan_aio_tracker_del(int fd, void *aiocbp);
#endif
#if 0
#define CP_RECORD_WRITE(__ret, __fd, __count, __pwrite_flag, __pwrite_offset, __aligned, __stream_flag, __tm1, __tm2) do{ \
......@@ -267,7 +266,6 @@ static struct darshan_aio_tracker* darshan_aio_tracker_del(int fd, void *aiocbp)
#else
#define CP_STAT_FILE(_f, _p, _r) do { }while(0)
#endif
#endif
#define CP_RECORD_OPEN(__ret, __path, __mode, __stream_flag, __tm1, __tm2) do { \
struct darshan_file_runtime* file; \
......@@ -297,6 +295,7 @@ static struct darshan_aio_tracker* darshan_aio_tracker_del(int fd, void *aiocbp)
CP_F_SET(file, CP_F_OPEN_TIMESTAMP, __tm1); \
CP_F_INC_NO_OVERLAP(file, __tm1, __tm2, file->last_posix_meta_end, CP_F_POSIX_META_TIME); \
} while (0)
#endif
int DARSHAN_DECL(close)(int fd)
{
......@@ -1206,6 +1205,12 @@ static void posix_runtime_initialize()
char *alignstr;
int tmpval;
int ret;
int posix_mod_mem_limit = 0;
struct darshan_module_funcs posix_mod_funcs =
{
.prepare_for_shutdown = &posix_prepare_for_shutdown,
.get_output_data = &posix_get_output_data
};
if (posix_mod_initialized)
return;
......@@ -1238,93 +1243,27 @@ static void posix_runtime_initialize()
/* register the posix module with darshan core */
darshan_core_register_module("POSIX", &posix_mod_funcs, &posix_mod_mem_limit);
/* TODO: allocate memory for saving i/o stats */
posix_mod_initialized = 1;
return;
}
static int access_comparison(const void* a_p, const void* b_p)
static double posix_wtime()
{
const struct cp_access_counter* a = a_p;
const struct cp_access_counter* b = b_p;
if(a->size < b->size)
return(-1);
if(a->size > b->size)
return(1);
return(0);
return DARSHAN_MPI_CALL(PMPI_Wtime)();
}
/* cp_access_counter()
*
* records the occurance of a particular access size for a file,
* current implementation uses glibc red black tree
*/
static void cp_access_counter(struct darshan_file_runtime* file, ssize_t size, enum cp_counter_type type)
static void posix_prepare_for_shutdown()
{
struct cp_access_counter* counter;
struct cp_access_counter* found;
void* tmp;
void** root;
int* count;
struct cp_access_counter tmp_counter;
/* don't count sizes or strides of 0 */
if(size == 0)
return;
switch(type)
{
case CP_COUNTER_ACCESS:
root = &file->access_root;
count = &file->access_count;
break;
case CP_COUNTER_STRIDE:
root = &file->stride_root;
count = &file->stride_count;
break;
default:
return;
}
/* check to see if this size is already recorded */
tmp_counter.size = size;
tmp_counter.freq = 1;
tmp = tfind(&tmp_counter, root, access_comparison);
if(tmp)
{
found = *(struct cp_access_counter**)tmp;
found->freq++;
return;
}
/* we can add a new one as long as we haven't hit the limit */
if(*count < CP_MAX_ACCESS_COUNT_RUNTIME)
{
counter = malloc(sizeof(*counter));
if(!counter)
{
return;
}
counter->size = size;
counter->freq = 1;
tmp = tsearch(counter, root, access_comparison);
found = *(struct cp_access_counter**)tmp;
/* if we get a new answer out here we are in trouble; this was
* already checked with the tfind()
*/
assert(found == counter);
(*count)++;
}
return;
}
static double posix_wtime(void)
static void posix_get_output_data(void **buffer, int size)
{
return DARSHAN_MPI_CALL(PMPI_Wtime)();
return;
}
#if 0
......@@ -1570,85 +1509,6 @@ void darshan_search_bench(int argc, char** argv, int iters)
}
#endif
#if 0
/* adds a tracker for the given aio operation */
static void darshan_aio_tracker_add(int fd, void *aiocbp)
{
struct darshan_aio_tracker* tracker;
struct darshan_file_runtime* file;
CP_LOCK();
file = darshan_file_by_fd(fd);
if(file)
{
tracker = malloc(sizeof(*tracker));
if(tracker)
{
tracker->tm1 = darshan_core_wtime();
tracker->aiocbp = aiocbp;
tracker->next = NULL;
if(file->aio_list_tail)
{
file->aio_list_tail->next = tracker;
file->aio_list_tail = tracker;
}
else
{
file->aio_list_head = file->aio_list_tail = tracker;
}
}
}
CP_UNLOCK();
return;
}
/* finds the tracker structure for a given aio operation, removes it from
* the linked list for the darshan_file structure, and returns a pointer.
*
* returns NULL if aio operation not found
*/
static struct darshan_aio_tracker* darshan_aio_tracker_del(int fd, void *aiocbp)
{
struct darshan_aio_tracker *tmp=NULL, *prev;
struct darshan_file_runtime* file;
CP_LOCK();
file = darshan_file_by_fd(fd);
if(file)
{
/* is there a tracker struct for this operation? */
tmp = file->aio_list_head;
prev = NULL;
while(tmp)
{
if(tmp->aiocbp == aiocbp)
{
if(prev)
prev->next = tmp->next;
else
file->aio_list_head = tmp->next;
if(tmp == file->aio_list_tail)
file->aio_list_tail = prev;
break;
}
else
{
prev = tmp;
tmp = tmp->next;
}
}
}
CP_UNLOCK();
return(tmp);
}
#endif
/*
* Local variables:
* c-indent-level: 4
......