1# Copyright 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import sys
7import threading
8
9from autotest_lib.client.common_lib import error
10from autotest_lib.client.cros.enterprise import enterprise_policy_base
11from autotest_lib.client.cros.enterprise.network_config import ProxyConfig
12from SocketServer import ThreadingTCPServer, StreamRequestHandler
13from telemetry.core import exceptions as telemetry_exceptions
14
15
16class ProxyHandler(StreamRequestHandler):
17    """Provide request handler for the Threaded Proxy Listener."""
18    def handle(self):
19        """
20        Get URL of request from first line.
21
22        Read the first line of the request, up to 40 characters, and look
23        for the URL of the request. If found, save it to the URL list.
24
25        Note: All requests are sent an HTTP 504 error.
26
27        """
28        # Capture URL in first 40 chars of request.
29        data = self.rfile.readline(40).strip()
30        logging.debug('ProxyHandler::handle(): <%s>', data)
31        self.server.store_requests_received(data)
32        self.wfile.write('HTTP/1.1 504 Gateway Timeout\r\n'
33                         'Connection: close\r\n\r\n')
34
35
36class ThreadedProxyServer(ThreadingTCPServer):
37    """
38    Provide a Threaded Proxy Server to service and save requests.
39
40    Define a Threaded Proxy Server which services requests, and allows the
41    handler to save all requests.
42
43    """
44    def __init__(self, server_address, HandlerClass):
45        """
46        Constructor.
47
48        @param server_address: tuple of server IP and port to listen on.
49        @param HandlerClass: the RequestHandler class to instantiate per req.
50
51        """
52        self.requests_received = []
53        ThreadingTCPServer.allow_reuse_address = True
54        ThreadingTCPServer.__init__(self, server_address, HandlerClass)
55
56    def store_requests_received(self, request):
57        """
58        Add receieved request to list.
59
60        @param request: request received by the proxy server.
61
62        """
63        self.requests_received.append(request)
64
65
66class ProxyListener(object):
67    """
68    Provide a Proxy Listener to detect connect requests.
69
70    Define a proxy listener to detect when a CONNECT request is seen at the
71    given |server_address|, and record all requests received. Requests
72    received are exposed to the caller.
73
74    """
75    def __init__(self, server_address):
76        """
77        Constructor.
78
79        @param server_address: tuple of server IP and port to listen on.
80
81        """
82        self._server = ThreadedProxyServer(server_address, ProxyHandler)
83        self._thread = threading.Thread(target=self._server.serve_forever)
84
85    def run(self):
86        """Start the server by activating its thread."""
87        self._thread.start()
88
89    def stop(self):
90        """Stop the server and its threads."""
91        self._server.server_close()
92        self._thread.join()
93
94    def get_requests_received(self):
95        """Get list of received requests."""
96        return self._server.requests_received
97
98    def reset_requests_received(self):
99        """Clear list of received requests."""
100        self._server.requests_received = []
101
102
103class policy_ProxySettings(enterprise_policy_base.EnterprisePolicyTest):
104    """
105    Test effect of ProxySettings policy on Chrome OS behavior.
106
107    This test verifies the behavior of Chrome OS for specific configurations
108    of the ProxySettings use policy: None (undefined), ProxyMode=direct,
109    ProxyMode=fixed_servers, ProxyMode=pac_script. None means that the policy
110    value is not set. This induces the default behavior, equivalent to what is
111    seen by an un-managed user.
112
113    When ProxySettings is None (undefined) or ProxyMode=direct, then no proxy
114    server should be used. When ProxyMode=fixed_servers or pac_script, then
115    the proxy server address specified by the ProxyServer or ProxyPacUrl
116    entry should be used.
117
118    """
119    version = 1
120
121    def initialize(self, **kwargs):
122        """Initialize this test."""
123        self._initialize_test_constants()
124        super(policy_ProxySettings, self).initialize(**kwargs)
125        self._proxy_server = ProxyListener(('', self.PROXY_PORT))
126        self._proxy_server.run()
127        self.start_webserver()
128
129
130    def _initialize_test_constants(self):
131        """Initialize test-specific constants, some from class constants."""
132        self.POLICY_NAME = 'ProxySettings'
133        self.PROXY_PORT = 3128
134        self.PAC_FILE = 'proxy_test.pac'
135        self.PAC_URL = '%s/%s' % (self.WEB_HOST, self.PAC_FILE)
136        self.BYPASS_URLS = ['www.google.com', 'www.googleapis.com']
137        self.FIXED_PROXY = {
138            'ProxyBypassList': ','.join(self.BYPASS_URLS),
139            'ProxyMode': 'fixed_servers',
140            'ProxyServer': 'localhost:%s' % self.PROXY_PORT
141        }
142        self.PAC_PROXY = {
143            'ProxyMode': 'pac_script',
144            'ProxyPacUrl': self.PAC_URL
145        }
146        self.DIRECT_PROXY = {
147            'ProxyMode': 'direct'
148        }
149        self.TEST_URL = 'http://www.cnn.com/'
150        self.TEST_CASES = {
151            'FixedProxy_UseFixedProxy': self.FIXED_PROXY,
152            'PacProxy_UsePacFile': self.PAC_PROXY,
153            'DirectProxy_UseNoProxy': self.DIRECT_PROXY,
154            'NotSet_UseNoProxy': None,
155        }
156        self.PROXY_CONFIGS = {
157            'DirectProxy_UseNoProxy_ONC': ProxyConfig(type='Direct'),
158            'PacProxy_UsePacFile_ONC': ProxyConfig(type='PAC',
159                                           pac_url=self.PAC_URL),
160            'FixedProxy_UseFixedProxy_ONC': ProxyConfig(type='Manual',
161                                                host='localhost',
162                                                port=self.PROXY_PORT,
163                                                exclude_urls=self.BYPASS_URLS)
164        }
165
166
167    def cleanup(self):
168        """Stop proxy server and cleanup."""
169        self._proxy_server.stop()
170        super(policy_ProxySettings, self).cleanup()
171
172
173    def navigate_to_url_with_retry(self, url, total_tries=1):
174        """
175        Navigate to url, attempting retry_count times if it fails to load.
176
177        @param url: string of the url to load.
178        @param total_tries: number of attempts to load the page.
179
180        @raises: error.TestError if page load times out.
181
182        """
183        for i in xrange(total_tries):
184            try:
185                self.navigate_to_url(url)
186            except telemetry_exceptions.TimeoutError as e:
187                if i == total_tries - 1:
188                    logging.error('Timeout error: %s [%s].', str(e),
189                                  sys.exc_info())
190                    raise error.TestError('Could not load %s after '
191                                          '%s tries.' % (url, total_tries))
192                else:
193                    logging.debug('Retrying page load of %s.', url)
194                    logging.debug('Timeout error: %s.', str(e))
195            else:
196                break
197
198
199    def _test_proxy_configuration(self, mode):
200        """
201        Verify CrOS enforces the specified ProxySettings configuration.
202
203        @param mode: Type of proxy.
204
205        @raises error.TestFail if behavior does not match expected.
206
207        """
208        self._proxy_server.reset_requests_received()
209        self.navigate_to_url_with_retry(url=self.TEST_URL, total_tries=2)
210        proxied_requests = self._proxy_server.get_requests_received()
211
212        matching_requests = [request for request in proxied_requests
213                             if self.TEST_URL in request]
214        logging.info('matching_requests: %s', matching_requests)
215
216        if mode is None or mode == 'direct':
217            if matching_requests:
218                raise error.TestFail('Requests should not have been sent '
219                                     'through the proxy server.')
220        elif mode == 'fixed_servers' or mode == 'pac_script':
221            if not matching_requests:
222                raise error.TestFail('Requests should have been sent '
223                                     'through the proxy server.')
224        else:
225            raise error.TestFail('Unrecognized Mode %s' % mode)
226
227
228    def run_once(self, case):
229        """
230        Setup and run the test configured for the specified test case.
231
232        Sets up a proxy using either the ProxyMode policy or ONC policy.
233
234        @param case: Name of the test case to run: see TEST_CASES.
235
236        """
237        if case.endswith('_ONC'):
238            proxy = self.PROXY_CONFIGS[case]
239            self.setup_case(user_policies={
240                'OpenNetworkConfiguration': proxy.policy()
241            })
242            mode = proxy.mode()
243        else:
244            case_value = self.TEST_CASES[case]
245            self.setup_case(user_policies={self.POLICY_NAME: case_value})
246            mode = case_value['ProxyMode'] if case_value else None
247
248        self._test_proxy_configuration(mode)
249