#!/usr/bin/env python3.4 # # Copyright (C) 2016 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); you may not # use this file except in compliance with the License. You may obtain a copy of # the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations under # the License. import pprint import queue import acts.base_test import acts.test_utils.wifi.wifi_test_utils as wutils import acts.utils from acts import asserts from acts.controllers.sl4a_lib import rpc_client WifiEnums = wutils.WifiEnums # Macros for RttParam keywords RttParam = WifiEnums.RttParam # Macros for RttManager Rtt = WifiEnums.Rtt RttBW = WifiEnums.RttBW RttPreamble = WifiEnums.RttPreamble RttPeerType = WifiEnums.RttPeerType RttType = WifiEnums.RttType ScanResult = WifiEnums.ScanResult RTT_MARGIN_OF_ERROR = WifiEnums.RTT_MARGIN_OF_ERROR class WifiRTTRangingError(Exception): """Error in WifiScanner Rtt.""" class WifiRttManagerTest(acts.base_test.BaseTestClass): """Tests for wifi's RttManager APIs.""" tests = None MAX_RTT_AP = 10 def __init__(self, controllers): acts.base_test.BaseTestClass.__init__(self, controllers) self.tests = ("test_support_check", "test_invalid_params", "test_capability_check", "test_rtt_ranging_single_AP_stress", "test_regular_scan_then_rtt_ranging_stress", "test_gscan_then_rtt_ranging_stress") def setup_class(self): self.dut = self.android_devices[0] wutils.wifi_test_device_init(self.dut) required_params = ("support_models", "stress_num", "vht80_5g", "actual_distance") self.unpack_userparams(required_params) asserts.assert_true( self.actual_distance >= 5, "Actual distance should be no shorter than 5 meters.") self.visible_networks = (self.vht80_5g, ) self.default_rtt_params = { RttParam.request_type: RttType.TYPE_TWO_SIDED, RttParam.device_type: RttPeerType.PEER_TYPE_AP, RttParam.preamble: RttPreamble.PREAMBLE_HT, RttParam.bandwidth: RttBW.BW_80_SUPPORT } # Expected capability for devices that don't support RTT. rtt_cap_neg = { 'lcrSupported': False, 'bwSupported': 0, 'twoSided11McRttSupported': False, 'preambleSupported': 0, 'oneSidedRttSupported': False, 'lciSupported': False } rtt_cap_shamu = { 'lcrSupported': False, 'bwSupported': 0x1C, 'twoSided11McRttSupported': True, 'preambleSupported': 6, 'oneSidedRttSupported': False, 'lciSupported': False } rtt_cap_bullhead = { 'lcrSupported': True, 'bwSupported': 0x1C, 'twoSided11McRttSupported': True, 'preambleSupported': 7, 'oneSidedRttSupported': True, 'lciSupported': True } rtt_cap_angler = { 'lcrSupported': True, 'bwSupported': 0x1C, 'twoSided11McRttSupported': True, 'preambleSupported': 6, 'oneSidedRttSupported': False, 'lciSupported': True } self.rtt_cap_table = { "hammerhead": rtt_cap_neg, "shamu": rtt_cap_shamu, "volantis": rtt_cap_neg, "volantisg": rtt_cap_neg, "bullhead": rtt_cap_bullhead, "angler": rtt_cap_angler } """Helper Functions""" def invalid_params_logic(self, rtt_params): try: self.dut.droid.wifiRttStartRanging([rtt_params]) except rpc_client.Sl4aApiError as e: e_str = str(e) asserts.assert_true( "IllegalArgumentException" in e_str, "Missing IllegalArgumentException in %s." % e_str) msg = "Got expected exception with invalid param %s." % rtt_params self.log.info(msg) def get_rtt_results(self, rtt_params): """Starts RTT ranging and get results. Args: rtt_params: A list of dicts each representing an RttParam. Returns: Rtt ranging results. """ self.log.debug("Start ranging with:\n%s" % pprint.pformat(rtt_params)) idx = self.dut.droid.wifiRttStartRanging(rtt_params) event = None try: event = self.dut.ed.pop_events("WifiRttRanging%d" % idx, 30) if event[0]["name"].endswith("onSuccess"): results = event[0]["data"]["Results"] result_len = len(results) param_len = len(rtt_params) asserts.assert_true(result_len == param_len, "Expected %d results, got %d." % (param_len, result_len)) # Add acceptable margin of error to results, which will be used # during result processing. for i, r in enumerate(results): bw_mode = rtt_params[i][RttParam.bandwidth] r[RttParam.margin] = RTT_MARGIN_OF_ERROR[bw_mode] self.log.debug(pprint.pformat(event)) return event except queue.Empty: self.log.error("Waiting for RTT event timed out.") return None def network_selector(self, network_info): """Decides if a network should be used for rtt ranging. There are a few conditions: 1. This network supports 80211mc. 2. This network's info matches certain conditions. This is added to better control which networks to range against instead of blindly use all 80211mc networks in air. Args: network_info: A dict representing a WiFi network. Returns: True if the input network should be used for ranging, False otherwise. """ target_params = { "is80211McRTTResponder": True, WifiEnums.BSSID_KEY: self.vht80_5g[WifiEnums.BSSID_KEY], } for k, v in target_params.items(): if k not in network_info: return False if type(network_info[k]) is str: network_info[k] = network_info[k].lower() v = v.lower() if network_info[k] != v: return False return True def regular_scan_for_rtt_networks(self): """Scans for 11mc-capable WiFi networks using regular wifi scan. Networks are selected based on self.network_selector. Returns: A list of networks that have RTTResponders. """ wutils.start_wifi_connection_scan(self.dut) networks = self.dut.droid.wifiGetScanResults() rtt_networks = [] for nw in networks: if self.network_selector(nw): rtt_networks.append(nw) return rtt_networks def gscan_for_rtt_networks(self): """Scans for 11mc-capable WiFi networks using wifi gscan. Networks are selected based on self.network_selector. Returns: A list of networks that have RTTResponders. """ s = { "reportEvents": WifiEnums.REPORT_EVENT_FULL_SCAN_RESULT, "band": WifiEnums.WIFI_BAND_BOTH, "periodInMs": 10000, "numBssidsPerScan": 32 } idx = wutils.start_wifi_single_scan(self.android_devices[0], s)["Index"] self.log.info("Scan index is %d" % idx) event_name = "WifiScannerScan%donFullResult" % idx def condition(event): nw = event["data"]["Results"][0] return self.network_selector(nw) rtt_networks = [] try: for i in range(len(self.visible_networks)): event = self.dut.ed.wait_for_event(event_name, condition, 30) rtt_networks.append(event["data"]["Results"][0]) self.log.info("Waiting for gscan to finish.") event_name = "WifiScannerScan%donResults" % idx event = self.dut.ed.pop_event(event_name, 300) total_network_cnt = len(event["data"]["Results"][0]["ScanResults"]) self.log.info("Found %d networks in total." % total_network_cnt) self.log.debug(rtt_networks) return rtt_networks except queue.Empty: self.log.error("Timed out waiting for gscan result.") def process_rtt_events(self, events): """Processes rtt ranging events. Validates RTT event types. Validates RTT response status and measured RTT values. Enforces success rate. Args: events: A list of callback results from RTT ranging. """ total = aborted = failure = invalid = out_of_range = 0 for e in events: if e["name"].endswith("onAborted"): aborted += 1 if e["name"].endswith("onFailure"): failure += 1 if e["name"].endswith("onSuccess"): results = e["data"]["Results"] for r in results: total += 1 # Status needs to be "success". status = r["status"] if status != Rtt.STATUS_SUCCESS: self.log.warning("Got error status %d." % status) invalid += 1 continue # RTT value should be positive. value = r["rtt"] if value <= 0: self.log.warning("Got error RTT value %d." % value) invalid += 1 continue # Vadlidate values in successful responses. acd = self.actual_distance margin = r[RttParam.margin] # If the distance is >= 0, check distance only. d = r["distance"] / 100.0 if d > 0: # Distance should be in acceptable range. is_d_valid = (acd - margin) <= d <= acd + (margin) if not is_d_valid: self.log.warning( ("Reported distance %.2fm is out of the" " acceptable range %.2f±%.2fm.") % (d, acd, margin)) out_of_range += 1 continue # Check if the RTT value is in range. d = (value / 2) / 1E10 * wutils.SPEED_OF_LIGHT is_rtt_valid = (acd - margin) <= d <= (acd + margin) if not is_rtt_valid: self.log.warning(( "Distance calculated from RTT value %d - %.2fm is " "out of the acceptable range %.2f±%dm.") % (value, d, acd, margin)) out_of_range += 1 continue # Check if the RSSI value is in range. rssi = r["rssi"] # average rssi in 0.5dB steps, e.g. 143 implies -71.5dB, # so the valid range is 0 to 200 is_rssi_valid = 0 <= rssi <= 200 if not is_rssi_valid: self.log.warning(("Reported RSSI %d is out of the" " acceptable range 0-200") % rssi) out_of_range += 1 continue self.log.info(( "Processed %d RTT events. %d aborted, %s failed. Among" " the %d responses in successful callbacks, %s are invalid, %s has" " RTT values that are out of range.") % (len(events), aborted, failure, total, invalid, out_of_range)) asserts.assert_true(total > 0, "No RTT response received.") # Percentage of responses that are valid should be >= 90%. valid_total = float(total - invalid) valid_response_rate = valid_total / total self.log.info("%.2f%% of the responses are valid." % (valid_response_rate * 100)) asserts.assert_true(valid_response_rate >= 0.9, "Valid response rate is below 90%%.") # Among the valid responses, the percentage of having an in-range RTT # value should be >= 67%. valid_value_rate = (total - invalid - out_of_range) / valid_total self.log.info("%.2f%% of valid responses have in-range RTT value" % (valid_value_rate * 100)) msg = "In-range response rate is below 67%%." asserts.assert_true(valid_value_rate >= 0.67, msg) def scan_then_rtt_ranging_stress_logic(self, scan_func): """Test logic to scan then do rtt ranging based on the scan results. Steps: 1. Start scan and get scan results. 2. Filter out the networks that support rtt in scan results. 3. Start rtt ranging against those networks that support rtt. 4. Repeat 5. Process RTT events. Args: scan_func: A function that does a wifi scan and only returns the networks that support rtt in the scan results. Returns: True if rtt behaves as expected, False otherwise. """ total = self.stress_num failed = 0 all_results = [] for i in range(total): self.log.info("Iteration %d" % i) rtt_networks = scan_func() if not rtt_networks: self.log.warning("Found no rtt network, skip this iteration.") failed += 1 continue self.log.debug("Found rtt networks:%s" % rtt_networks) rtt_params = [] for rn in rtt_networks: rtt_params.append(self.rtt_config_from_scan_result(rn)) results = self.get_rtt_results(rtt_params) if results: self.log.debug(results) all_results += results self.process_rtt_events(all_results) def rtt_config_from_scan_result(self, scan_result): """Creates an Rtt configuration based on the scan result of a network. """ scan_result_channel_width_to_rtt = { ScanResult.CHANNEL_WIDTH_20MHZ: RttBW.BW_20_SUPPORT, ScanResult.CHANNEL_WIDTH_40MHZ: RttBW.BW_40_SUPPORT, ScanResult.CHANNEL_WIDTH_80MHZ: RttBW.BW_80_SUPPORT, ScanResult.CHANNEL_WIDTH_160MHZ: RttBW.BW_160_SUPPORT, ScanResult.CHANNEL_WIDTH_80MHZ_PLUS_MHZ: RttBW.BW_160_SUPPORT } p = {} freq = scan_result[RttParam.frequency] p[RttParam.frequency] = freq p[RttParam.BSSID] = scan_result[WifiEnums.BSSID_KEY] if freq > 5000: p[RttParam.preamble] = RttPreamble.PREAMBLE_VHT else: p[RttParam.preamble] = RttPreamble.PREAMBLE_HT cf0 = scan_result[RttParam.center_freq0] if cf0 > 0: p[RttParam.center_freq0] = cf0 cf1 = scan_result[RttParam.center_freq1] if cf1 > 0: p[RttParam.center_freq1] = cf1 cw = scan_result["channelWidth"] p[RttParam.channel_width] = cw p[RttParam.bandwidth] = scan_result_channel_width_to_rtt[cw] if scan_result["is80211McRTTResponder"]: p[RttParam.request_type] = RttType.TYPE_TWO_SIDED else: p[RttParam.request_type] = RttType.TYPE_ONE_SIDED return p """Tests""" def test_invalid_params(self): """Tests the sanity check function in RttManager. """ param_list = [{ RttParam.device_type: 3 }, { RttParam.device_type: 1, RttParam.request_type: 3 }, { RttParam.device_type: 1, RttParam.request_type: 1, RttParam.BSSID: None }, { RttParam.BSSID: "xxxxxxxx", RttParam.number_burst: 1 }, { RttParam.number_burst: 0, RttParam.num_samples_per_burst: -1 }, { RttParam.num_samples_per_burst: 32 }, { RttParam.num_samples_per_burst: 5, RttParam.num_retries_per_measurement_frame: -1 }, { RttParam.num_retries_per_measurement_frame: 4 }, { RttParam.num_retries_per_measurement_frame: 2, RttParam.num_retries_per_FTMR: -1 }, { RttParam.num_retries_per_FTMR: 4 }] for param in param_list: self.invalid_params_logic(param) return True def test_support_check(self): """No device supports device-to-device RTT; only shamu and volantis devices support device-to-ap RTT. """ model = acts.utils.trim_model_name(self.dut.model) asserts.assert_true(self.dut.droid.wifiIsDeviceToDeviceRttSupported(), "Device to device is supposed to be supported.") if any([model in m for m in self.support_models]): asserts.assert_true(self.dut.droid.wifiIsDeviceToApRttSupported(), "%s should support device-to-ap RTT." % model) self.log.info("%s supports device-to-ap RTT as expected." % model) else: asserts.assert_false( self.dut.droid.wifiIsDeviceToApRttSupported(), "%s should not support device-to-ap RTT." % model) self.log.info( ("%s does not support device-to-ap RTT as expected.") % model) asserts.abort_class( "Device %s does not support RTT, abort." % model) return True def test_capability_check(self): """Checks the capabilities params are reported as expected. """ caps = self.dut.droid.wifiRttGetCapabilities() asserts.assert_true(caps, "Unable to get rtt capabilities.") self.log.debug("Got rtt capabilities %s" % caps) model = acts.utils.trim_model_name(self.dut.model) asserts.assert_true(model in self.rtt_cap_table, "Unknown model %s" % model) expected_caps = self.rtt_cap_table[model] for k, v in expected_caps.items(): asserts.assert_true(k in caps, "%s missing in capabilities." % k) asserts.assert_true(v == caps[k], "Expected %s for %s, got %s." % (v, k, caps[k])) return True def test_discovery(self): """Make sure all the expected 11mc BSSIDs are discovered properly, and they are all reported as 802.11mc Rtt Responder. Procedures: 1. Scan for wifi networks. Expect: All the RTT networks show up in scan results and their "is80211McRTTResponder" is True. All the non-RTT networks show up in scan results and their "is80211McRTTResponder" is False. """ wutils.start_wifi_connection_scan(self.dut) scan_results = self.dut.droid.wifiGetScanResults() self.log.debug(scan_results) for n in visible_networks: asserts.assert_true( wutils.match_networks(n, scan_results), "Network %s was not discovered properly." % n) return True def test_missing_bssid(self): """Start Rtt ranging with a config that does not have BSSID set. Should not get onSuccess. """ p = {} p[RttParam.request_type] = RttType.TYPE_TWO_SIDED p[RttParam.device_type] = RttPeerType.PEER_TYPE_AP p[RttParam.preamble] = RttPreamble.PREAMBLE_VHT p[RttParam.bandwidth] = RttBW.BW_80_SUPPORT p[RttParam.frequency] = self.vht80_5g[WifiEnums.frequency_key] p[RttParam.center_freq0] = self.vht80_5g[RttParam.center_freq0] results = self.get_rtt_results([p]) asserts.assert_true(results, "Did not get any result.") self.log.info(pprint.pformat(results)) def test_rtt_ranging_single_AP_stress(self): """Stress test for Rtt against one AP. Steps: 1. Do RTT ranging against the self.vht80_5g BSSID. 2. Repeat self.stress_num times. 3. Verify RTT results. """ p = {} p[RttParam.request_type] = RttType.TYPE_TWO_SIDED p[RttParam.device_type] = RttPeerType.PEER_TYPE_AP p[RttParam.preamble] = RttPreamble.PREAMBLE_VHT p[RttParam.bandwidth] = RttBW.BW_80_SUPPORT p[RttParam.BSSID] = self.vht80_5g[WifiEnums.BSSID_KEY] p[RttParam.frequency] = self.vht80_5g[WifiEnums.frequency_key] p[RttParam.center_freq0] = self.vht80_5g[RttParam.center_freq0] p[RttParam.channel_width] = ScanResult.CHANNEL_WIDTH_80MHZ all_results = [] for i in range(self.stress_num): self.log.info("RTT Ranging iteration %d" % (i + 1)) results = self.get_rtt_results([p]) if results: all_results += results else: self.log.warning("Did not get result for iteration %d." % i) frate = self.process_rtt_events(all_results) def test_regular_scan_then_rtt_ranging_stress(self): """Stress test for regular scan then start rtt ranging against the RTT compatible networks found by the scan. Steps: 1. Start a WiFi connection scan. 2. Get scan results. 3. Find all the 11mc capable BSSIDs and choose the ones to use (self.network_selector) 4. Do RTT ranging against the selected BSSIDs, with the info from the scan results. 5. Repeat self.stress_num times. 6. Verify RTT results. """ scan_func = self.regular_scan_for_rtt_networks self.scan_then_rtt_ranging_stress_logic(scan_func) def test_gscan_then_rtt_ranging_stress(self): """Stress test for gscan then start rtt ranging against the RTT compatible networks found by the scan. Steps: 1. Start a WifiScanner single shot scan on all channels. 2. Wait for full scan results of the expected 11mc capable BSSIDs. 3. Wait for single shot scan to finish on all channels. 4. Do RTT ranging against the selected BSSIDs, with the info from the scan results. 5. Repeat self.stress_num times. 6. Verify RTT results. """ scan_func = self.gscan_for_rtt_networks self.scan_then_rtt_ranging_stress_logic(scan_func)