1#!/usr/bin/env python3.4 2# 3# Copyright 2017 - Google 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import queue 18import statistics 19import time 20 21from acts import asserts 22from acts.test_utils.wifi import wifi_test_utils as wutils 23from acts.test_utils.wifi.rtt import rtt_const as rconsts 24 25# arbitrary timeout for events 26EVENT_TIMEOUT = 10 27 28 29def decorate_event(event_name, id): 30 return '%s_%d' % (event_name, id) 31 32 33def wait_for_event(ad, event_name, timeout=EVENT_TIMEOUT): 34 """Wait for the specified event or timeout. 35 36 Args: 37 ad: The android device 38 event_name: The event to wait on 39 timeout: Number of seconds to wait 40 Returns: 41 The event (if available) 42 """ 43 prefix = '' 44 if hasattr(ad, 'pretty_name'): 45 prefix = '[%s] ' % ad.pretty_name 46 try: 47 event = ad.ed.pop_event(event_name, timeout) 48 ad.log.info('%s%s: %s', prefix, event_name, event['data']) 49 return event 50 except queue.Empty: 51 ad.log.info('%sTimed out while waiting for %s', prefix, event_name) 52 asserts.fail(event_name) 53 54def fail_on_event(ad, event_name, timeout=EVENT_TIMEOUT): 55 """Wait for a timeout period and looks for the specified event - fails if it 56 is observed. 57 58 Args: 59 ad: The android device 60 event_name: The event to wait for (and fail on its appearance) 61 """ 62 prefix = '' 63 if hasattr(ad, 'pretty_name'): 64 prefix = '[%s] ' % ad.pretty_name 65 try: 66 event = ad.ed.pop_event(event_name, timeout) 67 ad.log.info('%sReceived unwanted %s: %s', prefix, event_name, event['data']) 68 asserts.fail(event_name, extras=event) 69 except queue.Empty: 70 ad.log.info('%s%s not seen (as expected)', prefix, event_name) 71 return 72 73 74def config_privilege_override(dut, override_to_no_privilege): 75 """Configure the device to override the permission check and to disallow any 76 privileged RTT operations, e.g. disallow one-sided RTT to Responders (APs) 77 which do not support IEEE 802.11mc. 78 79 Args: 80 dut: Device to configure. 81 override_to_no_privilege: True to indicate no privileged ops, False for 82 default (which will allow privileged ops). 83 """ 84 dut.adb.shell("cmd wifirtt set override_assume_no_privilege %d" % ( 85 1 if override_to_no_privilege else 0)) 86 87 88def get_rtt_constrained_results(scanned_networks, support_rtt): 89 """Filter the input list and only return those networks which either support 90 or do not support RTT (IEEE 802.11mc.) 91 92 Args: 93 scanned_networks: A list of networks from scan results. 94 support_rtt: True - only return those APs which support RTT, False - only 95 return those APs which do not support RTT. 96 97 Returns: a sub-set of the scanned_networks per support_rtt constraint. 98 """ 99 matching_networks = [] 100 for network in scanned_networks: 101 if support_rtt: 102 if (rconsts.SCAN_RESULT_KEY_RTT_RESPONDER in network and 103 network[rconsts.SCAN_RESULT_KEY_RTT_RESPONDER]): 104 matching_networks.append(network) 105 else: 106 if (rconsts.SCAN_RESULT_KEY_RTT_RESPONDER not in network or 107 not network[rconsts.SCAN_RESULT_KEY_RTT_RESPONDER]): 108 matching_networks.append(network) 109 110 return matching_networks 111 112 113def scan_networks(dut): 114 """Perform a scan and return scan results. 115 116 Args: 117 dut: Device under test. 118 119 Returns: an array of scan results. 120 """ 121 wutils.start_wifi_connection_scan(dut) 122 return dut.droid.wifiGetScanResults() 123 124 125def scan_with_rtt_support_constraint(dut, support_rtt, repeat=0): 126 """Perform a scan and return scan results of APs: only those that support or 127 do not support RTT (IEEE 802.11mc) - per the support_rtt parameter. 128 129 Args: 130 dut: Device under test. 131 support_rtt: True - only return those APs which support RTT, False - only 132 return those APs which do not support RTT. 133 repeat: Re-scan this many times to find an RTT supporting network. 134 135 Returns: an array of scan results. 136 """ 137 for i in range(repeat + 1): 138 scan_results = scan_networks(dut) 139 aps = get_rtt_constrained_results(scan_results, support_rtt) 140 if len(aps) != 0: 141 return aps 142 143 return [] 144 145 146def select_best_scan_results(scans, select_count, lowest_rssi=-80): 147 """Select the strongest 'select_count' scans in the input list based on 148 highest RSSI. Exclude all very weak signals, even if results in a shorter 149 list. 150 151 Args: 152 scans: List of scan results. 153 select_count: An integer specifying how many scans to return at most. 154 lowest_rssi: The lowest RSSI to accept into the output. 155 Returns: a list of the strongest 'select_count' scan results from the scans 156 list. 157 """ 158 def takeRssi(element): 159 return element['level'] 160 161 result = [] 162 scans.sort(key=takeRssi, reverse=True) 163 for scan in scans: 164 if len(result) == select_count: 165 break 166 if scan['level'] < lowest_rssi: 167 break # rest are lower since we're sorted 168 result.append(scan) 169 170 return result 171 172 173def validate_ap_result(scan_result, range_result): 174 """Validate the range results: 175 - Successful if AP (per scan result) support 802.11mc (allowed to fail 176 otherwise) 177 - MAC of result matches the BSSID 178 179 Args: 180 scan_result: Scan result for the AP 181 range_result: Range result returned by the RTT API 182 """ 183 asserts.assert_equal(scan_result[wutils.WifiEnums.BSSID_KEY], range_result[ 184 rconsts.EVENT_CB_RANGING_KEY_MAC_AS_STRING_BSSID], 'MAC/BSSID mismatch') 185 if (rconsts.SCAN_RESULT_KEY_RTT_RESPONDER in scan_result and 186 scan_result[rconsts.SCAN_RESULT_KEY_RTT_RESPONDER]): 187 asserts.assert_true(range_result[rconsts.EVENT_CB_RANGING_KEY_STATUS] == 188 rconsts.EVENT_CB_RANGING_STATUS_SUCCESS, 189 'Ranging failed for an AP which supports 802.11mc!') 190 191 192def validate_ap_results(scan_results, range_results): 193 """Validate an array of ranging results against the scan results used to 194 trigger the range. The assumption is that the results are returned in the 195 same order as the request (which were the scan results). 196 197 Args: 198 scan_results: Scans results used to trigger the range request 199 range_results: Range results returned by the RTT API 200 """ 201 asserts.assert_equal( 202 len(scan_results), 203 len(range_results), 204 'Mismatch in length of scan results and range results') 205 206 # sort first based on BSSID/MAC 207 scan_results.sort(key=lambda x: x[wutils.WifiEnums.BSSID_KEY]) 208 range_results.sort( 209 key=lambda x: x[rconsts.EVENT_CB_RANGING_KEY_MAC_AS_STRING_BSSID]) 210 211 for i in range(len(scan_results)): 212 validate_ap_result(scan_results[i], range_results[i]) 213 214 215def validate_aware_mac_result(range_result, mac, description): 216 """Validate the range result for an Aware peer specified with a MAC address: 217 - Correct MAC address. 218 219 The MAC addresses may contain ":" (which are ignored for the comparison) and 220 may be in any case (which is ignored for the comparison). 221 222 Args: 223 range_result: Range result returned by the RTT API 224 mac: MAC address of the peer 225 description: Additional content to print on failure 226 """ 227 mac1 = mac.replace(':', '').lower() 228 mac2 = range_result[rconsts.EVENT_CB_RANGING_KEY_MAC_AS_STRING].replace(':', 229 '').lower() 230 asserts.assert_equal(mac1, mac2, 231 '%s: MAC mismatch' % description) 232 233def validate_aware_peer_id_result(range_result, peer_id, description): 234 """Validate the range result for An Aware peer specified with a Peer ID: 235 - Correct Peer ID 236 - MAC address information not available 237 238 Args: 239 range_result: Range result returned by the RTT API 240 peer_id: Peer ID of the peer 241 description: Additional content to print on failure 242 """ 243 asserts.assert_equal(peer_id, 244 range_result[rconsts.EVENT_CB_RANGING_KEY_PEER_ID], 245 '%s: Peer Id mismatch' % description) 246 asserts.assert_false(rconsts.EVENT_CB_RANGING_KEY_MAC in range_result, 247 '%s: MAC Address not empty!' % description) 248 249 250def extract_stats(results, range_reference_mm, range_margin_mm, min_rssi, 251 reference_lci=[], reference_lcr=[], summary_only=False): 252 """Extract statistics from a list of RTT results. Returns a dictionary 253 with results: 254 - num_results (success or fails) 255 - num_success_results 256 - num_no_results (e.g. timeout) 257 - num_failures 258 - num_range_out_of_margin (only for successes) 259 - num_invalid_rssi (only for successes) 260 - distances: extracted list of distances 261 - distance_std_devs: extracted list of distance standard-deviations 262 - rssis: extracted list of RSSI 263 - distance_mean 264 - distance_std_dev (based on distance - ignoring the individual std-devs) 265 - rssi_mean 266 - rssi_std_dev 267 - status_codes 268 - lcis: extracted list of all of the individual LCI 269 - lcrs: extracted list of all of the individual LCR 270 - any_lci_mismatch: True/False - checks if all LCI results are identical to 271 the reference LCI. 272 - any_lcr_mismatch: True/False - checks if all LCR results are identical to 273 the reference LCR. 274 - num_attempted_measurements: extracted list of all of the individual 275 number of attempted measurements. 276 - num_successful_measurements: extracted list of all of the individual 277 number of successful measurements. 278 - invalid_num_attempted: True/False - checks if number of attempted 279 measurements is non-zero for successful results. 280 - invalid_num_successful: True/False - checks if number of successful 281 measurements is non-zero for successful results. 282 283 Args: 284 results: List of RTT results. 285 range_reference_mm: Reference value for the distance (in mm) 286 range_margin_mm: Acceptable absolute margin for distance (in mm) 287 min_rssi: Acceptable minimum RSSI value. 288 reference_lci, reference_lcr: Reference values for LCI and LCR. 289 summary_only: Only include summary keys (reduce size). 290 291 Returns: A dictionary of stats. 292 """ 293 stats = {} 294 stats['num_results'] = 0 295 stats['num_success_results'] = 0 296 stats['num_no_results'] = 0 297 stats['num_failures'] = 0 298 stats['num_range_out_of_margin'] = 0 299 stats['num_invalid_rssi'] = 0 300 stats['any_lci_mismatch'] = False 301 stats['any_lcr_mismatch'] = False 302 stats['invalid_num_attempted'] = False 303 stats['invalid_num_successful'] = False 304 305 range_max_mm = range_reference_mm + range_margin_mm 306 range_min_mm = range_reference_mm - range_margin_mm 307 308 distances = [] 309 distance_std_devs = [] 310 rssis = [] 311 num_attempted_measurements = [] 312 num_successful_measurements = [] 313 status_codes = [] 314 lcis = [] 315 lcrs = [] 316 317 for i in range(len(results)): 318 result = results[i] 319 320 if result is None: # None -> timeout waiting for RTT result 321 stats['num_no_results'] = stats['num_no_results'] + 1 322 continue 323 stats['num_results'] = stats['num_results'] + 1 324 325 status_codes.append(result[rconsts.EVENT_CB_RANGING_KEY_STATUS]) 326 if status_codes[-1] != rconsts.EVENT_CB_RANGING_STATUS_SUCCESS: 327 stats['num_failures'] = stats['num_failures'] + 1 328 continue 329 stats['num_success_results'] = stats['num_success_results'] + 1 330 331 distance_mm = result[rconsts.EVENT_CB_RANGING_KEY_DISTANCE_MM] 332 distances.append(distance_mm) 333 if not range_min_mm <= distance_mm <= range_max_mm: 334 stats['num_range_out_of_margin'] = stats['num_range_out_of_margin'] + 1 335 distance_std_devs.append( 336 result[rconsts.EVENT_CB_RANGING_KEY_DISTANCE_STD_DEV_MM]) 337 338 rssi = result[rconsts.EVENT_CB_RANGING_KEY_RSSI] 339 rssis.append(rssi) 340 if not min_rssi <= rssi <= 0: 341 stats['num_invalid_rssi'] = stats['num_invalid_rssi'] + 1 342 343 num_attempted = result[ 344 rconsts.EVENT_CB_RANGING_KEY_NUM_ATTEMPTED_MEASUREMENTS] 345 num_attempted_measurements.append(num_attempted) 346 if num_attempted == 0: 347 stats['invalid_num_attempted'] = True 348 349 num_successful = result[ 350 rconsts.EVENT_CB_RANGING_KEY_NUM_SUCCESSFUL_MEASUREMENTS] 351 num_successful_measurements.append(num_successful) 352 if num_successful == 0: 353 stats['invalid_num_successful'] = True 354 355 lcis.append(result[rconsts.EVENT_CB_RANGING_KEY_LCI]) 356 if (result[rconsts.EVENT_CB_RANGING_KEY_LCI] != reference_lci): 357 stats['any_lci_mismatch'] = True 358 lcrs.append(result[rconsts.EVENT_CB_RANGING_KEY_LCR]) 359 if (result[rconsts.EVENT_CB_RANGING_KEY_LCR] != reference_lcr): 360 stats['any_lcr_mismatch'] = True 361 362 if len(distances) > 0: 363 stats['distance_mean'] = statistics.mean(distances) 364 if len(distances) > 1: 365 stats['distance_std_dev'] = statistics.stdev(distances) 366 if len(rssis) > 0: 367 stats['rssi_mean'] = statistics.mean(rssis) 368 if len(rssis) > 1: 369 stats['rssi_std_dev'] = statistics.stdev(rssis) 370 if not summary_only: 371 stats['distances'] = distances 372 stats['distance_std_devs'] = distance_std_devs 373 stats['rssis'] = rssis 374 stats['num_attempted_measurements'] = num_attempted_measurements 375 stats['num_successful_measurements'] = num_successful_measurements 376 stats['status_codes'] = status_codes 377 stats['lcis'] = lcis 378 stats['lcrs'] = lcrs 379 380 return stats 381 382 383def run_ranging(dut, aps, iter_count, time_between_iterations, 384 target_run_time_sec=0): 385 """Executing ranging to the set of APs. 386 387 Will execute a minimum of 'iter_count' iterations. Will continue to run 388 until execution time (just) exceeds 'target_run_time_sec'. 389 390 Args: 391 dut: Device under test 392 aps: A list of APs (Access Points) to range to. 393 iter_count: (Minimum) Number of measurements to perform. 394 time_between_iterations: Number of seconds to wait between iterations. 395 target_run_time_sec: The target run time in seconds. 396 397 Returns: a list of the events containing the RTT results (or None for a 398 failed measurement). 399 """ 400 max_peers = dut.droid.wifiRttMaxPeersInRequest() 401 402 asserts.assert_true(len(aps) > 0, "Need at least one AP!") 403 if len(aps) > max_peers: 404 aps = aps[0:max_peers] 405 406 events = {} # need to keep track per BSSID! 407 for ap in aps: 408 events[ap["BSSID"]] = [] 409 410 start_clock = time.time() 411 iterations_done = 0 412 run_time = 0 413 while iterations_done < iter_count or ( 414 target_run_time_sec != 0 and run_time < target_run_time_sec): 415 if iterations_done != 0 and time_between_iterations != 0: 416 time.sleep(time_between_iterations) 417 418 id = dut.droid.wifiRttStartRangingToAccessPoints(aps) 419 try: 420 event = dut.ed.pop_event( 421 decorate_event(rconsts.EVENT_CB_RANGING_ON_RESULT, id), EVENT_TIMEOUT) 422 range_results = event["data"][rconsts.EVENT_CB_RANGING_KEY_RESULTS] 423 asserts.assert_equal( 424 len(aps), 425 len(range_results), 426 'Mismatch in length of scan results and range results') 427 for result in range_results: 428 bssid = result[rconsts.EVENT_CB_RANGING_KEY_MAC_AS_STRING] 429 asserts.assert_true(bssid in events, 430 "Result BSSID %s not in requested AP!?" % bssid) 431 asserts.assert_equal(len(events[bssid]), iterations_done, 432 "Duplicate results for BSSID %s!?" % bssid) 433 events[bssid].append(result) 434 except queue.Empty: 435 for ap in aps: 436 events[ap["BSSID"]].append(None) 437 438 iterations_done = iterations_done + 1 439 run_time = time.time() - start_clock 440 441 return events 442 443 444def analyze_results(all_aps_events, rtt_reference_distance_mm, 445 distance_margin_mm, min_expected_rssi, lci_reference, lcr_reference, 446 summary_only=False): 447 """Verifies the results of the RTT experiment. 448 449 Args: 450 all_aps_events: Dictionary of APs, each a list of RTT result events. 451 rtt_reference_distance_mm: Expected distance to the AP (source of truth). 452 distance_margin_mm: Accepted error marging in distance measurement. 453 min_expected_rssi: Minimum acceptable RSSI value 454 lci_reference, lcr_reference: Expected LCI/LCR values (arrays of bytes). 455 summary_only: Only include summary keys (reduce size). 456 """ 457 all_stats = {} 458 for bssid, events in all_aps_events.items(): 459 stats = extract_stats(events, rtt_reference_distance_mm, 460 distance_margin_mm, min_expected_rssi, 461 lci_reference, lcr_reference, summary_only) 462 all_stats[bssid] = stats 463 return all_stats 464