1#!/usr/bin/python
2#
3#
4# Copyright 2014, NICTA
5#
6# This software may be distributed and modified according to the terms of
7# the BSD 2-Clause license. Note that NO WARRANTY is provided.
8# See "LICENSE_BSD2.txt" for details.
9#
10# @TAG(NICTA_BSD)
11#
12#
13# Very simple command-line test runner.
14#
15# Ignores timeouts.
16#
17
18from __future__ import print_function
19
20import argparse
21import atexit
22import datetime
23import fnmatch
24import memusage
25import os
26import signal
27import subprocess
28import sys
29import testspec
30import time
31import traceback
32
33# Try importing psutil.
34PS_UTIL_AVAILABLE = False
35try:
36    import psutil
37    from psutil import NoSuchProcess
38    PS_UTIL_AVAILABLE = True
39    if not hasattr(psutil.Process, "children") and hasattr(psutil.Process, "get_children"):
40        psutil.Process.children = psutil.Process.get_children
41except ImportError:
42    pass
43
44ANSI_RESET = "\033[0m"
45ANSI_RED = "\033[31;1m"
46ANSI_GREEN = "\033[32m"
47ANSI_YELLOW = "\033[33m"
48ANSI_WHITE = "\033[38m"
49ANSI_BOLD = "\033[1m"
50
51def output_color(color, s):
52    """Wrap the given string in the given color."""
53    if os.isatty(sys.stdout.fileno()):
54        return color + s + ANSI_RESET
55    return s
56
57# Find a command in the PATH.
58def which(file):
59    for path in os.environ["PATH"].split(os.pathsep):
60        candidate = os.path.join(path, file)
61        if os.path.exists(candidate) and os.access(candidate, os.X_OK):
62            return candidate
63    return None
64
65#
66# Kill a process and all of its children.
67#
68# We attempt to handle races where a PID goes away while we
69# are looking at it, but not where a PID has been reused.
70#
71def kill_family(parent_pid):
72    if not PS_UTIL_AVAILABLE:
73        return
74
75    # Find process.
76    try:
77        process = psutil.Process(parent_pid)
78    except NoSuchProcess:
79        # Race. Nothing more to do.
80        return
81
82    # SIGSTOP everyone first.
83    try:
84        process.suspend()
85    except NoSuchProcess:
86        # Race. Nothing more to do.
87        return
88    process_list = [process]
89    for child in process.children(recursive=True):
90        try:
91            child.suspend()
92        except NoSuchProcess:
93            # Race.
94            pass
95        else:
96            process_list.append(child)
97
98
99    # Now SIGKILL everyone.
100    process_list.reverse()
101    for p in process_list:
102        p.send_signal(signal.SIGKILL)
103
104#
105# Run a single test.
106#
107# Return a tuple of (success, log, time_taken, memory_usage).
108#
109# Log only contains the output if verbose is *false*; otherwise, the
110# log is output to stdout where we can't easily get  to it.
111#
112def run_test(test, verbose=False):
113    # Construct the base command.
114    command = ["bash", "-c", test.command]
115
116    # If we have a "pidspace" program, use that to ensure that programs
117    # that double-fork can't continue running after the parent command
118    # dies.
119    if which("pidspace") != None:
120        command = [which("pidspace"), "--"] + command
121
122    # Print command and path.
123    if verbose:
124        print("\n")
125        if os.path.abspath(test.cwd) != os.path.abspath(os.getcwd()):
126            path = " [%s]" % os.path.relpath(test.cwd)
127        else:
128            path = ""
129        print("    command: %s%s" % (test.command, path))
130
131    output = subprocess.PIPE
132
133    # Start timing.
134    start_time = datetime.datetime.now()
135
136    # Start the command.
137    peak_mem_usage = None
138    try:
139        process = subprocess.Popen(command,
140                stdout=output, stderr=subprocess.STDOUT, stdin=subprocess.PIPE,
141                cwd=test.cwd)
142        lines_iter = iter(process.stdout.readline, b"")
143        if verbose:
144            for line in lines_iter:
145                print(line, end='')
146    except:
147        output = "Exception while running test:\n\n%s" % (traceback.format_exc())
148        atexit.register(lambda: kill_family(process.pid))
149        if verbose:
150            print(output)
151        return (False, "ERROR", output, datetime.datetime.now() - start_time, peak_mem_usage)
152
153    # If our program exits for some reason, attempt to kill the process.
154    atexit.register(lambda: kill_family(process.pid))
155
156    # Setup an alarm at the timeout.
157    was_timeout = [False]
158    def alarm_handler(sig, _):
159        was_timeout[0] = True
160        kill_family(process.pid)
161    signal.signal(signal.SIGALRM, alarm_handler)
162    signal.alarm(test.timeout)
163
164    # Wait for the command to finish.
165    with memusage.process_poller(process.pid) as m:
166        (output, _) = process.communicate()
167        peak_mem_usage = m.peak_mem_usage()
168
169    # Cancel the alarm. Small race here (if the alarm fires just after the
170    # process finished), but the returncode of our process should still be 0,
171    # and hence we won't interpret the result as a timeout.
172    signal.alarm(0)
173
174    if output == None:
175        output = ""
176    if process.returncode == 0:
177        status = "pass"
178    elif was_timeout[0]:
179        status = "TIMEOUT"
180    else:
181        status = "FAILED"
182    return (process.returncode == 0, status, output, datetime.datetime.now() - start_time, peak_mem_usage)
183
184def print_line(ch='-'):
185    print("".join([ch for x in range(72)]))
186
187# Print a status line.
188def print_test_line_start(test_name, verbose=False):
189    print("  running %-25s " % (test_name + " ..."), end="")
190    sys.stdout.flush()
191
192def print_test_line_end(test_name, color, status, time_taken, mem, verbose=False):
193    if mem:
194        # Report meory usage in gigabytes.
195        mem = '%5.2fGB' % round(float(mem) / 1024 / 1024 / 1024, 2)
196    if time_taken:
197        # Strip milliseconds for better printing.
198        time_taken = datetime.timedelta(seconds=int(time_taken.total_seconds()))
199        time_taken = '%8s' % time_taken
200    extras = ', '.join(filter(None, [time_taken, mem]))
201
202    # Print status line.
203    print(output_color(color, "%-10s" % status) + ('(%s)' % extras if extras else ''))
204    if verbose:
205        print("")
206        print_line("=")
207        print()
208    sys.stdout.flush()
209
210def print_test_line(test_name, color, status, time_taken, mem, verbose=False):
211    print_test_line_start(test_name, verbose)
212    print_test_line_end(test_name, color, status, time_taken, mem, verbose)
213
214#
215# Recursive glob
216#
217def rglob(base_dir, pattern):
218    matches = []
219    for root, dirnames, filenames in os.walk(base_dir):
220        for filename in fnmatch.filter(filenames, pattern):
221            matches.append(os.path.join(root, filename))
222    return matches
223
224#
225# Exclusive rglob
226#
227def xrglob(base_dir, pattern, excl_dirs):
228   matches = []
229   if base_dir in excl_dirs:
230       return matches
231   for root, dirnames, filenames in os.walk(base_dir):
232       for excl_dir in excl_dirs:
233           if root.startswith(os.path.abspath(excl_dir)):
234              break
235       else:
236           for filename in fnmatch.filter(filenames, pattern):
237               matches.append(os.path.join(root, filename))
238   return matches
239
240
241#
242# Run tests.
243#
244def main():
245    # Parse arguments
246    parser = argparse.ArgumentParser(description="Simple Regression Framework")
247    parser.add_argument("-s", "--strict", action="store_true",
248            help="be strict when parsing test XML files")
249    parser.add_argument("-d", "--directory", action="store",
250            metavar="DIR", help="directory to search for test files",
251            default=os.getcwd())
252    parser.add_argument("-x", "--exclude", action="append",
253            metavar="DIR", help="directory to exclude while searching for test files (multiple -x can be given)",
254            default=[])
255    parser.add_argument("--brief", action="store_true",
256            help="don't print failure logs at end of test run")
257    parser.add_argument("-l", "--list", action="store_true",
258            help="list known tests")
259    parser.add_argument("--legacy", action="store_true",
260            help="use legacy 'IsaMakefile' specs")
261    parser.add_argument("-v", "--verbose", action="store_true",
262            help="print test output")
263    parser.add_argument("--limit", action="store",
264            help="set line limit for logs", default=40)
265    parser.add_argument("tests", metavar="TESTS",
266            help="tests to run (defaults to all tests)",
267            nargs="*")
268    args = parser.parse_args()
269
270    # Search for test files:
271    if not args.legacy:
272        test_xml = sorted(xrglob(args.directory, "tests.xml", args.exclude))
273        tests = testspec.parse_test_files(test_xml, strict=args.strict)
274    else:
275        # Fetch legacy tests.
276        tests = testspec.legacy_testspec(args.directory)
277
278    # List test names if requested.
279    if args.list:
280        for t in tests:
281            print(t.name)
282        sys.exit(0)
283
284    # Calculate which tests should be run.
285    tests_to_run = []
286    if len(args.tests) == 0:
287        tests_to_run = tests
288    else:
289        desired_names = set(args.tests)
290        bad_names = desired_names - set([t.name for t in tests])
291        if len(bad_names) > 0:
292            parser.error("Unknown test names: %s" % (", ".join(sorted(bad_names))))
293        tests_to_run = [t for t in tests if t.name in desired_names]
294
295    # If running at least one test, and psutil is not available, print a warning.
296    if len(tests_to_run) > 0 and not PS_UTIL_AVAILABLE:
297        print("\n"
298              "Warning: 'psutil' module not available. Processes may not be correctly\n"
299              "stopped. Run\n"
300              "\n"
301              "    pip install --user psutil\n"
302              "\n"
303              "to install.\n"
304              "\n")
305
306    # Run the tests.
307    print("Running %d test(s)...\n" % len(tests_to_run))
308    failed_tests = set()
309    failed_test_log = []
310    for t in tests_to_run:
311        if len(t.depends & failed_tests) > 0:
312            print_test_line(t.name, ANSI_YELLOW, "skipped", None, None)
313            failed_tests.add(t.name)
314            continue
315
316        # Run the test.
317        print_test_line_start(t.name, verbose=args.verbose)
318        (passed, status, log, time_taken, mem) = run_test(t, verbose=args.verbose)
319
320        # Print result.
321        if not passed:
322            failed_tests.add(t.name)
323            failed_test_log.append((t.name, log, time_taken))
324            print_test_line_end(t.name, ANSI_RED, "%s *" % status, time_taken, mem, verbose=args.verbose)
325        else:
326            print_test_line_end(t.name, ANSI_GREEN, status, time_taken, mem, verbose=args.verbose)
327
328    # Print failure summaries unless requested not to.
329    if not args.brief and len(failed_test_log) > 0:
330        print("")
331        log_limit = int(args.limit)
332        for (failed_test, log, _) in failed_test_log:
333            print_line()
334            print("TEST FAILURE: %s" % failed_test)
335            print("")
336            log = log.rstrip("\n") + "\n"
337            lines = log.split("\n")
338            if len(lines) > 2 * log_limit:
339                lines = lines[:log_limit] + ["..."] + lines[-log_limit:]
340            print("\n".join(lines))
341        print_line()
342
343    # Print summary.
344    print(("\n\n"
345            + output_color(ANSI_WHITE, "%d/%d tests succeeded.") + "\n")
346            % (len(tests_to_run) - len(failed_tests), len(tests_to_run)))
347    if len(failed_tests) > 0:
348        print(output_color(ANSI_RED, "Tests failed.") + "\n")
349        sys.exit(1)
350    else:
351        print(output_color(ANSI_GREEN, "All tests passed.") + "\n")
352        sys.exit(0)
353
354
355if __name__ == "__main__":
356    main()
357
358
359