1# Copyright 2014 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import os
6import mock
7import unittest
8
9from telemetry.internal import forwarders
10from telemetry.internal.platform import network_controller_backend
11from telemetry.util import wpr_modes
12
13
14DEFAULT_PORTS = forwarders.PortSet(http=1111, https=2222, dns=3333)
15FORWARDER_HOST_IP = '123.321.123.321'
16EXPECTED_WPR_CA_CERT_PATH = os.path.join('[tempdir]', 'testca.pem')
17
18
19class FakePlatformBackend(object):
20  def __init__(self):
21    self.forwarder_factory = FakeForwarderFactory()
22    self.supports_test_ca = True
23    self.is_test_ca_installed = False
24    self.faulty_cert_installer = False
25    self.wpr_port_pairs = None
26    # Normally test using all default ports.
27    self.SetWprPortPairs(http=(0, 0), https=(0, 0), dns=(0, 0))
28
29  def SetWprPortPairs(self, http, https, dns):
30    self.wpr_port_pairs = forwarders.PortPairs(
31        forwarders.PortPair(*http),
32        forwarders.PortPair(*https),
33        forwarders.PortPair(*dns) if dns is not None else None)
34
35  def GetWprPortPairs(self):
36    return self.wpr_port_pairs
37
38  def InstallTestCa(self, ca_cert_path):
39    del ca_cert_path  # Unused argument.
40    self.is_test_ca_installed = True
41    # Exception is raised after setting the "installed" value to confirm that
42    # cleaup code is being called in case of errors.
43    if self.faulty_cert_installer:
44      raise Exception('Cert install failed!')
45
46  def RemoveTestCa(self):
47    self.is_test_ca_installed = False
48
49
50class FakeForwarderFactory(object):
51  def __init__(self):
52    self.host_ip = FORWARDER_HOST_IP
53
54  def Create(self, port_pairs):
55    return forwarders.Forwarder(port_pairs)
56
57
58class FakeReplayServer(object):
59  DEFAULT_PORTS = NotImplemented  # Will be assigned during test setUp.
60
61  def __init__(self, archive_path, host_ip, http_port, https_port, dns_port,
62               replay_args):
63    self.archive_path = archive_path
64    self.host_ip = host_ip
65    self.ports = forwarders.PortSet(
66        http_port or self.DEFAULT_PORTS.http,
67        https_port or self.DEFAULT_PORTS.https,
68        dns_port or self.DEFAULT_PORTS.dns if dns_port is not None else None)
69    self.replay_args = replay_args
70    self.is_running = False
71
72  def StartServer(self):
73    assert not self.is_running
74    self.is_running = True
75    return self.ports
76
77  def StopServer(self):
78    assert self.is_running
79    self.is_running = False
80
81
82class TestNetworkControllerBackend(
83    network_controller_backend.NetworkControllerBackend):
84  """Expose some private properties for testing purposes."""
85
86  @property
87  def wpr_ca_cert_path(self):
88    return self._wpr_ca_cert_path
89
90  @property
91  def replay_server(self):
92    return self._wpr_server
93
94  @property
95  def forwarder(self):
96    return self._forwarder
97
98  @property
99  def platform_backend(self):
100    return self._platform_backend
101
102
103class NetworkControllerBackendTest(unittest.TestCase):
104  def Patch(self, *args, **kwargs):
105    """Patch an object for the duration of a test, and return its mock."""
106    patcher = mock.patch(*args, **kwargs)
107    mock_object = patcher.start()
108    self.addCleanup(patcher.stop)
109    return mock_object
110
111  def PatchImportedModule(self, name):
112    """Shorthand to patch a module imported by network_controller_backend."""
113    return self.Patch(
114        'telemetry.internal.platform.network_controller_backend.%s' % name)
115
116  def setUp(self):
117    # Always use our FakeReplayServer.
118    FakeReplayServer.DEFAULT_PORTS = DEFAULT_PORTS  # Use global defaults.
119    self.Patch(
120        'telemetry.internal.util.webpagereplay.ReplayServer', FakeReplayServer)
121
122    # Pretend that only some predefined set of files exist.
123    def fake_path_exists(filename):
124      return filename in ['some-archive.wpr', 'another-archive.wpr']
125
126    self.Patch('os.path.exists', side_effect=fake_path_exists)
127
128    # Mock some imported modules.
129    mock_certutils = self.PatchImportedModule('certutils')
130    mock_certutils.openssl_import_error = None
131    mock_certutils.generate_dummy_ca_cert.return_value = ('-', '-')
132
133    mock_platformsettings = self.PatchImportedModule('platformsettings')
134    mock_platformsettings.HasSniSupport.return_value = True
135
136    mock_tempfile = self.PatchImportedModule('tempfile')
137    mock_tempfile.mkdtemp.return_value = '[tempdir]'
138
139    self.PatchImportedModule('shutil')
140
141    self.network_controller_backend = TestNetworkControllerBackend(
142        FakePlatformBackend())
143
144  def testOpenCloseController(self):
145    b = self.network_controller_backend
146    self.assertFalse(b.is_open)
147    b.Open(wpr_modes.WPR_REPLAY, ['--some-arg']) # Also installs test CA.
148    self.assertTrue(b.is_open)
149    self.assertTrue(b.is_test_ca_installed)
150    self.assertTrue(b.platform_backend.is_test_ca_installed)
151    b.Close() # Also removes test CA.
152    self.assertFalse(b.is_open)
153    self.assertFalse(b.is_test_ca_installed)
154    self.assertFalse(b.platform_backend.is_test_ca_installed)
155    b.Close()  # It's fine to close a closed controller.
156    self.assertFalse(b.is_open)
157
158  def testOpeningOpenControllerRaises(self):
159    b = self.network_controller_backend
160    b.Open(wpr_modes.WPR_REPLAY, ['--some-arg'])
161    with self.assertRaises(AssertionError):
162      b.Open(wpr_modes.WPR_REPLAY, ['--some-arg'])
163
164  def testInstallTestCaFailure(self):
165    b = self.network_controller_backend
166    b.platform_backend.faulty_cert_installer = True
167    b.Open(wpr_modes.WPR_REPLAY, ['--some-arg']) # Try to install test CA.
168
169    # Test CA is not installed, but the controller is otherwise open and safe
170    # to use.
171    self.assertTrue(b.is_open)
172    self.assertFalse(b.is_test_ca_installed)
173    self.assertFalse(b.platform_backend.is_test_ca_installed)
174    b.StartReplay('some-archive.wpr')
175    self.assertTrue(b.is_replay_active)
176
177    b.Close() # No test CA to remove.
178    self.assertFalse(b.is_open)
179    self.assertFalse(b.is_test_ca_installed)
180    self.assertFalse(b.platform_backend.is_test_ca_installed)
181
182  def testStartStopReplay(self):
183    b = self.network_controller_backend
184    b.Open(wpr_modes.WPR_REPLAY, ['--some-arg'])
185    self.assertFalse(b.is_replay_active)
186
187    b.StartReplay('some-archive.wpr')
188    self.assertTrue(b.is_replay_active)
189    self.assertTrue(b.replay_server.is_running)
190    self.assertIsNotNone(b.forwarder.port_pairs)
191
192    old_replay_server = b.replay_server
193    old_forwarder = b.forwarder
194    b.StopReplay()
195    self.assertFalse(b.is_replay_active)
196    self.assertFalse(old_replay_server.is_running)
197    self.assertIsNone(old_forwarder.port_pairs)
198    self.assertTrue(b.is_open)  # Controller is still open.
199
200    b.Close()
201    self.assertFalse(b.is_open)
202
203  def testClosingControllerAlsoStopsReplay(self):
204    b = self.network_controller_backend
205    b.Open(wpr_modes.WPR_REPLAY, ['--some-arg'])
206    b.StartReplay('some-archive.wpr')
207    self.assertTrue(b.is_replay_active)
208    self.assertTrue(b.replay_server.is_running)
209    self.assertIsNotNone(b.forwarder.port_pairs)
210
211    old_replay_server = b.replay_server
212    old_forwarder = b.forwarder
213    b.Close()
214    self.assertFalse(b.is_replay_active)
215    self.assertFalse(old_replay_server.is_running)
216    self.assertIsNone(old_forwarder.port_pairs)
217    self.assertFalse(b.is_open)
218
219  def testReplayOnClosedControllerRaises(self):
220    b = self.network_controller_backend
221    self.assertFalse(b.is_open)
222    with self.assertRaises(AssertionError):
223      b.StartReplay('some-archive.wpr')
224
225  def testReplayWithSameArgsReuseServer(self):
226    b = self.network_controller_backend
227    b.Open(wpr_modes.WPR_REPLAY, ['--some-arg'])
228
229    b.StartReplay('some-archive.wpr')
230    self.assertTrue(b.is_replay_active)
231    self.assertTrue(b.replay_server.is_running)
232
233    old_replay_server = b.replay_server
234    b.StartReplay('some-archive.wpr')
235    self.assertTrue(b.is_replay_active)
236    self.assertIs(b.replay_server, old_replay_server)
237    self.assertTrue(b.replay_server.is_running)
238
239  def testReplayWithDifferentArgsUseDifferentServer(self):
240    b = self.network_controller_backend
241    b.Open(wpr_modes.WPR_REPLAY, ['--some-arg'])
242
243    b.StartReplay('some-archive.wpr')
244    self.assertTrue(b.is_replay_active)
245    self.assertTrue(b.replay_server.is_running)
246
247    old_replay_server = b.replay_server
248    b.StartReplay('another-archive.wpr')
249    self.assertTrue(b.is_replay_active)
250    self.assertIsNot(b.replay_server, old_replay_server)
251    self.assertTrue(b.replay_server.is_running)
252    self.assertFalse(old_replay_server.is_running)
253
254  def testReplayWithoutArchivePathDoesNotStopReplay(self):
255    b = self.network_controller_backend
256    b.Open(wpr_modes.WPR_REPLAY, ['--some-arg'])
257
258    b.StartReplay('some-archive.wpr')
259    self.assertTrue(b.is_replay_active)
260    self.assertTrue(b.replay_server.is_running)
261    old_replay_server = b.replay_server
262
263    b.StartReplay(None)
264    self.assertTrue(b.is_replay_active)
265    self.assertIs(b.replay_server, old_replay_server)
266    self.assertTrue(b.replay_server.is_running)
267    self.assertEqual(b.replay_server.archive_path, 'some-archive.wpr')
268
269  def testModeOffDoesNotCreateReplayServer(self):
270    b = self.network_controller_backend
271    b.Open(wpr_modes.WPR_OFF, ['--some-arg'])
272    b.StartReplay('may-or-may-not-exist.wpr')
273    self.assertFalse(b.is_replay_active)
274    self.assertIsNone(b.replay_server)
275    self.assertIsNone(b.forwarder)
276
277  def testBadArchivePathRaises(self):
278    b = self.network_controller_backend
279    b.Open(wpr_modes.WPR_REPLAY, ['--some-arg'])
280    with self.assertRaises(network_controller_backend.ArchiveDoesNotExistError):
281      b.StartReplay('does-not-exist.wpr')
282
283  def testBadArchivePathOnRecordIsOkay(self):
284    b = self.network_controller_backend
285    b.Open(wpr_modes.WPR_RECORD, ['--some-arg'])
286    b.StartReplay('does-not-exist-yet.wpr')  # Does not raise.
287    self.assertTrue(b.is_replay_active)
288
289  def testReplayServerSettings(self):
290    b = self.network_controller_backend
291    b.Open(wpr_modes.WPR_RECORD, ['--some-arg'])
292    b.StartReplay('some-archive.wpr')
293
294    # Externally visible properties
295    self.assertTrue(b.is_replay_active)
296    self.assertEqual(b.host_ip, FORWARDER_HOST_IP)
297    self.assertEqual(b.wpr_device_ports, DEFAULT_PORTS)
298
299    # Private replay server settings.
300    self.assertTrue(b.replay_server.is_running)
301    self.assertEqual(b.replay_server.archive_path, 'some-archive.wpr')
302    self.assertEqual(b.replay_server.host_ip, FORWARDER_HOST_IP)
303    self.assertEqual(b.replay_server.replay_args, [
304        '--some-arg', '--record', '--inject_scripts=',
305        '--should_generate_certs',
306        '--https_root_ca_cert_path=%s' % EXPECTED_WPR_CA_CERT_PATH])
307
308  def testReplayServerOffSettings(self):
309    b = self.network_controller_backend
310    b.platform_backend.wpr_ca_cert_path = 'CERT_FILE'
311    b.Open(wpr_modes.WPR_OFF, ['--some-arg'])
312    b.StartReplay('some-archive.wpr')
313
314    self.assertFalse(b.is_replay_active)
315    self.assertEqual(b.host_ip, FORWARDER_HOST_IP)
316    self.assertEqual(b.wpr_device_ports, None)
317    self.assertIsNone(b.replay_server)
318
319  def testUseDefaultPorts(self):
320    b = self.network_controller_backend
321    b.Open(wpr_modes.WPR_REPLAY, ['--some-arg'])
322    b.StartReplay('some-archive.wpr')
323    self.assertEqual(b.replay_server.ports, DEFAULT_PORTS)
324    self.assertEqual(b.wpr_device_ports, DEFAULT_PORTS)
325
326    # Invariant
327    self.assertEqual(b.forwarder.port_pairs.local_ports, b.replay_server.ports)
328    self.assertEqual(b.forwarder.port_pairs.remote_ports, b.wpr_device_ports)
329
330  def testUseDefaultLocalPorts(self):
331    b = self.network_controller_backend
332    b.platform_backend.SetWprPortPairs(
333        http=(0, 8888), https=(0, 4444), dns=(0, 2222))
334    b.Open(wpr_modes.WPR_REPLAY, ['--some-arg'])
335    b.StartReplay('some-archive.wpr')
336    self.assertEqual(b.replay_server.ports, DEFAULT_PORTS)
337    self.assertEqual(b.wpr_device_ports, forwarders.PortSet(8888, 4444, 2222))
338
339    # Invariant
340    self.assertEqual(b.forwarder.port_pairs.local_ports, b.replay_server.ports)
341    self.assertEqual(b.forwarder.port_pairs.remote_ports, b.wpr_device_ports)
342
343  def testUseSpecificPorts(self):
344    b = self.network_controller_backend
345    b.platform_backend.SetWprPortPairs(
346        http=(88, 8888), https=(44, 4444), dns=None)
347    b.Open(wpr_modes.WPR_REPLAY, ['--some-arg'])
348    b.StartReplay('some-archive.wpr')
349    self.assertEqual(b.replay_server.ports, forwarders.PortSet(88, 44, None))
350    self.assertEqual(b.wpr_device_ports, forwarders.PortSet(8888, 4444, None))
351
352    # Invariant
353    self.assertEqual(b.forwarder.port_pairs.local_ports, b.replay_server.ports)
354    self.assertEqual(b.forwarder.port_pairs.remote_ports, b.wpr_device_ports)
355
356  def testRestartReplayShouldReusePorts(self):
357    FakeReplayServer.DEFAULT_PORTS = forwarders.PortSet(123, 456, 789)
358    b = self.network_controller_backend
359    b.Open(wpr_modes.WPR_REPLAY, ['--some-arg'])
360    b.StartReplay('some-archive.wpr')
361    self.assertEqual(b.wpr_device_ports, forwarders.PortSet(123, 456, 789))
362
363    # If replay restarts, the factory may use a different set of default ports.
364    FakeReplayServer.DEFAULT_PORTS = forwarders.PortSet(987, 654, 321)
365    b.StartReplay('another-archive.wpr')
366
367    # However same ports must be used, because apps/browsers may already be
368    # configured to use the old set of ports.
369    self.assertEqual(b.wpr_device_ports, forwarders.PortSet(123, 456, 789))
370
371  def testNewControllerSessionMayUseDifferentPorts(self):
372    FakeReplayServer.DEFAULT_PORTS = forwarders.PortSet(123, 456, 789)
373    b = self.network_controller_backend
374    b.Open(wpr_modes.WPR_REPLAY, ['--some-arg'])
375    b.StartReplay('some-archive.wpr')
376    self.assertEqual(b.wpr_device_ports, forwarders.PortSet(123, 456, 789))
377    b.Close()
378
379    # If replay restarts, the factory may use a different set of default ports.
380    FakeReplayServer.DEFAULT_PORTS = forwarders.PortSet(987, 654, 321)
381    b.Open(wpr_modes.WPR_REPLAY, ['--some-arg'])
382    b.StartReplay('some-archive.wpr')
383
384    # This time the network controller session was closed between replay's,
385    # so it's fine to use a different set of ports.
386    self.assertEqual(b.wpr_device_ports, forwarders.PortSet(987, 654, 321))
387