Commit 4d763398 authored by Jakob Luettgau's avatar Jakob Luettgau
Browse files

Add experimental filtered namerecord table reconstruction to libdarshan-utils.

parent 05e30606
......@@ -108,6 +108,11 @@ static int darshan_log_dzunload(darshan_fd fd, struct darshan_log_map *map_p);
static int darshan_log_noz_read(darshan_fd fd, struct darshan_log_map map,
void *buf, int len, int reset_strm_flag);
/* filtered namerecs test */
static int darshan_log_get_filtered_namerecs(void *name_rec_buf, int buf_len, int swap_flag, struct darshan_name_record_ref **hash, darshan_record_id *whitelist, int whitelist_count);
/* backwards compatibility functions */
int darshan_log_get_namerecs_3_00(void *name_rec_buf, int buf_len,
int swap_flag, struct darshan_name_record_ref **hash);
......@@ -578,6 +583,85 @@ int darshan_log_get_namehash(darshan_fd fd, struct darshan_name_record_ref **has
return(0);
}
/* darshan_log_get_filtered_namehash()
*
* read the set of name records from the darshan log file and add to the
* given hash table
*
* returns 0 on success, -1 on failure
*/
int darshan_log_get_filtered_namehash(darshan_fd fd,
struct darshan_name_record_ref **hash,
darshan_record_id *whitelist, int whitelist_count
)
{
struct darshan_fd_int_state *state = fd->state;
char *name_rec_buf;
int name_rec_buf_sz;
int read;
int read_req_sz;
int buf_len = 0;
int buf_processed;
assert(state);
/* just return if there is no name record mapping data */
if(fd->name_map.len == 0)
{
*hash = NULL;
return(0);
}
/* default to buffer twice as big as default compression buf */
name_rec_buf_sz = DARSHAN_DEF_COMP_BUF_SZ * 2;
name_rec_buf = malloc(name_rec_buf_sz);
if(!name_rec_buf)
return(-1);
memset(name_rec_buf, 0, name_rec_buf_sz);
do
{
/* read chunks of the darshan record id -> name mapping from log file,
* constructing a hash table in the process
*/
read_req_sz = name_rec_buf_sz - buf_len;
read = darshan_log_dzread(fd, DARSHAN_NAME_MAP_REGION_ID,
name_rec_buf + buf_len, read_req_sz);
if(read < 0)
{
fprintf(stderr, "Error: failed to read name hash from darshan log file.\n");
free(name_rec_buf);
return(-1);
}
buf_len += read;
/* extract any name records in the buffer */
//buf_processed = state->get_namerecs(name_rec_buf, buf_len, fd->swap_flag, hash);
//buf_processed = state->get_filtered_namerecs(name_rec_buf, buf_len, fd->swap_flag, hash);
buf_processed = darshan_log_get_filtered_namerecs(name_rec_buf, buf_len, fd->swap_flag, hash, whitelist, whitelist_count);
/* copy any leftover data to beginning of buffer to parse next */
memcpy(name_rec_buf, name_rec_buf + buf_processed, buf_len - buf_processed);
buf_len -= buf_processed;
/* we keep reading until we get a short read informing us we have
* read all of the record hash
*/
} while(read == read_req_sz);
assert(buf_len == 0);
free(name_rec_buf);
return(0);
}
/* darshan_log_put_namehash()
*
* writes the hash table of name records to the darshan log file
......@@ -864,6 +948,108 @@ static int darshan_log_get_namerecs(void *name_rec_buf, int buf_len,
return(buf_processed);
}
/* whitelist_filter
*
* A simple filter function, that tests if a provided value is in
*
*/
int whitelist_filter(darshan_record_id val, darshan_record_id *whitelist, int whitelist_count){
int i;
for(i = 0; i < whitelist_count; i++)
{
if (whitelist[i] == val)
{
return 1;
}
}
return 0;
}
/* darshan_log_get_filtered_namerecs
*
* Buffered reader to to reconstruct name records from logfile
*
*/
static int darshan_log_get_filtered_namerecs(void *name_rec_buf, int buf_len,
int swap_flag, struct darshan_name_record_ref **hash,
darshan_record_id *whitelist, int whitelist_count
)
// JL: would change interface to allow filter callback function instead of whitelist for more flexibility
{
struct darshan_name_record_ref *ref;
struct darshan_name_record *name_rec;
char *tmp_p;
int buf_processed = 0;
int rec_len;
/* work through the name record buffer -- deserialize the record data
* and add to the output hash table
* NOTE: these mapping pairs are variable in length, so we have to be able
* to handle incomplete mappings temporarily here
*/
name_rec = (struct darshan_name_record *)name_rec_buf;
while(buf_len > sizeof(darshan_record_id) + 1)
{
if(strnlen(name_rec->name, buf_len - sizeof(darshan_record_id)) ==
(buf_len - sizeof(darshan_record_id)))
{
/* if this record name's terminating null character is not
* present, we need to read more of the buffer before continuing
*/
break;
}
rec_len = sizeof(darshan_record_id) + strlen(name_rec->name) + 1;
if(swap_flag)
{
/* we need to sort out endianness issues before deserializing */
DARSHAN_BSWAP64(&(name_rec->id));
}
HASH_FIND(hlink, *hash, &(name_rec->id), sizeof(darshan_record_id), ref);
if ( whitelist_filter(name_rec->id, whitelist, whitelist_count) ) {
if(!ref)
{
ref = malloc(sizeof(*ref));
if(!ref)
return(-1);
ref->name_record = malloc(rec_len);
if(!ref->name_record)
{
free(ref);
return(-1);
}
/* copy the name record over from the hash buffer */
memcpy(ref->name_record, name_rec, rec_len);
/* add this record to the hash */
HASH_ADD(hlink, *hash, name_record->id, sizeof(darshan_record_id), ref);
}
}
tmp_p = (char *)name_rec + rec_len;
name_rec = (struct darshan_name_record *)tmp_p;
buf_len -= rec_len;
buf_processed += rec_len;
}
return(buf_processed);
}
/* read the header of the darshan log and set internal fd data structures
* NOTE: this is the only portion of the darshan log that is uncompressed
*
......@@ -1895,7 +2081,6 @@ void darshan_log_get_name_records(darshan_fd fd,
struct darshan_name_record_info **name_records,
int* count)
{
int ret;
struct darshan_name_record_ref *name_hash = NULL;
struct darshan_name_record_ref *ref = NULL;
......@@ -1907,7 +2092,7 @@ void darshan_log_get_name_records(darshan_fd fd,
if(ret < 0)
{
darshan_log_close(fd);
return(-1);
//return(-1);
}
int num = HASH_CNT(hlink, name_hash);
......@@ -1923,10 +2108,57 @@ void darshan_log_get_name_records(darshan_fd fd,
}
*count = num;
return;
}
/*
* darshan_log_lookup_name_records
*
* Get filtered list of hashed name_records in logs and returns the info
*/
void darshan_log_get_filtered_name_records(darshan_fd fd,
struct darshan_name_record_info **name_records,
int* count,
darshan_record_id *whitelist, int whitelist_count
)
{
int ret;
struct darshan_name_record_ref *name_hash = NULL;
struct darshan_name_record_ref *ref = NULL;
struct darshan_name_record_ref *tmp = NULL;
struct darshan_name_record_ref *curr = NULL;
/* read hash of darshan records */
ret = darshan_log_get_filtered_namehash(fd, &name_hash, whitelist, whitelist_count);
if(ret < 0)
{
darshan_log_close(fd);
//return(-1);
}
int num = HASH_CNT(hlink, name_hash);
*name_records = malloc(sizeof(**name_records) * num);
assert(*name_records);
int i = 0;
HASH_ITER(hlink, name_hash, curr, tmp)
{
(*name_records)[i].id = curr->name_record->id;
(*name_records)[i].name = curr->name_record->name;
i++;
}
*count = num;
}
/*
* darshan_log_get_record
*
......
......@@ -180,6 +180,13 @@ void darshan_log_get_name_records(darshan_fd fd,
int* count);
int darshan_log_get_record (darshan_fd fd, int mod_idx, void **buf);
void darshan_log_get_filtered_name_records(darshan_fd fd,
struct darshan_name_record_info **mods,
int* count,
darshan_record_id *whitelist, int whitelist_count
);
/* convenience macros for printing Darshan counters */
#define DARSHAN_PRINT_HEADER() \
printf("\n#<module>\t<rank>\t<record id>\t<counter>\t<value>" \
......
......@@ -6,7 +6,7 @@
# Example utility based on-top of the PyDarshan using a Python-Wrapper
# add hard-coded darshan utils path and ensure they take precedence
# add hard-coded pydarshan utils path and ensure they take precedence
import sys
sys.path.insert(0, "@prefix@")
......
......@@ -6,7 +6,7 @@
# Example utility based on-top of the PyDarshan usgin a Shell-Wrapper
# add hard-coded darshan utils path and ensure they take precedence
# add hard-coded pydarshan utils path and ensure they take precedence
PREFIX="@prefix@";
export PYTHONPATH=$PREFIX/pydarshan:$PYTHONPATH
......
......@@ -2,17 +2,67 @@
"""Auxiliary to discover darshan-util install directory."""
import shutil
import os
def discover_darshan():
def darshanutils_version():
"""
Discovers an existing darshan-util installation and returns the appropriate
path to a shared object for use with Python's CFFI.
:return: Path to a darshan-util installation.
"""
import subprocess
args = ['pkg-config', '--modversion', 'darshan-util']
p = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd='.')
out,err = p.communicate()
retval = p.wait()
return retval
if darshan_config:
return os.path.realpath(darshan_config + '/../../')
else:
raise RuntimeError('Could not discover darshan! Is darshan-util installed?')
def discover_darshan_pkgconfig():
"""
Discovers an existing darshan-util installation and returns the appropriate
path to a shared object for use with Python's CFFI.
:return: Path to a darshan-util installation.
"""
import subprocess
args = ['pkg-config', '--path', 'darshan-util']
p = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd='.')
out,err = p.communicate()
retval = p.wait()
print(retval)
if darshan_config:
return os.path.realpath(darshan_config + '/../../')
else:
raise RuntimeError('Could not discover darshan! Is darshan-util installed?')
def discover_darshan_shutil():
"""
Discovers an existing darshan-util installation and returns the appropriate
path to a shared object for use with Python's CFFI.
:return: Path to a darshan-util installation.
"""
import shutil
darshan_config = shutil.which('darshan-parser')
# alternatively via
......@@ -24,6 +74,17 @@ def discover_darshan():
raise RuntimeError('Could not discover darshan! Is darshan-util installed and set in your PATH?')
def discover_darshan():
"""
Discovers an existing darshan-util installation and returns the appropriate
path to a shared object for use with Python's CFFI.
:return: Path to a darshan-util installation.
"""
return discover_darshan_shutil()
def load_darshan_header():
"""
Returns a CFFI compatible header for darshan-utlil as a string.
......
......@@ -21,8 +21,8 @@ import pandas as pd
class DarshanReportJSONEncoder(json.JSONEncoder):
"""
Helper class for JSON serialization if the report contains numpy
log records, which are not handled by the default JSON encoder.
Helper class for JSON serialization if the report contains, for example,
numpy or dates records, which are not handled by the default JSON encoder.
"""
def default(self, obj):
if isinstance(obj, np.ndarray):
......@@ -42,14 +42,16 @@ class DarshanReport(object):
a number of common aggregations can be performed.
"""
def __init__(self, filename=None, data_format='numpy', automatic_summary=False, read_all=True):
def __init__(self, filename=None, data_format='pandas', automatic_summary=False,
read_all=True, lookup_name_records=True):
self.filename = filename
# options
self.data_format = data_format # Experimental: preferred internal representation: numpy useful for aggregations, dict good for export/REST
self.data_format = data_format # Experimental: preferred internal representation: pandas/numpy useful for aggregations, dict good for export/REST
# might require alternative granularity: e.g., records, vs summaries?
# vs dict/pandas? dict/native?
self.automatic_summary = automatic_summary
self.lookup_name_records = lookup_name_records
# state dependent book-keeping
......@@ -139,7 +141,7 @@ class DarshanReport(object):
memo[id(self)] = result
for k, v in self.__dict__.items():
if k in ["log"]:
# blacklist of members not copy
# blacklist of members not to copy
continue
setattr(result, k, copy.deepcopy(v, memo))
return result
......@@ -171,8 +173,9 @@ class DarshanReport(object):
self.data['modules'] = backend.log_get_modules(self.log)
self.modules = self.data['modules']
self.data["name_records"] = backend.log_get_name_records(self.log)
self.name_records = self.data['name_records']
if self.read_all == True:
self.data["name_records"] = backend.log_get_name_records(self.log)
self.name_records = self.data['name_records']
def read_all(self):
......@@ -275,6 +278,8 @@ class DarshanReport(object):
rec = backend.log_get_generic_record(self.log, mod, structdefs[mod])
while rec != None:
if mode == 'pandas':
self.records[mod].append(rec)
if mode == 'numpy':
self.records[mod].append(rec)
else:
......@@ -371,11 +376,14 @@ class DarshanReport(object):
tdelta = self.end_time - self.start_time
print("Times: ", self.start_time, " to ", self.end_time, " (Duration ", tdelta, ")", sep="")
print("Executeable: ", self.metadata['exe'], sep="")
print("Processes: ", self.metadata['job']['nprocs'], sep="")
print("JobID: ", self.metadata['job']['jobid'], sep="")
print("UID: ", self.metadata['job']['uid'], sep="")
print("Modules in Log: ", list(self.modules.keys()), sep="")
if 'exe' in self.metadata:
print("Executeable: ", self.metadata['exe'], sep="")
if 'job' in self.metadata:
print("Processes: ", self.metadata['job']['nprocs'], sep="")
print("JobID: ", self.metadata['job']['jobid'], sep="")
print("UID: ", self.metadata['job']['uid'], sep="")
print("Modules in Log: ", list(self.modules.keys()), sep="")
loaded = {}
for mod in self.records:
......@@ -384,7 +392,8 @@ class DarshanReport(object):
print("Name Records: ", len(self.name_records), sep="")
print("Darshan/Hints: ", self.metadata['job']['metadata'], sep="")
if 'job' in self.metadata:
print("Darshan/Hints: ", self.metadata['job']['metadata'], sep="")
print("DarshanReport: id(", id(self), ") (tmp)", sep="")
......
......@@ -7,7 +7,7 @@ with open('README.rst') as readme_file:
readme = readme_file.read()
requirements = ['cffi', 'numpy', 'matplotlib']
requirements = ['cffi', 'numpy', 'pandas', 'matplotlib']
setup_requirements = ['pytest-runner', ]
test_requirements = ['pytest']
......@@ -33,7 +33,7 @@ setup(
include_package_data=True,
keywords='darshan',
name='darshan',
packages=find_packages(include=['darshan']),
packages=find_packages(include=['darshan*']),
setup_requires=setup_requirements,
test_suite='tests',
tests_require=test_requirements,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment