Commit 054de670 authored by Shane Snyder's avatar Shane Snyder
Browse files

Merge branch 'pydarshan-RecordCollection' into 'master'

PyDarshan: Record Collections

See merge request !86
parents 755e6853 92501e87
......@@ -33,16 +33,17 @@ def enable_experimental(verbose=False):
import importlib
import darshan
paths = glob.glob(darshan.__path__[0] + "/experimental/aggregators/*.py")
for path in paths:
base = os.path.basename(path)
name = os.path.splitext(base)[0]
for subdir in ['aggregators', 'operations']:
paths = glob.glob(darshan.__path__[0] + f"/experimental/{subdir}/*.py")
for path in paths:
base = os.path.basename(path)
name = os.path.splitext(base)[0]
if name == "__init__":
continue
mod = importlib.import_module(f"darshan.experimental.{subdir}.{name}")
setattr(DarshanReport, name, getattr(mod, name))
if name == "__init__":
continue
mod = importlib.import_module('darshan.experimental.aggregators.{0}'.format(name))
setattr(DarshanReport, name, getattr(mod, name))
if verbose:
print("Added method {} to DarshanReport.".format(name))
if verbose:
print(f"Added method {mod.__name__} to DarshanReport.")
from darshan.report import *
def create_dxttimeline(self, group_by='rank', mode="append"):
"""
Generate/update a timeline from dxt tracing records of current report.
Args:
group_by (str): By which factor to group entries (default: rank)
Allowed Parameters: rank, filename
"""
self.mod_read_all_dxt_records("DXT_POSIX")
self.mod_read_all_dxt_records("DXT_MPIIO")
ctx = {'groups': [], 'items': []}
groups = ctx['groups']
items = ctx['items']
start_time = datetime.datetime.fromtimestamp( self.data['metadata']['job']['start_time'] )
def groupify(rec, mod):
for seg in rec['write_segments']:
seg.update( {'type': 'w'} )
for seg in rec['read_segments']:
seg.update( {'type': 'r'} )
segments = rec['write_segments'] + rec['read_segments']
segments = sorted(segments, key=lambda k: k['start_time'])
start = float('inf')
end = float('-inf')
trace = []
minsize = 0
for seg in segments:
trace += [ seg['type'], seg['offset'], seg['length'], seg['start_time'], seg['end_time'] ]
seg_minsize = seg['offset'] + seg['length']
if minsize < seg_minsize:
minsize = seg_minsize
if start > seg['start_time']:
start = seg['start_time']
if end < seg['end_time']:
end = seg['end_time']
# reconstruct timestamps
start = start_time + datetime.timedelta(seconds=start)
end = start_time + datetime.timedelta(seconds=end)
rid = "%s:%d:%d" % (mod, rec['id'], rec['rank'])
item = {
"id": rid,
"rank": rec['rank'],
"hostname": rec['hostname'],
#"filename": rec['filename'],
"filename": "FIXME: NEED FILENAME",
"group": rid,
"start": start.isoformat(),
"end": end.isoformat(),
"limitSize": False, # required to prevent rendering glitches
"data": {
"duration": (end-start).total_seconds(),
"start": segments[0]['start_time'],
"size": minsize, # minimal estimated filesize
"trace": trace,
}
}
items.append(item)
group = {
"id": rid,
#"content": "[%s] " % (mod) + rec['filename'][-84:],
"content": "[%s] " % (mod) + "NEED FILENAME",
"order": seg['start_time']
}
groups.append(group)
supported = ['DXT_POSIX', 'DXT_MPIIO']
for mod in supported:
if mod in self.data['records']:
for rec in self.data['records'][mod]:
groupify(rec, mod)
# overwrite existing summary entry
if mode == "append":
self.summary['timeline'] = ctx
return ctx
......@@ -8,7 +8,7 @@ def name_records_summary(self):
mod_name (str):
Return:
None
dict with counts nested <nrec-id>/<mod>
"""
counts = {}
......
......@@ -90,9 +90,9 @@ def filter(self, mods=None, name_records=None, pattern=None, regex=None):
if nrec in name_records:
if mod not in ctx:
ctx[mod] = []
ctx[mod] = DarshanRecordCollection(mod=mod, report=r)
ctx[mod].append(rec)
ctx[mod].append(rec._records[0])
r.records = ctx
......
......@@ -16,7 +16,6 @@ def merge(self, other, reduce_first=False):
None
"""
# new report
nr = DarshanReport()
......@@ -59,7 +58,7 @@ def merge(self, other, reduce_first=False):
if key not in nr.records:
nr.records[key] = copy.copy(records)
else:
nr.records[key] += copy.copy(records)
nr.records[key]._records = nr.records[key]._records + copy.copy(records._records)
for key, mod in report.modules.items():
if key not in nr.modules:
......@@ -69,8 +68,6 @@ def merge(self, other, reduce_first=False):
for key, counter in report.counters.items():
if key not in nr.counters:
nr.counters[key] = copy.copy(counter)
# TODO: invalidate len/counters
for key, nrec in report.name_records.items():
if key not in nr.counters:
......
#def np: read_csv -> vs open?
#def copy() -> np: create deep copy
#def view() -> np: creates a view?! wiht the same data..
#def info()
#def size()
#def astype(typestring/object, numpy/pandas?)
# comparisons: ==, <
# comparison: np: array_equal (array_wise comparison)
#add
#multiply
#exp
#sum (np: array wise)
#cumsum (np: all elems)
#min
#max
#median
#mean
#std
#corrcoef ?? interpretation? seems to make more sense for two different reports? that can work i think
# special treatmet dxt? -> corr over time?
......@@ -78,7 +78,7 @@ def reduce(self, operation="sum", mods=None, name_records=None, mode='append', d
if mod not in mods:
continue
for rec in recs:
for i, rec in enumerate(recs):
nrec = rec['id']
if nrec in name_records:
......@@ -95,7 +95,7 @@ def reduce(self, operation="sum", mods=None, name_records=None, mode='append', d
if nrec_pattern not in ctx[mod]:
ctx[mod][nrec_pattern] = {}
if counters not in rec:
if counters not in rec._records[0]:
continue
if counters not in ctx[mod][nrec_pattern]:
......@@ -108,7 +108,7 @@ def reduce(self, operation="sum", mods=None, name_records=None, mode='append', d
result = {}
for mod, name_records in ctx.items():
if mod not in result:
result[mod] = []
result[mod] = DarshanRecordCollection(mod=mod, report=r)
for name_record, val in name_records.items():
rec = {"id": name_record, "rank": -1}
......
......@@ -24,7 +24,7 @@ def plot_access_histogram(self, mod, filter=None, data=None):
print("Summarizing... iohist", mod)
self.mod_agg_iohist(mod)
else:
print("Can not create summary, mod_agg_iohist aggregator is not registered with the report clase.")
print("Can not create summary, mod_agg_iohist aggregator is not registered with the report class.")
......
......@@ -18,6 +18,8 @@ import sys
import numpy as np
import pandas as pd
import collections.abc
import logging
logger = logging.getLogger(__name__)
......@@ -39,6 +41,248 @@ class DarshanReportJSONEncoder(json.JSONEncoder):
class DarshanRecordCollection(collections.abc.MutableSequence):
"""
Darshan log records may nest various properties (e.g., DXT, Lustre).
As such they can not faithfully represented using only a single
Numpy array or a Pandas dataframe.
The DarshanRecordCollection is used as a wrapper to offer
users a stable API to DarshanReports and contained records
in various popular formats while allowing to optimize
memory and internal representations as necessary.
"""
def __init__(self, mod=None, report=None):
super(DarshanRecordCollection, self).__init__()
self.mod = mod # collections should be homogenous in module type
self.report = report # reference the report offering lookup for, e.g., counter names
self.rank = None # if all records in collection share rank, save memory
self.id = None # if all records in collection share id/nrec, save memory
self.timebase = None # allow fast time rebase without touching every record
self.start_time = None
self.end_time = None
self._type = "collection" # collection => list(), single => [record], nested => [[], ... ,[]]
self._records = list() # internal format before user conversion
pass
def __len__(self):
return len(self._records)
def __setitem__(self, key, val):
self._records[key] = val
def __getitem__(self, key):
if self._type == "record":
if isinstance(key, collections.abc.Hashable):
#TODO: might extend this style access to collection/nested type as well
# but do not want to offer an access which might not be feasible to maintain
return self._records[0][key]
else:
return self._records[0]
# Wrap single record in RecordCollection to attach conversions: to_json, to_dict, to_df, ...
# This way conversion logic can be shared.
record = DarshanRecordCollection(mod=self.mod, report=self.report)
if isinstance(key, slice):
record._type = "collection"
record._records = self._records[key]
else:
record._type = "record"
record.append(self._records[key])
return record
def __delitem__(self, key):
del self._list[ii]
def insert(self, key, val):
self._records.insert(key, val)
def append(self, val):
self.insert(len(self._records), val)
def __repr__(self):
if self._type == "record":
return self._records[0].__repr__()
return object.__repr__(self)
#def __repr__(self):
# print("DarshanRecordCollection.__repr__")
# repr = ""
# for rec in self._records:
# repr += f"{rec}\n"
# return repr
def info(self, describe=False, plot=False):
"""
Print information about the record for inspection.
Args:
describe (bool): show detailed summary and statistics (default: False)
plot (bool): show plots for quick value overview for counters and fcounters (default: False)
Return:
None
"""
mod = self.mod
records = self._records
print("Module: ", mod, sep="")
print("Records: ", len(self), sep="")
print("Coll. Type: ", self._type, sep="")
if mod in ['LUSTRE']:
for i, rec in enumerate(records):
pass
elif mod in ['DXT_POSIX', 'DXT_MPIIO']:
ids = set()
ranks = set()
hostnames = set()
reads = 0
writes = 0
for i, rec in enumerate(records):
ids.add(rec['id'])
ranks.add(rec['rank'])
hostnames.add(rec['hostname'])
reads += rec['read_count']
writes += rec['write_count']
print("Ranks: ", str(ranks), sep="")
print("Name Records: ", str(ids), sep="")
print("Hostnames: ", str(hostnames), sep="")
print("Read Events: ", str(reads), sep="")
print("Write Events: ", str(writes), sep="")
if describe or plot:
logger.warn("No plots/descriptions defined for DXT records info.")
else:
ids = set()
ranks = set()
for i, rec in enumerate(records):
ids.add(rec['id'])
ranks.add(rec['rank'])
print("Ranks: ", str(ranks), sep="")
print("Name Records: ", str(ids), sep="")
if describe or plot:
df = self.to_df(attach=None)
pd_max_rows = pd.get_option('display.max_rows')
pd_max_columns = pd.get_option('display.max_columns')
pd.set_option('display.max_rows', None)
if plot:
figw = 7
lh = 0.3 # lineheight
# get number of counters for plot height adjustment
nc = self[0]['counters'].size
nfc = self[0]['fcounters'].size
display(df['counters'].plot.box(vert=False, figsize=(figw, nc*lh)))
display(df['fcounters'].plot.box(vert=False, figsize=(figw, nfc*lh)))
if describe:
display(df['counters'].describe().transpose())
display(df['fcounters'].describe().transpose())
pd.set_option('display.max_rows', pd_max_rows)
###########################################################################
# Export Conversions (following the pandas naming conventions)
###########################################################################
def to_numpy(self):
records = copy.deepcopy(self._records)
return records
def to_list(self):
mod = self.mod
records = copy.deepcopy(self._records)
if mod in ['LUSTRE']:
raise NotImplementedError
elif mod in ['DXT_POSIX', 'DXT_MPIIO']:
raise NotImplementedError
else:
for i, rec in enumerate(records):
rec['counters'] = rec['counters'].tolist()
rec['fcounters'] = rec['fcounters'].tolist()
return records
def to_dict(self):
mod = self.mod
records = copy.deepcopy(self._records)
counters = self.report.counters[self.mod]
if mod in ['LUSTRE']:
raise NotImplementedError
elif mod in ['DXT_POSIX', 'DXT_MPIIO']:
# format already in a dict format, but may offer switches for expansion
logger.warn("WARNING: The output of DarshanRecordCollection.to_dict() may change in the future.")
pass
else:
for i, rec in enumerate(records):
rec['counters'] = dict(zip(counters['counters'], rec['counters']))
rec['fcounters'] = dict(zip(counters['fcounters'], rec['fcounters']))
return records
def to_json(self):
records = self.to_list()
return json.dumps(records, cls=DarshanReportJSONEncoder)
def to_df(self, attach="default"):
if attach == "default":
attach = ['id', 'rank']
mod = self.mod
records = copy.deepcopy(self._records)
if mod in ['LUSTRE']:
for i, rec in enumerate(records):
rec = rec
elif mod in ['DXT_POSIX', 'DXT_MPIIO']:
for i, rec in enumerate(records):
rec['read_segments'] = pd.DataFrame(rec['read_segments'])
rec['write_segments'] = pd.DataFrame(rec['write_segments'])
else:
counters = []
fcounters = []
ids = []
ranks = []
for i, rec in enumerate(records):
counters.append(rec['counters'])
fcounters.append(rec['fcounters'])
ids.append(rec['id'])
ranks.append(rec['rank'])
records = {"counters": None, "fcounters": None}
records['counters'] = pd.DataFrame(counters, columns=self.report.counters[mod]['counters'])
records['fcounters'] = pd.DataFrame(fcounters, columns=self.report.counters[mod]['fcounters'])
def flip_column_order(df):
return df[df.columns[::-1]]
# attach ids and ranks
if attach is not None:
for counter_type in ['counters', 'fcounters']:
records[counter_type] = flip_column_order(records[counter_type])
if 'id' in attach:
records[counter_type]['id'] = ids
if 'rank' in attach:
records[counter_type]['rank'] = ranks
records[counter_type] = flip_column_order(records[counter_type])
return records
class DarshanReport(object):
"""
The DarshanReport class provides a convienient wrapper to access darshan
......@@ -46,12 +290,12 @@ class DarshanReport(object):
a number of common aggregations can be performed.
"""
# a way to conser memory?
# a way to conserve memory?
#__slots__ = ['attr1', 'attr2']
def __init__(self,
filename=None, dtype='pandas',
filename=None, dtype='numpy',
start_time=None, end_time=None,
automatic_summary=False,
read_all=True, lookup_name_records=True):
......@@ -70,9 +314,7 @@ class DarshanReport(object):
self.filename = filename
# Behavioral Options
self.dtype = dtype # Experimental: preferred internal representation: pandas/numpy useful for aggregations, dict good for export/REST
# might require alternative granularity: e.g., records, vs summaries?
# vs dict/pandas? dict/native?
self.dtype = dtype # default dtype to return when viewing records
self.automatic_summary = automatic_summary
self.lookup_name_records = lookup_name_records
......@@ -81,16 +323,18 @@ class DarshanReport(object):
# Report Metadata
#
# Start/End + Timebase are
self.start_time = start_time if start_time else float('inf')
self.end_time = end_time if end_time else float('-inf')
self.timebase = self.start_time
# Initialize data namespaces
self.metadata = {}
self.modules = {}
self.counters = {}
self._metadata = {}
self._modules = {}
self._counters = {}
self.records = {}
self.mounts = {}
self._mounts = {}
self.name_records = {}
# initialize report/summary namespace
......@@ -101,10 +345,10 @@ class DarshanReport(object):
# legacy references (deprecate before 1.0?)
self.data_revision = 0 # counter for consistency checks
self.data = {'version': 1}
self.data['metadata'] = self.metadata
self.data['metadata'] = self._metadata
self.data['records'] = self.records
self.data['summary'] = self.summary
self.data['modules'] = self.modules
self.data['modules'] = self._modules
self.data['counters'] = self.counters
self.data['name_records'] = self.name_records
......@@ -121,6 +365,33 @@ class DarshanReport(object):
self.open(filename, read_all=read_all)
@property
def metadata(self):
return self._metadata
@property