1# 2# Copyright (c) 2009-2011, ETH Zurich. 3# All rights reserved. 4# 5# This file is distributed under the terms in the attached LICENSE file. 6# If you do not find this file, copies can be found by writing to: 7# ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group. 8# 9 10import os 11import types 12import string 13import datetime 14import debug 15import re 16 17class Harness: 18 RAW_FILE_NAME = 'raw.txt' 19 MENU_LST_FILE_NAME = 'menu.lst' 20 BOOT_FILE_NAME = 'bootlog.txt' 21 TERM_FILTER = re.compile("\[\d\d?m") 22 23 def _clean_line(self, line): 24 # filter output line of control characters 25 filtered_out = filter(lambda c: c in string.printable, line.rstrip()) 26 # Delete terminal color codes from output 27 filtered_out = self.TERM_FILTER.sub('', filtered_out) 28 return filtered_out 29 30 def _write_menu_lst_debug(self, test, build, machine, path): 31 # Ignore for tests that do not implement get_modules 32 if hasattr(test, "get_modules"): 33 menu_lst_file_name = os.path.join(path, self.MENU_LST_FILE_NAME) 34 debug.verbose("Writing menu.lst to %s" % menu_lst_file_name) 35 with open(menu_lst_file_name, "w") as menu: 36 menu.write( test.get_modules(build, machine).get_menu_data("/") ) 37 38 def run_test(self, build, machine, test, path): 39 # Open files for raw output from the victim and log data from the test 40 raw_file_name = os.path.join(path, self.RAW_FILE_NAME) 41 debug.verbose('open %s for raw output' % raw_file_name) 42 raw_file = open(raw_file_name, 'w') 43 44 # run the test, dumping the output to the raw file as we go 45 try: 46 debug.verbose('harness: setup test') 47 test.setup(build, machine, path) 48 self._write_menu_lst_debug(test, build, machine, path) 49 debug.verbose('harness: run test') 50 starttime = datetime.datetime.now() 51 for out in test.run(build, machine, path): 52 # timedelta for the time this line was emitted from the start of the run 53 timestamp = datetime.datetime.now() - starttime 54 # format as string, discarding sub-second precision 55 timestr = str(timestamp).split('.', 1)[0] 56 debug.debug('[%s] %s' % (timestr, self._clean_line(out))) 57 # log full raw line (without timestamp) to output file 58 raw_file.write(out) 59 debug.verbose('harness: output complete') 60 except KeyboardInterrupt: 61 # let the user know that we are on our way out 62 debug.error('Interrupted! Performing cleanup...') 63 raise 64 finally: 65 raw_file.close() 66 debug.verbose('harness: cleanup test') 67 test.cleanup(machine) 68 69 def process_output(self, test, path): 70 """Process raw.txt and return array of output lines that begins with grubs 71 output, avoids having encoding issues when generating other report files""" 72 73 raw_file_name = os.path.join(path, self.RAW_FILE_NAME) 74 75 if os.path.exists(raw_file_name): 76 idx = 0 77 with open(raw_file_name, 'r') as rf: 78 lines = rf.readlines() 79 for idx, line in enumerate(lines): 80 if line.strip() == "root (nd)" or \ 81 line.strip().startswith("Kernel starting at address"): 82 break 83 if idx == len(lines)-1: 84 debug.verbose('magic string "root (nd)" or "Kernel starting at address" not found, assuming no garbage in output') 85 idx=0 86 87 return [ unicode(self._clean_line(l), errors='replace') for l in lines[idx:] ] 88 89 # file did not exist 90 return ["could not open %s to process test output" % raw_file_name] 91 92 def extract_errors(self, test, path): 93 raw_file_name = os.path.join(path, self.RAW_FILE_NAME) 94 debug.verbose('open %s for raw input' % raw_file_name) 95 raw_file = open(raw_file_name, 'r') 96 97 try: 98 results = test.process_data(path, raw_file) 99 finally: 100 raw_file.close() 101 102 errors = [results.reason()] 103 try: 104 errors += results.errors 105 except: 106 pass 107 108 return errors 109 110 111 def process_results(self, test, path): 112 # open raw file for input processing 113 raw_file_name = os.path.join(path, self.RAW_FILE_NAME) 114 debug.verbose('open %s for raw input' % raw_file_name) 115 raw_file = open(raw_file_name, 'r') 116 117 try: 118 results = test.process_data(path, raw_file) 119 finally: 120 raw_file.close() 121 if not results: 122 debug.verbose('no results') 123 return True # no results, assume success 124 125 retval = True # everything OK 126 127 # Process raw.txt and make a bootlog.txt that begins with grubs or 128 # Barrelfish's output, avoids having encoding issues when viewing logfiles 129 boot_file_name = os.path.join(path, self.BOOT_FILE_NAME) 130 if os.path.exists(raw_file_name): 131 idx = 0 132 with open(raw_file_name, 'r') as rf: 133 lines = rf.readlines() 134 for idx, line in enumerate(lines): 135 if line.strip() == "root (nd)" or \ 136 "Barrelfish CPU driver starting" in line.strip(): 137 break 138 if idx > 0: 139 with open(boot_file_name, 'w') as wf: 140 wf.writelines(lines[idx:]) 141 else: 142 debug.verbose('Magic string root (nd) not found, do not write bootlog.txt') 143 else: 144 debug.verbose('No file named %s exists. Do not create bootlog.txt.' % raw_file_name) 145 146 # if a single result, turn it into a list 147 if not isinstance(results, types.ListType): 148 results = [results] 149 for result in results: 150 # see if it passed 151 try: 152 passed = result.passed() 153 except NotImplementedError: 154 passed = None 155 if passed is False: 156 debug.log('Test %s FAILED %s' % (test.name, '(' + result.reason() + ')') ) 157 retval = False 158 elif passed: 159 debug.verbose('Test %s PASSED' % test.name) 160 161 # write it to a file 162 name = result.name if result.name else 'results' 163 data_file_name = os.path.join(path, name + '.dat') 164 debug.verbose('create %s for processed output' % data_file_name) 165 data_file = open(data_file_name, 'w') 166 try: 167 result.to_file(data_file) 168 data_file.close() 169 except NotImplementedError: 170 debug.verbose('no processed output, remove %s' % data_file_name) 171 data_file.close() 172 os.remove(data_file_name) 173 174 return retval 175