Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
darshan
darshan
Commits
92501e87
Commit
92501e87
authored
Mar 25, 2021
by
Jakob Luettgau
Committed by
Shane Snyder
Mar 25, 2021
Browse files
PyDarshan: Record Collections
parent
755e6853
Changes
18
Expand all
Hide whitespace changes
Inline
Side-by-side
darshan-util/pydarshan/darshan/__init__.py
View file @
92501e87
...
...
@@ -33,16 +33,17 @@ def enable_experimental(verbose=False):
import
importlib
import
darshan
paths
=
glob
.
glob
(
darshan
.
__path__
[
0
]
+
"/experimental/aggregators/*.py"
)
for
path
in
paths
:
base
=
os
.
path
.
basename
(
path
)
name
=
os
.
path
.
splitext
(
base
)[
0
]
for
subdir
in
[
'aggregators'
,
'operations'
]:
paths
=
glob
.
glob
(
darshan
.
__path__
[
0
]
+
f
"/experimental/
{
subdir
}
/*.py"
)
for
path
in
paths
:
base
=
os
.
path
.
basename
(
path
)
name
=
os
.
path
.
splitext
(
base
)[
0
]
if
name
==
"__init__"
:
continue
mod
=
importlib
.
import_module
(
f
"darshan.experimental.
{
subdir
}
.
{
name
}
"
)
setattr
(
DarshanReport
,
name
,
getattr
(
mod
,
name
))
if
name
==
"__init__"
:
continue
mod
=
importlib
.
import_module
(
'darshan.experimental.aggregators.{0}'
.
format
(
name
))
setattr
(
DarshanReport
,
name
,
getattr
(
mod
,
name
))
if
verbose
:
print
(
"Added method {} to DarshanReport."
.
format
(
name
))
if
verbose
:
print
(
f
"Added method
{
mod
.
__name__
}
to DarshanReport."
)
darshan-util/pydarshan/darshan/experimental/aggregators/create_dxttimeline.py
0 → 100644
View file @
92501e87
from
darshan.report
import
*
def
create_dxttimeline
(
self
,
group_by
=
'rank'
,
mode
=
"append"
):
"""
Generate/update a timeline from dxt tracing records of current report.
Args:
group_by (str): By which factor to group entries (default: rank)
Allowed Parameters: rank, filename
"""
self
.
mod_read_all_dxt_records
(
"DXT_POSIX"
)
self
.
mod_read_all_dxt_records
(
"DXT_MPIIO"
)
ctx
=
{
'groups'
:
[],
'items'
:
[]}
groups
=
ctx
[
'groups'
]
items
=
ctx
[
'items'
]
start_time
=
datetime
.
datetime
.
fromtimestamp
(
self
.
data
[
'metadata'
][
'job'
][
'start_time'
]
)
def
groupify
(
rec
,
mod
):
for
seg
in
rec
[
'write_segments'
]:
seg
.
update
(
{
'type'
:
'w'
}
)
for
seg
in
rec
[
'read_segments'
]:
seg
.
update
(
{
'type'
:
'r'
}
)
segments
=
rec
[
'write_segments'
]
+
rec
[
'read_segments'
]
segments
=
sorted
(
segments
,
key
=
lambda
k
:
k
[
'start_time'
])
start
=
float
(
'inf'
)
end
=
float
(
'-inf'
)
trace
=
[]
minsize
=
0
for
seg
in
segments
:
trace
+=
[
seg
[
'type'
],
seg
[
'offset'
],
seg
[
'length'
],
seg
[
'start_time'
],
seg
[
'end_time'
]
]
seg_minsize
=
seg
[
'offset'
]
+
seg
[
'length'
]
if
minsize
<
seg_minsize
:
minsize
=
seg_minsize
if
start
>
seg
[
'start_time'
]:
start
=
seg
[
'start_time'
]
if
end
<
seg
[
'end_time'
]:
end
=
seg
[
'end_time'
]
# reconstruct timestamps
start
=
start_time
+
datetime
.
timedelta
(
seconds
=
start
)
end
=
start_time
+
datetime
.
timedelta
(
seconds
=
end
)
rid
=
"%s:%d:%d"
%
(
mod
,
rec
[
'id'
],
rec
[
'rank'
])
item
=
{
"id"
:
rid
,
"rank"
:
rec
[
'rank'
],
"hostname"
:
rec
[
'hostname'
],
#"filename": rec['filename'],
"filename"
:
"FIXME: NEED FILENAME"
,
"group"
:
rid
,
"start"
:
start
.
isoformat
(),
"end"
:
end
.
isoformat
(),
"limitSize"
:
False
,
# required to prevent rendering glitches
"data"
:
{
"duration"
:
(
end
-
start
).
total_seconds
(),
"start"
:
segments
[
0
][
'start_time'
],
"size"
:
minsize
,
# minimal estimated filesize
"trace"
:
trace
,
}
}
items
.
append
(
item
)
group
=
{
"id"
:
rid
,
#"content": "[%s] " % (mod) + rec['filename'][-84:],
"content"
:
"[%s] "
%
(
mod
)
+
"NEED FILENAME"
,
"order"
:
seg
[
'start_time'
]
}
groups
.
append
(
group
)
supported
=
[
'DXT_POSIX'
,
'DXT_MPIIO'
]
for
mod
in
supported
:
if
mod
in
self
.
data
[
'records'
]:
for
rec
in
self
.
data
[
'records'
][
mod
]:
groupify
(
rec
,
mod
)
# overwrite existing summary entry
if
mode
==
"append"
:
self
.
summary
[
'timeline'
]
=
ctx
return
ctx
darshan-util/pydarshan/darshan/experimental/aggregators/name_records_summary.py
View file @
92501e87
...
...
@@ -8,7 +8,7 @@ def name_records_summary(self):
mod_name (str):
Return:
None
dict with counts nested <nrec-id>/<mod>
"""
counts
=
{}
...
...
darshan-util/pydarshan/darshan/experimental/
aggregator
s/filter.py
→
darshan-util/pydarshan/darshan/experimental/
operation
s/filter.py
View file @
92501e87
...
...
@@ -90,9 +90,9 @@ def filter(self, mods=None, name_records=None, pattern=None, regex=None):
if
nrec
in
name_records
:
if
mod
not
in
ctx
:
ctx
[
mod
]
=
[]
ctx
[
mod
]
=
DarshanRecordCollection
(
mod
=
mod
,
report
=
r
)
ctx
[
mod
].
append
(
rec
)
ctx
[
mod
].
append
(
rec
.
_records
[
0
]
)
r
.
records
=
ctx
...
...
darshan-util/pydarshan/darshan/experimental/
aggregator
s/merge.py
→
darshan-util/pydarshan/darshan/experimental/
operation
s/merge.py
View file @
92501e87
...
...
@@ -16,7 +16,6 @@ def merge(self, other, reduce_first=False):
None
"""
# new report
nr
=
DarshanReport
()
...
...
@@ -59,7 +58,7 @@ def merge(self, other, reduce_first=False):
if
key
not
in
nr
.
records
:
nr
.
records
[
key
]
=
copy
.
copy
(
records
)
else
:
nr
.
records
[
key
]
+
=
copy
.
copy
(
records
)
nr
.
records
[
key
]
.
_records
=
nr
.
records
[
key
].
_records
+
copy
.
copy
(
records
.
_records
)
for
key
,
mod
in
report
.
modules
.
items
():
if
key
not
in
nr
.
modules
:
...
...
@@ -69,8 +68,6 @@ def merge(self, other, reduce_first=False):
for
key
,
counter
in
report
.
counters
.
items
():
if
key
not
in
nr
.
counters
:
nr
.
counters
[
key
]
=
copy
.
copy
(
counter
)
# TODO: invalidate len/counters
for
key
,
nrec
in
report
.
name_records
.
items
():
if
key
not
in
nr
.
counters
:
...
...
darshan-util/pydarshan/darshan/experimental/operations/potential_functions.py.txt
0 → 100644
View file @
92501e87
#def np: read_csv -> vs open?
#def copy() -> np: create deep copy
#def view() -> np: creates a view?! wiht the same data..
#def info()
#def size()
#def astype(typestring/object, numpy/pandas?)
# comparisons: ==, <
# comparison: np: array_equal (array_wise comparison)
#add
#multiply
#exp
#sum (np: array wise)
#cumsum (np: all elems)
#min
#max
#median
#mean
#std
#corrcoef ?? interpretation? seems to make more sense for two different reports? that can work i think
# special treatmet dxt? -> corr over time?
darshan-util/pydarshan/darshan/experimental/
aggregator
s/reduce.py
→
darshan-util/pydarshan/darshan/experimental/
operation
s/reduce.py
View file @
92501e87
...
...
@@ -78,7 +78,7 @@ def reduce(self, operation="sum", mods=None, name_records=None, mode='append', d
if
mod
not
in
mods
:
continue
for
rec
in
recs
:
for
i
,
rec
in
enumerate
(
recs
)
:
nrec
=
rec
[
'id'
]
if
nrec
in
name_records
:
...
...
@@ -95,7 +95,7 @@ def reduce(self, operation="sum", mods=None, name_records=None, mode='append', d
if
nrec_pattern
not
in
ctx
[
mod
]:
ctx
[
mod
][
nrec_pattern
]
=
{}
if
counters
not
in
rec
:
if
counters
not
in
rec
.
_records
[
0
]
:
continue
if
counters
not
in
ctx
[
mod
][
nrec_pattern
]:
...
...
@@ -108,7 +108,7 @@ def reduce(self, operation="sum", mods=None, name_records=None, mode='append', d
result
=
{}
for
mod
,
name_records
in
ctx
.
items
():
if
mod
not
in
result
:
result
[
mod
]
=
[]
result
[
mod
]
=
DarshanRecordCollection
(
mod
=
mod
,
report
=
r
)
for
name_record
,
val
in
name_records
.
items
():
rec
=
{
"id"
:
name_record
,
"rank"
:
-
1
}
...
...
darshan-util/pydarshan/darshan/experimental/plots/matplotlib.py
View file @
92501e87
...
...
@@ -24,7 +24,7 @@ def plot_access_histogram(self, mod, filter=None, data=None):
print
(
"Summarizing... iohist"
,
mod
)
self
.
mod_agg_iohist
(
mod
)
else
:
print
(
"Can not create summary, mod_agg_iohist aggregator is not registered with the report clas
e
."
)
print
(
"Can not create summary, mod_agg_iohist aggregator is not registered with the report clas
s
."
)
...
...
darshan-util/pydarshan/darshan/report.py
View file @
92501e87
...
...
@@ -18,6 +18,8 @@ import sys
import
numpy
as
np
import
pandas
as
pd
import
collections.abc
import
logging
logger
=
logging
.
getLogger
(
__name__
)
...
...
@@ -39,6 +41,248 @@ class DarshanReportJSONEncoder(json.JSONEncoder):
class
DarshanRecordCollection
(
collections
.
abc
.
MutableSequence
):
"""
Darshan log records may nest various properties (e.g., DXT, Lustre).
As such they can not faithfully represented using only a single
Numpy array or a Pandas dataframe.
The DarshanRecordCollection is used as a wrapper to offer
users a stable API to DarshanReports and contained records
in various popular formats while allowing to optimize
memory and internal representations as necessary.
"""
def
__init__
(
self
,
mod
=
None
,
report
=
None
):
super
(
DarshanRecordCollection
,
self
).
__init__
()
self
.
mod
=
mod
# collections should be homogenous in module type
self
.
report
=
report
# reference the report offering lookup for, e.g., counter names
self
.
rank
=
None
# if all records in collection share rank, save memory
self
.
id
=
None
# if all records in collection share id/nrec, save memory
self
.
timebase
=
None
# allow fast time rebase without touching every record
self
.
start_time
=
None
self
.
end_time
=
None
self
.
_type
=
"collection"
# collection => list(), single => [record], nested => [[], ... ,[]]
self
.
_records
=
list
()
# internal format before user conversion
pass
def
__len__
(
self
):
return
len
(
self
.
_records
)
def
__setitem__
(
self
,
key
,
val
):
self
.
_records
[
key
]
=
val
def
__getitem__
(
self
,
key
):
if
self
.
_type
==
"record"
:
if
isinstance
(
key
,
collections
.
abc
.
Hashable
):
#TODO: might extend this style access to collection/nested type as well
# but do not want to offer an access which might not be feasible to maintain
return
self
.
_records
[
0
][
key
]
else
:
return
self
.
_records
[
0
]
# Wrap single record in RecordCollection to attach conversions: to_json, to_dict, to_df, ...
# This way conversion logic can be shared.
record
=
DarshanRecordCollection
(
mod
=
self
.
mod
,
report
=
self
.
report
)
if
isinstance
(
key
,
slice
):
record
.
_type
=
"collection"
record
.
_records
=
self
.
_records
[
key
]
else
:
record
.
_type
=
"record"
record
.
append
(
self
.
_records
[
key
])
return
record
def
__delitem__
(
self
,
key
):
del
self
.
_list
[
ii
]
def
insert
(
self
,
key
,
val
):
self
.
_records
.
insert
(
key
,
val
)
def
append
(
self
,
val
):
self
.
insert
(
len
(
self
.
_records
),
val
)
def
__repr__
(
self
):
if
self
.
_type
==
"record"
:
return
self
.
_records
[
0
].
__repr__
()
return
object
.
__repr__
(
self
)
#def __repr__(self):
# print("DarshanRecordCollection.__repr__")
# repr = ""
# for rec in self._records:
# repr += f"{rec}\n"
# return repr
def
info
(
self
,
describe
=
False
,
plot
=
False
):
"""
Print information about the record for inspection.
Args:
describe (bool): show detailed summary and statistics (default: False)
plot (bool): show plots for quick value overview for counters and fcounters (default: False)
Return:
None
"""
mod
=
self
.
mod
records
=
self
.
_records
print
(
"Module: "
,
mod
,
sep
=
""
)
print
(
"Records: "
,
len
(
self
),
sep
=
""
)
print
(
"Coll. Type: "
,
self
.
_type
,
sep
=
""
)
if
mod
in
[
'LUSTRE'
]:
for
i
,
rec
in
enumerate
(
records
):
pass
elif
mod
in
[
'DXT_POSIX'
,
'DXT_MPIIO'
]:
ids
=
set
()
ranks
=
set
()
hostnames
=
set
()
reads
=
0
writes
=
0
for
i
,
rec
in
enumerate
(
records
):
ids
.
add
(
rec
[
'id'
])
ranks
.
add
(
rec
[
'rank'
])
hostnames
.
add
(
rec
[
'hostname'
])
reads
+=
rec
[
'read_count'
]
writes
+=
rec
[
'write_count'
]
print
(
"Ranks: "
,
str
(
ranks
),
sep
=
""
)
print
(
"Name Records: "
,
str
(
ids
),
sep
=
""
)
print
(
"Hostnames: "
,
str
(
hostnames
),
sep
=
""
)
print
(
"Read Events: "
,
str
(
reads
),
sep
=
""
)
print
(
"Write Events: "
,
str
(
writes
),
sep
=
""
)
if
describe
or
plot
:
logger
.
warn
(
"No plots/descriptions defined for DXT records info."
)
else
:
ids
=
set
()
ranks
=
set
()
for
i
,
rec
in
enumerate
(
records
):
ids
.
add
(
rec
[
'id'
])
ranks
.
add
(
rec
[
'rank'
])
print
(
"Ranks: "
,
str
(
ranks
),
sep
=
""
)
print
(
"Name Records: "
,
str
(
ids
),
sep
=
""
)
if
describe
or
plot
:
df
=
self
.
to_df
(
attach
=
None
)
pd_max_rows
=
pd
.
get_option
(
'display.max_rows'
)
pd_max_columns
=
pd
.
get_option
(
'display.max_columns'
)
pd
.
set_option
(
'display.max_rows'
,
None
)
if
plot
:
figw
=
7
lh
=
0.3
# lineheight
# get number of counters for plot height adjustment
nc
=
self
[
0
][
'counters'
].
size
nfc
=
self
[
0
][
'fcounters'
].
size
display
(
df
[
'counters'
].
plot
.
box
(
vert
=
False
,
figsize
=
(
figw
,
nc
*
lh
)))
display
(
df
[
'fcounters'
].
plot
.
box
(
vert
=
False
,
figsize
=
(
figw
,
nfc
*
lh
)))
if
describe
:
display
(
df
[
'counters'
].
describe
().
transpose
())
display
(
df
[
'fcounters'
].
describe
().
transpose
())
pd
.
set_option
(
'display.max_rows'
,
pd_max_rows
)
###########################################################################
# Export Conversions (following the pandas naming conventions)
###########################################################################
def
to_numpy
(
self
):
records
=
copy
.
deepcopy
(
self
.
_records
)
return
records
def
to_list
(
self
):
mod
=
self
.
mod
records
=
copy
.
deepcopy
(
self
.
_records
)
if
mod
in
[
'LUSTRE'
]:
raise
NotImplementedError
elif
mod
in
[
'DXT_POSIX'
,
'DXT_MPIIO'
]:
raise
NotImplementedError
else
:
for
i
,
rec
in
enumerate
(
records
):
rec
[
'counters'
]
=
rec
[
'counters'
].
tolist
()
rec
[
'fcounters'
]
=
rec
[
'fcounters'
].
tolist
()
return
records
def
to_dict
(
self
):
mod
=
self
.
mod
records
=
copy
.
deepcopy
(
self
.
_records
)
counters
=
self
.
report
.
counters
[
self
.
mod
]
if
mod
in
[
'LUSTRE'
]:
raise
NotImplementedError
elif
mod
in
[
'DXT_POSIX'
,
'DXT_MPIIO'
]:
# format already in a dict format, but may offer switches for expansion
logger
.
warn
(
"WARNING: The output of DarshanRecordCollection.to_dict() may change in the future."
)
pass
else
:
for
i
,
rec
in
enumerate
(
records
):
rec
[
'counters'
]
=
dict
(
zip
(
counters
[
'counters'
],
rec
[
'counters'
]))
rec
[
'fcounters'
]
=
dict
(
zip
(
counters
[
'fcounters'
],
rec
[
'fcounters'
]))
return
records
def
to_json
(
self
):
records
=
self
.
to_list
()
return
json
.
dumps
(
records
,
cls
=
DarshanReportJSONEncoder
)
def
to_df
(
self
,
attach
=
"default"
):
if
attach
==
"default"
:
attach
=
[
'id'
,
'rank'
]
mod
=
self
.
mod
records
=
copy
.
deepcopy
(
self
.
_records
)
if
mod
in
[
'LUSTRE'
]:
for
i
,
rec
in
enumerate
(
records
):
rec
=
rec
elif
mod
in
[
'DXT_POSIX'
,
'DXT_MPIIO'
]:
for
i
,
rec
in
enumerate
(
records
):
rec
[
'read_segments'
]
=
pd
.
DataFrame
(
rec
[
'read_segments'
])
rec
[
'write_segments'
]
=
pd
.
DataFrame
(
rec
[
'write_segments'
])
else
:
counters
=
[]
fcounters
=
[]
ids
=
[]
ranks
=
[]
for
i
,
rec
in
enumerate
(
records
):
counters
.
append
(
rec
[
'counters'
])
fcounters
.
append
(
rec
[
'fcounters'
])
ids
.
append
(
rec
[
'id'
])
ranks
.
append
(
rec
[
'rank'
])
records
=
{
"counters"
:
None
,
"fcounters"
:
None
}
records
[
'counters'
]
=
pd
.
DataFrame
(
counters
,
columns
=
self
.
report
.
counters
[
mod
][
'counters'
])
records
[
'fcounters'
]
=
pd
.
DataFrame
(
fcounters
,
columns
=
self
.
report
.
counters
[
mod
][
'fcounters'
])
def
flip_column_order
(
df
):
return
df
[
df
.
columns
[::
-
1
]]
# attach ids and ranks
if
attach
is
not
None
:
for
counter_type
in
[
'counters'
,
'fcounters'
]:
records
[
counter_type
]
=
flip_column_order
(
records
[
counter_type
])
if
'id'
in
attach
:
records
[
counter_type
][
'id'
]
=
ids
if
'rank'
in
attach
:
records
[
counter_type
][
'rank'
]
=
ranks
records
[
counter_type
]
=
flip_column_order
(
records
[
counter_type
])
return
records
class
DarshanReport
(
object
):
"""
The DarshanReport class provides a convienient wrapper to access darshan
...
...
@@ -46,12 +290,12 @@ class DarshanReport(object):
a number of common aggregations can be performed.
"""
# a way to conser memory?
# a way to conser
ve
memory?
#__slots__ = ['attr1', 'attr2']
def
__init__
(
self
,
filename
=
None
,
dtype
=
'
pandas
'
,
filename
=
None
,
dtype
=
'
numpy
'
,
start_time
=
None
,
end_time
=
None
,
automatic_summary
=
False
,
read_all
=
True
,
lookup_name_records
=
True
):
...
...
@@ -70,9 +314,7 @@ class DarshanReport(object):
self
.
filename
=
filename
# Behavioral Options
self
.
dtype
=
dtype
# Experimental: preferred internal representation: pandas/numpy useful for aggregations, dict good for export/REST
# might require alternative granularity: e.g., records, vs summaries?
# vs dict/pandas? dict/native?
self
.
dtype
=
dtype
# default dtype to return when viewing records
self
.
automatic_summary
=
automatic_summary
self
.
lookup_name_records
=
lookup_name_records
...
...
@@ -81,16 +323,18 @@ class DarshanReport(object):
# Report Metadata
#
# Start/End + Timebase are
self
.
start_time
=
start_time
if
start_time
else
float
(
'inf'
)
self
.
end_time
=
end_time
if
end_time
else
float
(
'-inf'
)
self
.
timebase
=
self
.
start_time
# Initialize data namespaces
self
.
metadata
=
{}
self
.
modules
=
{}
self
.
counters
=
{}
self
.
_
metadata
=
{}
self
.
_
modules
=
{}
self
.
_
counters
=
{}
self
.
records
=
{}
self
.
mounts
=
{}
self
.
_
mounts
=
{}
self
.
name_records
=
{}
# initialize report/summary namespace
...
...
@@ -101,10 +345,10 @@ class DarshanReport(object):
# legacy references (deprecate before 1.0?)
self
.
data_revision
=
0
# counter for consistency checks
self
.
data
=
{
'version'
:
1
}
self
.
data
[
'metadata'
]
=
self
.
metadata
self
.
data
[
'metadata'
]
=
self
.
_
metadata
self
.
data
[
'records'
]
=
self
.
records
self
.
data
[
'summary'
]
=
self
.
summary
self
.
data
[
'modules'
]
=
self
.
modules
self
.
data
[
'modules'
]
=
self
.
_
modules
self
.
data
[
'counters'
]
=
self
.
counters
self
.
data
[
'name_records'
]
=
self
.
name_records
...
...
@@ -121,6 +365,33 @@ class DarshanReport(object):
self
.
open
(
filename
,
read_all
=
read_all
)
@
property
def
metadata
(
self
):
return
self
.
_metadata
@
property
def
modules
(
self
):
return
self
.
_modules