1#!/usr/bin/env python
2
3"""
4CmpRuns - A simple tool for comparing two static analyzer runs to determine
5which reports have been added, removed, or changed.
6
7This is designed to support automated testing using the static analyzer, from
8two perspectives:
9  1. To monitor changes in the static analyzer's reports on real code bases,
10     for regression testing.
11
12  2. For use by end users who want to integrate regular static analyzer testing
13     into a buildbot like environment.
14
15Usage:
16
17    # Load the results of both runs, to obtain lists of the corresponding
18    # AnalysisDiagnostic objects.
19    #
20    resultsA = load_results_from_single_run(singleRunInfoA, delete_empty)
21    resultsB = load_results_from_single_run(singleRunInfoB, delete_empty)
22
23    # Generate a relation from diagnostics in run A to diagnostics in run B
24    # to obtain a list of triples (a, b, confidence).
25    diff = compare_results(resultsA, resultsB)
26
27"""
28import json
29import os
30import plistlib
31import re
32import sys
33
34from math import log
35from collections import defaultdict
36from copy import copy
37from enum import Enum
38from typing import (Any, DefaultDict, Dict, List, NamedTuple, Optional,
39                    Sequence, Set, TextIO, TypeVar, Tuple, Union)
40
41
42Number = Union[int, float]
43Stats = Dict[str, Dict[str, Number]]
44Plist = Dict[str, Any]
45JSON = Dict[str, Any]
46# Diff in a form: field -> (before, after)
47JSONDiff = Dict[str, Tuple[str, str]]
48# Type for generics
49T = TypeVar('T')
50
51STATS_REGEXP = re.compile(r"Statistics: (\{.+\})", re.MULTILINE | re.DOTALL)
52
53
54class Colors:
55    """
56    Color for terminal highlight.
57    """
58    RED = '\x1b[2;30;41m'
59    GREEN = '\x1b[6;30;42m'
60    CLEAR = '\x1b[0m'
61
62
63class HistogramType(str, Enum):
64    RELATIVE = "relative"
65    LOG_RELATIVE = "log-relative"
66    ABSOLUTE = "absolute"
67
68
69class ResultsDirectory(NamedTuple):
70    path: str
71    root: str = ""
72
73
74class SingleRunInfo:
75    """
76    Information about analysis run:
77    path - the analysis output directory
78    root - the name of the root directory, which will be disregarded when
79    determining the source file name
80    """
81    def __init__(self, results: ResultsDirectory,
82                 verbose_log: Optional[str] = None):
83        self.path = results.path
84        self.root = results.root.rstrip("/\\")
85        self.verbose_log = verbose_log
86
87
88class AnalysisDiagnostic:
89    def __init__(self, data: Plist, report: "AnalysisReport",
90                 html_report: Optional[str]):
91        self._data = data
92        self._loc = self._data['location']
93        self._report = report
94        self._html_report = html_report
95        self._report_size = len(self._data['path'])
96
97    def get_file_name(self) -> str:
98        root = self._report.run.root
99        file_name = self._report.files[self._loc['file']]
100
101        if file_name.startswith(root) and len(root) > 0:
102            return file_name[len(root) + 1:]
103
104        return file_name
105
106    def get_root_file_name(self) -> str:
107        path = self._data['path']
108
109        if not path:
110            return self.get_file_name()
111
112        p = path[0]
113        if 'location' in p:
114            file_index = p['location']['file']
115        else:  # control edge
116            file_index = path[0]['edges'][0]['start'][0]['file']
117
118        out = self._report.files[file_index]
119        root = self._report.run.root
120
121        if out.startswith(root):
122            return out[len(root):]
123
124        return out
125
126    def get_line(self) -> int:
127        return self._loc['line']
128
129    def get_column(self) -> int:
130        return self._loc['col']
131
132    def get_path_length(self) -> int:
133        return self._report_size
134
135    def get_category(self) -> str:
136        return self._data['category']
137
138    def get_description(self) -> str:
139        return self._data['description']
140
141    def get_location(self) -> str:
142        return f"{self.get_file_name()}:{self.get_line()}:{self.get_column()}"
143
144    def get_issue_identifier(self) -> str:
145        id = self.get_file_name() + "+"
146
147        if "issue_context" in self._data:
148            id += self._data["issue_context"] + "+"
149
150        if "issue_hash_content_of_line_in_context" in self._data:
151            id += str(self._data["issue_hash_content_of_line_in_context"])
152
153        return id
154
155    def get_html_report(self) -> str:
156        if self._html_report is None:
157            return " "
158
159        return os.path.join(self._report.run.path, self._html_report)
160
161    def get_readable_name(self) -> str:
162        if "issue_context" in self._data:
163            funcname_postfix = "#" + self._data["issue_context"]
164        else:
165            funcname_postfix = ""
166
167        root_filename = self.get_root_file_name()
168        file_name = self.get_file_name()
169
170        if root_filename != file_name:
171            file_prefix = f"[{root_filename}] {file_name}"
172        else:
173            file_prefix = root_filename
174
175        line = self.get_line()
176        col = self.get_column()
177        return f"{file_prefix}{funcname_postfix}:{line}:{col}" \
178            f", {self.get_category()}: {self.get_description()}"
179
180    KEY_FIELDS = ["check_name", "category", "description"]
181
182    def is_similar_to(self, other: "AnalysisDiagnostic") -> bool:
183        # We consider two diagnostics similar only if at least one
184        # of the key fields is the same in both diagnostics.
185        return len(self.get_diffs(other)) != len(self.KEY_FIELDS)
186
187    def get_diffs(self, other: "AnalysisDiagnostic") -> JSONDiff:
188        return {field: (self._data[field], other._data[field])
189                for field in self.KEY_FIELDS
190                if self._data[field] != other._data[field]}
191
192    # Note, the data format is not an API and may change from one analyzer
193    # version to another.
194    def get_raw_data(self) -> Plist:
195        return self._data
196
197    def __eq__(self, other: object) -> bool:
198        return hash(self) == hash(other)
199
200    def __ne__(self, other: object) -> bool:
201        return hash(self) != hash(other)
202
203    def __hash__(self) -> int:
204        return hash(self.get_issue_identifier())
205
206
207class AnalysisRun:
208    def __init__(self, info: SingleRunInfo):
209        self.path = info.path
210        self.root = info.root
211        self.info = info
212        self.reports: List[AnalysisReport] = []
213        # Cumulative list of all diagnostics from all the reports.
214        self.diagnostics: List[AnalysisDiagnostic] = []
215        self.clang_version: Optional[str] = None
216        self.raw_stats: List[JSON] = []
217
218    def get_clang_version(self) -> Optional[str]:
219        return self.clang_version
220
221    def read_single_file(self, path: str, delete_empty: bool):
222        with open(path, "rb") as plist_file:
223            data = plistlib.load(plist_file)
224
225        if 'statistics' in data:
226            self.raw_stats.append(json.loads(data['statistics']))
227            data.pop('statistics')
228
229        # We want to retrieve the clang version even if there are no
230        # reports. Assume that all reports were created using the same
231        # clang version (this is always true and is more efficient).
232        if 'clang_version' in data:
233            if self.clang_version is None:
234                self.clang_version = data.pop('clang_version')
235            else:
236                data.pop('clang_version')
237
238        # Ignore/delete empty reports.
239        if not data['files']:
240            if delete_empty:
241                os.remove(path)
242            return
243
244        # Extract the HTML reports, if they exists.
245        htmlFiles = []
246        for d in data['diagnostics']:
247            if 'HTMLDiagnostics_files' in d:
248                # FIXME: Why is this named files, when does it have multiple
249                # files?
250                assert len(d['HTMLDiagnostics_files']) == 1
251                htmlFiles.append(d.pop('HTMLDiagnostics_files')[0])
252            else:
253                htmlFiles.append(None)
254
255        report = AnalysisReport(self, data.pop('files'))
256        # Python 3.10 offers zip(..., strict=True). The following assertion
257        # mimics it.
258        assert len(data['diagnostics']) == len(htmlFiles)
259        diagnostics = [AnalysisDiagnostic(d, report, h)
260                       for d, h in zip(data.pop('diagnostics'), htmlFiles)]
261
262        assert not data
263
264        report.diagnostics.extend(diagnostics)
265        self.reports.append(report)
266        self.diagnostics.extend(diagnostics)
267
268
269class AnalysisReport:
270    def __init__(self, run: AnalysisRun, files: List[str]):
271        self.run = run
272        self.files = files
273        self.diagnostics: List[AnalysisDiagnostic] = []
274
275
276def load_results(results: ResultsDirectory, delete_empty: bool = True,
277                 verbose_log: Optional[str] = None) -> AnalysisRun:
278    """
279    Backwards compatibility API.
280    """
281    return load_results_from_single_run(SingleRunInfo(results,
282                                                      verbose_log),
283                                        delete_empty)
284
285
286def load_results_from_single_run(info: SingleRunInfo,
287                                 delete_empty: bool = True) -> AnalysisRun:
288    """
289    # Load results of the analyzes from a given output folder.
290    # - info is the SingleRunInfo object
291    # - delete_empty specifies if the empty plist files should be deleted
292
293    """
294    path = info.path
295    run = AnalysisRun(info)
296
297    if os.path.isfile(path):
298        run.read_single_file(path, delete_empty)
299    else:
300        for dirpath, dirnames, filenames in os.walk(path):
301            for f in filenames:
302                if not f.endswith('plist'):
303                    continue
304
305                p = os.path.join(dirpath, f)
306                run.read_single_file(p, delete_empty)
307
308    return run
309
310
311def cmp_analysis_diagnostic(d):
312    return d.get_issue_identifier()
313
314
315AnalysisDiagnosticPair = Tuple[AnalysisDiagnostic, AnalysisDiagnostic]
316
317
318class ComparisonResult:
319    def __init__(self):
320        self.present_in_both: List[AnalysisDiagnostic] = []
321        self.present_only_in_old: List[AnalysisDiagnostic] = []
322        self.present_only_in_new: List[AnalysisDiagnostic] = []
323        self.changed_between_new_and_old: List[AnalysisDiagnosticPair] = []
324
325    def add_common(self, issue: AnalysisDiagnostic):
326        self.present_in_both.append(issue)
327
328    def add_removed(self, issue: AnalysisDiagnostic):
329        self.present_only_in_old.append(issue)
330
331    def add_added(self, issue: AnalysisDiagnostic):
332        self.present_only_in_new.append(issue)
333
334    def add_changed(self, old_issue: AnalysisDiagnostic,
335                    new_issue: AnalysisDiagnostic):
336        self.changed_between_new_and_old.append((old_issue, new_issue))
337
338
339GroupedDiagnostics = DefaultDict[str, List[AnalysisDiagnostic]]
340
341
342def get_grouped_diagnostics(diagnostics: List[AnalysisDiagnostic]
343                            ) -> GroupedDiagnostics:
344    result: GroupedDiagnostics = defaultdict(list)
345    for diagnostic in diagnostics:
346        result[diagnostic.get_location()].append(diagnostic)
347    return result
348
349
350def compare_results(results_old: AnalysisRun, results_new: AnalysisRun,
351                    histogram: Optional[HistogramType] = None
352                    ) -> ComparisonResult:
353    """
354    compare_results - Generate a relation from diagnostics in run A to
355    diagnostics in run B.
356
357    The result is the relation as a list of triples (a, b) where
358    each element {a,b} is None or a matching element from the respective run
359    """
360
361    res = ComparisonResult()
362
363    # Map size_before -> size_after
364    path_difference_data: List[float] = []
365
366    diags_old = get_grouped_diagnostics(results_old.diagnostics)
367    diags_new = get_grouped_diagnostics(results_new.diagnostics)
368
369    locations_old = set(diags_old.keys())
370    locations_new = set(diags_new.keys())
371
372    common_locations = locations_old & locations_new
373
374    for location in common_locations:
375        old = diags_old[location]
376        new = diags_new[location]
377
378        # Quadratic algorithms in this part are fine because 'old' and 'new'
379        # are most commonly of size 1.
380        common: Set[AnalysisDiagnostic] = set()
381        for a in old:
382            for b in new:
383                if a.get_issue_identifier() == b.get_issue_identifier():
384                    a_path_len = a.get_path_length()
385                    b_path_len = b.get_path_length()
386
387                    if a_path_len != b_path_len:
388
389                        if histogram == HistogramType.RELATIVE:
390                            path_difference_data.append(
391                                float(a_path_len) / b_path_len)
392
393                        elif histogram == HistogramType.LOG_RELATIVE:
394                            path_difference_data.append(
395                                log(float(a_path_len) / b_path_len))
396
397                        elif histogram == HistogramType.ABSOLUTE:
398                            path_difference_data.append(
399                                a_path_len - b_path_len)
400
401                    res.add_common(b)
402                    common.add(a)
403
404        old = filter_issues(old, common)
405        new = filter_issues(new, common)
406        common = set()
407
408        for a in old:
409            for b in new:
410                if a.is_similar_to(b):
411                    res.add_changed(a, b)
412                    common.add(a)
413                    common.add(b)
414
415        old = filter_issues(old, common)
416        new = filter_issues(new, common)
417
418        # Whatever is left in 'old' doesn't have a corresponding diagnostic
419        # in 'new', so we need to mark it as 'removed'.
420        for a in old:
421            res.add_removed(a)
422
423        # Whatever is left in 'new' doesn't have a corresponding diagnostic
424        # in 'old', so we need to mark it as 'added'.
425        for b in new:
426            res.add_added(b)
427
428    only_old_locations = locations_old - common_locations
429    for location in only_old_locations:
430        for a in diags_old[location]:
431            # These locations have been found only in the old build, so we
432            # need to mark all of therm as 'removed'
433            res.add_removed(a)
434
435    only_new_locations = locations_new - common_locations
436    for location in only_new_locations:
437        for b in diags_new[location]:
438            # These locations have been found only in the new build, so we
439            # need to mark all of therm as 'added'
440            res.add_added(b)
441
442    # FIXME: Add fuzzy matching. One simple and possible effective idea would
443    # be to bin the diagnostics, print them in a normalized form (based solely
444    # on the structure of the diagnostic), compute the diff, then use that as
445    # the basis for matching. This has the nice property that we don't depend
446    # in any way on the diagnostic format.
447
448    if histogram:
449        from matplotlib import pyplot
450        pyplot.hist(path_difference_data, bins=100)
451        pyplot.show()
452
453    return res
454
455
456def filter_issues(origin: List[AnalysisDiagnostic],
457                  to_remove: Set[AnalysisDiagnostic]) \
458                  -> List[AnalysisDiagnostic]:
459    return [diag for diag in origin if diag not in to_remove]
460
461
462def compute_percentile(values: Sequence[T], percentile: float) -> T:
463    """
464    Return computed percentile.
465    """
466    return sorted(values)[int(round(percentile * len(values) + 0.5)) - 1]
467
468
469def derive_stats(results: AnalysisRun) -> Stats:
470    # Assume all keys are the same in each statistics bucket.
471    combined_data = defaultdict(list)
472
473    # Collect data on paths length.
474    for report in results.reports:
475        for diagnostic in report.diagnostics:
476            combined_data['PathsLength'].append(diagnostic.get_path_length())
477
478    for stat in results.raw_stats:
479        for key, value in stat.items():
480            combined_data[str(key)].append(value)
481
482    combined_stats: Stats = {}
483
484    for key, values in combined_data.items():
485        combined_stats[key] = {
486            "max": max(values),
487            "min": min(values),
488            "mean": sum(values) / len(values),
489            "90th %tile": compute_percentile(values, 0.9),
490            "95th %tile": compute_percentile(values, 0.95),
491            "median": sorted(values)[len(values) // 2],
492            "total": sum(values)
493        }
494
495    return combined_stats
496
497
498# TODO: compare_results decouples comparison from the output, we should
499#       do it here as well
500def compare_stats(results_old: AnalysisRun, results_new: AnalysisRun,
501                  out: TextIO = sys.stdout):
502    stats_old = derive_stats(results_old)
503    stats_new = derive_stats(results_new)
504
505    old_keys = set(stats_old.keys())
506    new_keys = set(stats_new.keys())
507    keys = sorted(old_keys & new_keys)
508
509    for key in keys:
510        out.write(f"{key}\n")
511
512        nested_keys = sorted(set(stats_old[key]) & set(stats_new[key]))
513
514        for nested_key in nested_keys:
515            val_old = float(stats_old[key][nested_key])
516            val_new = float(stats_new[key][nested_key])
517
518            report = f"{val_old:.3f} -> {val_new:.3f}"
519
520            # Only apply highlighting when writing to TTY and it's not Windows
521            if out.isatty() and os.name != 'nt':
522                if val_new != 0:
523                    ratio = (val_new - val_old) / val_new
524                    if ratio < -0.2:
525                        report = Colors.GREEN + report + Colors.CLEAR
526                    elif ratio > 0.2:
527                        report = Colors.RED + report + Colors.CLEAR
528
529            out.write(f"\t {nested_key} {report}\n")
530
531    removed_keys = old_keys - new_keys
532    if removed_keys:
533        out.write(f"REMOVED statistics: {removed_keys}\n")
534
535    added_keys = new_keys - old_keys
536    if added_keys:
537        out.write(f"ADDED statistics: {added_keys}\n")
538
539    out.write("\n")
540
541
542def dump_scan_build_results_diff(dir_old: ResultsDirectory,
543                                 dir_new: ResultsDirectory,
544                                 delete_empty: bool = True,
545                                 out: TextIO = sys.stdout,
546                                 show_stats: bool = False,
547                                 stats_only: bool = False,
548                                 histogram: Optional[HistogramType] = None,
549                                 verbose_log: Optional[str] = None):
550    """
551    Compare directories with analysis results and dump results.
552
553    :param delete_empty: delete empty plist files
554    :param out: buffer to dump comparison results to.
555    :param show_stats: compare execution stats as well.
556    :param stats_only: compare ONLY execution stats.
557    :param histogram: optional histogram type to plot path differences.
558    :param verbose_log: optional path to an additional log file.
559    """
560    results_old = load_results(dir_old, delete_empty, verbose_log)
561    results_new = load_results(dir_new, delete_empty, verbose_log)
562
563    if show_stats or stats_only:
564        compare_stats(results_old, results_new)
565    if stats_only:
566        return
567
568    # Open the verbose log, if given.
569    if verbose_log:
570        aux_log: Optional[TextIO] = open(verbose_log, "w")
571    else:
572        aux_log = None
573
574    diff = compare_results(results_old, results_new, histogram)
575    found_diffs = 0
576    total_added = 0
577    total_removed = 0
578    total_modified = 0
579
580    for new in diff.present_only_in_new:
581        out.write(f"ADDED: {new.get_readable_name()}\n\n")
582        found_diffs += 1
583        total_added += 1
584        if aux_log:
585            aux_log.write(f"('ADDED', {new.get_readable_name()}, "
586                          f"{new.get_html_report()})\n")
587
588    for old in diff.present_only_in_old:
589        out.write(f"REMOVED: {old.get_readable_name()}\n\n")
590        found_diffs += 1
591        total_removed += 1
592        if aux_log:
593            aux_log.write(f"('REMOVED', {old.get_readable_name()}, "
594                          f"{old.get_html_report()})\n")
595
596    for old, new in diff.changed_between_new_and_old:
597        out.write(f"MODIFIED: {old.get_readable_name()}\n")
598        found_diffs += 1
599        total_modified += 1
600        diffs = old.get_diffs(new)
601        str_diffs = [f"          '{key}' changed: "
602                     f"'{old_value}' -> '{new_value}'"
603                     for key, (old_value, new_value) in diffs.items()]
604        out.write(",\n".join(str_diffs) + "\n\n")
605        if aux_log:
606            aux_log.write(f"('MODIFIED', {old.get_readable_name()}, "
607                          f"{old.get_html_report()})\n")
608
609    total_reports = len(results_new.diagnostics)
610    out.write(f"TOTAL REPORTS: {total_reports}\n")
611    out.write(f"TOTAL ADDED: {total_added}\n")
612    out.write(f"TOTAL REMOVED: {total_removed}\n")
613    out.write(f"TOTAL MODIFIED: {total_modified}\n")
614
615    if aux_log:
616        aux_log.write(f"('TOTAL NEW REPORTS', {total_reports})\n")
617        aux_log.write(f"('TOTAL DIFFERENCES', {found_diffs})\n")
618        aux_log.close()
619
620    # TODO: change to NamedTuple
621    return found_diffs, len(results_old.diagnostics), \
622        len(results_new.diagnostics)
623
624
625if __name__ == "__main__":
626    print("CmpRuns.py should not be used on its own.")
627    print("Please use 'SATest.py compare' instead")
628    sys.exit(1)
629