report.py 16.1 KB
Newer Older
1
2
3
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

4
5
6
7
8
9
"""
The darshan.repport module provides the DarshanReport class for convienient
interaction and aggregation of Darshan logs using Python.
"""


10
import darshan.backend.cffi_backend as backend
11

12
import json
13
import re
14
import copy
15
import datetime
16
import sys
17

18
19
20
import numpy as np
import pandas as pd

21

22
23
24
25
import logging



26
class DarshanReportJSONEncoder(json.JSONEncoder):
27
    """
28
29
    Helper class for JSON serialization if the report contains, for example,
    numpy or dates records, which are not handled by the default JSON encoder.
30
    """
31
32
33
    def default(self, obj):
        if isinstance(obj, np.ndarray):
            return obj.tolist()
34
35
36
37
    
        if isinstance(obj, datetime.datetime):
            return obj.isoformat()

38
39
40
        return json.JSONEncoder.default(self, obj)


41

42
_structdefs = {
43
44
45
    "BG/Q": "struct darshan_bgq_record **",
    "DXT_MPIIO": "struct dxt_file_record **",
    "DXT_POSIX": "struct dxt_file_record **",
46
47
48
    "H5F": "struct darshan_hdf5_file **",
    "H5D": "struct darshan_hdf5_dataset **",
    "LUSTRE": "struct darshan_lustre_record **",
49
50
51
52
53
54
55
    "MPI-IO": "struct darshan_mpiio_file **",
    "PNETCDF": "struct darshan_pnetcdf_file **",
    "POSIX": "struct darshan_posix_file **",
    "STDIO": "struct darshan_stdio_file **",
}


56

57
class DarshanReport(object):
58
59
60
61
62
    """
    The DarshanReport class provides a convienient wrapper to access darshan
    logs, which also caches already fetched information. In addition to that
    a number of common aggregations can be performed.
    """
63

64
65
    def __init__(self, filename=None, data_format='pandas', automatic_summary=False,
            read_all=True, lookup_name_records=True):
66
67
        self.filename = filename

68
        # options
69
        self.data_format = data_format  # Experimental: preferred internal representation: pandas/numpy useful for aggregations, dict good for export/REST
Jakob Luettgau's avatar
PEP8.    
Jakob Luettgau committed
70
71
                                        # might require alternative granularity: e.g., records, vs summaries?
                                        # vs dict/pandas?  dict/native?
72
        self.automatic_summary = automatic_summary
73
        self.lookup_name_records = lookup_name_records
74

75

76
        # state dependent book-keeping
Jakob Luettgau's avatar
PEP8.    
Jakob Luettgau committed
77
        self.converted_records = False  # true if convert_records() was called (unnumpyfy)
78

79
80
81
        # 
        self.start_time = float('inf')
        self.end_time = float('-inf')
82

83
84
85
86
87
        # initialize data namespaces
        self.metadata = {}
        self.modules = {}
        self.counters = {}
        self.records = {}
88
        self.mounts = {}
89
        self.name_records = {}
90
91
92

        # initialize report/summary namespace
        self.summary_revision = 0       # counter to check if summary needs update
93
94
95
96
97
98
99
100
101
102
103
104
105
        self.summary = {}


        # legacy references (deprecate before 1.0?)
        self.data_revision = 0          # counter for consistency checks
        self.data = {'version': 1}
        self.data['metadata'] = self.metadata
        self.data['records'] = self.records
        self.data['summary'] = self.summary
        self.data['modules'] = self.modules
        self.data['counters'] = self.counters
        self.data['name_records'] = self.name_records

106

107
108
109

        # when using report algebra this log allows to untangle potentially
        # unfair aggregations (e.g., double accounting)
110
        self.provenance_enabled = True
111
        self.provenance_log = []
112
113
114
        self.provenance_reports = {}


115
116
117
118
        if filename:
            self.open(filename, read_all=read_all)    


119
120
121
    def open(self, filename, read_all=False):
        """
        Open log file via CFFI backend.
122

123
124
125
        Args:
            filename (str): filename to open (optional)
            read_all (bool): whether to read all records for log
126

127
128
129
130
        Return:
            None

        """
131
132
133

        self.filename = filename

134
135
        if filename:
            self.log = backend.log_open(self.filename)
136
137
138
            if not bool(self.log['handle']):
                raise RuntimeError("Failed to open file.")

139
            self.read_metadata(read_all=read_all)
140

141
142
            if read_all:
                self.read_all()
143
144
145
146



    def __add__(self, other):
147
        """
148
        Allow reports to be merged using the addition operator.
149
        """
150

151
        return self.merge(other)
152

153

154
155
156
    def __deepcopy__(self, memo):
        """
        Creates a deepcopy of report.
157

158
159
        .. note::
            Needed to purge reference to self.log as Cdata can not be pickled:
160
161
            TypeError: can't pickle _cffi_backend.CData objects
        """
162

163
164
165
166
167
        cls = self.__class__
        result = cls.__new__(cls)
        memo[id(self)] = result
        for k, v in self.__dict__.items():
            if k in ["log"]:
168
                # blacklist of members not to copy
169
170
171
                continue
            setattr(result, k, copy.deepcopy(v, memo))
        return result
172

173
        # TODO: might consider treating self.log as list of open logs to not deactivate load functions?
174

175
        return result
176

177

178
    def read_metadata(self, read_all=False):
179
180
181
182
183
184
185
186
187
188
        """
        Read metadata such as the job, the executables and available modules.

        Args:
            None

        Return:
            None

        """
189
190
        self.metadata['job'] = backend.log_get_job(self.log)
        self.metadata['exe'] = backend.log_get_exe(self.log)
191

192
193
        self.start_time = datetime.datetime.fromtimestamp(self.metadata['job']['start_time'])
        self.end_time = datetime.datetime.fromtimestamp(self.metadata['job']['end_time'])
194
195

        self.data['mounts'] = backend.log_get_mounts(self.log)
196
        self.mounts = self.data['mounts']
197
198
199
200

        self.data['modules'] = backend.log_get_modules(self.log)
        self.modules = self.data['modules']

201
        if read_all == True:
202
203
            self.data["name_records"] = backend.log_get_name_records(self.log)
            self.name_records = self.data['name_records']
204
205


206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
    def update_name_records(self, mod=None):
        """
        Update (and prune unused) name records from resolve table.

        First reindexes all used name record identifiers and then queries 
        darshan-utils library to compile filtered list of name records.

        Args:
            None

        Return:
            None

        """
        # sanitize inputs
        mods = mod
        if mods is None:
            mods = self.records
        else:
            mods = [mod]

        
        # state
        ids = set()

        for mod in mods:
232
            print(mod)
233
234
235
236
237
238
239
            for rec in self.records[mod]:
                ids.add(rec['id'])


        self.name_records = backend.log_lookup_name_records(self.log, ids)
        

240
    def read_all(self):
241
242
243
244
245
246
247
248
249
        """
        Read all available records from darshan log and return as dictionary.

        Args:
            None

        Return:
            None
        """
250
251
252
253
254
        self.read_all_generic_records()
        self.read_all_dxt_records()
        return


255
    def read_all_generic_records(self, counters=True, fcounters=True):
256
        """
257
        Read all generic records from darshan log and return as dictionary.
258

259
260
        Args:
            None
261

262
263
264
        Return:
            None
        """
265
        for mod in self.data['modules']:
266
            self.mod_read_all_records(mod, warnings=False)
267

268
269
270
        pass


271
    def read_all_dxt_records(self, reads=True, writes=True, dtype=None):
272
        """
273
        Read all dxt records from darshan log and return as dictionary.
274
275
276
277
278
279
280

        Args:
            None

        Return:
            None
        """
281
        for mod in self.data['modules']:
282
            self.mod_read_all_dxt_records(mod, warnings=False, reads=reads, writes=writes, dtype=dtype)
283
284
285
286

        pass


287
    def mod_read_all_records(self, mod, dtype='numpy', warnings=True):
288
        """
289
        Reads all generic records for module
290
291
292

        Args:
            mod (str): Identifier of module to fetch all records
293
            dtype (str): 'numpy' for ndarray (default), 'dict' for python dictionary, 'pandas'
294
295
296
297
298

        Return:
            None

        """
299
        unsupported =  ['DXT_POSIX', 'DXT_MPIIO', 'LUSTRE']
300

301
        if mod in unsupported:
302
303
            if warnings:
                print("Skipping. Currently unsupported:", mod, "in mod_read_all_records().", file=sys.stderr)
304
305
306
307
            # skip mod
            return 


308
        self.data['records'][mod] = []
309
310
311
312

        cn = backend.counter_names(mod)
        fcn = backend.fcounter_names(mod)

313
314
315
316
317
318
319
320

        self.modules[mod]['num_records'] = 0

        
        if mod not in self.counters:
            self.counters[mod] = {}
        self.counters[mod]['counters'] = cn 
        self.counters[mod]['fcounters'] = fcn
321
322


323
        rec = backend.log_get_generic_record(self.log, mod, _structdefs[mod], dtype=dtype)
324
        while rec != None:
325
            if dtype == 'pandas':
326
                self.records[mod].append(rec)
327
            if dtype == 'numpy': 
328
                self.records[mod].append(rec)
329
            else:
330
331
                self.records[mod].append(rec)

332
            self.modules[mod]['num_records'] += 1
333
334

            # fetch next
335
            rec = backend.log_get_generic_record(self.log, mod, _structdefs[mod], dtype=dtype)
336

337

338
339
        if self.lookup_name_records:
            self.update_name_records()
340
341
342
343


        # process/combine records if the format dtype allows for this
        if dtype == 'pandas':
344
345
            combined_c = None
            combined_fc = None
346

347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
            for rec in self.records[mod]:
                obj = rec['counters']
                #print(type(obj))
                #display(obj)
                
                if combined_c is None:
                    combined_c = rec['counters']
                else:
                    combined_c = pd.concat([combined_c, rec['counters']])
                    
                if combined_fc is None:
                    combined_fc = rec['fcounters']
                else:
                    combined_fc = pd.concat([combined_fc, rec['fcounters']])

            self.records[mod] = [{
                'rank': -1,
                'id': -1,
                'counters': combined_c,
                'fcounters': combined_fc
                }]
368

369
370
371
        pass


372
    def mod_read_all_dxt_records(self, mod, dtype='numpy', warnings=True, reads=True, writes=True):
373
        """
374
        Reads all dxt records for provided module.
375
376
377

        Args:
            mod (str): Identifier of module to fetch all records
378
            dtype (str): 'numpy' for ndarray (default), 'dict' for python dictionary
379
380
381

        Return:
            None
382
383

        """
384
        if mod not in self.data['modules']:
385
386
            if warnings:
                print("Skipping. Log does not contain data for mod:", mod, file=sys.stderr)
387
388
389
390
391
392
            return


        supported =  ['DXT_POSIX', 'DXT_MPIIO']

        if mod not in supported:
393
394
            if warnings:
                print("Skipping. Currently unsupported:", mod, 'in mod_read_all_dxt_records().', file=sys.stderr)
395
396
397
            # skip mod
            return 

398

399
400
401
        self.records[mod] = []
        self.modules[mod]['num_records'] = 0

402

403
404
        if mod not in self.counters:
            self.counters[mod] = {}
405
406


407
        rec = backend.log_get_dxt_record(self.log, mod, _structdefs[mod], dtype=dtype)
408
        while rec != None:
409
            if dtype == 'numpy': 
410
                self.records[mod].append(rec)
411
            else:
412
413
                self.records[mod].append(rec)

414
415

                pass
416

417

418
            self.data['modules'][mod]['num_records'] += 1
419
420

            # fetch next
421
            rec = backend.log_get_dxt_record(self.log, mod, _structdefs[mod], reads=reads, writes=writes, dtype=dtype)
422
423
424
425

        pass


426
    def mod_records(self, mod, dtype='numpy', warnings=True):
427
428
        """
        Return generator for lazy record loading and traversal.
429
430
431
432
433
434
435

        .. warning::
            Can't be used for now when alternating between different modules.
            A temporary workaround can be to open the same log multiple times,
            as this ways buffers are not shared between get_record invocations
            in the lower level library.

436
437
438

        Args:
            mod (str): Identifier of module to fetch records for
439
            dtype (str): 'numpy' for ndarray (default), 'dict' for python dictionary
440
441
442
443
444
445
446
447
448
449
450
451
452

        Return:
            None

        """
        cn = backend.counter_names(mod)
        fcn = backend.fcounter_names(mod)

        if mod not in self.counters:
            self.counters[mod] = {}
        self.counters[mod]['counters'] = cn 
        self.counters[mod]['fcounters'] = fcn

453
        rec = backend.log_get_generic_record(self.log, mod, _structdefs[mod], dtype=dtype)
454
455
456
457
        while rec != None:
            yield rec

            # fetch next
458
            rec = backend.log_get_generic_record(self.log, mod, _structdefs[mod])
459
460


461
    def info(self, metadata=False):
462
        """
463
        Print information about the record for inspection.
464
465

        Args:
466
            metadata (bool): show detailed metadata (default: False)
467
468
469

        Return:
            None
470
        """
471
        print("Filename:       ", self.filename, sep="")
472

473
474
475
        tdelta = self.end_time - self.start_time
        print("Times:          ", self.start_time, " to ", self.end_time, " (Duration ", tdelta, ")", sep="")

476
477
478
479
480
481
482
483
        if 'exe' in self.metadata:
            print("Executeable:    ", self.metadata['exe'], sep="")

        if 'job' in self.metadata:
            print("Processes:      ", self.metadata['job']['nprocs'], sep="")
            print("JobID:          ", self.metadata['job']['jobid'], sep="")
            print("UID:            ", self.metadata['job']['uid'], sep="")
            print("Modules in Log: ", list(self.modules.keys()), sep="")
484
485
486
487
488
489
490
491

        loaded = {}
        for mod in self.records:
            loaded[mod] = len(self.records[mod])
        print("Loaded Records: ", loaded, sep="")

        print("Name Records:   ", len(self.name_records), sep="")
        
492
493
        if 'job' in self.metadata:
            print("Darshan/Hints:  ", self.metadata['job']['metadata'], sep="")
494
        print("DarshanReport:  id(", id(self), ") (tmp)", sep="")
495

496

497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
        if metadata:
            for key, val in self.metadata.items():
                if key == "job":
                    for key2, val2 in self.metadata[key].items():
                        print("metadata['", key ,"']['", key2, "'] = ", val2, sep="")
                else:
                    print("metadata['", key, "'] = ", val, sep="")
    
    
        #def get_size(obj, seen=None):
        #    """Recursively finds size of objects"""
        #    size = sys.getsizeof(obj)
        #    if seen is None:
        #        seen = set()
        #    obj_id = id(obj)
        #    if obj_id in seen:
        #        return 0
        #    # Important mark as seen *before* entering recursion to gracefully handle
        #    # self-referential objects
        #    seen.add(obj_id)
        #    if isinstance(obj, dict):
        #        size += sum([get_size(v, seen) for v in obj.values()])
        #        size += sum([get_size(k, seen) for k in obj.keys()])
        #    elif hasattr(obj, '__dict__'):
        #        size += get_size(obj.__dict__, seen)
        #    elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes, bytearray)):
        #        size += sum([get_size(i, seen) for i in obj])
        #    return size
525

526
        #print("Memory:", get_size(self), 'bytes')
527

528
529

    def to_dict():
530
        """
Jakob Luettgau's avatar
PEP8.    
Jakob Luettgau committed
531
        Return dictionary representation of report data.
532
533
534
535
536

        Args:
            None

        Return:
537
            dict
538
        """
539
        data = copy.deepcopy(self.data)
540

541
542
543
544
545
        recs = data['records']
        for mod in recs:
            for i, rec in enumerate(data['records'][mod]):
                recs[mod][i]['counters'] = rec['counters'].tolist()
                recs[mod][i]['fcounters'] = rec['fcounters'].tolist()
546

547
        return data
548
549


550
    def to_json(self):
551
        """
Jakob Luettgau's avatar
PEP8.    
Jakob Luettgau committed
552
        Return JSON representation of report data as string.
553
554
555
556
557
558
559

        Args:
            None

        Return:
            JSON String
        """
560
561
562
563
564
565
566
567
        data = copy.deepcopy(self.data)

        recs = data['records']
        for mod in recs:
            for i, rec in enumerate(data['records'][mod]):
                recs[mod][i]['counters'] = rec['counters'].tolist()
                recs[mod][i]['fcounters'] = rec['fcounters'].tolist()

568
        return json.dumps(data, cls=DarshanReportJSONEncoder)