1# Copyright (c) 2012 The Chromium OS Authors.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4#
5# Copyright (c) 2003-2005 by Peter Astrand <astrand@lysator.liu.se>
6# Licensed to PSF under a Contributor Agreement.
7# See http://www.python.org/2.4/license for licensing details.
8
9"""Subprocess execution
10
11This module holds a subclass of subprocess.Popen with our own required
12features, mainly that we get access to the subprocess output while it
13is running rather than just at the end. This makes it easier to show
14progress information and filter output in real time.
15"""
16
17import errno
18import os
19import pty
20import select
21import subprocess
22import sys
23import unittest
24
25
26# Import these here so the caller does not need to import subprocess also.
27PIPE = subprocess.PIPE
28STDOUT = subprocess.STDOUT
29PIPE_PTY = -3     # Pipe output through a pty
30stay_alive = True
31
32
33class Popen(subprocess.Popen):
34    """Like subprocess.Popen with ptys and incremental output
35
36    This class deals with running a child process and filtering its output on
37    both stdout and stderr while it is running. We do this so we can monitor
38    progress, and possibly relay the output to the user if requested.
39
40    The class is similar to subprocess.Popen, the equivalent is something like:
41
42        Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
43
44    But this class has many fewer features, and two enhancement:
45
46    1. Rather than getting the output data only at the end, this class sends it
47         to a provided operation as it arrives.
48    2. We use pseudo terminals so that the child will hopefully flush its output
49         to us as soon as it is produced, rather than waiting for the end of a
50         line.
51
52    Use communicate_filter() to handle output from the subprocess.
53
54    """
55
56    def __init__(self, args, stdin=None, stdout=PIPE_PTY, stderr=PIPE_PTY,
57                 shell=False, cwd=None, env=None, **kwargs):
58        """Cut-down constructor
59
60        Args:
61            args: Program and arguments for subprocess to execute.
62            stdin: See subprocess.Popen()
63            stdout: See subprocess.Popen(), except that we support the sentinel
64                    value of cros_subprocess.PIPE_PTY.
65            stderr: See subprocess.Popen(), except that we support the sentinel
66                    value of cros_subprocess.PIPE_PTY.
67            shell: See subprocess.Popen()
68            cwd: Working directory to change to for subprocess, or None if none.
69            env: Environment to use for this subprocess, or None to inherit parent.
70            kwargs: No other arguments are supported at the moment.    Passing other
71                    arguments will cause a ValueError to be raised.
72        """
73        stdout_pty = None
74        stderr_pty = None
75
76        if stdout == PIPE_PTY:
77            stdout_pty = pty.openpty()
78            stdout = os.fdopen(stdout_pty[1])
79        if stderr == PIPE_PTY:
80            stderr_pty = pty.openpty()
81            stderr = os.fdopen(stderr_pty[1])
82
83        super(Popen, self).__init__(args, stdin=stdin,
84                stdout=stdout, stderr=stderr, shell=shell, cwd=cwd, env=env,
85                **kwargs)
86
87        # If we're on a PTY, we passed the slave half of the PTY to the subprocess.
88        # We want to use the master half on our end from now on.    Setting this here
89        # does make some assumptions about the implementation of subprocess, but
90        # those assumptions are pretty minor.
91
92        # Note that if stderr is STDOUT, then self.stderr will be set to None by
93        # this constructor.
94        if stdout_pty is not None:
95            self.stdout = os.fdopen(stdout_pty[0])
96        if stderr_pty is not None:
97            self.stderr = os.fdopen(stderr_pty[0])
98
99        # Insist that unit tests exist for other arguments we don't support.
100        if kwargs:
101            raise ValueError("Unit tests do not test extra args - please add tests")
102
103    def convert_data(self, data):
104        """Convert stdout/stderr data to the correct format for output
105
106        Args:
107            data: Data to convert, or None for ''
108
109        Returns:
110            Converted data, as bytes
111        """
112        if data is None:
113            return b''
114        return data
115
116    def communicate_filter(self, output, input_buf=''):
117        """Interact with process: Read data from stdout and stderr.
118
119        This method runs until end-of-file is reached, then waits for the
120        subprocess to terminate.
121
122        The output function is sent all output from the subprocess and must be
123        defined like this:
124
125            def output([self,] stream, data)
126            Args:
127                stream: the stream the output was received on, which will be
128                        sys.stdout or sys.stderr.
129                data: a string containing the data
130
131            Returns:
132                True to terminate the process
133
134        Note: The data read is buffered in memory, so do not use this
135        method if the data size is large or unlimited.
136
137        Args:
138            output: Function to call with each fragment of output.
139
140        Returns:
141            A tuple (stdout, stderr, combined) which is the data received on
142            stdout, stderr and the combined data (interleaved stdout and stderr).
143
144            Note that the interleaved output will only be sensible if you have
145            set both stdout and stderr to PIPE or PIPE_PTY. Even then it depends on
146            the timing of the output in the subprocess. If a subprocess flips
147            between stdout and stderr quickly in succession, by the time we come to
148            read the output from each we may see several lines in each, and will read
149            all the stdout lines, then all the stderr lines. So the interleaving
150            may not be correct. In this case you might want to pass
151            stderr=cros_subprocess.STDOUT to the constructor.
152
153            This feature is still useful for subprocesses where stderr is
154            rarely used and indicates an error.
155
156            Note also that if you set stderr to STDOUT, then stderr will be empty
157            and the combined output will just be the same as stdout.
158        """
159
160        read_set = []
161        write_set = []
162        stdout = None # Return
163        stderr = None # Return
164
165        if self.stdin:
166            # Flush stdio buffer.    This might block, if the user has
167            # been writing to .stdin in an uncontrolled fashion.
168            self.stdin.flush()
169            if input_buf:
170                write_set.append(self.stdin)
171            else:
172                self.stdin.close()
173        if self.stdout:
174            read_set.append(self.stdout)
175            stdout = bytearray()
176        if self.stderr and self.stderr != self.stdout:
177            read_set.append(self.stderr)
178            stderr = bytearray()
179        combined = bytearray()
180
181        stop_now = False
182        input_offset = 0
183        while read_set or write_set:
184            try:
185                rlist, wlist, _ = select.select(read_set, write_set, [], 0.2)
186            except select.error as e:
187                if e.args[0] == errno.EINTR:
188                    continue
189                raise
190
191            if not stay_alive:
192                    self.terminate()
193
194            if self.stdin in wlist:
195                # When select has indicated that the file is writable,
196                # we can write up to PIPE_BUF bytes without risk
197                # blocking.    POSIX defines PIPE_BUF >= 512
198                chunk = input_buf[input_offset : input_offset + 512]
199                bytes_written = os.write(self.stdin.fileno(), chunk)
200                input_offset += bytes_written
201                if input_offset >= len(input_buf):
202                    self.stdin.close()
203                    write_set.remove(self.stdin)
204
205            if self.stdout in rlist:
206                data = b''
207                # We will get an error on read if the pty is closed
208                try:
209                    data = os.read(self.stdout.fileno(), 1024)
210                except OSError:
211                    pass
212                if not len(data):
213                    self.stdout.close()
214                    read_set.remove(self.stdout)
215                else:
216                    stdout += data
217                    combined += data
218                    if output:
219                        stop_now = output(sys.stdout, data)
220            if self.stderr in rlist:
221                data = b''
222                # We will get an error on read if the pty is closed
223                try:
224                    data = os.read(self.stderr.fileno(), 1024)
225                except OSError:
226                    pass
227                if not len(data):
228                    self.stderr.close()
229                    read_set.remove(self.stderr)
230                else:
231                    stderr += data
232                    combined += data
233                    if output:
234                        stop_now = output(sys.stderr, data)
235            if stop_now:
236                self.terminate()
237
238        # All data exchanged.    Translate lists into strings.
239        stdout = self.convert_data(stdout)
240        stderr = self.convert_data(stderr)
241        combined = self.convert_data(combined)
242
243        self.wait()
244        return (stdout, stderr, combined)
245
246
247# Just being a unittest.TestCase gives us 14 public methods.    Unless we
248# disable this, we can only have 6 tests in a TestCase.    That's not enough.
249#
250# pylint: disable=R0904
251
252class TestSubprocess(unittest.TestCase):
253    """Our simple unit test for this module"""
254
255    class MyOperation:
256        """Provides a operation that we can pass to Popen"""
257        def __init__(self, input_to_send=None):
258            """Constructor to set up the operation and possible input.
259
260            Args:
261                input_to_send: a text string to send when we first get input. We will
262                    add \r\n to the string.
263            """
264            self.stdout_data = ''
265            self.stderr_data = ''
266            self.combined_data = ''
267            self.stdin_pipe = None
268            self._input_to_send = input_to_send
269            if input_to_send:
270                pipe = os.pipe()
271                self.stdin_read_pipe = pipe[0]
272                self._stdin_write_pipe = os.fdopen(pipe[1], 'w')
273
274        def output(self, stream, data):
275            """Output handler for Popen. Stores the data for later comparison"""
276            if stream == sys.stdout:
277                self.stdout_data += data
278            if stream == sys.stderr:
279                self.stderr_data += data
280            self.combined_data += data
281
282            # Output the input string if we have one.
283            if self._input_to_send:
284                self._stdin_write_pipe.write(self._input_to_send + '\r\n')
285                self._stdin_write_pipe.flush()
286
287    def _basic_check(self, plist, oper):
288        """Basic checks that the output looks sane."""
289        self.assertEqual(plist[0], oper.stdout_data)
290        self.assertEqual(plist[1], oper.stderr_data)
291        self.assertEqual(plist[2], oper.combined_data)
292
293        # The total length of stdout and stderr should equal the combined length
294        self.assertEqual(len(plist[0]) + len(plist[1]), len(plist[2]))
295
296    def test_simple(self):
297        """Simple redirection: Get process list"""
298        oper = TestSubprocess.MyOperation()
299        plist = Popen(['ps']).communicate_filter(oper.output)
300        self._basic_check(plist, oper)
301
302    def test_stderr(self):
303        """Check stdout and stderr"""
304        oper = TestSubprocess.MyOperation()
305        cmd = 'echo fred >/dev/stderr && false || echo bad'
306        plist = Popen([cmd], shell=True).communicate_filter(oper.output)
307        self._basic_check(plist, oper)
308        self.assertEqual(plist [0], 'bad\r\n')
309        self.assertEqual(plist [1], 'fred\r\n')
310
311    def test_shell(self):
312        """Check with and without shell works"""
313        oper = TestSubprocess.MyOperation()
314        cmd = 'echo test >/dev/stderr'
315        self.assertRaises(OSError, Popen, [cmd], shell=False)
316        plist = Popen([cmd], shell=True).communicate_filter(oper.output)
317        self._basic_check(plist, oper)
318        self.assertEqual(len(plist [0]), 0)
319        self.assertEqual(plist [1], 'test\r\n')
320
321    def test_list_args(self):
322        """Check with and without shell works using list arguments"""
323        oper = TestSubprocess.MyOperation()
324        cmd = ['echo', 'test', '>/dev/stderr']
325        plist = Popen(cmd, shell=False).communicate_filter(oper.output)
326        self._basic_check(plist, oper)
327        self.assertEqual(plist [0], ' '.join(cmd[1:]) + '\r\n')
328        self.assertEqual(len(plist [1]), 0)
329
330        oper = TestSubprocess.MyOperation()
331
332        # this should be interpreted as 'echo' with the other args dropped
333        cmd = ['echo', 'test', '>/dev/stderr']
334        plist = Popen(cmd, shell=True).communicate_filter(oper.output)
335        self._basic_check(plist, oper)
336        self.assertEqual(plist [0], '\r\n')
337
338    def test_cwd(self):
339        """Check we can change directory"""
340        for shell in (False, True):
341            oper = TestSubprocess.MyOperation()
342            plist = Popen('pwd', shell=shell, cwd='/tmp').communicate_filter(
343                oper.output)
344            self._basic_check(plist, oper)
345            self.assertEqual(plist [0], '/tmp\r\n')
346
347    def test_env(self):
348        """Check we can change environment"""
349        for add in (False, True):
350            oper = TestSubprocess.MyOperation()
351            env = os.environ
352            if add:
353                env ['FRED'] = 'fred'
354            cmd = 'echo $FRED'
355            plist = Popen(cmd, shell=True, env=env).communicate_filter(oper.output)
356            self._basic_check(plist, oper)
357            self.assertEqual(plist [0], add and 'fred\r\n' or '\r\n')
358
359    def test_extra_args(self):
360        """Check we can't add extra arguments"""
361        self.assertRaises(ValueError, Popen, 'true', close_fds=False)
362
363    def test_basic_input(self):
364        """Check that incremental input works
365
366        We set up a subprocess which will prompt for name. When we see this prompt
367        we send the name as input to the process. It should then print the name
368        properly to stdout.
369        """
370        oper = TestSubprocess.MyOperation('Flash')
371        prompt = 'What is your name?: '
372        cmd = 'echo -n "%s"; read name; echo Hello $name' % prompt
373        plist = Popen([cmd], stdin=oper.stdin_read_pipe,
374                shell=True).communicate_filter(oper.output)
375        self._basic_check(plist, oper)
376        self.assertEqual(len(plist [1]), 0)
377        self.assertEqual(plist [0], prompt + 'Hello Flash\r\r\n')
378
379    def test_isatty(self):
380        """Check that ptys appear as terminals to the subprocess"""
381        oper = TestSubprocess.MyOperation()
382        cmd = ('if [ -t %d ]; then echo "terminal %d" >&%d; '
383                'else echo "not %d" >&%d; fi;')
384        both_cmds = ''
385        for fd in (1, 2):
386            both_cmds += cmd % (fd, fd, fd, fd, fd)
387        plist = Popen(both_cmds, shell=True).communicate_filter(oper.output)
388        self._basic_check(plist, oper)
389        self.assertEqual(plist [0], 'terminal 1\r\n')
390        self.assertEqual(plist [1], 'terminal 2\r\n')
391
392        # Now try with PIPE and make sure it is not a terminal
393        oper = TestSubprocess.MyOperation()
394        plist = Popen(both_cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
395                shell=True).communicate_filter(oper.output)
396        self._basic_check(plist, oper)
397        self.assertEqual(plist [0], 'not 1\n')
398        self.assertEqual(plist [1], 'not 2\n')
399
400if __name__ == '__main__':
401    unittest.main()
402