1#
2# Copyright (c) 2009-2011, ETH Zurich.
3# All rights reserved.
4#
5# This file is distributed under the terms in the attached LICENSE file.
6# If you do not find this file, copies can be found by writing to:
7# ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
8#
9
10import os
11import types
12import string
13import datetime
14import debug
15import re
16
17class Harness:
18    RAW_FILE_NAME = 'raw.txt'
19    MENU_LST_FILE_NAME = 'menu.lst'
20    BOOT_FILE_NAME = 'bootlog.txt'
21    TERM_FILTER = re.compile("\[\d\d?m")
22
23    def _clean_line(self, line):
24        # filter output line of control characters
25        filtered_out = filter(lambda c: c in string.printable, line.rstrip())
26        # Delete terminal color codes from output
27        filtered_out = self.TERM_FILTER.sub('', filtered_out)
28        return filtered_out
29
30    def _write_menu_lst_debug(self, test, build, machine, path):
31        # Ignore for tests that do not implement get_modules
32        if hasattr(test, "get_modules"):
33            menu_lst_file_name = os.path.join(path, self.MENU_LST_FILE_NAME)
34            debug.verbose("Writing menu.lst to %s" % menu_lst_file_name)
35            with open(menu_lst_file_name, "w") as menu:
36                menu.write( test.get_modules(build, machine).get_menu_data("/") )
37
38    def run_test(self, build, machine, test, path):
39        # Open files for raw output from the victim and log data from the test
40        raw_file_name = os.path.join(path, self.RAW_FILE_NAME)
41        debug.verbose('open %s for raw output' % raw_file_name)
42        raw_file = open(raw_file_name, 'w')
43
44        # run the test, dumping the output to the raw file as we go
45        try:
46            debug.verbose('harness: setup test')
47            test.setup(build, machine, path)
48            self._write_menu_lst_debug(test, build, machine, path)
49            debug.verbose('harness: run test')
50            starttime = datetime.datetime.now()
51            for out in test.run(build, machine, path):
52                # timedelta for the time this line was emitted from the start of the run
53                timestamp = datetime.datetime.now() - starttime
54                # format as string, discarding sub-second precision
55                timestr = str(timestamp).split('.', 1)[0]
56                debug.debug('[%s] %s' % (timestr, self._clean_line(out)))
57                # log full raw line (without timestamp) to output file
58                raw_file.write(out)
59            debug.verbose('harness: output complete')
60        except KeyboardInterrupt:
61            # let the user know that we are on our way out
62            debug.error('Interrupted! Performing cleanup...')
63            raise
64        finally:
65            raw_file.close()
66            debug.verbose('harness: cleanup test')
67            test.cleanup(machine)
68
69    def process_output(self, test, path):
70        """Process raw.txt and return array of output lines that begins with grubs
71        output, avoids having encoding issues when generating other report files"""
72
73        raw_file_name = os.path.join(path, self.RAW_FILE_NAME)
74
75        if os.path.exists(raw_file_name):
76            idx = 0
77            with open(raw_file_name, 'r') as rf:
78                lines = rf.readlines()
79                for idx, line in enumerate(lines):
80                    if line.strip() == "root (nd)" or \
81                       line.strip().startswith("Kernel starting at address"):
82                        break
83                if idx == len(lines)-1:
84                    debug.verbose('magic string "root (nd)" or "Kernel starting at address" not found, assuming no garbage in output')
85                    idx=0
86
87            return [ unicode(self._clean_line(l), errors='replace') for l in lines[idx:] ]
88
89        # file did not exist
90        return ["could not open %s to process test output" % raw_file_name]
91
92    def extract_errors(self, test, path):
93        raw_file_name = os.path.join(path, self.RAW_FILE_NAME)
94        debug.verbose('open %s for raw input' % raw_file_name)
95        raw_file = open(raw_file_name, 'r')
96
97        try:
98            results = test.process_data(path, raw_file)
99        finally:
100            raw_file.close()
101
102        errors = [results.reason()]
103        try:
104            errors += results.errors
105        except:
106            pass
107
108        return errors
109
110
111    def process_results(self, test, path):
112        # open raw file for input processing
113        raw_file_name = os.path.join(path, self.RAW_FILE_NAME)
114        debug.verbose('open %s for raw input' % raw_file_name)
115        raw_file = open(raw_file_name, 'r')
116
117        try:
118            results = test.process_data(path, raw_file)
119        finally:
120            raw_file.close()
121        if not results:
122            debug.verbose('no results')
123            return True  # no results, assume success
124
125        retval = True  # everything OK
126
127        # Process raw.txt and make a bootlog.txt that begins with grubs or
128        # Barrelfish's output, avoids having encoding issues when viewing logfiles
129        boot_file_name = os.path.join(path, self.BOOT_FILE_NAME)
130        if os.path.exists(raw_file_name):
131            idx = 0
132            with open(raw_file_name, 'r') as rf:
133                lines = rf.readlines()
134                for idx, line in enumerate(lines):
135                    if line.strip() == "root (nd)" or \
136                       "Barrelfish CPU driver starting" in line.strip():
137                        break
138            if idx > 0:
139                with open(boot_file_name, 'w') as wf:
140                    wf.writelines(lines[idx:])
141            else:
142                debug.verbose('Magic string root (nd) not found, do not write bootlog.txt')
143        else:
144            debug.verbose('No file named %s exists. Do not create bootlog.txt.' % raw_file_name)
145
146        # if a single result, turn it into a list
147        if not isinstance(results, types.ListType):
148            results = [results]
149        for result in results:
150            # see if it passed
151            try:
152                passed = result.passed()
153            except NotImplementedError:
154                passed = None
155            if passed is False:
156                debug.log('Test %s FAILED %s' % (test.name, '(' + result.reason() + ')') )
157                retval = False
158            elif passed:
159                debug.verbose('Test %s PASSED' % test.name)
160
161            # write it to a file
162            name = result.name if result.name else 'results'
163            data_file_name = os.path.join(path, name + '.dat')
164            debug.verbose('create %s for processed output' % data_file_name)
165            data_file = open(data_file_name, 'w')
166            try:
167                result.to_file(data_file)
168                data_file.close()
169            except NotImplementedError:
170                debug.verbose('no processed output, remove %s' % data_file_name)
171                data_file.close()
172                os.remove(data_file_name)
173
174        return retval
175