Commit 2e9dddc2 authored by Jakob Luettgau's avatar Jakob Luettgau
Browse files

Add basic dxt support (without segments). Updates to documentation, and example notebooks.

parent d6b07f76
......@@ -43,6 +43,8 @@ coverage: ## check code coverage quickly with the default Python
docs: ## generate Sphinx HTML documentation, including API docs
rm -f docs/darshan.rst
rm -f docs/darshan.backend.rst
rm -f docs/darshan.plots.rst
rm -f docs/modules.rst
sphinx-apidoc -o docs/ darshan
$(MAKE) -C docs clean
......
......@@ -5,7 +5,5 @@
__version__ = '0.1.0'
from darshan.parser_cffi import *
from darshan.backend.cffi_backend import *
from darshan.report import *
......@@ -100,6 +100,19 @@ struct darshan_apxc_perf_record
struct dxt_file_record {
struct darshan_base_record base_rec;
int64_t shared_record; /* -1 means it is a shared file record */
char hostname[64]; /* size defined via macro */
int64_t write_count;
int64_t read_count;
};
typedef uint64_t darshan_record_id;
struct darshan_name_record
......
......@@ -642,7 +642,7 @@ class AsciiDarshanLog(object):
logdata = self.logdata
# Original:
# Original, Target:
## <type>, <app time>, <read>, <write>, <meta>
#POSIX, 98.837925, 0.150075, 0.5991, 0.4129
#MPI-IO, 97.293875, 0.051575, 0.126525, 2.528025
......
......@@ -158,6 +158,57 @@ def log_get_name_records(log):
def log_get_dxt_record(log, mod_name, mod_type, mode='numpy'):
"""
Returns a dictionary holding a dxt darshan log record.
Args:
log: Handle returned by darshan.open
mod_name (str): Name of the Darshan module
mod_type (str): String containing the C type
Return:
dict: generic log record
Example
-------
The typical darshan log record provides two arrays, on for integer counters
and one for floating point counters:
>>> darshan.log_get_dxt_record(log, "POSIX", "struct darshan_posix_file **")
{'rank': 42, 'read_segments': array([...])}
"""
modules = log_get_modules(log)
rec = {}
buf = ffi.new("void **")
r = libdutil.darshan_log_get_record(log, modules[mod_name]['idx'], buf)
if r < 1:
return None
rbuf = ffi.cast(mod_type, buf)
clst = []
rec['id'] = rbuf[0].base_rec.id
rec['rank'] = rbuf[0].base_rec.rank
rec['write_count'] = rbuf[0].write_count
rec['read_count'] = rbuf[0].read_count
rec['write_segments'] = []
rec['read_segments'] = []
return rec
def log_get_generic_record(log, mod_name, mod_type, mode='numpy'):
"""
Returns a dictionary holding a generic darshan log record.
......@@ -320,6 +371,7 @@ def log_get_posix_record(log):
return log_get_generic_record(log, "POSIX", "struct darshan_posix_file **")
def log_get_stdio_record(log):
"""
Returns a darshan log record for STDIO.
......
""" Darshan Error classes and functions. """
class DarshanBaseError(Exception):
"""
Base exception class for Darshan errors in Python.
"""
pass
class DarshanVersionError(NotImplementedError):
"""
Raised when using a features is used which is not provided by libdarshanutil.
"""
min_version = None
def __init__(self, min_version, msg="Feature"):
self.msg = msg
self.min_version = min_version
self.version = "0.0.0"
def __repr__(self):
return "DarshanVersionError('%s')" % str(sefl)
def __str__(self):
return "%s requires libdarshanutil >= %s, have %s" % (self.msg, self.min_version, self.version)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import darshan.parser_cffi as parser
import darshan.backend.cffi_backend as backend
import json
import numpy as np
import re
import copy
class NumpyEncoder(json.JSONEncoder):
def default(self, obj):
......@@ -15,43 +17,147 @@ class NumpyEncoder(json.JSONEncoder):
class DarshanReport(object):
"""
The DarshanReport class provides a convienient wrapper to access darshan
logs, which also caches already fetched information. In addition to that
a number of common aggregations can be performed.
"""
def __init__(self, filename):
self.filename = filename
self.report = {'version': 1}
self.report['records'] = {}
self.filehashes = {}
self.log = parser.log_open(self.filename)
self.log = backend.log_open(self.filename)
# state dependent book-keeping
self.converted_records = False # true if convert_records() was called (unnumpyfiy)
# when using report algebra this log allows to untangle potentially
# unfair aggregations (e.g., double accounting)
self.provenance_log = []
self.read_metadata()
self.report["name_records"] = backend.log_get_name_records(self.log)
def __add__(self, other):
new_report = self.copy()
#new_report = copy.deepcopy(self)
new_report.provenance_log.append(("add", self, other))
return new_report
def read_all(self):
"""
Read all available information from darshan log and return das dictionary.
"""
self.report["job"] = parser.log_get_job(self.log)
self.report["exe"] = parser.log_get_exe(self.log)
self.report["mounts"] = parser.log_get_mounts(self.log)
self.report["modules"] = parser.log_get_modules(self.log)
self.report["name_records"] = parser.log_get_name_records(self.log)
Args:
None
Return:
None
"""
self.report['records'] = {}
for mod in self.report['modules']:
self.mod_read_all_records(mod)
pass
def read_metadata(self):
"""
Read metadata such as the job, the executables and available modules.
Args:
None
Return:
None
"""
self.report["job"] = backend.log_get_job(self.log)
self.report["exe"] = backend.log_get_exe(self.log)
self.report["mounts"] = backend.log_get_mounts(self.log)
self.report["modules"] = backend.log_get_modules(self.log)
def mod_read_all_dxt_records(self, mod, mode='numpy'):
"""
Reads all records for module
Args:
mod (str): Identifier of module to fetch all records
mode (str): 'numpy' for ndarray (default), 'dict' for python dictionary
Return:
None
"""
supported = ['DXT_POSIX', 'DXT_MPIIO']
if mod not in supported:
print("Skipping. Currently unsupported:", mod)
# skip mod
return
structdefs = {
"DXT_POSIX": "struct dxt_file_record **",
"DXT_MPIIO": "struct dxt_file_record **",
}
self.report['records'][mod] = []
self.report['modules'][mod]['num_records'] = 0
self.summarize()
self.convert_records()
rec = backend.log_get_dxt_record(self.log, mod, structdefs[mod])
while rec != None:
# TODO: performance hog and hacky ;)
#recs = json.dumps(rec, cls=NumpyEncoder)
#rec = json.loads(recs)
if mode == 'numpy':
self.report['records'][mod].append(rec)
else:
print("Not implemented.")
exit(1)
#c = dict(zip(cn, rec['counters']))
#fc = dict(zip(fcn, rec['fcounters']))
#self.report['records'][mod].append([c, fc])
pass
self.report['modules'][mod]['num_records'] += 1
# fetch next
rec = backend.log_get_dxt_record(self.log, mod, structdefs[mod])
pass
def mod_read_all_records(self, mod, mode='numpy'):
"""
Reads all records for module
......@@ -64,12 +170,14 @@ class DarshanReport(object):
None
"""
unsupported = ['DXT_POSIX', 'DXT_MPIIO', 'LUSTRE']
unsupported = ['DXT_POSIX', 'DXT_MPIIO']
if mod in unsupported:
print("Skipping. Currently unsupported:", mod)
# skip mod
return
structdefs = {
"BG/Q": "struct darshan_bgq_record **",
"HDF5": "struct darshan_hdf5_file **",
......@@ -77,20 +185,26 @@ class DarshanReport(object):
"PNETCDF": "struct darshan_pnetcdf_file **",
"POSIX": "struct darshan_posix_file **",
"STDIO": "struct darshan_stdio_file **",
"DECAF": "struct darshan_decaf_record **"
"DECAF": "struct darshan_decaf_record **",
"DXT_POSIX": "struct dxt_file_record **",
}
self.report['records'][mod] = []
cn = parser.counter_names(mod)
fcn = parser.fcounter_names(mod)
cn = backend.counter_names(mod)
fcn = backend.fcounter_names(mod)
self.report['modules'][mod]['counters'] = cn
self.report['modules'][mod]['fcounters'] = fcn
self.report['modules'][mod]['num_records'] = 0
rec = parser.log_get_generic_record(self.log, mod, structdefs[mod])
rec = backend.log_get_generic_record(self.log, mod, structdefs[mod])
while rec != None:
# TODO: performance hog and hacky ;)
#recs = json.dumps(rec, cls=NumpyEncoder)
......@@ -107,12 +221,44 @@ class DarshanReport(object):
self.report['modules'][mod]['num_records'] += 1
# fetch next
rec = parser.log_get_generic_record(self.log, mod, structdefs[mod])
rec = backend.log_get_generic_record(self.log, mod, structdefs[mod])
pass
def mod_agg(self, mod, ranks=None, files=None, preserve_rank=False, preserve_file=False):
"""
Aggregate counters for a given module name and return updated dictionary.
Args:
mod (str): Name of the mod to aggregate.
ranks (int or list): Only aggregate if rank is matched
files (int or list): Only aggregate if file is matched
preserve_rank: do not collapse ranks into single value
preserve_file: do not collapse files into single value
Return:
List of aggregated records
"""
# TODO: assert
c = None
fc = None
# aggragate
for rec in recs[mod]:
if mod not in ctx:
c = rec['counters']
fc = rec['counters']
else:
c = np.add(ctx[mod], rec['counters'])
fc = np.add(ctx[mod], rec['fcounters'])
return {'counters': c, 'fcounter': fc}
def mod_agg_iohist(self, mod, mode='append'):
"""
......@@ -125,15 +271,19 @@ class DarshanReport(object):
None
"""
# convienience
recs = self.report['records']
ctx = {}
supported = ["POSIX", "MPI-IO"]
if mod not in supported:
raise Exception("Unsupported mod_name for aggregated iohist.")
# convienience
recs = self.report['records']
ctx = {}
# check records for module are present
if mod not in recs:
return
# aggragate
......@@ -158,11 +308,11 @@ class DarshanReport(object):
tmp = json.dumps(ctx[mod], cls=NumpyEncoder)
tmp = json.loads(tmp)
cn = parser.counter_names(mod)
cn = backend.counter_names(mod)
c = dict(zip(cn, tmp))
c = {k:c[k] for k in c if re.match('.*?_SIZE_.*?', k)}
c = {fix_name(k):v for k, v in c.items()}
ctx[mod] = c
ctx = c
if mode == 'append':
......@@ -200,6 +350,11 @@ class DarshanReport(object):
# aggragate
mods = ['MPI-IO', 'POSIX', 'STDIO']
for mod in mods:
# check records for module are present
if mod not in recs:
continue
agg = None
for rec in recs[mod]:
if agg is not None:
......@@ -209,7 +364,7 @@ class DarshanReport(object):
# filter fields
cn = parser.counter_names(mod)
cn = backend.counter_names(mod)
agg = dict(zip(cn, agg.tolist()))
......@@ -277,11 +432,150 @@ class DarshanReport(object):
return ctx
def create_sankey(self):
"""
Generate a summary that shows the dataflow from ranks to files to mnts.
"""
# convienience
recs = self.report['records']
nrecs = self.report['name_records']
ctx = {}
# check records for module are present
if 'POSIX' not in recs:
self.report['sankey'] = None
return
ranks = {}
files = {}
mounts = {}
nodes = {}
edges = {}
# build mnt list
mnts = []
for mnt in self.report['mounts']:
mnts.append(mnt[0])
# collect records
for rec in recs['POSIX']:
rnk = "r_%d" % (rec['rank'])
fnr = rec['id']
fnr = nrecs[fnr]
# determine mount point
mnt = None
for curr in mnts:
if re.search(curr, fnr):
mnt = curr
break
mnt = "m_%s" % mnt
nodes[rnk] = {'name': rnk}
nodes[fnr] = {'name': fnr}
nodes[mnt] = {'name': mnt}
#rnk2fnr += 1
#fnr2mnt += 1
rnk2fnr = "%s->%s" % (rnk, fnr)
fnr2mnt = "%s->%s" % (fnr, mnt)
if rnk2fnr not in edges:
edges[rnk2fnr] = {"value": 0, "source": rnk, "target": fnr}
edges[rnk2fnr]["value"] += 1
if fnr2mnt not in edges:
edges[fnr2mnt] = {"value": 0, "source": fnr, "target": mnt}
edges[fnr2mnt]["value"] += 1
ctx = {
"nodes": list(nodes.values()),
"links": list(edges.values())
}
tmp = json.dumps(ctx, cls=NumpyEncoder)
tmp = json.loads(tmp)
self.report['sankey'] = tmp
def create_time_summary(self):
"""
TODO: port to new object report
"""
raise("Not implemented.")
# Original, Target:
## <type>, <app time>, <read>, <write>, <meta>
#POSIX, 98.837925, 0.150075, 0.5991, 0.4129
#MPI-IO, 97.293875, 0.051575, 0.126525, 2.528025
#STDIO, 99.261425, 0, 0.738575, 0
# convienience links
summary = logdata['summary']
time_summary = logdata['time-summary.dat']
runtime = float(logdata['runtime'])
nprocs = int(logdata['nprocs'])
for layer in ['POSIX', 'MPIIO', 'STDIO']:
if (layer + '_OPENS') in summary or (layer + '_INDEP_OPENS') in summary :
entry = {
'type': layer, 'app_time': None, 'read': None, 'write': None, 'meta': None
}
io_time = 0.0
for op in ['READ', 'WRITE', 'META']:
val = float(summary[layer + '_F_' + op + '_TIME'])
io_time += val
entry[op.lower()] = (val / (runtime * nprocs)) * 100
entry['app_time'] = ((runtime * nprocs - io_time) / (runtime * nprocs)) * 100
time_summary[layer] = entry
def summarize(self):
"""
Compile summary
Compiles a report summary of the records present in the report object.
Args:
None