Commit aaacffa0 authored by Jakob Luettgau's avatar Jakob Luettgau
Browse files

Add experimental filter and reduce methods.

parent 41212c28
......@@ -49,10 +49,11 @@ def log_get_job(log):
job = {}
jobrec = ffi.new("struct darshan_job *")
libdutil.darshan_log_get_job(log['handle'], jobrec)
job['jobid'] = jobrec[0].jobid
job['uid'] = jobrec[0].uid
job['start_time'] = jobrec[0].start_time
job['end_time'] = jobrec[0].end_time
job['nprocs'] = jobrec[0].nprocs
job['jobid'] = jobrec[0].jobid
mstr = ffi.string(jobrec[0].metadata).decode("utf-8")
md = {}
for kv in mstr.split('\n')[:-1]:
......
from darshan.report import *
import sys
def filter(self, mods=None, name_records=None, data_format='numpy', mode='append'):
"""
Return filtered list of records.
Args:
mods: Name(s) of modules to preserve
name_records: Id(s)/Name(s) of name_records to preserve
Return:
None
"""
# convienience
recs = self.records
ctx = {}
mods_wildcard = False
name_records_wildcard = False
if mods in ['all', '*', None]:
mods_wildcard = True
mods = None
if name_records in ['all', '*', None]:
name_records_wildcard = True
name_records = None
# change inputs to whitelists
if mods == None:
mods = self.records.keys()
if name_records == None:
name_records = list(self.name_records.keys())
else:
resolve_table = {}
for key, value in self.name_records.items():
resolve_table[key] = key
resolve_table[value] = key
ids = []
for nrec in name_records:
if nrec in resolve_table:
ids.append(resolve_table[nrec])
# TODO: decide if overwriting kargs is really a good idea.. currently considering it a sanitation step
name_records = ids
print(mods)
print(name_records)
if name_records != None:
# aggragate
for mod, recs in self.records.items():
if mod not in mods:
continue
for rec in recs:
nrec = rec['id']
if nrec in name_records:
if mod not in ctx:
ctx[mod] = []
ctx[mod].append(rec)
if mode == 'append':
name = 'filter'
if name not in self.summary:
self.summary[name] = {}
self.data[name] = ctx
return ctx
......@@ -11,16 +11,18 @@ def mod_agg_iohist(self, mod, mode='append'):
None
"""
# convienience
recs = self.data['records']
ctx = {}
# sanitation and guards
supported = ["POSIX", "MPI-IO"]
if mod not in supported:
raise Exception("Unsupported mod_name for aggregated iohist.")
# convienience
recs = self.records
ctx = {}
# check records for module are present
if mod not in recs:
return
......@@ -59,8 +61,7 @@ def mod_agg_iohist(self, mod, mode='append'):
if 'agg_iohist' not in self.data:
self.data['agg_iohist'] = {}
self.data['agg_iohist'][mod] = ctx
else:
return ctx
return ctx
from darshan.report import *
def name_records_summary(self):
"""
Count records for every name record.
Args:
mod_name (str):
Return:
None
"""
counts = {}
for mod, records in self.records.items():
for rec in records:
if rec['id'] not in counts:
counts[rec['id']] = {}
ctx = counts[rec['id']]
if mod not in ctx:
ctx[mod] = 1
else:
ctx[mod] += 1
return counts
from darshan.report import *
import sys
def reduce(self, operation="sum", mods=None, name_records=None, mode='append', data_format="numpy"):
"""
Reduce records.
Args:
mods: Name(s) of modules to preserve (reduced)
name_records: Id(s)/Name(s) of name_records to preserve (reduced)
Return:
None
"""
# convienience
recs = self.records
ctx = {}
mods_wildcard = False
name_records_wildcard = False
if mods in ['distinct', 'unique']:
mods_wildcard = False
mods = None
elif mods in ['all', '*', None]:
mods_wildcard = True
mods = None
if name_records in ['distinct', 'unique']:
name_records_wildcard = False
name_records = None
elif name_records in ['all', '*', None]:
name_records_wildcard = True
name_records = None
# change inputs to whitelists
if mods == None:
mods = self.records.keys()
if name_records == None:
name_records = list(self.name_records.keys())
else:
resolve_table = {}
for key, value in self.name_records.items():
resolve_table[key] = key
resolve_table[value] = key
ids = []
for nrec in name_records:
if nrec in resolve_table:
ids.append(resolve_table[nrec])
# TODO: decide if overwriting kargs is really a good idea.. currently considering it a sanitation step
name_records = ids
print(mods)
print(name_records)
if name_records != None:
# aggragate
for mod, recs in self.records.items():
if mod not in mods:
continue
for rec in recs:
nrec = rec['id']
if nrec in name_records:
if mod not in ctx:
ctx[mod] = {}
# TODO: consider regex?, but filter those out at resolve_table
if name_records_wildcard:
nrec_pattern = '*'
else:
nrec_pattern = nrec
if nrec not in ctx[mod]:
ctx[mod][nrec_pattern] = rec['counters']
else:
ctx[mod][nrec_pattern] = np.add(ctx[mod][nrec_pattern], rec['counters'])
if mode == 'append':
name = 'reduction'
if name not in self.summary:
self.summary[name] = {}
self.data[name] = ctx
return ctx
......@@ -33,9 +33,7 @@ def plot_access_histogram(self, mod, filter=None, data=None):
read_vals = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
write_vals = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
posix = self.data['agg_iohist'][mod]
......
......@@ -13,7 +13,7 @@ import numpy as np
import re
import copy
import datetime
import sys
class NumpyEncoder(json.JSONEncoder):
......@@ -35,7 +35,7 @@ class DarshanReport(object):
a number of common aggregations can be performed.
"""
def __init__(self, filename=None, data_format='numpy', automatic_summary=False):
def __init__(self, filename=None, data_format='numpy', automatic_summary=False, read_all=False):
self.filename = filename
# options
......@@ -50,18 +50,25 @@ class DarshanReport(object):
# initialize data namespace
self.data_revision = 0 # counter for consistency checks
self.data = {'version': 1}
self.data['metadata'] = {}
self.data['metadata'] = {'start_time': float('inf'), 'end_time': float('-inf')}
self.data['records'] = {}
self.data['summary'] = {}
self.data['modules'] = {}
self.data['counters'] = {}
self.data['name_records'] = {}
self.metadata = self.data['metadata']
self.modules = self.data['modules']
self.counters = self.data['counters']
self.records = self.data['records']
self.name_records = self.data['name_records']
# initialize report/summary namespace
self.summary_revision = 0 # counter to check if summary needs update
self.summary = self.data['summary']
# when using report algebra this log allows to untangle potentially
# unfair aggregations (e.g., double accounting)
self.provenance_enabled = True
......@@ -72,8 +79,9 @@ class DarshanReport(object):
if filename:
self.log = backend.log_open(self.filename)
self.read_metadata()
self.data["name_records"] = backend.log_get_name_records(self.log)
if read_all:
self.read_all()
......@@ -92,6 +100,18 @@ class DarshanReport(object):
nr.provenance_log.append(("add", self, other, datetime.datetime.now()))
# update metadata
def update_metadata(report):
if report.metadata['start_time'] < nr.metadata['start_time']:
nr.metadata['start_time'] = report.metadata['start_time']
if report.metadata['end_time'] > nr.metadata['end_time']:
nr.metadata['end_time'] = report.metadata['end_time']
update_metadata(self)
update_metadata(other)
# copy over records (references, under assumption single records are not altered)
for report in [self, other]:
for key, records in report.data['records'].items():
......@@ -105,6 +125,32 @@ class DarshanReport(object):
return nr
def read_metadata(self):
"""
Read metadata such as the job, the executables and available modules.
Args:
None
Return:
None
"""
self.data['metadata']['job'] = backend.log_get_job(self.log)
self.data['metadata']['exe'] = backend.log_get_exe(self.log)
self.metadata['start_time'] = self.metadata['job']['start_time']
self.metadata['end_time'] = self.metadata['job']['end_time']
self.data['mounts'] = backend.log_get_mounts(self.log)
self.data['modules'] = backend.log_get_modules(self.log)
self.modules = self.data['modules']
self.data["name_records"] = backend.log_get_name_records(self.log)
self.name_records = self.data['name_records']
def read_all(self):
"""
Read all available records from darshan log and return as dictionary.
......@@ -132,7 +178,7 @@ class DarshanReport(object):
"""
for mod in self.data['modules']:
self.mod_read_all_records(mod)
self.mod_read_all_records(mod, warnings=False)
pass
......@@ -149,30 +195,12 @@ class DarshanReport(object):
"""
for mod in self.data['modules']:
self.mod_read_all_dxt_records(mod)
self.mod_read_all_dxt_records(mod, warnings=False)
pass
def read_metadata(self):
"""
Read metadata such as the job, the executables and available modules.
Args:
None
Return:
None
"""
self.data['metadata']['job'] = backend.log_get_job(self.log)
self.data['metadata']['exe'] = backend.log_get_exe(self.log)
self.data['mounts'] = backend.log_get_mounts(self.log)
self.data['modules'] = backend.log_get_modules(self.log)
def mod_read_all_records(self, mod, mode='numpy'):
def mod_read_all_records(self, mod, mode='numpy', warnings=True):
"""
Reads all generic records for module
......@@ -187,7 +215,8 @@ class DarshanReport(object):
unsupported = ['DXT_POSIX', 'DXT_MPIIO', 'LUSTRE']
if mod in unsupported:
print("Skipping. Currently unsupported:", mod, "in mod_read_all_records().")
if warnings:
print("Skipping. Currently unsupported:", mod, "in mod_read_all_records().", file=sys.stderr)
# skip mod
return
......@@ -209,9 +238,14 @@ class DarshanReport(object):
cn = backend.counter_names(mod)
fcn = backend.fcounter_names(mod)
self.data['modules'][mod]['counters'] = cn
self.data['modules'][mod]['fcounters'] = fcn
self.data['modules'][mod]['num_records'] = 0
self.modules[mod]['num_records'] = 0
if mod not in self.counters:
self.counters[mod] = {}
self.counters[mod]['counters'] = cn
self.counters[mod]['fcounters'] = fcn
......@@ -222,14 +256,14 @@ class DarshanReport(object):
#rec = json.loads(recs)
if mode == 'numpy':
self.data['records'][mod].append(rec)
self.records[mod].append(rec)
else:
c = dict(zip(cn, rec['counters']))
fc = dict(zip(fcn, rec['fcounters']))
self.data['records'][mod].append([c, fc])
self.records[mod].append([c, fc])
self.data['modules'][mod]['num_records'] += 1
self.modules[mod]['num_records'] += 1
# fetch next
rec = backend.log_get_generic_record(self.log, mod, structdefs[mod])
......@@ -237,7 +271,7 @@ class DarshanReport(object):
pass
def mod_read_all_dxt_records(self, mod, mode='numpy'):
def mod_read_all_dxt_records(self, mod, mode='numpy', warnings=True):
"""
Reads all dxt records for provided module.
......@@ -251,14 +285,16 @@ class DarshanReport(object):
"""
if mod not in self.data['modules']:
print("Skipping. Log does not contain data for mod:", mod)
if warnings:
print("Skipping. Log does not contain data for mod:", mod, file=sys.stderr)
return
supported = ['DXT_POSIX', 'DXT_MPIIO']
if mod not in supported:
print("Skipping. Currently unsupported:", mod, 'in mod_read_all_dxt_records().')
if warnings:
print("Skipping. Currently unsupported:", mod, 'in mod_read_all_dxt_records().', file=sys.stderr)
# skip mod
return
......@@ -269,9 +305,12 @@ class DarshanReport(object):
}
self.data['records'][mod] = []
self.data['modules'][mod]['num_records'] = 0
self.records[mod] = []
self.modules[mod]['num_records'] = 0
if mod not in self.counters:
self.counters[mod] = {}
rec = backend.log_get_dxt_record(self.log, mod, structdefs[mod])
......@@ -281,7 +320,7 @@ class DarshanReport(object):
#rec = json.loads(recs)
if mode == 'numpy':
self.data['records'][mod].append(rec)
self.records[mod].append(rec)
else:
print("Not implemented.")
exit(1)
......
%% Cell type:code id: tags:
``` python
import darshan
import pprint
```
%% Cell type:code id: tags:
``` python
report = darshan.DarshanReport("example.darshan")
```
%% Cell type:markdown id: tags:
By default only metadata, available modules and the name records are loaded:
%% Cell type:code id: tags:
``` python
report.data
```
%%%% Output: execute_result
{'version': 1,
'metadata': {'job': {'jobid': 4478544,
'uid': 69615,
'metadata': {'start_time': 1490000867,
'end_time': 1490000983,
'job': {'uid': 69615,
'start_time': 1490000867,
'end_time': 1490000983,
'nprocs': 2048,
'jobid': 4478544,
'metadata': {'lib_ver': '3.1.3', 'h': 'romio_no_indep_rw=true;cb_nodes=4'}},
'exe': '/global/project/projectdirs/m888/glock/tokio-abc-results/bin.edison/vpicio_uni /scratch2/scratchdirs/glock/tokioabc-s.4478544/vpicio/vpicio.hdf5 32'},
'records': {},
'summary': {},
'modules': {'POSIX': {'len': 186, 'ver': 3, 'idx': 1},
'MPI-IO': {'len': 154, 'ver': 2, 'idx': 2},
'LUSTRE': {'len': 87, 'ver': 1, 'idx': 6},
'STDIO': {'len': 3234, 'ver': 1, 'idx': 7}},
'name_records': {14734109647742566553: '<STDIN>',
15920181672442173319: '<STDOUT>',
7238257241479193519: '<STDERR>',
6301063301082038805: '/scratch2/scratchdirs/glock/tokioabc-s.4478544/vpicio/vpicio.hdf5'},
'mounts': [('/.shared/base/default/etc/dat.conf', 'dvs'),
('/usr/lib64/libibverbs.so.1.0.0', 'dvs'),
('/usr/lib64/libibumad.so.3.0.2', 'dvs'),
('/usr/lib64/librdmacm.so.1.0.0', 'dvs'),
('/usr/lib64/libibgni.so.1.0.0', 'dvs'),
('/global/cscratch1', 'lustre'),
('/global/projectb', 'dvs'),
('/global/projecta', 'dvs'),
('/usr/sbin/ibstat', 'dvs'),
('/global/project', 'dvs'),
('/global/common', 'dvs'),
('/global/syscom', 'dvs'),
('/global/dna', 'dvs'),
('/opt/slurm', 'dvs'),
('/global/u1', 'dvs'),
('/global/u2', 'dvs'),
('/scratch1', 'lustre'),
('/scratch2', 'lustre'),
('/scratch3', 'lustre'),
('/etc', 'dvs'),
('/', 'rootfs'),
('/', 'dvs')],
'modules': {'POSIX': {'len': 186, 'ver': 3, 'idx': 1},