1#! /usr/bin/env python
2
3# Copyright (C) 2002, 2003 by Martin Pool <mbp@samba.org>
4# Copyright (C) 2003 by Tim Potter <tpot@samba.org>
5#
6# This program is free software; you can redistribute it and/or
7# modify it under the terms of the GNU General Public License as
8# published by the Free Software Foundation; either version 2 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14# General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with this program; if not, write to the Free Software
18# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19# USA
20
21"""comfychair: a Python-based instrument of software torture.
22
23Copyright (C) 2002, 2003 by Martin Pool <mbp@samba.org>
24Copyright (C) 2003 by Tim Potter <tpot@samba.org>
25
26This is a test framework designed for testing programs written in
27Python, or (through a fork/exec interface) any other language.
28
29For more information, see the file README.comfychair.
30
31To run a test suite based on ComfyChair, just run it as a program.
32"""
33
34import sys, re
35
36
37class TestCase:
38    """A base class for tests.  This class defines required functions which
39    can optionally be overridden by subclasses.  It also provides some
40    utility functions for"""
41
42    def __init__(self):
43        self.test_log = ""
44        self.background_pids = []
45        self._cleanups = []
46        self._enter_rundir()
47        self._save_environment()
48        self.add_cleanup(self.teardown)
49
50
51    # --------------------------------------------------
52    # Save and restore directory
53    def _enter_rundir(self):
54        import os
55        self.basedir = os.getcwd()
56        self.add_cleanup(self._restore_directory)
57        self.rundir = os.path.join(self.basedir,
58                                   'testtmp',
59                                   self.__class__.__name__)
60        self.tmpdir = os.path.join(self.rundir, 'tmp')
61        os.system("rm -fr %s" % self.rundir)
62        os.makedirs(self.tmpdir)
63        os.system("mkdir -p %s" % self.rundir)
64        os.chdir(self.rundir)
65
66    def _restore_directory(self):
67        import os
68        os.chdir(self.basedir)
69
70    # --------------------------------------------------
71    # Save and restore environment
72    def _save_environment(self):
73        import os
74        self._saved_environ = os.environ.copy()
75        self.add_cleanup(self._restore_environment)
76
77    def _restore_environment(self):
78        import os
79        os.environ.clear()
80        os.environ.update(self._saved_environ)
81
82
83    def setup(self):
84        """Set up test fixture."""
85        pass
86
87    def teardown(self):
88        """Tear down test fixture."""
89        pass
90
91    def runtest(self):
92        """Run the test."""
93        pass
94
95
96    def add_cleanup(self, c):
97        """Queue a cleanup to be run when the test is complete."""
98        self._cleanups.append(c)
99
100
101    def fail(self, reason = ""):
102        """Say the test failed."""
103        raise AssertionError(reason)
104
105
106    #############################################################
107    # Requisition methods
108
109    def require(self, predicate, message):
110        """Check a predicate for running this test.
111
112If the predicate value is not true, the test is skipped with a message explaining
113why."""
114        if not predicate:
115            raise NotRunError, message
116
117    def require_root(self):
118        """Skip this test unless run by root."""
119        import os
120        self.require(os.getuid() == 0,
121                     "must be root to run this test")
122
123    #############################################################
124    # Assertion methods
125
126    def assert_(self, expr, reason = ""):
127        if not expr:
128            raise AssertionError(reason)
129
130    def assert_equal(self, a, b):
131        if not a == b:
132            raise AssertionError("assertEquals failed: %s" % `(a, b)`)
133
134    def assert_notequal(self, a, b):
135        if a == b:
136            raise AssertionError("assertNotEqual failed: %s" % `(a, b)`)
137
138    def assert_re_match(self, pattern, s):
139        """Assert that a string matches a particular pattern
140
141        Inputs:
142          pattern      string: regular expression
143          s            string: to be matched
144
145        Raises:
146          AssertionError if not matched
147          """
148        if not re.match(pattern, s):
149            raise AssertionError("string does not match regexp\n"
150                                 "    string: %s\n"
151                                 "    re: %s" % (`s`, `pattern`))
152
153    def assert_re_search(self, pattern, s):
154        """Assert that a string *contains* a particular pattern
155
156        Inputs:
157          pattern      string: regular expression
158          s            string: to be searched
159
160        Raises:
161          AssertionError if not matched
162          """
163        if not re.search(pattern, s):
164            raise AssertionError("string does not contain regexp\n"
165                                 "    string: %s\n"
166                                 "    re: %s" % (`s`, `pattern`))
167
168
169    def assert_no_file(self, filename):
170        import os.path
171        assert not os.path.exists(filename), ("file exists but should not: %s" % filename)
172
173
174    #############################################################
175    # Methods for running programs
176
177    def runcmd_background(self, cmd):
178        import os
179        self.test_log = self.test_log + "Run in background:\n" + `cmd` + "\n"
180        pid = os.fork()
181        if pid == 0:
182            # child
183            try:
184                os.execvp("/bin/sh", ["/bin/sh", "-c", cmd])
185            finally:
186                os._exit(127)
187        self.test_log = self.test_log + "pid: %d\n" % pid
188        return pid
189
190
191    def runcmd(self, cmd, expectedResult = 0):
192        """Run a command, fail if the command returns an unexpected exit
193        code.  Return the output produced."""
194        rc, output, stderr = self.runcmd_unchecked(cmd)
195        if rc != expectedResult:
196            raise AssertionError("""command returned %d; expected %s: \"%s\"
197stdout:
198%s
199stderr:
200%s""" % (rc, expectedResult, cmd, output, stderr))
201
202        return output, stderr
203
204
205    def run_captured(self, cmd):
206        """Run a command, capturing stdout and stderr.
207
208        Based in part on popen2.py
209
210        Returns (waitstatus, stdout, stderr)."""
211        import os, types
212        pid = os.fork()
213        if pid == 0:
214            # child
215            try:
216                pid = os.getpid()
217                openmode = os.O_WRONLY|os.O_CREAT|os.O_TRUNC
218
219                outfd = os.open('%d.out' % pid, openmode, 0666)
220                os.dup2(outfd, 1)
221                os.close(outfd)
222
223                errfd = os.open('%d.err' % pid, openmode, 0666)
224                os.dup2(errfd, 2)
225                os.close(errfd)
226
227                if isinstance(cmd, types.StringType):
228                    cmd = ['/bin/sh', '-c', cmd]
229
230                os.execvp(cmd[0], cmd)
231            finally:
232                os._exit(127)
233        else:
234            # parent
235            exited_pid, waitstatus = os.waitpid(pid, 0)
236            stdout = open('%d.out' % pid).read()
237            stderr = open('%d.err' % pid).read()
238            return waitstatus, stdout, stderr
239
240
241    def runcmd_unchecked(self, cmd, skip_on_noexec = 0):
242        """Invoke a command; return (exitcode, stdout, stderr)"""
243        import os
244        waitstatus, stdout, stderr = self.run_captured(cmd)
245        assert not os.WIFSIGNALED(waitstatus), \
246               ("%s terminated with signal %d" % (`cmd`, os.WTERMSIG(waitstatus)))
247        rc = os.WEXITSTATUS(waitstatus)
248        self.test_log = self.test_log + ("""Run command: %s
249Wait status: %#x (exit code %d, signal %d)
250stdout:
251%s
252stderr:
253%s""" % (cmd, waitstatus, os.WEXITSTATUS(waitstatus), os.WTERMSIG(waitstatus),
254         stdout, stderr))
255        if skip_on_noexec and rc == 127:
256            # Either we could not execute the command or the command
257            # returned exit code 127.  According to system(3) we can't
258            # tell the difference.
259            raise NotRunError, "could not execute %s" % `cmd`
260        return rc, stdout, stderr
261
262
263    def explain_failure(self, exc_info = None):
264        print "test_log:"
265        print self.test_log
266
267
268    def log(self, msg):
269        """Log a message to the test log.  This message is displayed if
270        the test fails, or when the runtests function is invoked with
271        the verbose option."""
272        self.test_log = self.test_log + msg + "\n"
273
274
275class NotRunError(Exception):
276    """Raised if a test must be skipped because of missing resources"""
277    def __init__(self, value = None):
278        self.value = value
279
280
281def _report_error(case, debugger):
282    """Ask the test case to explain failure, and optionally run a debugger
283
284    Input:
285      case         TestCase instance
286      debugger     if true, a debugger function to be applied to the traceback
287"""
288    import sys
289    ex = sys.exc_info()
290    print "-----------------------------------------------------------------"
291    if ex:
292        import traceback
293        traceback.print_exc(file=sys.stdout)
294    case.explain_failure()
295    print "-----------------------------------------------------------------"
296
297    if debugger:
298        tb = ex[2]
299        debugger(tb)
300
301
302def runtests(test_list, verbose = 0, debugger = None):
303    """Run a series of tests.
304
305    Inputs:
306      test_list    sequence of TestCase classes
307      verbose      print more information as testing proceeds
308      debugger     debugger object to be applied to errors
309
310    Returns:
311      unix return code: 0 for success, 1 for failures, 2 for test failure
312    """
313    import traceback
314    ret = 0
315    for test_class in test_list:
316        print "%-30s" % _test_name(test_class),
317        # flush now so that long running tests are easier to follow
318        sys.stdout.flush()
319
320        obj = None
321        try:
322            try: # run test and show result
323                obj = test_class()
324                obj.setup()
325                obj.runtest()
326                print "OK"
327            except KeyboardInterrupt:
328                print "INTERRUPT"
329                _report_error(obj, debugger)
330                ret = 2
331                break
332            except NotRunError, msg:
333                print "NOTRUN, %s" % msg.value
334            except:
335                print "FAIL"
336                _report_error(obj, debugger)
337                ret = 1
338        finally:
339            while obj and obj._cleanups:
340                try:
341                    apply(obj._cleanups.pop())
342                except KeyboardInterrupt:
343                    print "interrupted during teardown"
344                    _report_error(obj, debugger)
345                    ret = 2
346                    break
347                except:
348                    print "error during teardown"
349                    _report_error(obj, debugger)
350                    ret = 1
351        # Display log file if we're verbose
352        if ret == 0 and verbose:
353            obj.explain_failure()
354
355    return ret
356
357
358def _test_name(test_class):
359    """Return a human-readable name for a test class.
360    """
361    try:
362        return test_class.__name__
363    except:
364        return `test_class`
365
366
367def print_help():
368    """Help for people running tests"""
369    import sys
370    print """%s: software test suite based on ComfyChair
371
372usage:
373    To run all tests, just run this program.  To run particular tests,
374    list them on the command line.
375
376options:
377    --help              show usage message
378    --list              list available tests
379    --verbose, -v       show more information while running tests
380    --post-mortem, -p   enter Python debugger on error
381""" % sys.argv[0]
382
383
384def print_list(test_list):
385    """Show list of available tests"""
386    for test_class in test_list:
387        print "    %s" % _test_name(test_class)
388
389
390def main(tests, extra_tests=[]):
391    """Main entry point for test suites based on ComfyChair.
392
393    inputs:
394      tests       Sequence of TestCase subclasses to be run by default.
395      extra_tests Sequence of TestCase subclasses that are available but
396                  not run by default.
397
398Test suites should contain this boilerplate:
399
400    if __name__ == '__main__':
401        comfychair.main(tests)
402
403This function handles standard options such as --help and --list, and
404by default runs all tests in the suggested order.
405
406Calls sys.exit() on completion.
407"""
408    from sys import argv
409    import getopt, sys
410
411    opt_verbose = 0
412    debugger = None
413
414    opts, args = getopt.getopt(argv[1:], 'pv',
415                               ['help', 'list', 'verbose', 'post-mortem'])
416    for opt, opt_arg in opts:
417        if opt == '--help':
418            print_help()
419            return
420        elif opt == '--list':
421            print_list(tests + extra_tests)
422            return
423        elif opt == '--verbose' or opt == '-v':
424            opt_verbose = 1
425        elif opt == '--post-mortem' or opt == '-p':
426            import pdb
427            debugger = pdb.post_mortem
428
429    if args:
430        all_tests = tests + extra_tests
431        by_name = {}
432        for t in all_tests:
433            by_name[_test_name(t)] = t
434        which_tests = []
435        for name in args:
436            which_tests.append(by_name[name])
437    else:
438        which_tests = tests
439
440    sys.exit(runtests(which_tests, verbose=opt_verbose,
441                      debugger=debugger))
442
443
444if __name__ == '__main__':
445    print __doc__
446