2# Copyright (c) 2009-2011, ETH Zurich.
3# All rights reserved.
5# This file is distributed under the terms in the attached LICENSE file.
6# If you do not find this file, copies can be found by writing to:
7# ETH Zurich D-INFK, Universitaetstrasse 6, CH-8092 Zurich. Attn: Systems Group.
10import os
11import types
12import string
13import datetime
14import debug
15import re
17class Harness:
18    RAW_FILE_NAME = 'raw.txt'
19    MENU_LST_FILE_NAME = 'menu.lst'
20    BOOT_FILE_NAME = 'bootlog.txt'
21    TERM_FILTER = re.compile("\[\d\d?m")
23    def _clean_line(self, line):
24        # filter output line of control characters
25        filtered_out = filter(lambda c: c in string.printable, line.rstrip())
26        # Delete terminal color codes from output
27        filtered_out = self.TERM_FILTER.sub('', filtered_out)
28        return filtered_out
30    def _write_menu_lst_debug(self, test, build, machine, path):
31        # Ignore for tests that do not implement get_modules
32        if hasattr(test, "get_modules"):
33            menu_lst_file_name = os.path.join(path, self.MENU_LST_FILE_NAME)
34            debug.verbose("harness: writing menu.lst to %s" % menu_lst_file_name)
35            with open(menu_lst_file_name, "w") as menu:
36                menu.write( test.get_modules(build, machine).get_menu_data("/") )
38    def run_test(self, build, machine, test, path):
39        # Open files for raw output from the victim and log data from the test
40        raw_file_name = os.path.join(path, self.RAW_FILE_NAME)
41        debug.verbose('open %s for raw output' % raw_file_name)
42        raw_file = open(raw_file_name, 'w')
44        # run the test, dumping the output to the raw file as we go
45        try:
46            debug.verbose('harness: setup test')
47            test.setup(build, machine, path)
48            self._write_menu_lst_debug(test, build, machine, path)
49            debug.verbose('harness: run test')
50            starttime = datetime.datetime.now()
51            for out in test.run(build, machine, path):
52                # timedelta for the time this line was emitted from the start of the run
53                timestamp = datetime.datetime.now() - starttime
54                # format as string, discarding sub-second precision
55                timestr = str(timestamp).split('.', 1)[0]
56                debug.debug('[%s] %s' % (timestr, self._clean_line(out)))
57                # log full raw line (without timestamp) to output file
58                raw_file.write(out)
59            debug.verbose('harness: output complete')
60        except KeyboardInterrupt:
61            # let the user know that we are on our way out
62            debug.error('Interrupted! Performing cleanup...')
63            raise
64        finally:
65            raw_file.close()
66            debug.verbose('harness: cleanup test')
67            test.cleanup(machine)
69    def process_output(self, test, path):
70        """Process raw.txt and return array of output lines that begins with grubs
71        output, avoids having encoding issues when generating other report files"""
73        raw_file_name = os.path.join(path, self.RAW_FILE_NAME)
75        if os.path.exists(raw_file_name):
76            idx = 0
77            with open(raw_file_name, 'r') as rf:
78                lines = rf.readlines()
79                for idx, line in enumerate(lines):
80                    if line.strip() == "root (nd)" or \
81                       line.strip().startswith("Kernel starting at address") or \
82                       "ARMv8-A: Barrelfish CPU driver starting on ARMv8" in line:
83                            break
84                if idx == len(lines)-1:
85                    debug.verbose('magic string "root (nd)" or "Kernel starting at address" not found, assuming no garbage in output')
86                    idx=0
88            return [ unicode(self._clean_line(l), errors='replace') for l in lines[idx:] ]
90        # file did not exist
91        return ["could not open %s to process test output" % raw_file_name]
93    def extract_errors(self, test, path):
94        raw_file_name = os.path.join(path, self.RAW_FILE_NAME)
95        debug.verbose('open %s for raw input' % raw_file_name)
96        raw_file = open(raw_file_name, 'r')
98        try:
99            results = test.process_data(path, raw_file)
100        finally:
101            raw_file.close()
103        errors = [results.reason()]
104        try:
105            errors += results.errors
106        except:
107            pass
109        return errors
112    def process_results(self, test, path):
113        # open raw file for input processing
114        raw_file_name = os.path.join(path, self.RAW_FILE_NAME)
115        debug.verbose('open %s for raw input' % raw_file_name)
116        raw_file = open(raw_file_name, 'r')
118        try:
119            results = test.process_data(path, raw_file)
120        finally:
121            raw_file.close()
122        if not results:
123            debug.verbose('no results')
124            return True  # no results, assume success
126        retval = True  # everything OK
128        # Process raw.txt and make a bootlog.txt that begins with grubs or
129        # Barrelfish's output, avoids having encoding issues when viewing logfiles
130        boot_file_name = os.path.join(path, self.BOOT_FILE_NAME)
131        if os.path.exists(raw_file_name):
132            idx = 0
133            with open(raw_file_name, 'r') as rf:
134                lines = rf.readlines()
135                for idx, line in enumerate(lines):
136                    if line.strip() == "root (nd)" or \
137                       "Barrelfish CPU driver starting" in line.strip():
138                        break
139            if idx > 0:
140                with open(boot_file_name, 'w') as wf:
141                    wf.writelines(lines[idx:])
142            else:
143                debug.verbose('Magic string root (nd) not found, do not write bootlog.txt')
144        else:
145            debug.verbose('No file named %s exists. Do not create bootlog.txt.' % raw_file_name)
147        # if a single result, turn it into a list
148        if not isinstance(results, types.ListType):
149            results = [results]
150        for result in results:
151            # see if it passed
152            try:
153                passed = result.passed()
154            except NotImplementedError:
155                passed = None
156            if passed is False:
157                debug.log('Test %s FAILED %s' % (test.name, '(' + result.reason() + ')') )
158                retval = False
159            elif passed:
160                debug.verbose('Test %s PASSED' % test.name)
162            # write it to a file
163            name = result.name if result.name else 'results'
164            data_file_name = os.path.join(path, name + '.dat')
165            debug.verbose('create %s for processed output' % data_file_name)
166            data_file = open(data_file_name, 'w')
167            try:
168                result.to_file(data_file)
169                data_file.close()
170            except NotImplementedError:
171                debug.verbose('no processed output, remove %s' % data_file_name)
172                data_file.close()
173                os.remove(data_file_name)
175        return retval