1#!/usr/bin/env python3.4 2# 3# Copyright 2018 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import re 18import sys 19import random 20import time 21 22import acts.controllers.packet_capture as packet_capture 23import acts.signals as signals 24import acts_contrib.test_utils.wifi.rpm_controller_utils as rutils 25import acts_contrib.test_utils.wifi.wifi_datastore_utils as dutils 26import acts_contrib.test_utils.wifi.wifi_test_utils as wutils 27 28from acts import asserts 29from acts.base_test import BaseTestClass 30from acts.controllers.ap_lib import hostapd_constants 31from acts.test_decorators import test_tracker_info 32from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest 33 34WifiEnums = wutils.WifiEnums 35 36WAIT_BEFORE_CONNECTION = 1 37SINGLE_BAND = 1 38DUAL_BAND = 2 39 40TIMEOUT = 60 41TEST = 'test_' 42PING_ADDR = 'www.google.com' 43 44NUM_LINK_PROBES = 3 45PROBE_DELAY_SEC = 1 46 47 48class WifiChaosTest(WifiBaseTest): 49 """ Tests for wifi IOT 50 51 Test Bed Requirement: 52 * One Android device 53 * Wi-Fi IOT networks visible to the device 54 """ 55 56 def __init__(self, configs): 57 BaseTestClass.__init__(self, configs) 58 self.generate_interop_tests() 59 60 def randomize_testcases(self): 61 """Randomize the list of hosts and build a random order of tests, 62 based on SSIDs, keeping all the relevant bands of an AP together. 63 64 """ 65 temp_tests = list() 66 hosts = self.user_params['interop_host'] 67 68 random.shuffle(hosts) 69 70 for host in hosts: 71 ssid_2g = None 72 ssid_5g = None 73 info = dutils.show_device(host) 74 75 # Based on the information in datastore, add to test list if 76 # AP has 2G band. 77 if 'ssid_2g' in info: 78 ssid_2g = info['ssid_2g'] 79 temp_tests.append(TEST + ssid_2g) 80 81 # Based on the information in datastore, add to test list if 82 # AP has 5G band. 83 if 'ssid_5g' in info: 84 ssid_5g = info['ssid_5g'] 85 temp_tests.append(TEST + ssid_5g) 86 87 self.tests = temp_tests 88 89 def generate_interop_testcase(self, base_test, testcase_name, ssid_dict): 90 """Generates a single test case from the given data. 91 92 Args: 93 base_test: The base test case function to run. 94 testcase_name: The name of the test case. 95 ssid_dict: The information about the network under test. 96 """ 97 ssid = testcase_name 98 test_tracker_uuid = ssid_dict[testcase_name]['uuid'] 99 hostname = ssid_dict[testcase_name]['host'] 100 if not testcase_name.startswith('test_'): 101 testcase_name = 'test_%s' % testcase_name 102 test_case = test_tracker_info(uuid=test_tracker_uuid)( 103 lambda: base_test(ssid, hostname)) 104 setattr(self, testcase_name, test_case) 105 self.tests.append(testcase_name) 106 107 def generate_interop_tests(self): 108 for ssid_dict in self.user_params['interop_ssid']: 109 testcase_name = list(ssid_dict)[0] 110 self.generate_interop_testcase(self.interop_base_test, 111 testcase_name, ssid_dict) 112 self.randomize_testcases() 113 114 def setup_class(self): 115 super().setup_class() 116 self.dut = self.android_devices[0] 117 self.admin = 'admin' + str(random.randint(1000001, 12345678)) 118 wutils.wifi_test_device_init(self.dut) 119 # Set country code explicitly to "US". 120 wutils.set_wifi_country_code(self.dut, wutils.WifiEnums.CountryCode.US) 121 122 asserts.assert_true( 123 self.lock_pcap(), 124 "Could not lock a Packet Capture. Aborting Interop test.") 125 126 wutils.wifi_toggle_state(self.dut, True) 127 128 def lock_pcap(self): 129 """Lock a Packet Capturere to use for the test.""" 130 131 # Get list of devices from the datastore. 132 locked_pcap = False 133 devices = dutils.get_devices() 134 135 for device in devices: 136 137 device_name = device['hostname'] 138 device_type = device['ap_label'] 139 if device_type == 'PCAP' and not device['lock_status']: 140 if dutils.lock_device(device_name, self.admin): 141 self.pcap_host = device_name 142 host = device['ip_address'] 143 self.log.info("Locked Packet Capture device: %s" % device_name) 144 locked_pcap = True 145 break 146 else: 147 self.log.warning("Failed to lock %s PCAP." % device_name) 148 149 if not locked_pcap: 150 return False 151 152 pcap_config = {'ssh_config':{'user':'root'} } 153 pcap_config['ssh_config']['host'] = host 154 155 self.pcap = packet_capture.PacketCapture(pcap_config) 156 return True 157 158 def setup_test(self): 159 super().setup_test() 160 self.dut.droid.wakeLockAcquireBright() 161 self.dut.droid.wakeUpNow() 162 163 def on_pass(self, test_name, begin_time): 164 wutils.stop_pcap(self.pcap, self.pcap_procs, True) 165 166 def on_fail(self, test_name, begin_time): 167 # Sleep to ensure all failed packets are captured. 168 time.sleep(5) 169 wutils.stop_pcap(self.pcap, self.pcap_procs, False) 170 super().on_fail(test_name, begin_time) 171 172 def teardown_test(self): 173 super().teardown_test() 174 self.dut.droid.wakeLockRelease() 175 self.dut.droid.goToSleepNow() 176 177 def teardown_class(self): 178 # Unlock the PCAP. 179 if not dutils.unlock_device(self.pcap_host): 180 self.log.warning("Failed to unlock %s PCAP. Check in datastore.") 181 182 183 """Helper Functions""" 184 185 def scan_and_connect_by_id(self, network, net_id): 186 """Scan for network and connect using network id. 187 188 Args: 189 net_id: Integer specifying the network id of the network. 190 191 """ 192 ssid = network[WifiEnums.SSID_KEY] 193 wutils.start_wifi_connection_scan_and_ensure_network_found(self.dut, 194 ssid) 195 wutils.wifi_connect_by_id(self.dut, net_id) 196 197 def run_ping(self, sec): 198 """Run ping for given number of seconds. 199 200 Args: 201 sec: Time in seconds to run teh ping traffic. 202 203 """ 204 self.log.info("Finding Gateway...") 205 route_response = self.dut.adb.shell("ip route get 8.8.8.8") 206 gateway_ip = re.search('via (.*) dev', str(route_response)).group(1) 207 self.log.info("Gateway IP = %s" % gateway_ip) 208 self.log.info("Running ping for %d seconds" % sec) 209 result = self.dut.adb.shell("ping -w %d %s" % (sec, gateway_ip), 210 timeout=sec + 1) 211 self.log.debug("Ping Result = %s" % result) 212 if "100% packet loss" in result: 213 raise signals.TestFailure("100% packet loss during ping") 214 215 def send_link_probes(self, network): 216 """ 217 Send link probes, and verify that the device and AP did not crash. 218 Also verify that at least one link probe succeeded. 219 220 Steps: 221 1. Send a few link probes. 222 2. Ensure that the device and AP did not crash (by checking that the 223 device remains connected to the expected network). 224 """ 225 results = wutils.send_link_probes( 226 self.dut, NUM_LINK_PROBES, PROBE_DELAY_SEC) 227 228 self.log.info("Link Probe results: %s" % (results,)) 229 230 wifi_info = self.dut.droid.wifiGetConnectionInfo() 231 expected = network[WifiEnums.SSID_KEY] 232 actual = wifi_info[WifiEnums.SSID_KEY] 233 asserts.assert_equal( 234 expected, actual, 235 "Device did not remain connected after sending link probes!") 236 237 def unlock_and_turn_off_ap(self, hostname, rpm_port, rpm_ip): 238 """UNlock the AP in datastore and turn off the AP. 239 240 Args: 241 hostname: Hostname of the AP. 242 rpm_port: Port number on the RPM for the AP. 243 rpm_ip: RPM's IP address. 244 245 """ 246 # Un-Lock AP in datastore. 247 self.log.debug("Un-lock AP in datastore") 248 if not dutils.unlock_device(hostname): 249 self.log.warning("Failed to unlock %s AP. Check AP in datastore.") 250 # Turn OFF AP from the RPM port. 251 rutils.turn_off_ap(rpm_port, rpm_ip) 252 253 def run_connect_disconnect(self, network, hostname, rpm_port, rpm_ip, 254 release_ap): 255 """Run connect/disconnect to a given network in loop. 256 257 Args: 258 network: Dict, network information. 259 hostname: Hostanme of the AP to connect to. 260 rpm_port: Port number on the RPM for the AP. 261 rpm_ip: Port number on the RPM for the AP. 262 release_ap: Flag to determine if we should turn off the AP yet. 263 264 Raises: TestFailure if the network connection fails. 265 266 """ 267 for attempt in range(5): 268 try: 269 begin_time = time.time() 270 ssid = network[WifiEnums.SSID_KEY] 271 net_id = self.dut.droid.wifiAddNetwork(network) 272 asserts.assert_true(net_id != -1, "Add network %s failed" % network) 273 self.log.info("Connecting to %s" % ssid) 274 self.scan_and_connect_by_id(network, net_id) 275 self.run_ping(10) 276 # TODO(b/133369482): uncomment once bug is resolved 277 # self.send_link_probes(network) 278 wutils.wifi_forget_network(self.dut, ssid) 279 time.sleep(WAIT_BEFORE_CONNECTION) 280 except Exception as e: 281 self.log.error("Connection to %s network failed on the %d " 282 "attempt with exception %s." % (ssid, attempt, e)) 283 # TODO:(bmahadev) Uncomment after scan issue is fixed. 284 # self.dut.take_bug_report(ssid, begin_time) 285 # self.dut.cat_adb_log(ssid, begin_time) 286 if release_ap: 287 self.unlock_and_turn_off_ap(hostname, rpm_port, rpm_ip) 288 raise signals.TestFailure("Failed to connect to %s" % ssid) 289 290 def get_band_and_chan(self, ssid): 291 """Get the band and channel information from SSID. 292 293 Args: 294 ssid: SSID of the network. 295 296 """ 297 ssid_info = ssid.split('_') 298 self.band = ssid_info[-1] 299 for item in ssid_info: 300 # Skip over the router model part. 301 if 'ch' in item and item != ssid_info[0]: 302 self.chan = re.search(r'(\d+)',item).group(0) 303 return 304 raise signals.TestFailure("Channel information not found in SSID.") 305 306 def interop_base_test(self, ssid, hostname): 307 """Base test for all the connect-disconnect interop tests. 308 309 Args: 310 ssid: string, SSID of the network to connect to. 311 hostname: string, hostname of the AP. 312 313 Steps: 314 1. Lock AP in datstore. 315 2. Turn on AP on the rpm switch. 316 3. Run connect-disconnect in loop. 317 4. Turn off AP on the rpm switch. 318 5. Unlock AP in datastore. 319 320 """ 321 network = {} 322 network['password'] = 'password' 323 network['SSID'] = ssid 324 release_ap = False 325 wutils.reset_wifi(self.dut) 326 327 # Lock AP in datastore. 328 self.log.info("Lock AP in datastore") 329 330 ap_info = dutils.show_device(hostname) 331 332 # If AP is locked by a different test admin, then we skip. 333 if ap_info['lock_status'] and ap_info['locked_by'] != self.admin: 334 raise signals.TestSkip("AP %s is locked, skipping test" % hostname) 335 336 if not dutils.lock_device(hostname, self.admin): 337 self.log.warning("Failed to lock %s AP. Unlock AP in datastore" 338 " and try again.") 339 raise signals.TestFailure("Failed to lock AP") 340 341 band = SINGLE_BAND 342 if ('ssid_2g' in ap_info) and ('ssid_5g' in ap_info): 343 band = DUAL_BAND 344 if (band == SINGLE_BAND) or ( 345 band == DUAL_BAND and '5G' in ssid): 346 release_ap = True 347 348 # Get AP RPM attributes and Turn ON AP. 349 rpm_ip = ap_info['rpm_ip'] 350 rpm_port = ap_info['rpm_port'] 351 352 rutils.turn_on_ap(self.pcap, ssid, rpm_port, rpm_ip=rpm_ip) 353 self.log.info("Finished turning ON AP.") 354 # Experimental. Some APs take upto a min to come online. 355 time.sleep(60) 356 357 self.get_band_and_chan(ssid) 358 self.pcap.configure_monitor_mode(self.band, self.chan) 359 self.pcap_procs = wutils.start_pcap( 360 self.pcap, self.band.lower(), self.test_name) 361 self.run_connect_disconnect(network, hostname, rpm_port, rpm_ip, 362 release_ap) 363 364 # Un-lock only if it's a single band AP or we are running the last band. 365 if release_ap: 366 self.unlock_and_turn_off_ap(hostname, rpm_port, rpm_ip) 367