1# Copyright 2014 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import os 6import mock 7import unittest 8 9from telemetry.internal import forwarders 10from telemetry.internal.platform import network_controller_backend 11from telemetry.util import wpr_modes 12 13 14DEFAULT_PORTS = forwarders.PortSet(http=1111, https=2222, dns=3333) 15FORWARDER_HOST_IP = '123.321.123.321' 16EXPECTED_WPR_CA_CERT_PATH = os.path.join('[tempdir]', 'testca.pem') 17 18 19class FakePlatformBackend(object): 20 def __init__(self): 21 self.forwarder_factory = FakeForwarderFactory() 22 self.supports_test_ca = True 23 self.is_test_ca_installed = False 24 self.faulty_cert_installer = False 25 self.wpr_port_pairs = None 26 # Normally test using all default ports. 27 self.SetWprPortPairs(http=(0, 0), https=(0, 0), dns=(0, 0)) 28 29 def SetWprPortPairs(self, http, https, dns): 30 self.wpr_port_pairs = forwarders.PortPairs( 31 forwarders.PortPair(*http), 32 forwarders.PortPair(*https), 33 forwarders.PortPair(*dns) if dns is not None else None) 34 35 def GetWprPortPairs(self): 36 return self.wpr_port_pairs 37 38 def InstallTestCa(self, ca_cert_path): 39 del ca_cert_path # Unused argument. 40 self.is_test_ca_installed = True 41 # Exception is raised after setting the "installed" value to confirm that 42 # cleaup code is being called in case of errors. 43 if self.faulty_cert_installer: 44 raise Exception('Cert install failed!') 45 46 def RemoveTestCa(self): 47 self.is_test_ca_installed = False 48 49 50class FakeForwarderFactory(object): 51 def __init__(self): 52 self.host_ip = FORWARDER_HOST_IP 53 54 def Create(self, port_pairs): 55 return forwarders.Forwarder(port_pairs) 56 57 58class FakeReplayServer(object): 59 DEFAULT_PORTS = NotImplemented # Will be assigned during test setUp. 60 61 def __init__(self, archive_path, host_ip, http_port, https_port, dns_port, 62 replay_args): 63 self.archive_path = archive_path 64 self.host_ip = host_ip 65 self.ports = forwarders.PortSet( 66 http_port or self.DEFAULT_PORTS.http, 67 https_port or self.DEFAULT_PORTS.https, 68 dns_port or self.DEFAULT_PORTS.dns if dns_port is not None else None) 69 self.replay_args = replay_args 70 self.is_running = False 71 72 def StartServer(self): 73 assert not self.is_running 74 self.is_running = True 75 return self.ports 76 77 def StopServer(self): 78 assert self.is_running 79 self.is_running = False 80 81 82class TestNetworkControllerBackend( 83 network_controller_backend.NetworkControllerBackend): 84 """Expose some private properties for testing purposes.""" 85 86 @property 87 def wpr_ca_cert_path(self): 88 return self._wpr_ca_cert_path 89 90 @property 91 def replay_server(self): 92 return self._wpr_server 93 94 @property 95 def forwarder(self): 96 return self._forwarder 97 98 @property 99 def platform_backend(self): 100 return self._platform_backend 101 102 103class NetworkControllerBackendTest(unittest.TestCase): 104 def Patch(self, *args, **kwargs): 105 """Patch an object for the duration of a test, and return its mock.""" 106 patcher = mock.patch(*args, **kwargs) 107 mock_object = patcher.start() 108 self.addCleanup(patcher.stop) 109 return mock_object 110 111 def PatchImportedModule(self, name): 112 """Shorthand to patch a module imported by network_controller_backend.""" 113 return self.Patch( 114 'telemetry.internal.platform.network_controller_backend.%s' % name) 115 116 def setUp(self): 117 # Always use our FakeReplayServer. 118 FakeReplayServer.DEFAULT_PORTS = DEFAULT_PORTS # Use global defaults. 119 self.Patch( 120 'telemetry.internal.util.webpagereplay.ReplayServer', FakeReplayServer) 121 122 # Pretend that only some predefined set of files exist. 123 def fake_path_exists(filename): 124 return filename in ['some-archive.wpr', 'another-archive.wpr'] 125 126 self.Patch('os.path.exists', side_effect=fake_path_exists) 127 128 # Mock some imported modules. 129 mock_certutils = self.PatchImportedModule('certutils') 130 mock_certutils.openssl_import_error = None 131 mock_certutils.generate_dummy_ca_cert.return_value = ('-', '-') 132 133 mock_platformsettings = self.PatchImportedModule('platformsettings') 134 mock_platformsettings.HasSniSupport.return_value = True 135 136 mock_tempfile = self.PatchImportedModule('tempfile') 137 mock_tempfile.mkdtemp.return_value = '[tempdir]' 138 139 self.PatchImportedModule('shutil') 140 141 self.network_controller_backend = TestNetworkControllerBackend( 142 FakePlatformBackend()) 143 144 def testOpenCloseController(self): 145 b = self.network_controller_backend 146 self.assertFalse(b.is_open) 147 b.Open(wpr_modes.WPR_REPLAY, ['--some-arg']) # Also installs test CA. 148 self.assertTrue(b.is_open) 149 self.assertTrue(b.is_test_ca_installed) 150 self.assertTrue(b.platform_backend.is_test_ca_installed) 151 b.Close() # Also removes test CA. 152 self.assertFalse(b.is_open) 153 self.assertFalse(b.is_test_ca_installed) 154 self.assertFalse(b.platform_backend.is_test_ca_installed) 155 b.Close() # It's fine to close a closed controller. 156 self.assertFalse(b.is_open) 157 158 def testOpeningOpenControllerRaises(self): 159 b = self.network_controller_backend 160 b.Open(wpr_modes.WPR_REPLAY, ['--some-arg']) 161 with self.assertRaises(AssertionError): 162 b.Open(wpr_modes.WPR_REPLAY, ['--some-arg']) 163 164 def testInstallTestCaFailure(self): 165 b = self.network_controller_backend 166 b.platform_backend.faulty_cert_installer = True 167 b.Open(wpr_modes.WPR_REPLAY, ['--some-arg']) # Try to install test CA. 168 169 # Test CA is not installed, but the controller is otherwise open and safe 170 # to use. 171 self.assertTrue(b.is_open) 172 self.assertFalse(b.is_test_ca_installed) 173 self.assertFalse(b.platform_backend.is_test_ca_installed) 174 b.StartReplay('some-archive.wpr') 175 self.assertTrue(b.is_replay_active) 176 177 b.Close() # No test CA to remove. 178 self.assertFalse(b.is_open) 179 self.assertFalse(b.is_test_ca_installed) 180 self.assertFalse(b.platform_backend.is_test_ca_installed) 181 182 def testStartStopReplay(self): 183 b = self.network_controller_backend 184 b.Open(wpr_modes.WPR_REPLAY, ['--some-arg']) 185 self.assertFalse(b.is_replay_active) 186 187 b.StartReplay('some-archive.wpr') 188 self.assertTrue(b.is_replay_active) 189 self.assertTrue(b.replay_server.is_running) 190 self.assertIsNotNone(b.forwarder.port_pairs) 191 192 old_replay_server = b.replay_server 193 old_forwarder = b.forwarder 194 b.StopReplay() 195 self.assertFalse(b.is_replay_active) 196 self.assertFalse(old_replay_server.is_running) 197 self.assertIsNone(old_forwarder.port_pairs) 198 self.assertTrue(b.is_open) # Controller is still open. 199 200 b.Close() 201 self.assertFalse(b.is_open) 202 203 def testClosingControllerAlsoStopsReplay(self): 204 b = self.network_controller_backend 205 b.Open(wpr_modes.WPR_REPLAY, ['--some-arg']) 206 b.StartReplay('some-archive.wpr') 207 self.assertTrue(b.is_replay_active) 208 self.assertTrue(b.replay_server.is_running) 209 self.assertIsNotNone(b.forwarder.port_pairs) 210 211 old_replay_server = b.replay_server 212 old_forwarder = b.forwarder 213 b.Close() 214 self.assertFalse(b.is_replay_active) 215 self.assertFalse(old_replay_server.is_running) 216 self.assertIsNone(old_forwarder.port_pairs) 217 self.assertFalse(b.is_open) 218 219 def testReplayOnClosedControllerRaises(self): 220 b = self.network_controller_backend 221 self.assertFalse(b.is_open) 222 with self.assertRaises(AssertionError): 223 b.StartReplay('some-archive.wpr') 224 225 def testReplayWithSameArgsReuseServer(self): 226 b = self.network_controller_backend 227 b.Open(wpr_modes.WPR_REPLAY, ['--some-arg']) 228 229 b.StartReplay('some-archive.wpr') 230 self.assertTrue(b.is_replay_active) 231 self.assertTrue(b.replay_server.is_running) 232 233 old_replay_server = b.replay_server 234 b.StartReplay('some-archive.wpr') 235 self.assertTrue(b.is_replay_active) 236 self.assertIs(b.replay_server, old_replay_server) 237 self.assertTrue(b.replay_server.is_running) 238 239 def testReplayWithDifferentArgsUseDifferentServer(self): 240 b = self.network_controller_backend 241 b.Open(wpr_modes.WPR_REPLAY, ['--some-arg']) 242 243 b.StartReplay('some-archive.wpr') 244 self.assertTrue(b.is_replay_active) 245 self.assertTrue(b.replay_server.is_running) 246 247 old_replay_server = b.replay_server 248 b.StartReplay('another-archive.wpr') 249 self.assertTrue(b.is_replay_active) 250 self.assertIsNot(b.replay_server, old_replay_server) 251 self.assertTrue(b.replay_server.is_running) 252 self.assertFalse(old_replay_server.is_running) 253 254 def testReplayWithoutArchivePathDoesNotStopReplay(self): 255 b = self.network_controller_backend 256 b.Open(wpr_modes.WPR_REPLAY, ['--some-arg']) 257 258 b.StartReplay('some-archive.wpr') 259 self.assertTrue(b.is_replay_active) 260 self.assertTrue(b.replay_server.is_running) 261 old_replay_server = b.replay_server 262 263 b.StartReplay(None) 264 self.assertTrue(b.is_replay_active) 265 self.assertIs(b.replay_server, old_replay_server) 266 self.assertTrue(b.replay_server.is_running) 267 self.assertEqual(b.replay_server.archive_path, 'some-archive.wpr') 268 269 def testModeOffDoesNotCreateReplayServer(self): 270 b = self.network_controller_backend 271 b.Open(wpr_modes.WPR_OFF, ['--some-arg']) 272 b.StartReplay('may-or-may-not-exist.wpr') 273 self.assertFalse(b.is_replay_active) 274 self.assertIsNone(b.replay_server) 275 self.assertIsNone(b.forwarder) 276 277 def testBadArchivePathRaises(self): 278 b = self.network_controller_backend 279 b.Open(wpr_modes.WPR_REPLAY, ['--some-arg']) 280 with self.assertRaises(network_controller_backend.ArchiveDoesNotExistError): 281 b.StartReplay('does-not-exist.wpr') 282 283 def testBadArchivePathOnRecordIsOkay(self): 284 b = self.network_controller_backend 285 b.Open(wpr_modes.WPR_RECORD, ['--some-arg']) 286 b.StartReplay('does-not-exist-yet.wpr') # Does not raise. 287 self.assertTrue(b.is_replay_active) 288 289 def testReplayServerSettings(self): 290 b = self.network_controller_backend 291 b.Open(wpr_modes.WPR_RECORD, ['--some-arg']) 292 b.StartReplay('some-archive.wpr') 293 294 # Externally visible properties 295 self.assertTrue(b.is_replay_active) 296 self.assertEqual(b.host_ip, FORWARDER_HOST_IP) 297 self.assertEqual(b.wpr_device_ports, DEFAULT_PORTS) 298 299 # Private replay server settings. 300 self.assertTrue(b.replay_server.is_running) 301 self.assertEqual(b.replay_server.archive_path, 'some-archive.wpr') 302 self.assertEqual(b.replay_server.host_ip, FORWARDER_HOST_IP) 303 self.assertEqual(b.replay_server.replay_args, [ 304 '--some-arg', '--record', '--inject_scripts=', 305 '--should_generate_certs', 306 '--https_root_ca_cert_path=%s' % EXPECTED_WPR_CA_CERT_PATH]) 307 308 def testReplayServerOffSettings(self): 309 b = self.network_controller_backend 310 b.platform_backend.wpr_ca_cert_path = 'CERT_FILE' 311 b.Open(wpr_modes.WPR_OFF, ['--some-arg']) 312 b.StartReplay('some-archive.wpr') 313 314 self.assertFalse(b.is_replay_active) 315 self.assertEqual(b.host_ip, FORWARDER_HOST_IP) 316 self.assertEqual(b.wpr_device_ports, None) 317 self.assertIsNone(b.replay_server) 318 319 def testUseDefaultPorts(self): 320 b = self.network_controller_backend 321 b.Open(wpr_modes.WPR_REPLAY, ['--some-arg']) 322 b.StartReplay('some-archive.wpr') 323 self.assertEqual(b.replay_server.ports, DEFAULT_PORTS) 324 self.assertEqual(b.wpr_device_ports, DEFAULT_PORTS) 325 326 # Invariant 327 self.assertEqual(b.forwarder.port_pairs.local_ports, b.replay_server.ports) 328 self.assertEqual(b.forwarder.port_pairs.remote_ports, b.wpr_device_ports) 329 330 def testUseDefaultLocalPorts(self): 331 b = self.network_controller_backend 332 b.platform_backend.SetWprPortPairs( 333 http=(0, 8888), https=(0, 4444), dns=(0, 2222)) 334 b.Open(wpr_modes.WPR_REPLAY, ['--some-arg']) 335 b.StartReplay('some-archive.wpr') 336 self.assertEqual(b.replay_server.ports, DEFAULT_PORTS) 337 self.assertEqual(b.wpr_device_ports, forwarders.PortSet(8888, 4444, 2222)) 338 339 # Invariant 340 self.assertEqual(b.forwarder.port_pairs.local_ports, b.replay_server.ports) 341 self.assertEqual(b.forwarder.port_pairs.remote_ports, b.wpr_device_ports) 342 343 def testUseSpecificPorts(self): 344 b = self.network_controller_backend 345 b.platform_backend.SetWprPortPairs( 346 http=(88, 8888), https=(44, 4444), dns=None) 347 b.Open(wpr_modes.WPR_REPLAY, ['--some-arg']) 348 b.StartReplay('some-archive.wpr') 349 self.assertEqual(b.replay_server.ports, forwarders.PortSet(88, 44, None)) 350 self.assertEqual(b.wpr_device_ports, forwarders.PortSet(8888, 4444, None)) 351 352 # Invariant 353 self.assertEqual(b.forwarder.port_pairs.local_ports, b.replay_server.ports) 354 self.assertEqual(b.forwarder.port_pairs.remote_ports, b.wpr_device_ports) 355 356 def testRestartReplayShouldReusePorts(self): 357 FakeReplayServer.DEFAULT_PORTS = forwarders.PortSet(123, 456, 789) 358 b = self.network_controller_backend 359 b.Open(wpr_modes.WPR_REPLAY, ['--some-arg']) 360 b.StartReplay('some-archive.wpr') 361 self.assertEqual(b.wpr_device_ports, forwarders.PortSet(123, 456, 789)) 362 363 # If replay restarts, the factory may use a different set of default ports. 364 FakeReplayServer.DEFAULT_PORTS = forwarders.PortSet(987, 654, 321) 365 b.StartReplay('another-archive.wpr') 366 367 # However same ports must be used, because apps/browsers may already be 368 # configured to use the old set of ports. 369 self.assertEqual(b.wpr_device_ports, forwarders.PortSet(123, 456, 789)) 370 371 def testNewControllerSessionMayUseDifferentPorts(self): 372 FakeReplayServer.DEFAULT_PORTS = forwarders.PortSet(123, 456, 789) 373 b = self.network_controller_backend 374 b.Open(wpr_modes.WPR_REPLAY, ['--some-arg']) 375 b.StartReplay('some-archive.wpr') 376 self.assertEqual(b.wpr_device_ports, forwarders.PortSet(123, 456, 789)) 377 b.Close() 378 379 # If replay restarts, the factory may use a different set of default ports. 380 FakeReplayServer.DEFAULT_PORTS = forwarders.PortSet(987, 654, 321) 381 b.Open(wpr_modes.WPR_REPLAY, ['--some-arg']) 382 b.StartReplay('some-archive.wpr') 383 384 # This time the network controller session was closed between replay's, 385 # so it's fine to use a different set of ports. 386 self.assertEqual(b.wpr_device_ports, forwarders.PortSet(987, 654, 321)) 387