1# SPDX-License-Identifier: GPL-2.0
2import re
3import csv
4import json
5import argparse
6from pathlib import Path
7import subprocess
8
9
10class TestError:
11    def __init__(self, metric: list[str], wl: str, value: list[float], low: float, up=float('nan'), description=str()):
12        self.metric: list = metric  # multiple metrics in relationship type tests
13        self.workloads = [wl]  # multiple workloads possible
14        self.collectedValue: list = value
15        self.valueLowBound = low
16        self.valueUpBound = up
17        self.description = description
18
19    def __repr__(self) -> str:
20        if len(self.metric) > 1:
21            return "\nMetric Relationship Error: \tThe collected value of metric {0}\n\
22                \tis {1} in workload(s): {2} \n\
23                \tbut expected value range is [{3}, {4}]\n\
24                \tRelationship rule description: \'{5}\'".format(self.metric, self.collectedValue, self.workloads,
25                                                                 self.valueLowBound, self.valueUpBound, self.description)
26        elif len(self.collectedValue) == 0:
27            return "\nNo Metric Value Error: \tMetric {0} returns with no value \n\
28                    \tworkload(s): {1}".format(self.metric, self.workloads)
29        else:
30            return "\nWrong Metric Value Error: \tThe collected value of metric {0}\n\
31                    \tis {1} in workload(s): {2}\n\
32                    \tbut expected value range is [{3}, {4}]"\
33                        .format(self.metric, self.collectedValue, self.workloads,
34                                self.valueLowBound, self.valueUpBound)
35
36
37class Validator:
38    def __init__(self, rulefname, reportfname='', t=5, debug=False, datafname='', fullrulefname='', workload='true', metrics=''):
39        self.rulefname = rulefname
40        self.reportfname = reportfname
41        self.rules = None
42        self.collectlist: str = metrics
43        self.metrics = self.__set_metrics(metrics)
44        self.skiplist = set()
45        self.tolerance = t
46
47        self.workloads = [x for x in workload.split(",") if x]
48        self.wlidx = 0  # idx of current workloads
49        self.allresults = dict()  # metric results of all workload
50        self.alltotalcnt = dict()
51        self.allpassedcnt = dict()
52
53        self.results = dict()  # metric results of current workload
54        # vars for test pass/failure statistics
55        # metrics with no results or negative results, neg result counts failed tests
56        self.ignoremetrics = set()
57        self.totalcnt = 0
58        self.passedcnt = 0
59        # vars for errors
60        self.errlist = list()
61
62        # vars for Rule Generator
63        self.pctgmetrics = set()  # Percentage rule
64
65        # vars for debug
66        self.datafname = datafname
67        self.debug = debug
68        self.fullrulefname = fullrulefname
69
70    def __set_metrics(self, metrics=''):
71        if metrics != '':
72            return set(metrics.split(","))
73        else:
74            return set()
75
76    def read_json(self, filename: str) -> dict:
77        try:
78            with open(Path(filename).resolve(), "r") as f:
79                data = json.loads(f.read())
80        except OSError as e:
81            print(f"Error when reading file {e}")
82            sys.exit()
83
84        return data
85
86    def json_dump(self, data, output_file):
87        parent = Path(output_file).parent
88        if not parent.exists():
89            parent.mkdir(parents=True)
90
91        with open(output_file, "w+") as output_file:
92            json.dump(data,
93                      output_file,
94                      ensure_ascii=True,
95                      indent=4)
96
97    def get_results(self, idx: int = 0):
98        return self.results[idx]
99
100    def get_bounds(self, lb, ub, error, alias={}, ridx: int = 0) -> list:
101        """
102        Get bounds and tolerance from lb, ub, and error.
103        If missing lb, use 0.0; missing ub, use float('inf); missing error, use self.tolerance.
104
105        @param lb: str/float, lower bound
106        @param ub: str/float, upper bound
107        @param error: float/str, error tolerance
108        @returns: lower bound, return inf if the lower bound is a metric value and is not collected
109                  upper bound, return -1 if the upper bound is a metric value and is not collected
110                  tolerance, denormalized base on upper bound value
111        """
112        # init ubv and lbv to invalid values
113        def get_bound_value(bound, initval, ridx):
114            val = initval
115            if isinstance(bound, int) or isinstance(bound, float):
116                val = bound
117            elif isinstance(bound, str):
118                if bound == '':
119                    val = float("inf")
120                elif bound in alias:
121                    vall = self.get_value(alias[ub], ridx)
122                    if vall:
123                        val = vall[0]
124                elif bound.replace('.', '1').isdigit():
125                    val = float(bound)
126                else:
127                    print("Wrong bound: {0}".format(bound))
128            else:
129                print("Wrong bound: {0}".format(bound))
130            return val
131
132        ubv = get_bound_value(ub, -1, ridx)
133        lbv = get_bound_value(lb, float('inf'), ridx)
134        t = get_bound_value(error, self.tolerance, ridx)
135
136        # denormalize error threshold
137        denormerr = t * ubv / 100 if ubv != 100 and ubv > 0 else t
138
139        return lbv, ubv, denormerr
140
141    def get_value(self, name: str, ridx: int = 0) -> list:
142        """
143        Get value of the metric from self.results.
144        If result of this metric is not provided, the metric name will be added into self.ignoremetics.
145        All future test(s) on this metric will fail.
146
147        @param name: name of the metric
148        @returns: list with value found in self.results; list is empty when value is not found.
149        """
150        results = []
151        data = self.results[ridx] if ridx in self.results else self.results[0]
152        if name not in self.ignoremetrics:
153            if name in data:
154                results.append(data[name])
155            elif name.replace('.', '1').isdigit():
156                results.append(float(name))
157            else:
158                self.ignoremetrics.add(name)
159        return results
160
161    def check_bound(self, val, lb, ub, err):
162        return True if val <= ub + err and val >= lb - err else False
163
164    # Positive Value Sanity check
165    def pos_val_test(self):
166        """
167        Check if metrics value are non-negative.
168        One metric is counted as one test.
169        Failure: when metric value is negative or not provided.
170        Metrics with negative value will be added into self.ignoremetrics.
171        """
172        negmetric = dict()
173        pcnt = 0
174        tcnt = 0
175        rerun = list()
176        for name, val in self.get_results().items():
177            if val < 0:
178                negmetric[name] = val
179                rerun.append(name)
180            else:
181                pcnt += 1
182            tcnt += 1
183        # The first round collect_perf() run these metrics with simple workload
184        # "true". We give metrics a second chance with a longer workload if less
185        # than 20 metrics failed positive test.
186        if len(rerun) > 0 and len(rerun) < 20:
187            second_results = dict()
188            self.second_test(rerun, second_results)
189            for name, val in second_results.items():
190                if name not in negmetric:
191                    continue
192                if val >= 0:
193                    del negmetric[name]
194                    pcnt += 1
195
196        if len(negmetric.keys()):
197            self.ignoremetrics.update(negmetric.keys())
198            self.errlist.extend(
199                [TestError([m], self.workloads[self.wlidx], negmetric[m], 0) for m in negmetric.keys()])
200
201        return
202
203    def evaluate_formula(self, formula: str, alias: dict, ridx: int = 0):
204        """
205        Evaluate the value of formula.
206
207        @param formula: the formula to be evaluated
208        @param alias: the dict has alias to metric name mapping
209        @returns: value of the formula is success; -1 if the one or more metric value not provided
210        """
211        stack = []
212        b = 0
213        errs = []
214        sign = "+"
215        f = str()
216
217        # TODO: support parenthesis?
218        for i in range(len(formula)):
219            if i+1 == len(formula) or formula[i] in ('+', '-', '*', '/'):
220                s = alias[formula[b:i]] if i + \
221                    1 < len(formula) else alias[formula[b:]]
222                v = self.get_value(s, ridx)
223                if not v:
224                    errs.append(s)
225                else:
226                    f = f + "{0}(={1:.4f})".format(s, v[0])
227                    if sign == "*":
228                        stack[-1] = stack[-1] * v
229                    elif sign == "/":
230                        stack[-1] = stack[-1] / v
231                    elif sign == '-':
232                        stack.append(-v[0])
233                    else:
234                        stack.append(v[0])
235                if i + 1 < len(formula):
236                    sign = formula[i]
237                    f += sign
238                    b = i + 1
239
240        if len(errs) > 0:
241            return -1, "Metric value missing: "+','.join(errs)
242
243        val = sum(stack)
244        return val, f
245
246    # Relationships Tests
247    def relationship_test(self, rule: dict):
248        """
249        Validate if the metrics follow the required relationship in the rule.
250        eg. lower_bound <= eval(formula)<= upper_bound
251        One rule is counted as ont test.
252        Failure: when one or more metric result(s) not provided, or when formula evaluated outside of upper/lower bounds.
253
254        @param rule: dict with metric name(+alias), formula, and required upper and lower bounds.
255        """
256        alias = dict()
257        for m in rule['Metrics']:
258            alias[m['Alias']] = m['Name']
259        lbv, ubv, t = self.get_bounds(
260            rule['RangeLower'], rule['RangeUpper'], rule['ErrorThreshold'], alias, ridx=rule['RuleIndex'])
261        val, f = self.evaluate_formula(
262            rule['Formula'], alias, ridx=rule['RuleIndex'])
263
264        lb = rule['RangeLower']
265        ub = rule['RangeUpper']
266        if isinstance(lb, str):
267            if lb in alias:
268                lb = alias[lb]
269        if isinstance(ub, str):
270            if ub in alias:
271                ub = alias[ub]
272
273        if val == -1:
274            self.errlist.append(TestError([m['Name'] for m in rule['Metrics']], self.workloads[self.wlidx], [],
275                                lb, ub, rule['Description']))
276        elif not self.check_bound(val, lbv, ubv, t):
277            self.errlist.append(TestError([m['Name'] for m in rule['Metrics']], self.workloads[self.wlidx], [val],
278                                lb, ub, rule['Description']))
279        else:
280            self.passedcnt += 1
281        self.totalcnt += 1
282
283        return
284
285    # Single Metric Test
286    def single_test(self, rule: dict):
287        """
288        Validate if the metrics are in the required value range.
289        eg. lower_bound <= metrics_value <= upper_bound
290        One metric is counted as one test in this type of test.
291        One rule may include one or more metrics.
292        Failure: when the metric value not provided or the value is outside the bounds.
293        This test updates self.total_cnt.
294
295        @param rule: dict with metrics to validate and the value range requirement
296        """
297        lbv, ubv, t = self.get_bounds(
298            rule['RangeLower'], rule['RangeUpper'], rule['ErrorThreshold'])
299        metrics = rule['Metrics']
300        passcnt = 0
301        totalcnt = 0
302        failures = dict()
303        rerun = list()
304        for m in metrics:
305            totalcnt += 1
306            result = self.get_value(m['Name'])
307            if len(result) > 0 and self.check_bound(result[0], lbv, ubv, t) or m['Name'] in self.skiplist:
308                passcnt += 1
309            else:
310                failures[m['Name']] = result
311                rerun.append(m['Name'])
312
313        if len(rerun) > 0 and len(rerun) < 20:
314            second_results = dict()
315            self.second_test(rerun, second_results)
316            for name, val in second_results.items():
317                if name not in failures:
318                    continue
319                if self.check_bound(val, lbv, ubv, t):
320                    passcnt += 1
321                    del failures[name]
322                else:
323                    failures[name] = [val]
324                    self.results[0][name] = val
325
326        self.totalcnt += totalcnt
327        self.passedcnt += passcnt
328        if len(failures.keys()) != 0:
329            self.errlist.extend([TestError([name], self.workloads[self.wlidx], val,
330                                rule['RangeLower'], rule['RangeUpper']) for name, val in failures.items()])
331
332        return
333
334    def create_report(self):
335        """
336        Create final report and write into a JSON file.
337        """
338        print(self.errlist)
339
340        if self.debug:
341            allres = [{"Workload": self.workloads[i], "Results": self.allresults[i]}
342                      for i in range(0, len(self.workloads))]
343            self.json_dump(allres, self.datafname)
344
345    def check_rule(self, testtype, metric_list):
346        """
347        Check if the rule uses metric(s) that not exist in current platform.
348
349        @param metric_list: list of metrics from the rule.
350        @return: False when find one metric out in Metric file. (This rule should not skipped.)
351                 True when all metrics used in the rule are found in Metric file.
352        """
353        if testtype == "RelationshipTest":
354            for m in metric_list:
355                if m['Name'] not in self.metrics:
356                    return False
357        return True
358
359    # Start of Collector and Converter
360    def convert(self, data: list, metricvalues: dict):
361        """
362        Convert collected metric data from the -j output to dict of {metric_name:value}.
363        """
364        for json_string in data:
365            try:
366                result = json.loads(json_string)
367                if "metric-unit" in result and result["metric-unit"] != "(null)" and result["metric-unit"] != "":
368                    name = result["metric-unit"].split("  ")[1] if len(result["metric-unit"].split("  ")) > 1 \
369                        else result["metric-unit"]
370                    metricvalues[name.lower()] = float(result["metric-value"])
371            except ValueError as error:
372                continue
373        return
374
375    def _run_perf(self, metric, workload: str):
376        tool = 'perf'
377        command = [tool, 'stat', '-j', '-M', f"{metric}", "-a"]
378        wl = workload.split()
379        command.extend(wl)
380        print(" ".join(command))
381        cmd = subprocess.run(command, stderr=subprocess.PIPE, encoding='utf-8')
382        data = [x+'}' for x in cmd.stderr.split('}\n') if x]
383        if data[0][0] != '{':
384            data[0] = data[0][data[0].find('{'):]
385        return data
386
387    def collect_perf(self, workload: str):
388        """
389        Collect metric data with "perf stat -M" on given workload with -a and -j.
390        """
391        self.results = dict()
392        print(f"Starting perf collection")
393        print(f"Long workload: {workload}")
394        collectlist = dict()
395        if self.collectlist != "":
396            collectlist[0] = {x for x in self.collectlist.split(",")}
397        else:
398            collectlist[0] = set(list(self.metrics))
399        # Create metric set for relationship rules
400        for rule in self.rules:
401            if rule["TestType"] == "RelationshipTest":
402                metrics = [m["Name"] for m in rule["Metrics"]]
403                if not any(m not in collectlist[0] for m in metrics):
404                    collectlist[rule["RuleIndex"]] = [
405                        ",".join(list(set(metrics)))]
406
407        for idx, metrics in collectlist.items():
408            if idx == 0:
409                wl = "true"
410            else:
411                wl = workload
412            for metric in metrics:
413                data = self._run_perf(metric, wl)
414                if idx not in self.results:
415                    self.results[idx] = dict()
416                self.convert(data, self.results[idx])
417        return
418
419    def second_test(self, collectlist, second_results):
420        workload = self.workloads[self.wlidx]
421        for metric in collectlist:
422            data = self._run_perf(metric, workload)
423            self.convert(data, second_results)
424
425    # End of Collector and Converter
426
427    # Start of Rule Generator
428    def parse_perf_metrics(self):
429        """
430        Read and parse perf metric file:
431        1) find metrics with '1%' or '100%' as ScaleUnit for Percent check
432        2) create metric name list
433        """
434        command = ['perf', 'list', '-j', '--details', 'metrics']
435        cmd = subprocess.run(command, stdout=subprocess.PIPE,
436                             stderr=subprocess.PIPE, encoding='utf-8')
437        try:
438            data = json.loads(cmd.stdout)
439            for m in data:
440                if 'MetricName' not in m:
441                    print("Warning: no metric name")
442                    continue
443                name = m['MetricName'].lower()
444                self.metrics.add(name)
445                if 'ScaleUnit' in m and (m['ScaleUnit'] == '1%' or m['ScaleUnit'] == '100%'):
446                    self.pctgmetrics.add(name.lower())
447        except ValueError as error:
448            print(f"Error when parsing metric data")
449            sys.exit()
450
451        return
452
453    def remove_unsupported_rules(self, rules):
454        new_rules = []
455        for rule in rules:
456            add_rule = True
457            for m in rule["Metrics"]:
458                if m["Name"] in self.skiplist or m["Name"] not in self.metrics:
459                    add_rule = False
460                    break
461            if add_rule:
462                new_rules.append(rule)
463        return new_rules
464
465    def create_rules(self):
466        """
467        Create full rules which includes:
468        1) All the rules from the "relationshi_rules" file
469        2) SingleMetric rule for all the 'percent' metrics
470
471        Reindex all the rules to avoid repeated RuleIndex
472        """
473        data = self.read_json(self.rulefname)
474        rules = data['RelationshipRules']
475        self.skiplist = set([name.lower() for name in data['SkipList']])
476        self.rules = self.remove_unsupported_rules(rules)
477        pctgrule = {'RuleIndex': 0,
478                    'TestType': 'SingleMetricTest',
479                    'RangeLower': '0',
480                    'RangeUpper': '100',
481                    'ErrorThreshold': self.tolerance,
482                    'Description': 'Metrics in percent unit have value with in [0, 100]',
483                    'Metrics': [{'Name': m.lower()} for m in self.pctgmetrics]}
484        self.rules.append(pctgrule)
485
486        # Re-index all rules to avoid repeated RuleIndex
487        idx = 1
488        for r in self.rules:
489            r['RuleIndex'] = idx
490            idx += 1
491
492        if self.debug:
493            # TODO: need to test and generate file name correctly
494            data = {'RelationshipRules': self.rules, 'SupportedMetrics': [
495                {"MetricName": name} for name in self.metrics]}
496            self.json_dump(data, self.fullrulefname)
497
498        return
499    # End of Rule Generator
500
501    def _storewldata(self, key):
502        '''
503        Store all the data of one workload into the corresponding data structure for all workloads.
504        @param key: key to the dictionaries (index of self.workloads).
505        '''
506        self.allresults[key] = self.results
507        self.alltotalcnt[key] = self.totalcnt
508        self.allpassedcnt[key] = self.passedcnt
509
510    # Initialize data structures before data validation of each workload
511    def _init_data(self):
512
513        testtypes = ['PositiveValueTest',
514                     'RelationshipTest', 'SingleMetricTest']
515        self.results = dict()
516        self.ignoremetrics = set()
517        self.errlist = list()
518        self.totalcnt = 0
519        self.passedcnt = 0
520
521    def test(self):
522        '''
523        The real entry point of the test framework.
524        This function loads the validation rule JSON file and Standard Metric file to create rules for
525        testing and namemap dictionaries.
526        It also reads in result JSON file for testing.
527
528        In the test process, it passes through each rule and launch correct test function bases on the
529        'TestType' field of the rule.
530
531        The final report is written into a JSON file.
532        '''
533        if not self.collectlist:
534            self.parse_perf_metrics()
535        self.create_rules()
536        for i in range(0, len(self.workloads)):
537            self.wlidx = i
538            self._init_data()
539            self.collect_perf(self.workloads[i])
540            # Run positive value test
541            self.pos_val_test()
542            for r in self.rules:
543                # skip rules that uses metrics not exist in this platform
544                testtype = r['TestType']
545                if not self.check_rule(testtype, r['Metrics']):
546                    continue
547                if testtype == 'RelationshipTest':
548                    self.relationship_test(r)
549                elif testtype == 'SingleMetricTest':
550                    self.single_test(r)
551                else:
552                    print("Unsupported Test Type: ", testtype)
553            print("Workload: ", self.workloads[i])
554            print("Total Test Count: ", self.totalcnt)
555            print("Passed Test Count: ", self.passedcnt)
556            self._storewldata(i)
557        self.create_report()
558        return len(self.errlist) > 0
559# End of Class Validator
560
561
562def main() -> None:
563    parser = argparse.ArgumentParser(
564        description="Launch metric value validation")
565
566    parser.add_argument(
567        "-rule", help="Base validation rule file", required=True)
568    parser.add_argument(
569        "-output_dir", help="Path for validator output file, report file", required=True)
570    parser.add_argument("-debug", help="Debug run, save intermediate data to files",
571                        action="store_true", default=False)
572    parser.add_argument(
573        "-wl", help="Workload to run while data collection", default="true")
574    parser.add_argument("-m", help="Metric list to validate", default="")
575    args = parser.parse_args()
576    outpath = Path(args.output_dir)
577    reportf = Path.joinpath(outpath, 'perf_report.json')
578    fullrule = Path.joinpath(outpath, 'full_rule.json')
579    datafile = Path.joinpath(outpath, 'perf_data.json')
580
581    validator = Validator(args.rule, reportf, debug=args.debug,
582                          datafname=datafile, fullrulefname=fullrule, workload=args.wl,
583                          metrics=args.m)
584    ret = validator.test()
585
586    return ret
587
588
589if __name__ == "__main__":
590    import sys
591    sys.exit(main())
592