1#! /usr/bin/env python 2 3# Copyright (C) 2002, 2003 by Martin Pool <mbp@samba.org> 4# Copyright (C) 2003 by Tim Potter <tpot@samba.org> 5# 6# This program is free software; you can redistribute it and/or 7# modify it under the terms of the GNU General Public License as 8# published by the Free Software Foundation; either version 2 of the 9# License, or (at your option) any later version. 10# 11# This program is distributed in the hope that it will be useful, but 12# WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14# General Public License for more details. 15# 16# You should have received a copy of the GNU General Public License 17# along with this program; if not, write to the Free Software 18# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 19# USA 20 21"""comfychair: a Python-based instrument of software torture. 22 23Copyright (C) 2002, 2003 by Martin Pool <mbp@samba.org> 24Copyright (C) 2003 by Tim Potter <tpot@samba.org> 25 26This is a test framework designed for testing programs written in 27Python, or (through a fork/exec interface) any other language. 28 29For more information, see the file README.comfychair. 30 31To run a test suite based on ComfyChair, just run it as a program. 32""" 33 34import sys, re 35 36 37class TestCase: 38 """A base class for tests. This class defines required functions which 39 can optionally be overridden by subclasses. It also provides some 40 utility functions for""" 41 42 def __init__(self): 43 self.test_log = "" 44 self.background_pids = [] 45 self._cleanups = [] 46 self._enter_rundir() 47 self._save_environment() 48 self.add_cleanup(self.teardown) 49 50 51 # -------------------------------------------------- 52 # Save and restore directory 53 def _enter_rundir(self): 54 import os 55 self.basedir = os.getcwd() 56 self.add_cleanup(self._restore_directory) 57 self.rundir = os.path.join(self.basedir, 58 'testtmp', 59 self.__class__.__name__) 60 self.tmpdir = os.path.join(self.rundir, 'tmp') 61 os.system("rm -fr %s" % self.rundir) 62 os.makedirs(self.tmpdir) 63 os.system("mkdir -p %s" % self.rundir) 64 os.chdir(self.rundir) 65 66 def _restore_directory(self): 67 import os 68 os.chdir(self.basedir) 69 70 # -------------------------------------------------- 71 # Save and restore environment 72 def _save_environment(self): 73 import os 74 self._saved_environ = os.environ.copy() 75 self.add_cleanup(self._restore_environment) 76 77 def _restore_environment(self): 78 import os 79 os.environ.clear() 80 os.environ.update(self._saved_environ) 81 82 83 def setup(self): 84 """Set up test fixture.""" 85 pass 86 87 def teardown(self): 88 """Tear down test fixture.""" 89 pass 90 91 def runtest(self): 92 """Run the test.""" 93 pass 94 95 96 def add_cleanup(self, c): 97 """Queue a cleanup to be run when the test is complete.""" 98 self._cleanups.append(c) 99 100 101 def fail(self, reason = ""): 102 """Say the test failed.""" 103 raise AssertionError(reason) 104 105 106 ############################################################# 107 # Requisition methods 108 109 def require(self, predicate, message): 110 """Check a predicate for running this test. 111 112If the predicate value is not true, the test is skipped with a message explaining 113why.""" 114 if not predicate: 115 raise NotRunError, message 116 117 def require_root(self): 118 """Skip this test unless run by root.""" 119 import os 120 self.require(os.getuid() == 0, 121 "must be root to run this test") 122 123 ############################################################# 124 # Assertion methods 125 126 def assert_(self, expr, reason = ""): 127 if not expr: 128 raise AssertionError(reason) 129 130 def assert_equal(self, a, b): 131 if not a == b: 132 raise AssertionError("assertEquals failed: %s" % `(a, b)`) 133 134 def assert_notequal(self, a, b): 135 if a == b: 136 raise AssertionError("assertNotEqual failed: %s" % `(a, b)`) 137 138 def assert_re_match(self, pattern, s): 139 """Assert that a string matches a particular pattern 140 141 Inputs: 142 pattern string: regular expression 143 s string: to be matched 144 145 Raises: 146 AssertionError if not matched 147 """ 148 if not re.match(pattern, s): 149 raise AssertionError("string does not match regexp\n" 150 " string: %s\n" 151 " re: %s" % (`s`, `pattern`)) 152 153 def assert_re_search(self, pattern, s): 154 """Assert that a string *contains* a particular pattern 155 156 Inputs: 157 pattern string: regular expression 158 s string: to be searched 159 160 Raises: 161 AssertionError if not matched 162 """ 163 if not re.search(pattern, s): 164 raise AssertionError("string does not contain regexp\n" 165 " string: %s\n" 166 " re: %s" % (`s`, `pattern`)) 167 168 169 def assert_no_file(self, filename): 170 import os.path 171 assert not os.path.exists(filename), ("file exists but should not: %s" % filename) 172 173 174 ############################################################# 175 # Methods for running programs 176 177 def runcmd_background(self, cmd): 178 import os 179 self.test_log = self.test_log + "Run in background:\n" + `cmd` + "\n" 180 pid = os.fork() 181 if pid == 0: 182 # child 183 try: 184 os.execvp("/bin/sh", ["/bin/sh", "-c", cmd]) 185 finally: 186 os._exit(127) 187 self.test_log = self.test_log + "pid: %d\n" % pid 188 return pid 189 190 191 def runcmd(self, cmd, expectedResult = 0): 192 """Run a command, fail if the command returns an unexpected exit 193 code. Return the output produced.""" 194 rc, output, stderr = self.runcmd_unchecked(cmd) 195 if rc != expectedResult: 196 raise AssertionError("""command returned %d; expected %s: \"%s\" 197stdout: 198%s 199stderr: 200%s""" % (rc, expectedResult, cmd, output, stderr)) 201 202 return output, stderr 203 204 205 def run_captured(self, cmd): 206 """Run a command, capturing stdout and stderr. 207 208 Based in part on popen2.py 209 210 Returns (waitstatus, stdout, stderr).""" 211 import os, types 212 pid = os.fork() 213 if pid == 0: 214 # child 215 try: 216 pid = os.getpid() 217 openmode = os.O_WRONLY|os.O_CREAT|os.O_TRUNC 218 219 outfd = os.open('%d.out' % pid, openmode, 0666) 220 os.dup2(outfd, 1) 221 os.close(outfd) 222 223 errfd = os.open('%d.err' % pid, openmode, 0666) 224 os.dup2(errfd, 2) 225 os.close(errfd) 226 227 if isinstance(cmd, types.StringType): 228 cmd = ['/bin/sh', '-c', cmd] 229 230 os.execvp(cmd[0], cmd) 231 finally: 232 os._exit(127) 233 else: 234 # parent 235 exited_pid, waitstatus = os.waitpid(pid, 0) 236 stdout = open('%d.out' % pid).read() 237 stderr = open('%d.err' % pid).read() 238 return waitstatus, stdout, stderr 239 240 241 def runcmd_unchecked(self, cmd, skip_on_noexec = 0): 242 """Invoke a command; return (exitcode, stdout, stderr)""" 243 import os 244 waitstatus, stdout, stderr = self.run_captured(cmd) 245 assert not os.WIFSIGNALED(waitstatus), \ 246 ("%s terminated with signal %d" % (`cmd`, os.WTERMSIG(waitstatus))) 247 rc = os.WEXITSTATUS(waitstatus) 248 self.test_log = self.test_log + ("""Run command: %s 249Wait status: %#x (exit code %d, signal %d) 250stdout: 251%s 252stderr: 253%s""" % (cmd, waitstatus, os.WEXITSTATUS(waitstatus), os.WTERMSIG(waitstatus), 254 stdout, stderr)) 255 if skip_on_noexec and rc == 127: 256 # Either we could not execute the command or the command 257 # returned exit code 127. According to system(3) we can't 258 # tell the difference. 259 raise NotRunError, "could not execute %s" % `cmd` 260 return rc, stdout, stderr 261 262 263 def explain_failure(self, exc_info = None): 264 print "test_log:" 265 print self.test_log 266 267 268 def log(self, msg): 269 """Log a message to the test log. This message is displayed if 270 the test fails, or when the runtests function is invoked with 271 the verbose option.""" 272 self.test_log = self.test_log + msg + "\n" 273 274 275class NotRunError(Exception): 276 """Raised if a test must be skipped because of missing resources""" 277 def __init__(self, value = None): 278 self.value = value 279 280 281def _report_error(case, debugger): 282 """Ask the test case to explain failure, and optionally run a debugger 283 284 Input: 285 case TestCase instance 286 debugger if true, a debugger function to be applied to the traceback 287""" 288 import sys 289 ex = sys.exc_info() 290 print "-----------------------------------------------------------------" 291 if ex: 292 import traceback 293 traceback.print_exc(file=sys.stdout) 294 case.explain_failure() 295 print "-----------------------------------------------------------------" 296 297 if debugger: 298 tb = ex[2] 299 debugger(tb) 300 301 302def runtests(test_list, verbose = 0, debugger = None): 303 """Run a series of tests. 304 305 Inputs: 306 test_list sequence of TestCase classes 307 verbose print more information as testing proceeds 308 debugger debugger object to be applied to errors 309 310 Returns: 311 unix return code: 0 for success, 1 for failures, 2 for test failure 312 """ 313 import traceback 314 ret = 0 315 for test_class in test_list: 316 print "%-30s" % _test_name(test_class), 317 # flush now so that long running tests are easier to follow 318 sys.stdout.flush() 319 320 obj = None 321 try: 322 try: # run test and show result 323 obj = test_class() 324 obj.setup() 325 obj.runtest() 326 print "OK" 327 except KeyboardInterrupt: 328 print "INTERRUPT" 329 _report_error(obj, debugger) 330 ret = 2 331 break 332 except NotRunError, msg: 333 print "NOTRUN, %s" % msg.value 334 except: 335 print "FAIL" 336 _report_error(obj, debugger) 337 ret = 1 338 finally: 339 while obj and obj._cleanups: 340 try: 341 apply(obj._cleanups.pop()) 342 except KeyboardInterrupt: 343 print "interrupted during teardown" 344 _report_error(obj, debugger) 345 ret = 2 346 break 347 except: 348 print "error during teardown" 349 _report_error(obj, debugger) 350 ret = 1 351 # Display log file if we're verbose 352 if ret == 0 and verbose: 353 obj.explain_failure() 354 355 return ret 356 357 358def _test_name(test_class): 359 """Return a human-readable name for a test class. 360 """ 361 try: 362 return test_class.__name__ 363 except: 364 return `test_class` 365 366 367def print_help(): 368 """Help for people running tests""" 369 import sys 370 print """%s: software test suite based on ComfyChair 371 372usage: 373 To run all tests, just run this program. To run particular tests, 374 list them on the command line. 375 376options: 377 --help show usage message 378 --list list available tests 379 --verbose, -v show more information while running tests 380 --post-mortem, -p enter Python debugger on error 381""" % sys.argv[0] 382 383 384def print_list(test_list): 385 """Show list of available tests""" 386 for test_class in test_list: 387 print " %s" % _test_name(test_class) 388 389 390def main(tests, extra_tests=[]): 391 """Main entry point for test suites based on ComfyChair. 392 393 inputs: 394 tests Sequence of TestCase subclasses to be run by default. 395 extra_tests Sequence of TestCase subclasses that are available but 396 not run by default. 397 398Test suites should contain this boilerplate: 399 400 if __name__ == '__main__': 401 comfychair.main(tests) 402 403This function handles standard options such as --help and --list, and 404by default runs all tests in the suggested order. 405 406Calls sys.exit() on completion. 407""" 408 from sys import argv 409 import getopt, sys 410 411 opt_verbose = 0 412 debugger = None 413 414 opts, args = getopt.getopt(argv[1:], 'pv', 415 ['help', 'list', 'verbose', 'post-mortem']) 416 for opt, opt_arg in opts: 417 if opt == '--help': 418 print_help() 419 return 420 elif opt == '--list': 421 print_list(tests + extra_tests) 422 return 423 elif opt == '--verbose' or opt == '-v': 424 opt_verbose = 1 425 elif opt == '--post-mortem' or opt == '-p': 426 import pdb 427 debugger = pdb.post_mortem 428 429 if args: 430 all_tests = tests + extra_tests 431 by_name = {} 432 for t in all_tests: 433 by_name[_test_name(t)] = t 434 which_tests = [] 435 for name in args: 436 which_tests.append(by_name[name]) 437 else: 438 which_tests = tests 439 440 sys.exit(runtests(which_tests, verbose=opt_verbose, 441 debugger=debugger)) 442 443 444if __name__ == '__main__': 445 print __doc__ 446