1# Lint as: python2, python3
2# Copyright (c) 2015 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import six.moves.http_client
7import logging
8import socket
9import tempfile
10import time
11import six.moves.xmlrpc_client
12
13import common
14from autotest_lib.client.bin import utils
15from autotest_lib.client.common_lib import error
16from autotest_lib.client.common_lib.cros import retry
17
18try:
19    import jsonrpclib
20except ImportError:
21    jsonrpclib = None
22
23
24class RpcServerTracker(object):
25    """
26    This class keeps track of all the RPC server connections started on a remote
27    host. The caller can use either |xmlrpc_connect| or |jsonrpc_connect| to
28    start the required type of rpc server on the remote host.
29    The host will cleanup all the open RPC server connections on disconnect.
30    """
31
32    _RPC_PROXY_URL_FORMAT = 'http://localhost:%d'
33    _RPC_HOST_ADDRESS_FORMAT = 'localhost:%d'
34    _RPC_SHUTDOWN_POLLING_PERIOD_SECONDS = 2
35    _RPC_SHUTDOWN_TIMEOUT_SECONDS = 10
36
37    def __init__(self, host):
38        """
39        @param port: The host object associated with this instance of
40                     RpcServerTracker.
41        """
42        self._host = host
43        self._rpc_proxy_map = {}
44
45
46    def _setup_port(self, port, command_name, remote_pid=None):
47        """Sets up a tunnel process and register it to rpc_server_tracker.
48
49        Chrome OS on the target closes down most external ports for security.
50        We could open the port, but doing that would conflict with security
51        tests that check that only expected ports are open.  So, to get to
52        the port on the target we use an ssh tunnel.
53
54        This method assumes that xmlrpc and jsonrpc never conflict, since
55        we can only either have an xmlrpc or a jsonrpc server listening on
56        a remote port. As such, it enforces a single proxy->remote port
57        policy, i.e if one starts a jsonrpc proxy/server from port A->B,
58        and then tries to start an xmlrpc proxy forwarded to the same port,
59        the xmlrpc proxy will override the jsonrpc tunnel process, however:
60
61        1. None of the methods on the xmlrpc proxy will work because
62        the server listening on B is jsonrpc.
63
64        2. The xmlrpc client cannot initiate a termination of the JsonRPC
65        server, as the only use case currently is goofy, which is tied to
66        the factory image. It is much easier to handle a failed xmlrpc
67        call on the client than it is to terminate goofy in this scenario,
68        as doing the latter might leave the DUT in a hard to recover state.
69
70        With the current implementation newer rpc proxy connections will
71        terminate the tunnel processes of older rpc connections tunneling
72        to the same remote port. If methods are invoked on the client
73        after this has happened they will fail with connection closed errors.
74
75        @param port: The remote forwarding port.
76        @param command_name: The name of the remote process, to terminate
77                                using pkill.
78        @param remote_pid: The PID of the remote background process
79                            as a string.
80
81        @return the local port which is used for port forwarding on the ssh
82                    client.
83        """
84        self.disconnect(port)
85        local_port = utils.get_unused_port()
86        tunnel_proc = self._host.create_ssh_tunnel(port, local_port)
87        self._rpc_proxy_map[port] = (command_name, tunnel_proc, remote_pid)
88        return local_port
89
90
91    def _setup_rpc(self, port, command_name, remote_pid=None):
92        """Construct a URL for an rpc connection using ssh tunnel.
93
94        @param port: The remote forwarding port.
95        @param command_name: The name of the remote process, to terminate
96                              using pkill.
97        @param remote_pid: The PID of the remote background process
98                            as a string.
99
100        @return a url that we can use to initiate the rpc connection.
101        """
102        return self._RPC_PROXY_URL_FORMAT % self._setup_port(
103                port, command_name, remote_pid=remote_pid)
104
105
106    def tunnel_connect(self, port):
107        """Construct a host address using ssh tunnel.
108
109        @param port: The remote forwarding port.
110
111        @return a host address using ssh tunnel.
112        """
113        return self._RPC_HOST_ADDRESS_FORMAT % self._setup_port(port, None)
114
115
116    def xmlrpc_connect(self, command, port, command_name=None,
117                       ready_test_name=None, timeout_seconds=10,
118                       logfile=None, request_timeout_seconds=None,
119                       server_desc=None):
120        """Connect to an XMLRPC server on the host.
121
122        The `command` argument should be a simple shell command that
123        starts an XMLRPC server on the given `port`.  The command
124        must not daemonize, and must terminate cleanly on SIGTERM.
125        The command is started in the background on the host, and a
126        local XMLRPC client for the server is created and returned
127        to the caller.
128
129        Note that the process of creating an XMLRPC client makes no
130        attempt to connect to the remote server; the caller is
131        responsible for determining whether the server is running
132        correctly, and is ready to serve requests.
133
134        Optionally, the caller can pass ready_test_name, a string
135        containing the name of a method to call on the proxy.  This
136        method should take no parameters and return successfully only
137        when the server is ready to process client requests.  When
138        ready_test_name is set, xmlrpc_connect will block until the
139        proxy is ready, and throw a TestError if the server isn't
140        ready by timeout_seconds.
141
142        If a server is already running on the remote port, this
143        method will kill it and disconnect the tunnel process
144        associated with the connection before establishing a new one,
145        by consulting the rpc_proxy_map in disconnect.
146
147        @param command Shell command to start the server.
148        @param port Port number on which the server is expected to
149                    be serving.
150        @param command_name String to use as input to `pkill` to
151            terminate the XMLRPC server on the host.
152        @param ready_test_name String containing the name of a
153            method defined on the XMLRPC server.
154        @param timeout_seconds Number of seconds to wait
155            for the server to become 'ready.'  Will throw a
156            TestFail error if server is not ready in time.
157        @param logfile Logfile to send output when running
158            'command' argument.
159        @param request_timeout_seconds Timeout in seconds for an XMLRPC request.
160        @param server_desc: Extra text to report in socket.error descriptions.
161
162        """
163        # Clean up any existing state.  If the caller is willing
164        # to believe their server is down, we ought to clean up
165        # any tunnels we might have sitting around.
166        self.disconnect(port)
167        remote_pid = None
168        if command is not None:
169            if logfile:
170                remote_cmd = '%s > %s 2>&1' % (command, logfile)
171            else:
172                remote_cmd = command
173            remote_pid = self._host.run_background(remote_cmd)
174            logging.debug('Started XMLRPC server on host %s, pid = %s',
175                          self._host.hostname, remote_pid)
176
177        # Tunnel through SSH to be able to reach that remote port.
178        rpc_url = self._setup_rpc(port, command_name, remote_pid=remote_pid)
179        if not server_desc:
180            server_desc = "<%s '%s:%s'>" % (command_name or 'XMLRPC',
181                                            self._host.hostname, port)
182        server_desc = '%s (%s)' % (server_desc, rpc_url.replace('http://', ''))
183        if request_timeout_seconds is not None:
184            proxy = TimeoutXMLRPCServerProxy(
185                    rpc_url, timeout=request_timeout_seconds, allow_none=True)
186        else:
187            proxy = six.moves.xmlrpc_client.ServerProxy(rpc_url, allow_none=True)
188
189        if ready_test_name is not None:
190            # retry.retry logs each attempt; calculate delay_sec to
191            # keep log spam to a dull roar.
192            @retry.retry((socket.error,
193                          six.moves.xmlrpc_client.ProtocolError,
194                          six.moves.http_client.BadStatusLine),
195                         timeout_min=timeout_seconds / 60.0,
196                         delay_sec=min(max(timeout_seconds / 20.0, 0.1), 1))
197            def ready_test():
198                """ Call proxy.ready_test_name(). """
199                try:
200                    getattr(proxy, ready_test_name)()
201                except socket.error as e:
202                    e.filename = server_desc
203                    raise
204
205            try:
206                logging.info('Waiting %d seconds for XMLRPC server '
207                             'to start.', timeout_seconds)
208                ready_test()
209            except Exception as exc:
210                log_lines = []
211                if logfile:
212                    logging.warn('Failed to start XMLRPC server; getting log.')
213                    with tempfile.NamedTemporaryFile() as temp:
214                        self._host.get_file(logfile, temp.name)
215                        with open(temp.name) as f:
216                            log_lines = f.read().rstrip().splitlines()
217                else:
218                    logging.warn('Failed to start XMLRPC server; no log.')
219
220                logging.error(
221                        'Failed to start XMLRPC server:  %s.%s: %s.',
222                        type(exc).__module__, type(exc).__name__,
223                        str(exc).rstrip('.'))
224
225                if isinstance(exc, six.moves.http_client.BadStatusLine):
226                    # BadStatusLine: inject the last log line into the message,
227                    # using the 'line' and 'args' attributes.
228                    if log_lines:
229                        if exc.line:
230                            exc.line = '%s -- Log tail: %r' % (
231                                    exc.line, log_lines[-1])
232                        else:
233                            exc.line = 'Log tail: %r' % (
234                                    log_lines[-1])
235                        exc.args = (exc.line,)
236                elif isinstance(exc, socket.error):
237                    # socket.error: inject the last log line into the message,
238                    # using the 'filename' attribute.
239                    if log_lines:
240                        if exc.filename:
241                            exc.filename = '%s -- Log tail: %r' % (
242                                    exc.filename, log_lines[-1])
243                        else:
244                            exc.filename = 'Log tail: %r' % log_lines[-1]
245                elif log_lines:
246                    # Unusual failure: can't inject the last log line,
247                    # so report it via logging.
248                    logging.error('Log tail: %r', log_lines[-1])
249
250                if len(log_lines) > 1:
251                    # The failure messages include only the last line,
252                    # so report the whole thing if it had more lines.
253                    logging.error('Full XMLRPC server log:\n%s',
254                                  '\n'.join(log_lines))
255
256                self.disconnect(port)
257                raise
258        logging.info('XMLRPC server started successfully.')
259        return proxy
260
261
262    def jsonrpc_connect(self, port):
263        """Creates a jsonrpc proxy connection through an ssh tunnel.
264
265        This method exists to facilitate communication with goofy (which is
266        the default system manager on all factory images) and as such, leaves
267        most of the rpc server sanity checking to the caller. Unlike
268        xmlrpc_connect, this method does not facilitate the creation of a remote
269        jsonrpc server, as the only clients of this code are factory tests,
270        for which the goofy system manager is built in to the image and starts
271        when the target boots.
272
273        One can theoretically create multiple jsonrpc proxies all forwarded
274        to the same remote port, provided the remote port has an rpc server
275        listening. However, in doing so we stand the risk of leaking an
276        existing tunnel process, so we always disconnect any older tunnels
277        we might have through disconnect.
278
279        @param port: port on the remote host that is serving this proxy.
280
281        @return: The client proxy.
282        """
283        if not jsonrpclib:
284            logging.warning('Jsonrpclib could not be imported. Check that '
285                            'site-packages contains jsonrpclib.')
286            return None
287
288        proxy = jsonrpclib.jsonrpc.ServerProxy(self._setup_rpc(port, None))
289
290        logging.info('Established a jsonrpc connection through port %s.', port)
291        return proxy
292
293    def disconnect(self, port, pkill=True):
294        """Disconnect from an RPC server on the host.
295
296        Terminates the remote RPC server previously started for
297        the given `port`.  Also closes the local ssh tunnel created
298        for the connection to the host.  This function does not
299        directly alter the state of a previously returned RPC
300        client object; however disconnection will cause all
301        subsequent calls to methods on the object to fail.
302
303        This function does nothing if requested to disconnect a port
304        that was not previously connected via _setup_rpc.
305
306        @param port Port number passed to a previous call to `_setup_rpc()`.
307        @param pkill: if True, ssh in to the server and pkill the process.
308        """
309        if port not in self._rpc_proxy_map:
310            return
311        remote_name, tunnel_proc, remote_pid = self._rpc_proxy_map[port]
312        if pkill and remote_name:
313            # We use 'pkill' to find our target process rather than
314            # a PID, because the host may have rebooted since
315            # connecting, and we don't want to kill an innocent
316            # process with the same PID.
317            #
318            # 'pkill' helpfully exits with status 1 if no target
319            # process  is found, for which run() will throw an
320            # exception.  We don't want that, so we the ignore
321            # status.
322            self._host.run("pkill -f '%s'" % remote_name, ignore_status=True)
323            if remote_pid:
324                logging.info('Waiting for RPC server "%s" shutdown',
325                             remote_name)
326                start_time = time.time()
327                while (time.time() - start_time <
328                       self._RPC_SHUTDOWN_TIMEOUT_SECONDS):
329                    running_processes = self._host.run(
330                            "pgrep -f '%s'" % remote_name,
331                            ignore_status=True).stdout.split()
332                    if not remote_pid in running_processes:
333                        logging.info('Shut down RPC server.')
334                        break
335                    time.sleep(self._RPC_SHUTDOWN_POLLING_PERIOD_SECONDS)
336                else:
337                    raise error.TestError('Failed to shutdown RPC server %s' %
338                                          remote_name)
339
340        self._host.disconnect_ssh_tunnel(tunnel_proc)
341        del self._rpc_proxy_map[port]
342
343
344    def disconnect_all(self):
345        """Disconnect all known RPC proxy ports."""
346        for port in self._rpc_proxy_map.keys():
347            self.disconnect(port)
348
349
350class TimeoutXMLRPCServerProxy(six.moves.xmlrpc_client.ServerProxy):
351    """XMLRPC ServerProxy supporting timeout."""
352    def __init__(self, uri, timeout=20, *args, **kwargs):
353        """Initializes a TimeoutXMLRPCServerProxy.
354
355        @param uri: URI to a XMLRPC server.
356        @param timeout: Timeout in seconds for a XMLRPC request.
357        @param *args: args to xmlrpclib.ServerProxy.
358        @param **kwargs: kwargs to xmlrpclib.ServerProxy.
359
360        """
361        if timeout:
362            kwargs['transport'] = TimeoutXMLRPCTransport(timeout=timeout)
363        six.moves.xmlrpc_client.ServerProxy.__init__(self, uri, *args, **kwargs)
364
365
366class TimeoutXMLRPCTransport(six.moves.xmlrpc_client.Transport):
367    """A Transport subclass supporting timeout."""
368    def __init__(self, timeout=20, *args, **kwargs):
369        """Initializes a TimeoutXMLRPCTransport.
370
371        @param timeout: Timeout in seconds for a HTTP request through this transport layer.
372        @param *args: args to xmlrpclib.Transport.
373        @param **kwargs: kwargs to xmlrpclib.Transport.
374
375        """
376        six.moves.xmlrpc_client.Transport.__init__(self, *args, **kwargs)
377        self.timeout = timeout
378
379
380    def make_connection(self, host):
381        """Overwrites make_connection in xmlrpclib.Transport with timeout.
382
383        @param host: Host address to connect.
384
385        @return: A httplib.HTTPConnection connecting to host with timeout.
386
387        """
388        conn = six.moves.http_client.HTTPConnection(host, timeout=self.timeout)
389        return conn
390