1#! /usr/bin/env python 2 3# Copyright (C) 2002, 2003 by Martin Pool <mbp@samba.org> 4# Copyright (C) 2003 by Tim Potter <tpot@samba.org> 5# 6# This program is free software; you can redistribute it and/or 7# modify it under the terms of the GNU General Public License as 8# published by the Free Software Foundation; either version 3 of the 9# License, or (at your option) any later version. 10# 11# This program is distributed in the hope that it will be useful, but 12# WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14# General Public License for more details. 15# 16# You should have received a copy of the GNU General Public License 17# along with this program; if not, see <http://www.gnu.org/licenses/>. 18 19"""comfychair: a Python-based instrument of software torture. 20 21Copyright (C) 2002, 2003 by Martin Pool <mbp@samba.org> 22Copyright (C) 2003 by Tim Potter <tpot@samba.org> 23 24This is a test framework designed for testing programs written in 25Python, or (through a fork/exec interface) any other language. 26 27For more information, see the file README.comfychair. 28 29To run a test suite based on ComfyChair, just run it as a program. 30""" 31 32import sys, re 33 34 35class TestCase: 36 """A base class for tests. This class defines required functions which 37 can optionally be overridden by subclasses. It also provides some 38 utility functions for""" 39 40 def __init__(self): 41 self.test_log = "" 42 self.background_pids = [] 43 self._cleanups = [] 44 self._enter_rundir() 45 self._save_environment() 46 self.add_cleanup(self.teardown) 47 48 49 # -------------------------------------------------- 50 # Save and restore directory 51 def _enter_rundir(self): 52 import os 53 self.basedir = os.getcwd() 54 self.add_cleanup(self._restore_directory) 55 self.rundir = os.path.join(self.basedir, 56 'testtmp', 57 self.__class__.__name__) 58 self.tmpdir = os.path.join(self.rundir, 'tmp') 59 os.system("rm -fr %s" % self.rundir) 60 os.makedirs(self.tmpdir) 61 os.system("mkdir -p %s" % self.rundir) 62 os.chdir(self.rundir) 63 64 def _restore_directory(self): 65 import os 66 os.chdir(self.basedir) 67 68 # -------------------------------------------------- 69 # Save and restore environment 70 def _save_environment(self): 71 import os 72 self._saved_environ = os.environ.copy() 73 self.add_cleanup(self._restore_environment) 74 75 def _restore_environment(self): 76 import os 77 os.environ.clear() 78 os.environ.update(self._saved_environ) 79 80 81 def setup(self): 82 """Set up test fixture.""" 83 pass 84 85 def teardown(self): 86 """Tear down test fixture.""" 87 pass 88 89 def runtest(self): 90 """Run the test.""" 91 pass 92 93 94 def add_cleanup(self, c): 95 """Queue a cleanup to be run when the test is complete.""" 96 self._cleanups.append(c) 97 98 99 def fail(self, reason = ""): 100 """Say the test failed.""" 101 raise AssertionError(reason) 102 103 104 ############################################################# 105 # Requisition methods 106 107 def require(self, predicate, message): 108 """Check a predicate for running this test. 109 110If the predicate value is not true, the test is skipped with a message explaining 111why.""" 112 if not predicate: 113 raise NotRunError, message 114 115 def require_root(self): 116 """Skip this test unless run by root.""" 117 import os 118 self.require(os.getuid() == 0, 119 "must be root to run this test") 120 121 ############################################################# 122 # Assertion methods 123 124 def assert_(self, expr, reason = ""): 125 if not expr: 126 raise AssertionError(reason) 127 128 def assert_equal(self, a, b): 129 if not a == b: 130 raise AssertionError("assertEquals failed: %s" % `(a, b)`) 131 132 def assert_notequal(self, a, b): 133 if a == b: 134 raise AssertionError("assertNotEqual failed: %s" % `(a, b)`) 135 136 def assert_re_match(self, pattern, s): 137 """Assert that a string matches a particular pattern 138 139 Inputs: 140 pattern string: regular expression 141 s string: to be matched 142 143 Raises: 144 AssertionError if not matched 145 """ 146 if not re.match(pattern, s): 147 raise AssertionError("string does not match regexp\n" 148 " string: %s\n" 149 " re: %s" % (`s`, `pattern`)) 150 151 def assert_re_search(self, pattern, s): 152 """Assert that a string *contains* a particular pattern 153 154 Inputs: 155 pattern string: regular expression 156 s string: to be searched 157 158 Raises: 159 AssertionError if not matched 160 """ 161 if not re.search(pattern, s): 162 raise AssertionError("string does not contain regexp\n" 163 " string: %s\n" 164 " re: %s" % (`s`, `pattern`)) 165 166 167 def assert_no_file(self, filename): 168 import os.path 169 assert not os.path.exists(filename), ("file exists but should not: %s" % filename) 170 171 172 ############################################################# 173 # Methods for running programs 174 175 def runcmd_background(self, cmd): 176 import os 177 self.test_log = self.test_log + "Run in background:\n" + `cmd` + "\n" 178 pid = os.fork() 179 if pid == 0: 180 # child 181 try: 182 os.execvp("/bin/sh", ["/bin/sh", "-c", cmd]) 183 finally: 184 os._exit(127) 185 self.test_log = self.test_log + "pid: %d\n" % pid 186 return pid 187 188 189 def runcmd(self, cmd, expectedResult = 0): 190 """Run a command, fail if the command returns an unexpected exit 191 code. Return the output produced.""" 192 rc, output, stderr = self.runcmd_unchecked(cmd) 193 if rc != expectedResult: 194 raise AssertionError("""command returned %d; expected %s: \"%s\" 195stdout: 196%s 197stderr: 198%s""" % (rc, expectedResult, cmd, output, stderr)) 199 200 return output, stderr 201 202 203 def run_captured(self, cmd): 204 """Run a command, capturing stdout and stderr. 205 206 Based in part on popen2.py 207 208 Returns (waitstatus, stdout, stderr).""" 209 import os, types 210 pid = os.fork() 211 if pid == 0: 212 # child 213 try: 214 pid = os.getpid() 215 openmode = os.O_WRONLY|os.O_CREAT|os.O_TRUNC 216 217 outfd = os.open('%d.out' % pid, openmode, 0666) 218 os.dup2(outfd, 1) 219 os.close(outfd) 220 221 errfd = os.open('%d.err' % pid, openmode, 0666) 222 os.dup2(errfd, 2) 223 os.close(errfd) 224 225 if isinstance(cmd, types.StringType): 226 cmd = ['/bin/sh', '-c', cmd] 227 228 os.execvp(cmd[0], cmd) 229 finally: 230 os._exit(127) 231 else: 232 # parent 233 exited_pid, waitstatus = os.waitpid(pid, 0) 234 stdout = open('%d.out' % pid).read() 235 stderr = open('%d.err' % pid).read() 236 return waitstatus, stdout, stderr 237 238 239 def runcmd_unchecked(self, cmd, skip_on_noexec = 0): 240 """Invoke a command; return (exitcode, stdout, stderr)""" 241 import os 242 waitstatus, stdout, stderr = self.run_captured(cmd) 243 assert not os.WIFSIGNALED(waitstatus), \ 244 ("%s terminated with signal %d" % (`cmd`, os.WTERMSIG(waitstatus))) 245 rc = os.WEXITSTATUS(waitstatus) 246 self.test_log = self.test_log + ("""Run command: %s 247Wait status: %#x (exit code %d, signal %d) 248stdout: 249%s 250stderr: 251%s""" % (cmd, waitstatus, os.WEXITSTATUS(waitstatus), os.WTERMSIG(waitstatus), 252 stdout, stderr)) 253 if skip_on_noexec and rc == 127: 254 # Either we could not execute the command or the command 255 # returned exit code 127. According to system(3) we can't 256 # tell the difference. 257 raise NotRunError, "could not execute %s" % `cmd` 258 return rc, stdout, stderr 259 260 261 def explain_failure(self, exc_info = None): 262 print "test_log:" 263 print self.test_log 264 265 266 def log(self, msg): 267 """Log a message to the test log. This message is displayed if 268 the test fails, or when the runtests function is invoked with 269 the verbose option.""" 270 self.test_log = self.test_log + msg + "\n" 271 272 273class NotRunError(Exception): 274 """Raised if a test must be skipped because of missing resources""" 275 def __init__(self, value = None): 276 self.value = value 277 278 279def _report_error(case, debugger): 280 """Ask the test case to explain failure, and optionally run a debugger 281 282 Input: 283 case TestCase instance 284 debugger if true, a debugger function to be applied to the traceback 285""" 286 import sys 287 ex = sys.exc_info() 288 print "-----------------------------------------------------------------" 289 if ex: 290 import traceback 291 traceback.print_exc(file=sys.stdout) 292 case.explain_failure() 293 print "-----------------------------------------------------------------" 294 295 if debugger: 296 tb = ex[2] 297 debugger(tb) 298 299 300def runtests(test_list, verbose = 0, debugger = None): 301 """Run a series of tests. 302 303 Inputs: 304 test_list sequence of TestCase classes 305 verbose print more information as testing proceeds 306 debugger debugger object to be applied to errors 307 308 Returns: 309 unix return code: 0 for success, 1 for failures, 2 for test failure 310 """ 311 import traceback 312 ret = 0 313 for test_class in test_list: 314 print "%-30s" % _test_name(test_class), 315 # flush now so that long running tests are easier to follow 316 sys.stdout.flush() 317 318 obj = None 319 try: 320 try: # run test and show result 321 obj = test_class() 322 obj.setup() 323 obj.runtest() 324 print "OK" 325 except KeyboardInterrupt: 326 print "INTERRUPT" 327 _report_error(obj, debugger) 328 ret = 2 329 break 330 except NotRunError, msg: 331 print "NOTRUN, %s" % msg.value 332 except: 333 print "FAIL" 334 _report_error(obj, debugger) 335 ret = 1 336 finally: 337 while obj and obj._cleanups: 338 try: 339 apply(obj._cleanups.pop()) 340 except KeyboardInterrupt: 341 print "interrupted during teardown" 342 _report_error(obj, debugger) 343 ret = 2 344 break 345 except: 346 print "error during teardown" 347 _report_error(obj, debugger) 348 ret = 1 349 # Display log file if we're verbose 350 if ret == 0 and verbose: 351 obj.explain_failure() 352 353 return ret 354 355 356def _test_name(test_class): 357 """Return a human-readable name for a test class. 358 """ 359 try: 360 return test_class.__name__ 361 except: 362 return `test_class` 363 364 365def print_help(): 366 """Help for people running tests""" 367 import sys 368 print """%s: software test suite based on ComfyChair 369 370usage: 371 To run all tests, just run this program. To run particular tests, 372 list them on the command line. 373 374options: 375 --help show usage message 376 --list list available tests 377 --verbose, -v show more information while running tests 378 --post-mortem, -p enter Python debugger on error 379""" % sys.argv[0] 380 381 382def print_list(test_list): 383 """Show list of available tests""" 384 for test_class in test_list: 385 print " %s" % _test_name(test_class) 386 387 388def main(tests, extra_tests=[]): 389 """Main entry point for test suites based on ComfyChair. 390 391 inputs: 392 tests Sequence of TestCase subclasses to be run by default. 393 extra_tests Sequence of TestCase subclasses that are available but 394 not run by default. 395 396Test suites should contain this boilerplate: 397 398 if __name__ == '__main__': 399 comfychair.main(tests) 400 401This function handles standard options such as --help and --list, and 402by default runs all tests in the suggested order. 403 404Calls sys.exit() on completion. 405""" 406 from sys import argv 407 import getopt, sys 408 409 opt_verbose = 0 410 debugger = None 411 412 opts, args = getopt.getopt(argv[1:], 'pv', 413 ['help', 'list', 'verbose', 'post-mortem']) 414 for opt, opt_arg in opts: 415 if opt == '--help': 416 print_help() 417 return 418 elif opt == '--list': 419 print_list(tests + extra_tests) 420 return 421 elif opt == '--verbose' or opt == '-v': 422 opt_verbose = 1 423 elif opt == '--post-mortem' or opt == '-p': 424 import pdb 425 debugger = pdb.post_mortem 426 427 if args: 428 all_tests = tests + extra_tests 429 by_name = {} 430 for t in all_tests: 431 by_name[_test_name(t)] = t 432 which_tests = [] 433 for name in args: 434 which_tests.append(by_name[name]) 435 else: 436 which_tests = tests 437 438 sys.exit(runtests(which_tests, verbose=opt_verbose, 439 debugger=debugger)) 440 441 442if __name__ == '__main__': 443 print __doc__ 444