Commit 88bd7814 authored by Jakob Luettgau's avatar Jakob Luettgau
Browse files

Homogenize aggregators, use summary namespace to store results.

parent aaacffa0
# -*- coding: utf-8 -*-
import cffi
import numpy
import numpy as np
import pandas as pd
from darshan.discover_darshan import discover_darshan
from darshan.api_def_c import load_darshan_header
......@@ -167,7 +169,7 @@ def log_get_name_records(log):
def log_get_dxt_record(log, mod_name, mod_type, mode='dict'):
def log_get_dxt_record(log, mod_name, mod_type, reads=True, writes=True, mode='pandas'):
"""
Returns a dictionary holding a dxt darshan log record.
......@@ -239,6 +241,10 @@ def log_get_dxt_record(log, mod_name, mod_type, mode='dict'):
}
rec['read_segments'].append(seg)
#pd.DataFrame([rec])
return rec
......@@ -279,12 +285,12 @@ def log_get_generic_record(log, mod_name, mod_type, mode='numpy'):
for i in range(0, len(rbuf[0].counters)):
clst.append(rbuf[0].counters[i])
rec['counters'] = numpy.array(clst, dtype=numpy.uint64)
rec['counters'] = np.array(clst, dtype=np.uint64)
flst = []
for i in range(0, len(rbuf[0].fcounters)):
flst.append(rbuf[0].fcounters[i])
rec['fcounters'] = numpy.array(clst, dtype=numpy.float64)
rec['fcounters'] = np.array(clst, dtype=np.float64)
return rec
......@@ -455,5 +461,5 @@ def log_get_apxc_record(log):
clst = []
for i in range(0, len(prf[0].counters)):
clst.append(prf[0].counters[i])
rec['counters'] = numpy.array(clst, dtype=numpy.uint64)
rec['counters'] = np.array(clst, dtype=np.uint64)
return rec
......@@ -20,7 +20,7 @@ def agg_ioops(self, mode='append'):
# convienience
recs = self.data['records']
recs = self.records
ctx = {}
# aggragate
......@@ -106,10 +106,13 @@ def agg_ioops(self, mode='append'):
tmp = json.dumps(ctx, cls=NumpyEncoder)
ctx = json.loads(tmp)
# reset summary target
# overwrite existing summary entry
if mode == 'append':
self.data['agg_ioops'] = ctx
else:
return ctx
self.summary['agg_ioops'] = ctx
return ctx
from darshan.report import *
def create_sankey(self):
def create_sankey(self, mode="append"):
"""
Generate a summary that shows the dataflow between ranks, files and
their mountpoints.
......@@ -78,9 +78,11 @@ def create_sankey(self):
tmp = json.dumps(ctx, cls=NumpyEncoder)
tmp = json.loads(tmp)
self.data['sankey'] = tmp
# overwrite existing summary entry
if mode == "append":
self.summary['sankey'] = tmp
return ctx
from darshan.report import *
def create_time_summary(self):
def create_time_summary(self, mode="append"):
"""
TODO: port to new object report
......@@ -15,6 +15,8 @@ def create_time_summary(self):
#MPI-IO, 97.293875, 0.051575, 0.126525, 2.528025
#STDIO, 99.261425, 0, 0.738575, 0
ctx = {}
# convienience links
summary = logdata['summary']
time_summary = logdata['time-summary.dat']
......@@ -39,3 +41,14 @@ def create_time_summary(self):
time_summary[layer] = entry
# overwrite existing summary entry
if mode == "append":
self.summary['time_summary'] = ctx
return ctx
from darshan.report import *
def create_timeline(self, group_by='rank'):
def create_timeline(self, group_by='rank', mode="append"):
"""
Generate/update a timeline from dxt tracing records of current report.
......@@ -14,11 +14,12 @@ def create_timeline(self, group_by='rank'):
self.mod_read_all_dxt_records("DXT_MPIIO")
self.data['timeline'] = {'groups': [], 'items': []}
groups = self.data['timeline']['groups']
items = self.data['timeline']['items']
ctx = {'groups': [], 'items': []}
groups = ctx['groups']
items = ctx['items']
start_time = datetime.datetime.fromtimestamp( self.data['metadata']['job']['start_time'] )
......@@ -101,3 +102,13 @@ def create_timeline(self, group_by='rank'):
# overwrite existing summary entry
if mode == "append":
self.summary['timeline'] = ctx
return ctx
......@@ -15,6 +15,9 @@ def filter(self, mods=None, name_records=None, data_format='numpy', mode='append
None
"""
# convienience
recs = self.records
ctx = {}
......
......@@ -59,8 +59,8 @@ def mod_agg_iohist(self, mod, mode='append'):
if mode == 'append':
if 'agg_iohist' not in self.data:
self.data['agg_iohist'] = {}
self.data['agg_iohist'][mod] = ctx
self.summary['agg_iohist'] = {}
self.summary['agg_iohist'][mod] = ctx
return ctx
......
......@@ -16,12 +16,11 @@ def name_records_summary(self):
for mod, records in self.records.items():
for rec in records:
if rec['id'] not in counts:
counts[rec['id']] = {}
ctx = counts[rec['id']]
if mod not in ctx:
ctx[mod] = 1
counts[rec['id']] = {'name': self.name_records[rec['id']], 'counts': {}}
if mod not in counts[rec['id']]['counts']:
counts[rec['id']]['counts'][mod] = 1
else:
ctx[mod] += 1
counts[rec['id']]['counts'][mod] += 1
return counts
......@@ -65,8 +65,8 @@ def reduce(self, operation="sum", mods=None, name_records=None, mode='append', d
print(mods)
print(name_records)
#print(mods)
#print(name_records)
if name_records != None:
......@@ -91,10 +91,35 @@ def reduce(self, operation="sum", mods=None, name_records=None, mode='append', d
nrec_pattern = nrec
if nrec not in ctx[mod]:
ctx[mod][nrec_pattern] = rec['counters']
else:
ctx[mod][nrec_pattern] = np.add(ctx[mod][nrec_pattern], rec['counters'])
for counters in ['counters', 'fcounters']:
if nrec_pattern not in ctx[mod]:
ctx[mod][nrec_pattern] = {}
if counters not in rec:
continue
if counters not in ctx[mod][nrec_pattern]:
ctx[mod][nrec_pattern][counters] = rec[counters]
else:
ctx[mod][nrec_pattern][counters] = np.add(ctx[mod][nrec_pattern][counters], rec[counters])
# convert records back to list
result = {}
for mod, name_records in ctx.items():
if mod not in result:
result[mod] = []
for name_record, val in name_records.items():
rec = {"id": name_record, "rank": -1}
rec.update({"id": name_record, "rank": -1})
rec.update(val)
result[mod].append(rec)
......@@ -103,5 +128,5 @@ def reduce(self, operation="sum", mods=None, name_records=None, mode='append', d
if name not in self.summary:
self.summary[name] = {}
self.data[name] = ctx
return ctx
return result
......@@ -97,8 +97,9 @@ def plot_access_histogram(self, mod, filter=None, data=None):
fig.tight_layout()
plt.show()
pass
#plt.show()
return plt
def plot_time_summary(self, filter=None, data=None):
......@@ -114,7 +115,7 @@ def plot_time_summary(self, filter=None, data=None):
def plot_opcounts(self, filter=None, data=None):
def plot_opcounts(self, filter=None, data=None, return_csv=False):
"""
Generates a baor chart summary for operation counts.
......@@ -156,7 +157,7 @@ def plot_opcounts(self, filter=None, data=None):
posix['POSIX_OPENS'],
posix['POSIX_STATS'],
posix['POSIX_SEEKS'],
posix['POSIX_MMAPS'],
0, # faulty? posix['POSIX_MMAPS'],
posix['POSIX_FSYNCS'] + posix['POSIX_FDSYNCS']
]
......@@ -206,6 +207,21 @@ def plot_opcounts(self, filter=None, data=None):
def as_csv():
text = ""
text += ','.join(labels) + ',Layer' + "\n"
text += ','.join(str(x) for x in posix_vals) + ',POSIX' + "\n"
text += ','.join(str(x) for x in mpiind_vals) + ',MPIIND' + "\n"
text += ','.join(str(x) for x in mpicol_vals) + ',MPICOL' + "\n"
text += ','.join(str(x) for x in stdio_vals) + ',STDIO' + "\n"
return text
print(as_csv())
x = np.arange(len(labels)) # the label locations
width = 0.15 # the width of the bars
......@@ -243,8 +259,13 @@ def plot_opcounts(self, filter=None, data=None):
fig.tight_layout()
plt.show()
pass
#plt.show()
if return_csv:
return plt, as_csv()
else:
return plt
......
......@@ -8,13 +8,16 @@ interaction and aggregation of Darshan logs using Python.
import darshan.backend.cffi_backend as backend
import json
import numpy as np
import re
import copy
import datetime
import sys
import numpy as np
import pandas as pd
class NumpyEncoder(json.JSONEncoder):
"""
......@@ -76,6 +79,16 @@ class DarshanReport(object):
self.provenance_reports = {}
if filename:
self.open(filename, read_all=read_all)
def open(self, filename, read_all=False):
self.filename = filename
if filename:
self.log = backend.log_open(self.filename)
self.read_metadata()
......@@ -87,6 +100,9 @@ class DarshanReport(object):
def __add__(self, other):
"""
Allow reports to be combined/merged overloading the addition operation.
"""
# new report
nr = DarshanReport()
......@@ -95,20 +111,29 @@ class DarshanReport(object):
# Currently, assume logs remain in memomry to create prov. tree on demand
# Alternative: maintain a tree with simpler refs? (modified reports would not work then)
nr.provenance_reports[self.filename] = copy.copy(self)
nr.provenance_reports[other.filename] = copy.copy(other)
#nr.provenance_reports[self.filename] = copy.copy(self)
#nr.provenance_reports[other.filename] = copy.copy(other)
nr.provenance_reports[self.filename] = None
nr.provenance_reports[other.filename] = None
nr.provenance_log.append(("add", self, other, datetime.datetime.now()))
# update metadata
def update_metadata(report):
def update_metadata(report, force=False):
if force:
nr.metadata['start_time'] = report.metadata['start_time']
nr.metadata['end_time'] = report.metadata['end_time']
return
if report.metadata['start_time'] < nr.metadata['start_time']:
nr.metadata['start_time'] = report.metadata['start_time']
if report.metadata['end_time'] > nr.metadata['end_time']:
nr.metadata['end_time'] = report.metadata['end_time']
update_metadata(self)
update_metadata(self, force=True)
update_metadata(other)
......@@ -121,6 +146,21 @@ class DarshanReport(object):
else:
nr.records[key] += copy.copy(records)
for key, mod in report.modules.items():
if key not in nr.modules:
nr.modules[key] = copy.copy(mod)
# TODO: invalidate len/counters
for key, counter in report.counters.items():
if key not in nr.counters:
nr.counters[key] = copy.copy(counter)
# TODO: invalidate len/counters
for key, nrec in report.name_records.items():
if key not in nr.counters:
nr.name_records[key] = copy.copy(nrec)
# TODO: verify colliding name_records?
return nr
......@@ -136,11 +176,11 @@ class DarshanReport(object):
None
"""
self.data['metadata']['job'] = backend.log_get_job(self.log)
self.data['metadata']['exe'] = backend.log_get_exe(self.log)
self.metadata['job'] = backend.log_get_job(self.log)
self.metadata['exe'] = backend.log_get_exe(self.log)
self.metadata['start_time'] = self.metadata['job']['start_time']
self.metadata['end_time'] = self.metadata['job']['end_time']
self.metadata['start_time'] = datetime.datetime.fromtimestamp(self.metadata['job']['start_time'])
self.metadata['end_time'] = datetime.datetime.fromtimestamp(self.metadata['job']['end_time'])
self.data['mounts'] = backend.log_get_mounts(self.log)
......@@ -183,7 +223,7 @@ class DarshanReport(object):
pass
def read_all_dxt_records(self):
def read_all_dxt_records(self, reads=True, writes=True):
"""
Read all dxt records from darshan log and return as dictionary.
......@@ -195,7 +235,7 @@ class DarshanReport(object):
"""
for mod in self.data['modules']:
self.mod_read_all_dxt_records(mod, warnings=False)
self.mod_read_all_dxt_records(mod, warnings=False, reads=reads, writes=writes)
pass
......@@ -271,7 +311,7 @@ class DarshanReport(object):
pass
def mod_read_all_dxt_records(self, mod, mode='numpy', warnings=True):
def mod_read_all_dxt_records(self, mod, mode='numpy', warnings=True, reads=True, writes=True):
"""
Reads all dxt records for provided module.
......@@ -334,7 +374,7 @@ class DarshanReport(object):
self.data['modules'][mod]['num_records'] += 1
# fetch next
rec = backend.log_get_dxt_record(self.log, mod, structdefs[mod])
rec = backend.log_get_dxt_record(self.log, mod, structdefs[mod], reads=reads, writes=writes)
pass
......@@ -383,7 +423,7 @@ class DarshanReport(object):
None
"""
recs = self.data['records']
recs = self.records
for mod in recs:
for i, rec in enumerate(self.data['records'][mod]):
......@@ -393,6 +433,51 @@ class DarshanReport(object):
self.converted_records = True
def info(self):
"""
Print information about the record for inspection.
Args:
None
Return:
None
"""
print("DarshanReport: ", id(self))
print("Modules in logfile:", [self.modules.keys()])
for mod in self.records:
print("Loaded Records:", mod, ", Entries:", len(self.records[mod]))
print("Name Records:", len(self.name_records))
def get_size(obj, seen=None):
"""Recursively finds size of objects"""
size = sys.getsizeof(obj)
if seen is None:
seen = set()
obj_id = id(obj)
if obj_id in seen:
return 0
# Important mark as seen *before* entering recursion to gracefully handle
# self-referential objects
seen.add(obj_id)
if isinstance(obj, dict):
size += sum([get_size(v, seen) for v in obj.values()])
size += sum([get_size(k, seen) for k in obj.keys()])
elif hasattr(obj, '__dict__'):
size += get_size(obj.__dict__, seen)
elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes, bytearray)):
size += sum([get_size(i, seen) for i in obj])
return size
#print("Memory:", get_size(self), 'bytes')
def as_json(self):
"""
Return JSON representatino of report data as string.
......@@ -404,7 +489,13 @@ class DarshanReport(object):
JSON String
"""
# TODO: decide how to best issue conversion
data = self.data
data = copy.deepcopy(self.data)
recs = data['records']
for mod in recs:
for i, rec in enumerate(data['records'][mod]):
recs[mod][i]['counters'] = rec['counters'].tolist()
recs[mod][i]['fcounters'] = rec['fcounters'].tolist()
return json.dumps(data)
%% Cell type:code id: tags:
``` python
import darshan
import pprint
```
%% Cell type:code id: tags:
``` python
report = darshan.DarshanReport("example.darshan")
report = darshan.DarshanReport("example.darshan", read_all=True)
```
%% Cell type:code id: tags:
``` python
report.info()
```
%%%% Output: stream
DarshanReport: 139885578056984
Modules in logfile: [dict_keys(['POSIX', 'MPI-IO', 'LUSTRE', 'STDIO'])]
Loaded Records: POSIX , Entries: 1
Loaded Records: MPI-IO , Entries: 1
Loaded Records: STDIO , Entries: 129
Name Records: 4
%% Cell type:code id: tags:
``` python
```