Commit e0773049 authored by Jakob Luettgau's avatar Jakob Luettgau
Browse files

Refactoring to move aggregations to experimental for the time being....

Refactoring to move aggregations to experimental for the time being. Convieniently reenable by calling darshan.enable_experimental() after import darshan.
parent e587da8c
......@@ -5,8 +5,27 @@
__version__ = '0.1.0'
options = {}
#from darshan.backend.cffi_backend import *
from darshan.report import DarshanReport
def enable_experimental(verbose=True):
import os
import glob
import importlib
import darshan
paths = glob.glob(darshan.__path__[0] + "/experimental/aggregators/*.py")
for path in paths:
base = os.path.basename(path)
name = os.path.splitext(base)[0]
if name == "__init__":
continue
mod = importlib.import_module('darshan.experimental.aggregators.{0}'.format(name))
setattr(DarshanReport, name, getattr(mod, name))
if verbose:
print("Added method {} to DarshanReport.".format(name))
# -*- coding: utf-8 -*-
"""
The darshan.parser_cffi module makes use of darshan-utils C library
for high performance and low memory footprint when accessing darschan
log files.
"""
import cffi
import numpy
......@@ -28,6 +22,7 @@ libdutil = ffi.dlopen("libdarshan-util.so")
def log_open(filename):
"""
Opens a darshan logfile.
......
from darshan.report import *
def agg_ioops(self, mode='append'):
"""
Compile the I/O operations summary for the current report.
Args:
mode (str): Whether to 'append' (default) or to 'return' aggregation.
Return:
None or dict: Depending on mode
"""
series = [
{'name': 'POSIX', 'type': 'bar', 'data': [0, 0, 0, 0, 0, 0, 0] },
{'name': 'MPI-IO Indep.', 'type': 'bar', 'data': [0, 0, 0, 0, 0, 0, 0] },
{'name': 'MPI-IO Coll.', 'type': 'bar', 'data': [0, 0, 0, 0, 0, 0, 0] },
{'name': 'STDIO', 'type': 'bar', 'data': [0, 0, 0, 0, 0, 0, 0] }
]
# convienience
recs = self.report['records']
ctx = {}
# aggragate
mods = ['MPI-IO', 'POSIX', 'STDIO']
for mod in mods:
# check records for module are present
if mod not in recs:
continue
agg = None
for rec in recs[mod]:
if agg is not None:
agg = np.add(agg, rec['counters'])
else:
agg = rec['counters']
# filter fields
cn = backend.counter_names(mod)
agg = dict(zip(cn, agg.tolist()))
# append aggregated statistics for module to report
if mod == 'MPI-IO':
ctx[mod + ' Indep.'] = agg
#agg_indep = {
# 'Read': agg['MPIIO_'],
# 'Write': agg['MPIIO_'],
# 'Open': agg['MPIIO_'],
# 'Stat': agg['MPIIO_'],
# 'Seek': agg['MPIIO_'],
# 'Mmap': agg['MPIIO_'],
# 'Fsync': agg['MPIIO_']
#}
#ctx[mod + ' Coll.'] = agg
#agg_coll = {
# 'Read': agg['MPIIO_'],
# 'Write': agg['MPIIO_'],
# 'Open': agg['MPIIO_'],
# 'Stat': agg['MPIIO_'],
# 'Seek': agg['MPIIO_'],
# 'Mmap': agg['MPIIO_'],
# 'Fsync': agg['MPIIO_']
#}
else:
# POSIX and STDIO share most counter names and are handled
# together for this reason, except for metadata/sync counter
tmp = {
'Read': agg[mod + '_READS'],
'Write': agg[mod + '_WRITES'],
'Open': agg[mod + '_OPENS'],
'Stat': 0,
'Seek': agg[mod + '_SEEKS'],
'Mmap': 0,
'Fsync': 0
}
if mod == 'POSIX':
tmp['Stat']
tmp['Stat']
tmp['Stat']
pass
elif mod == 'STDIO':
tmp['Stat']
tmp['Mmap']
tmp['Fsync']
pass
ctx[mod] = agg
ctx[mod + '_simple'] = tmp
# cleanup and prepare for json serialization?
tmp = json.dumps(ctx, cls=NumpyEncoder)
ctx = json.loads(tmp)
# reset summary target
if mode == 'append':
self.report['agg_ioops'] = ctx
else:
return ctx
from darshan.report import *
def create_sankey(self):
"""
Generate a summary that shows the dataflow between ranks, files and
their mountpoints.
"""
# convienience
recs = self.report['records']
nrecs = self.report['name_records']
ctx = {}
# check records for module are present
if 'POSIX' not in recs:
self.report['sankey'] = None
return
ranks = {}
files = {}
mounts = {}
nodes = {}
edges = {}
# build mnt list
mnts = []
for mnt in self.report['mounts']:
mnts.append(mnt[0])
# collect records
for rec in recs['POSIX']:
rnk = "r_%d" % (rec['rank'])
fnr = rec['id']
fnr = nrecs[fnr]
# determine mount point
mnt = None
for curr in mnts:
if re.search(curr, fnr):
mnt = curr
break
mnt = "m_%s" % mnt
nodes[rnk] = {'name': rnk}
nodes[fnr] = {'name': fnr}
nodes[mnt] = {'name': mnt}
#rnk2fnr += 1
#fnr2mnt += 1
rnk2fnr = "%s->%s" % (rnk, fnr)
fnr2mnt = "%s->%s" % (fnr, mnt)
if rnk2fnr not in edges:
edges[rnk2fnr] = {"value": 0, "source": rnk, "target": fnr}
edges[rnk2fnr]["value"] += 1
if fnr2mnt not in edges:
edges[fnr2mnt] = {"value": 0, "source": fnr, "target": mnt}
edges[fnr2mnt]["value"] += 1
ctx = {
"nodes": list(nodes.values()),
"links": list(edges.values())
}
tmp = json.dumps(ctx, cls=NumpyEncoder)
tmp = json.loads(tmp)
self.report['sankey'] = tmp
from darshan.report import *
def create_time_summary(self):
"""
TODO: port to new object report
"""
raise("Not implemented.")
# Original, Target:
## <type>, <app time>, <read>, <write>, <meta>
#POSIX, 98.837925, 0.150075, 0.5991, 0.4129
#MPI-IO, 97.293875, 0.051575, 0.126525, 2.528025
#STDIO, 99.261425, 0, 0.738575, 0
# convienience links
summary = logdata['summary']
time_summary = logdata['time-summary.dat']
runtime = float(logdata['runtime'])
nprocs = int(logdata['nprocs'])
for layer in ['POSIX', 'MPIIO', 'STDIO']:
if (layer + '_OPENS') in summary or (layer + '_INDEP_OPENS') in summary :
entry = {
'type': layer, 'app_time': None, 'read': None, 'write': None, 'meta': None
}
io_time = 0.0
for op in ['READ', 'WRITE', 'META']:
val = float(summary[layer + '_F_' + op + '_TIME'])
io_time += val
entry[op.lower()] = (val / (runtime * nprocs)) * 100
entry['app_time'] = ((runtime * nprocs - io_time) / (runtime * nprocs)) * 100
time_summary[layer] = entry
from darshan.report import *
def create_timeline(self, group_by='rank'):
"""
Generate/update a timeline from dxt tracing records of current report.
Args:
group_by (str): By which factor to group entries (default: rank)
Allowed Parameters: rank, filename
"""
self.mod_read_all_dxt_records("DXT_POSIX")
self.mod_read_all_dxt_records("DXT_MPIIO")
self.report['timeline'] = {'groups': [], 'items': []}
groups = self.report['timeline']['groups']
items = self.report['timeline']['items']
start_time = datetime.datetime.fromtimestamp( self.report['job']['start_time'] )
def groupify(rec, mod):
for seg in rec['write_segments']:
seg.update( {'type': 'w'} )
for seg in rec['read_segments']:
seg.update( {'type': 'r'} )
segments = rec['write_segments'] + rec['read_segments']
segments = sorted(segments, key=lambda k: k['start_time'])
start = float('inf')
end = float('-inf')
trace = []
minsize = 0
for seg in segments:
trace += [ seg['type'], seg['offset'], seg['length'], seg['start_time'], seg['end_time'] ]
seg_minsize = seg['offset'] + seg['length']
if minsize < seg_minsize:
minsize = seg_minsize
if start > seg['start_time']:
start = seg['start_time']
if end < seg['end_time']:
end = seg['end_time']
# reconstruct timestamps
start = start_time + datetime.timedelta(seconds=start)
end = start_time + datetime.timedelta(seconds=end)
rid = "%s:%d:%d" % (mod, rec['id'], rec['rank'])
item = {
"id": rid,
"rank": rec['rank'],
"hostname": rec['hostname'],
"filename": rec['filename'],
"group": rid,
"start": start.isoformat(),
"end": end.isoformat(),
"limitSize": False, # required to prevent rendering glitches
"data": {
"duration": (end-start).total_seconds(),
"start": segments[0]['start_time'],
"size": minsize, # minimal estimated filesize
"trace": trace,
}
}
items.append(item)
group = {
"id": rid,
"content": "[%s] " % (mod) + rec['filename'][-84:],
"order": seg['start_time']
}
groups.append(group)
supported = ['DXT_POSIX', 'DXT_MPIIO']
for mod in supported:
if mod in self.report['records']:
for rec in self.report['records'][mod]:
groupify(rec, mod)
from darshan.report import *
def mod_agg_iohist(self, mod, mode='append'):
"""
Generate aggregated histogram for mod_name.
Args:
mod_name (str):
Return:
None
"""
# convienience
recs = self.report['records']
ctx = {}
supported = ["POSIX", "MPI-IO"]
if mod not in supported:
raise Exception("Unsupported mod_name for aggregated iohist.")
# check records for module are present
if mod not in recs:
return
# aggragate
for rec in recs[mod]:
if mod not in ctx:
ctx[mod] = rec['counters']
else:
ctx[mod] = np.add(ctx[mod], rec['counters'])
# cleanup and prepare for json serialization
def fix_name(name):
name = name.split("_")
typ = "UNKNOWN"
if "READ" in name:
typ = "READ"
elif "WRITE" in name:
typ = "WRITE"
name = "%s_%s_%s" % (typ, name[-2], name[-1])
return name
tmp = json.dumps(ctx[mod], cls=NumpyEncoder)
tmp = json.loads(tmp)
cn = backend.counter_names(mod)
c = dict(zip(cn, tmp))
c = {k:c[k] for k in c if re.match('.*?_SIZE_.*?', k)}
c = {fix_name(k):v for k, v in c.items()}
ctx = c
if mode == 'append':
if 'agg_iohist' not in self.report:
self.report['agg_iohist'] = {}
self.report['agg_iohist'][mod] = ctx
else:
return ctx
from darshan.report import *
def summarize(self):
"""
Compiles a report summary of the records present in the report object.
Args:
None
Return:
None
"""
if self.converted_records == True:
raise('convert_records() was called earlier on this report. ' +
'Can not aggregate non-numpy arrays. '+
'(TODO: Consider back-conversion.)')
self.mod_agg_iohist("MPI-IO")
self.mod_agg_iohist("POSIX")
self.agg_ioops()
pass
......@@ -16,12 +16,10 @@ import datetime
class NumpyEncoder(json.JSONEncoder):
"""
Helper class for JSON serialization if the report contains numpy
log records, which are not handled by the default json encoder.
log records, which are not handled by the default JSON encoder.
"""
def default(self, obj):
if isinstance(obj, np.ndarray):
......@@ -30,8 +28,6 @@ class NumpyEncoder(json.JSONEncoder):
class DarshanReport(object):
"""
The DarshanReport class provides a convienient wrapper to access darshan
......@@ -64,7 +60,6 @@ class DarshanReport(object):
self.report["name_records"] = backend.log_get_name_records(self.log)
def __add__(self, other):
new_report = self.copy()
#new_report = copy.deepcopy(self)
......@@ -113,8 +108,6 @@ class DarshanReport(object):
pass
def read_metadata(self):
"""
Read metadata such as the job, the executables and available modules.
......@@ -132,8 +125,6 @@ class DarshanReport(object):
self.report["modules"] = backend.log_get_modules(self.log)
def mod_read_all_records(self, mod, mode='numpy'):
"""
Reads all generic records for module
......@@ -178,9 +169,6 @@ class DarshanReport(object):
rec = backend.log_get_generic_record(self.log, mod, structdefs[mod])
while rec != None:
# TODO: performance hog and hacky ;)
......@@ -203,9 +191,6 @@ class DarshanReport(object):
pass
def mod_read_all_dxt_records(self, mod, mode='numpy'):
"""
Reads all dxt records for provided module.
......@@ -243,9 +228,6 @@ class DarshanReport(object):
rec = backend.log_get_dxt_record(self.log, mod, structdefs[mod])
while rec != None:
# TODO: performance hog and hacky ;)
......@@ -272,9 +254,6 @@ class DarshanReport(object):
pass
def mod_agg(self, mod, ranks=None, files=None, preserve_rank=False, preserve_file=False):
"""
Aggregate counters for a given module name and return updated dictionary.
......@@ -308,512 +287,6 @@ class DarshanReport(object):
return {'counters': c, 'fcounter': fc}
def mod_agg_iohist(self, mod, mode='append'):
"""
Generate aggregated histogram for mod_name.
Args:
mod_name (str):