1# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import multiprocessing
7import os
8import threading
9
10from autotest_lib.client.common_lib import autotemp
11from autotest_lib.server import utils
12
13_MASTER_SSH_COMMAND_TEMPLATE = (
14    '/usr/bin/ssh -a -x -N '
15    '-o ControlMaster=yes '  # Create multiplex socket.
16    '-o ControlPath=%(socket)s '
17    '-o StrictHostKeyChecking=no '
18    '-o UserKnownHostsFile=/dev/null '
19    '-o BatchMode=yes '
20    '-o ConnectTimeout=30 '
21    '-o ServerAliveInterval=900 '
22    '-o ServerAliveCountMax=3 '
23    '-o ConnectionAttempts=4 '
24    '-o Protocol=2 '
25    '-l %(user)s -p %(port)d %(hostname)s')
26
27
28class MasterSsh(object):
29    """Manages multiplex ssh connection."""
30
31    def __init__(self, hostname, user, port):
32        self._hostname = hostname
33        self._user = user
34        self._port = port
35
36        self._master_job = None
37        self._master_tempdir = None
38
39        self._lock = multiprocessing.Lock()
40
41    def __del__(self):
42        self.close()
43
44    @property
45    def _socket_path(self):
46        return os.path.join(self._master_tempdir.name, 'socket')
47
48    @property
49    def ssh_option(self):
50        """Returns the ssh option to use this multiplexed ssh.
51
52        If background process is not running, returns an empty string.
53        """
54        if not self._master_tempdir:
55            return ''
56        return '-o ControlPath=%s' % (self._socket_path,)
57
58    def maybe_start(self, timeout=5):
59        """Starts the background process to run multiplex ssh connection.
60
61        If there already is a background process running, this does nothing.
62        If there is a stale process or a stale socket, first clean them up,
63        then create a background process.
64
65        @param timeout: timeout in seconds (default 5) to wait for master ssh
66                        connection to be established. If timeout is reached, a
67                        warning message is logged, but no other action is
68                        taken.
69        """
70        # Multiple processes might try in parallel to clean up the old master
71        # ssh connection and create a new one, therefore use a lock to protect
72        # against race conditions.
73        with self._lock:
74            # If a previously started master SSH connection is not running
75            # anymore, it needs to be cleaned up and then restarted.
76            if (self._master_job and (not os.path.exists(self._socket_path) or
77                                      self._master_job.sp.poll() is not None)):
78                logging.info(
79                        'Master ssh connection to %s is down.', self._hostname)
80                self._close_internal()
81
82            # Start a new master SSH connection.
83            if not self._master_job:
84                # Create a shared socket in a temp location.
85                self._master_tempdir = autotemp.tempdir(
86                        unique_id='ssh-master', dir='/tmp')
87
88                # Start the master SSH connection in the background.
89                master_cmd = _MASTER_SSH_COMMAND_TEMPLATE % {
90                        'hostname': self._hostname,
91                        'user': self._user,
92                        'port': self._port,
93                        'socket': self._socket_path,
94                }
95                logging.info(
96                        'Starting master ssh connection \'%s\'', master_cmd)
97                self._master_job = utils.BgJob(
98                         master_cmd, nickname='master-ssh',
99                         stdout_tee=utils.DEVNULL, stderr_tee=utils.DEVNULL,
100                         unjoinable=True)
101
102                # To prevent a race between the master ssh connection
103                # startup and its first attempted use, wait for socket file to
104                # exist before returning.
105                try:
106                    utils.poll_for_condition(
107                            condition=lambda: os.path.exists(self._socket_path),
108                            timeout=timeout,
109                            sleep_interval=0.2,
110                            desc='Wait for a socket file to exist')
111                # log the issue if it fails, but don't throw an exception
112                except utils.TimeoutError:
113                    logging.info('Timed out waiting for master-ssh connection '
114                                 'to be established.')
115
116
117    def close(self):
118        """Releases all resources used by multiplexed ssh connection."""
119        with self._lock:
120            self._close_internal()
121
122    def _close_internal(self):
123        # Assume that when this is called, _lock should be acquired, already.
124        if self._master_job:
125            logging.debug('Nuking ssh master_job')
126            utils.nuke_subprocess(self._master_job.sp)
127            self._master_job = None
128
129        if self._master_tempdir:
130            logging.debug('Cleaning ssh master_tempdir')
131            self._master_tempdir.clean()
132            self._master_tempdir = None
133
134
135class ConnectionPool(object):
136    """Holds SSH multiplex connection instance."""
137
138    def __init__(self):
139        self._pool = {}
140        self._lock = threading.Lock()
141
142    def get(self, hostname, user, port):
143        """Returns MasterSsh instance for the given endpoint.
144
145        If the pool holds the instance already, returns it. If not, create the
146        instance, and returns it.
147
148        Caller has the responsibility to call maybe_start() before using it.
149
150        @param hostname: Host name of the endpoint.
151        @param user: User name to log in.
152        @param port: Port number sshd is listening.
153        """
154        key = (hostname, user, port)
155        logging.debug('Get master ssh connection for %s@%s:%d', user, hostname,
156                      port)
157
158        with self._lock:
159            conn = self._pool.get(key)
160            if not conn:
161                conn = MasterSsh(hostname, user, port)
162                self._pool[key] = conn
163            return conn
164
165    def shutdown(self):
166        """Closes all ssh multiplex connections."""
167        for ssh in self._pool.itervalues():
168            ssh.close()
169