1'''This class extends pexpect.spawn to specialize setting up SSH connections.
2This adds methods for login, logout, and expecting the shell prompt.
3
4PEXPECT LICENSE
5
6    This license is approved by the OSI and FSF as GPL-compatible.
7        http://opensource.org/licenses/isc-license.txt
8
9    Copyright (c) 2012, Noah Spurrier <noah@noah.org>
10    PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
11    PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
12    COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
13    THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
14    WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
15    MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
16    ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
17    WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
18    ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
19    OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
20
21'''
22
23from pexpect import ExceptionPexpect, TIMEOUT, EOF, spawn
24import time
25import os
26import sys
27import re
28
29__all__ = ['ExceptionPxssh', 'pxssh']
30
31# Exception classes used by this module.
32class ExceptionPxssh(ExceptionPexpect):
33    '''Raised for pxssh exceptions.
34    '''
35
36if sys.version_info > (3, 0):
37    from shlex import quote
38else:
39    _find_unsafe = re.compile(r'[^\w@%+=:,./-]').search
40
41    def quote(s):
42        """Return a shell-escaped version of the string *s*."""
43        if not s:
44            return "''"
45        if _find_unsafe(s) is None:
46            return s
47
48        # use single quotes, and put single quotes into double quotes
49        # the string $'b is then quoted as '$'"'"'b'
50        return "'" + s.replace("'", "'\"'\"'") + "'"
51
52class pxssh (spawn):
53    '''This class extends pexpect.spawn to specialize setting up SSH
54    connections. This adds methods for login, logout, and expecting the shell
55    prompt. It does various tricky things to handle many situations in the SSH
56    login process. For example, if the session is your first login, then pxssh
57    automatically accepts the remote certificate; or if you have public key
58    authentication setup then pxssh won't wait for the password prompt.
59
60    pxssh uses the shell prompt to synchronize output from the remote host. In
61    order to make this more robust it sets the shell prompt to something more
62    unique than just $ or #. This should work on most Borne/Bash or Csh style
63    shells.
64
65    Example that runs a few commands on a remote server and prints the result::
66
67        from pexpect import pxssh
68        import getpass
69        try:
70            s = pxssh.pxssh()
71            hostname = raw_input('hostname: ')
72            username = raw_input('username: ')
73            password = getpass.getpass('password: ')
74            s.login(hostname, username, password)
75            s.sendline('uptime')   # run a command
76            s.prompt()             # match the prompt
77            print(s.before)        # print everything before the prompt.
78            s.sendline('ls -l')
79            s.prompt()
80            print(s.before)
81            s.sendline('df')
82            s.prompt()
83            print(s.before)
84            s.logout()
85        except pxssh.ExceptionPxssh as e:
86            print("pxssh failed on login.")
87            print(e)
88
89    Example showing how to specify SSH options::
90
91        from pexpect import pxssh
92        s = pxssh.pxssh(options={
93                            "StrictHostKeyChecking": "no",
94                            "UserKnownHostsFile": "/dev/null"})
95        ...
96
97    Note that if you have ssh-agent running while doing development with pxssh
98    then this can lead to a lot of confusion. Many X display managers (xdm,
99    gdm, kdm, etc.) will automatically start a GUI agent. You may see a GUI
100    dialog box popup asking for a password during development. You should turn
101    off any key agents during testing. The 'force_password' attribute will turn
102    off public key authentication. This will only work if the remote SSH server
103    is configured to allow password logins. Example of using 'force_password'
104    attribute::
105
106            s = pxssh.pxssh()
107            s.force_password = True
108            hostname = raw_input('hostname: ')
109            username = raw_input('username: ')
110            password = getpass.getpass('password: ')
111            s.login (hostname, username, password)
112
113    `debug_command_string` is only for the test suite to confirm that the string
114    generated for SSH is correct, using this will not allow you to do
115    anything other than get a string back from `pxssh.pxssh.login()`.
116    '''
117
118    def __init__ (self, timeout=30, maxread=2000, searchwindowsize=None,
119                    logfile=None, cwd=None, env=None, ignore_sighup=True, echo=True,
120                    options={}, encoding=None, codec_errors='strict',
121                    debug_command_string=False):
122
123        spawn.__init__(self, None, timeout=timeout, maxread=maxread,
124                       searchwindowsize=searchwindowsize, logfile=logfile,
125                       cwd=cwd, env=env, ignore_sighup=ignore_sighup, echo=echo,
126                       encoding=encoding, codec_errors=codec_errors)
127
128        self.name = '<pxssh>'
129
130        #SUBTLE HACK ALERT! Note that the command that SETS the prompt uses a
131        #slightly different string than the regular expression to match it. This
132        #is because when you set the prompt the command will echo back, but we
133        #don't want to match the echoed command. So if we make the set command
134        #slightly different than the regex we eliminate the problem. To make the
135        #set command different we add a backslash in front of $. The $ doesn't
136        #need to be escaped, but it doesn't hurt and serves to make the set
137        #prompt command different than the regex.
138
139        # used to match the command-line prompt
140        self.UNIQUE_PROMPT = r"\[PEXPECT\][\$\#] "
141        self.PROMPT = self.UNIQUE_PROMPT
142
143        # used to set shell command-line prompt to UNIQUE_PROMPT.
144        self.PROMPT_SET_SH = r"PS1='[PEXPECT]\$ '"
145        self.PROMPT_SET_CSH = r"set prompt='[PEXPECT]\$ '"
146        self.SSH_OPTS = ("-o'RSAAuthentication=no'"
147                + " -o 'PubkeyAuthentication=no'")
148# Disabling host key checking, makes you vulnerable to MITM attacks.
149#                + " -o 'StrictHostKeyChecking=no'"
150#                + " -o 'UserKnownHostsFile /dev/null' ")
151        # Disabling X11 forwarding gets rid of the annoying SSH_ASKPASS from
152        # displaying a GUI password dialog. I have not figured out how to
153        # disable only SSH_ASKPASS without also disabling X11 forwarding.
154        # Unsetting SSH_ASKPASS on the remote side doesn't disable it! Annoying!
155        #self.SSH_OPTS = "-x -o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'"
156        self.force_password = False
157
158        self.debug_command_string = debug_command_string
159
160        # User defined SSH options, eg,
161        # ssh.otions = dict(StrictHostKeyChecking="no",UserKnownHostsFile="/dev/null")
162        self.options = options
163
164    def levenshtein_distance(self, a, b):
165        '''This calculates the Levenshtein distance between a and b.
166        '''
167
168        n, m = len(a), len(b)
169        if n > m:
170            a,b = b,a
171            n,m = m,n
172        current = range(n+1)
173        for i in range(1,m+1):
174            previous, current = current, [i]+[0]*n
175            for j in range(1,n+1):
176                add, delete = previous[j]+1, current[j-1]+1
177                change = previous[j-1]
178                if a[j-1] != b[i-1]:
179                    change = change + 1
180                current[j] = min(add, delete, change)
181        return current[n]
182
183    def try_read_prompt(self, timeout_multiplier):
184        '''This facilitates using communication timeouts to perform
185        synchronization as quickly as possible, while supporting high latency
186        connections with a tunable worst case performance. Fast connections
187        should be read almost immediately. Worst case performance for this
188        method is timeout_multiplier * 3 seconds.
189        '''
190
191        # maximum time allowed to read the first response
192        first_char_timeout = timeout_multiplier * 0.5
193
194        # maximum time allowed between subsequent characters
195        inter_char_timeout = timeout_multiplier * 0.1
196
197        # maximum time for reading the entire prompt
198        total_timeout = timeout_multiplier * 3.0
199
200        prompt = self.string_type()
201        begin = time.time()
202        expired = 0.0
203        timeout = first_char_timeout
204
205        while expired < total_timeout:
206            try:
207                prompt += self.read_nonblocking(size=1, timeout=timeout)
208                expired = time.time() - begin # updated total time expired
209                timeout = inter_char_timeout
210            except TIMEOUT:
211                break
212
213        return prompt
214
215    def sync_original_prompt (self, sync_multiplier=1.0):
216        '''This attempts to find the prompt. Basically, press enter and record
217        the response; press enter again and record the response; if the two
218        responses are similar then assume we are at the original prompt.
219        This can be a slow function. Worst case with the default sync_multiplier
220        can take 12 seconds. Low latency connections are more likely to fail
221        with a low sync_multiplier. Best case sync time gets worse with a
222        high sync multiplier (500 ms with default). '''
223
224        # All of these timing pace values are magic.
225        # I came up with these based on what seemed reliable for
226        # connecting to a heavily loaded machine I have.
227        self.sendline()
228        time.sleep(0.1)
229
230        try:
231            # Clear the buffer before getting the prompt.
232            self.try_read_prompt(sync_multiplier)
233        except TIMEOUT:
234            pass
235
236        self.sendline()
237        x = self.try_read_prompt(sync_multiplier)
238
239        self.sendline()
240        a = self.try_read_prompt(sync_multiplier)
241
242        self.sendline()
243        b = self.try_read_prompt(sync_multiplier)
244
245        ld = self.levenshtein_distance(a,b)
246        len_a = len(a)
247        if len_a == 0:
248            return False
249        if float(ld)/len_a < 0.4:
250            return True
251        return False
252
253    ### TODO: This is getting messy and I'm pretty sure this isn't perfect.
254    ### TODO: I need to draw a flow chart for this.
255    ### TODO: Unit tests for SSH tunnels, remote SSH command exec, disabling original prompt sync
256    def login (self, server, username, password='', terminal_type='ansi',
257                original_prompt=r"[#$]", login_timeout=10, port=None,
258                auto_prompt_reset=True, ssh_key=None, quiet=True,
259                sync_multiplier=1, check_local_ip=True,
260                password_regex=r'(?i)(?:password:)|(?:passphrase for key)',
261                ssh_tunnels={}, spawn_local_ssh=True,
262                sync_original_prompt=True, ssh_config=None):
263        '''This logs the user into the given server.
264
265        It uses
266        'original_prompt' to try to find the prompt right after login. When it
267        finds the prompt it immediately tries to reset the prompt to something
268        more easily matched. The default 'original_prompt' is very optimistic
269        and is easily fooled. It's more reliable to try to match the original
270        prompt as exactly as possible to prevent false matches by server
271        strings such as the "Message Of The Day". On many systems you can
272        disable the MOTD on the remote server by creating a zero-length file
273        called :file:`~/.hushlogin` on the remote server. If a prompt cannot be found
274        then this will not necessarily cause the login to fail. In the case of
275        a timeout when looking for the prompt we assume that the original
276        prompt was so weird that we could not match it, so we use a few tricks
277        to guess when we have reached the prompt. Then we hope for the best and
278        blindly try to reset the prompt to something more unique. If that fails
279        then login() raises an :class:`ExceptionPxssh` exception.
280
281        In some situations it is not possible or desirable to reset the
282        original prompt. In this case, pass ``auto_prompt_reset=False`` to
283        inhibit setting the prompt to the UNIQUE_PROMPT. Remember that pxssh
284        uses a unique prompt in the :meth:`prompt` method. If the original prompt is
285        not reset then this will disable the :meth:`prompt` method unless you
286        manually set the :attr:`PROMPT` attribute.
287
288        Set ``password_regex`` if there is a MOTD message with `password` in it.
289        Changing this is like playing in traffic, don't (p)expect it to match straight
290        away.
291
292        If you require to connect to another SSH server from the your original SSH
293        connection set ``spawn_local_ssh`` to `False` and this will use your current
294        session to do so. Setting this option to `False` and not having an active session
295        will trigger an error.
296
297        Set ``ssh_key`` to a file path to an SSH private key to use that SSH key
298        for the session authentication.
299        Set ``ssh_key`` to `True` to force passing the current SSH authentication socket
300        to the desired ``hostname``.
301
302        Set ``ssh_config`` to a file path string of an SSH client config file to pass that
303        file to the client to handle itself. You may set any options you wish in here, however
304        doing so will require you to post extra information that you may not want to if you
305        run into issues.
306        '''
307
308        session_regex_array = ["(?i)are you sure you want to continue connecting", original_prompt, password_regex, "(?i)permission denied", "(?i)terminal type", TIMEOUT]
309        session_init_regex_array = []
310        session_init_regex_array.extend(session_regex_array)
311        session_init_regex_array.extend(["(?i)connection closed by remote host", EOF])
312
313        ssh_options = ''.join([" -o '%s=%s'" % (o, v) for (o, v) in self.options.items()])
314        if quiet:
315            ssh_options = ssh_options + ' -q'
316        if not check_local_ip:
317            ssh_options = ssh_options + " -o'NoHostAuthenticationForLocalhost=yes'"
318        if self.force_password:
319            ssh_options = ssh_options + ' ' + self.SSH_OPTS
320        if ssh_config is not None:
321            if spawn_local_ssh and not os.path.isfile(ssh_config):
322                raise ExceptionPxssh('SSH config does not exist or is not a file.')
323            ssh_options = ssh_options + '-F ' + ssh_config
324        if port is not None:
325            ssh_options = ssh_options + ' -p %s'%(str(port))
326        if ssh_key is not None:
327            # Allow forwarding our SSH key to the current session
328            if ssh_key==True:
329                ssh_options = ssh_options + ' -A'
330            else:
331                if spawn_local_ssh and not os.path.isfile(ssh_key):
332                    raise ExceptionPxssh('private ssh key does not exist or is not a file.')
333                ssh_options = ssh_options + ' -i %s' % (ssh_key)
334
335        # SSH tunnels, make sure you know what you're putting into the lists
336        # under each heading. Do not expect these to open 100% of the time,
337        # The port you're requesting might be bound.
338        #
339        # The structure should be like this:
340        # { 'local': ['2424:localhost:22'],  # Local SSH tunnels
341        # 'remote': ['2525:localhost:22'],   # Remote SSH tunnels
342        # 'dynamic': [8888] } # Dynamic/SOCKS tunnels
343        if ssh_tunnels!={} and isinstance({},type(ssh_tunnels)):
344            tunnel_types = {
345                'local':'L',
346                'remote':'R',
347                'dynamic':'D'
348            }
349            for tunnel_type in tunnel_types:
350                cmd_type = tunnel_types[tunnel_type]
351                if tunnel_type in ssh_tunnels:
352                    tunnels = ssh_tunnels[tunnel_type]
353                    for tunnel in tunnels:
354                        if spawn_local_ssh==False:
355                            tunnel = quote(str(tunnel))
356                        ssh_options = ssh_options + ' -' + cmd_type + ' ' + str(tunnel)
357        cmd = "ssh %s -l %s %s" % (ssh_options, username, server)
358        if self.debug_command_string:
359            return(cmd)
360
361        # Are we asking for a local ssh command or to spawn one in another session?
362        if spawn_local_ssh:
363            spawn._spawn(self, cmd)
364        else:
365            self.sendline(cmd)
366
367        # This does not distinguish between a remote server 'password' prompt
368        # and a local ssh 'passphrase' prompt (for unlocking a private key).
369        i = self.expect(session_init_regex_array, timeout=login_timeout)
370
371        # First phase
372        if i==0:
373            # New certificate -- always accept it.
374            # This is what you get if SSH does not have the remote host's
375            # public key stored in the 'known_hosts' cache.
376            self.sendline("yes")
377            i = self.expect(session_regex_array)
378        if i==2: # password or passphrase
379            self.sendline(password)
380            i = self.expect(session_regex_array)
381        if i==4:
382            self.sendline(terminal_type)
383            i = self.expect(session_regex_array)
384        if i==7:
385            self.close()
386            raise ExceptionPxssh('Could not establish connection to host')
387
388        # Second phase
389        if i==0:
390            # This is weird. This should not happen twice in a row.
391            self.close()
392            raise ExceptionPxssh('Weird error. Got "are you sure" prompt twice.')
393        elif i==1: # can occur if you have a public key pair set to authenticate.
394            ### TODO: May NOT be OK if expect() got tricked and matched a false prompt.
395            pass
396        elif i==2: # password prompt again
397            # For incorrect passwords, some ssh servers will
398            # ask for the password again, others return 'denied' right away.
399            # If we get the password prompt again then this means
400            # we didn't get the password right the first time.
401            self.close()
402            raise ExceptionPxssh('password refused')
403        elif i==3: # permission denied -- password was bad.
404            self.close()
405            raise ExceptionPxssh('permission denied')
406        elif i==4: # terminal type again? WTF?
407            self.close()
408            raise ExceptionPxssh('Weird error. Got "terminal type" prompt twice.')
409        elif i==5: # Timeout
410            #This is tricky... I presume that we are at the command-line prompt.
411            #It may be that the shell prompt was so weird that we couldn't match
412            #it. Or it may be that we couldn't log in for some other reason. I
413            #can't be sure, but it's safe to guess that we did login because if
414            #I presume wrong and we are not logged in then this should be caught
415            #later when I try to set the shell prompt.
416            pass
417        elif i==6: # Connection closed by remote host
418            self.close()
419            raise ExceptionPxssh('connection closed')
420        else: # Unexpected
421            self.close()
422            raise ExceptionPxssh('unexpected login response')
423        if sync_original_prompt:
424            if not self.sync_original_prompt(sync_multiplier):
425                self.close()
426                raise ExceptionPxssh('could not synchronize with original prompt')
427        # We appear to be in.
428        # set shell prompt to something unique.
429        if auto_prompt_reset:
430            if not self.set_unique_prompt():
431                self.close()
432                raise ExceptionPxssh('could not set shell prompt '
433                                     '(received: %r, expected: %r).' % (
434                                         self.before, self.PROMPT,))
435        return True
436
437    def logout (self):
438        '''Sends exit to the remote shell.
439
440        If there are stopped jobs then this automatically sends exit twice.
441        '''
442        self.sendline("exit")
443        index = self.expect([EOF, "(?i)there are stopped jobs"])
444        if index==1:
445            self.sendline("exit")
446            self.expect(EOF)
447        self.close()
448
449    def prompt(self, timeout=-1):
450        '''Match the next shell prompt.
451
452        This is little more than a short-cut to the :meth:`~pexpect.spawn.expect`
453        method. Note that if you called :meth:`login` with
454        ``auto_prompt_reset=False``, then before calling :meth:`prompt` you must
455        set the :attr:`PROMPT` attribute to a regex that it will use for
456        matching the prompt.
457
458        Calling :meth:`prompt` will erase the contents of the :attr:`before`
459        attribute even if no prompt is ever matched. If timeout is not given or
460        it is set to -1 then self.timeout is used.
461
462        :return: True if the shell prompt was matched, False if the timeout was
463                 reached.
464        '''
465
466        if timeout == -1:
467            timeout = self.timeout
468        i = self.expect([self.PROMPT, TIMEOUT], timeout=timeout)
469        if i==1:
470            return False
471        return True
472
473    def set_unique_prompt(self):
474        '''This sets the remote prompt to something more unique than ``#`` or ``$``.
475        This makes it easier for the :meth:`prompt` method to match the shell prompt
476        unambiguously. This method is called automatically by the :meth:`login`
477        method, but you may want to call it manually if you somehow reset the
478        shell prompt. For example, if you 'su' to a different user then you
479        will need to manually reset the prompt. This sends shell commands to
480        the remote host to set the prompt, so this assumes the remote host is
481        ready to receive commands.
482
483        Alternatively, you may use your own prompt pattern. In this case you
484        should call :meth:`login` with ``auto_prompt_reset=False``; then set the
485        :attr:`PROMPT` attribute to a regular expression. After that, the
486        :meth:`prompt` method will try to match your prompt pattern.
487        '''
488
489        self.sendline("unset PROMPT_COMMAND")
490        self.sendline(self.PROMPT_SET_SH) # sh-style
491        i = self.expect ([TIMEOUT, self.PROMPT], timeout=10)
492        if i == 0: # csh-style
493            self.sendline(self.PROMPT_SET_CSH)
494            i = self.expect([TIMEOUT, self.PROMPT], timeout=10)
495            if i == 0:
496                return False
497        return True
498
499# vi:ts=4:sw=4:expandtab:ft=python:
500