1#!/usr/bin/env python3.4
2#
3#   Copyright 2017 - Google
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import queue
18import statistics
19import time
20
21from acts import asserts
22from acts.test_utils.wifi import wifi_test_utils as wutils
23from acts.test_utils.wifi.rtt import rtt_const as rconsts
24
25# arbitrary timeout for events
26EVENT_TIMEOUT = 10
27
28
29def decorate_event(event_name, id):
30  return '%s_%d' % (event_name, id)
31
32
33def wait_for_event(ad, event_name, timeout=EVENT_TIMEOUT):
34  """Wait for the specified event or timeout.
35
36  Args:
37    ad: The android device
38    event_name: The event to wait on
39    timeout: Number of seconds to wait
40  Returns:
41    The event (if available)
42  """
43  prefix = ''
44  if hasattr(ad, 'pretty_name'):
45    prefix = '[%s] ' % ad.pretty_name
46  try:
47    event = ad.ed.pop_event(event_name, timeout)
48    ad.log.info('%s%s: %s', prefix, event_name, event['data'])
49    return event
50  except queue.Empty:
51    ad.log.info('%sTimed out while waiting for %s', prefix, event_name)
52    asserts.fail(event_name)
53
54def fail_on_event(ad, event_name, timeout=EVENT_TIMEOUT):
55  """Wait for a timeout period and looks for the specified event - fails if it
56  is observed.
57
58  Args:
59    ad: The android device
60    event_name: The event to wait for (and fail on its appearance)
61  """
62  prefix = ''
63  if hasattr(ad, 'pretty_name'):
64    prefix = '[%s] ' % ad.pretty_name
65  try:
66    event = ad.ed.pop_event(event_name, timeout)
67    ad.log.info('%sReceived unwanted %s: %s', prefix, event_name, event['data'])
68    asserts.fail(event_name, extras=event)
69  except queue.Empty:
70    ad.log.info('%s%s not seen (as expected)', prefix, event_name)
71    return
72
73
74def config_privilege_override(dut, override_to_no_privilege):
75  """Configure the device to override the permission check and to disallow any
76  privileged RTT operations, e.g. disallow one-sided RTT to Responders (APs)
77  which do not support IEEE 802.11mc.
78
79  Args:
80    dut: Device to configure.
81    override_to_no_privilege: True to indicate no privileged ops, False for
82                              default (which will allow privileged ops).
83  """
84  dut.adb.shell("cmd wifirtt set override_assume_no_privilege %d" % (
85    1 if override_to_no_privilege else 0))
86
87
88def get_rtt_constrained_results(scanned_networks, support_rtt):
89  """Filter the input list and only return those networks which either support
90  or do not support RTT (IEEE 802.11mc.)
91
92  Args:
93    scanned_networks: A list of networks from scan results.
94      support_rtt: True - only return those APs which support RTT, False - only
95                   return those APs which do not support RTT.
96
97  Returns: a sub-set of the scanned_networks per support_rtt constraint.
98  """
99  matching_networks = []
100  for network in scanned_networks:
101    if support_rtt:
102      if (rconsts.SCAN_RESULT_KEY_RTT_RESPONDER in network and
103          network[rconsts.SCAN_RESULT_KEY_RTT_RESPONDER]):
104        matching_networks.append(network)
105    else:
106      if (rconsts.SCAN_RESULT_KEY_RTT_RESPONDER not in network or
107            not network[rconsts.SCAN_RESULT_KEY_RTT_RESPONDER]):
108        matching_networks.append(network)
109
110  return matching_networks
111
112
113def scan_networks(dut):
114  """Perform a scan and return scan results.
115
116  Args:
117    dut: Device under test.
118
119  Returns: an array of scan results.
120  """
121  wutils.start_wifi_connection_scan(dut)
122  return dut.droid.wifiGetScanResults()
123
124
125def scan_with_rtt_support_constraint(dut, support_rtt, repeat=0):
126  """Perform a scan and return scan results of APs: only those that support or
127  do not support RTT (IEEE 802.11mc) - per the support_rtt parameter.
128
129  Args:
130    dut: Device under test.
131    support_rtt: True - only return those APs which support RTT, False - only
132                 return those APs which do not support RTT.
133    repeat: Re-scan this many times to find an RTT supporting network.
134
135  Returns: an array of scan results.
136  """
137  for i in range(repeat + 1):
138    scan_results = scan_networks(dut)
139    aps = get_rtt_constrained_results(scan_results, support_rtt)
140    if len(aps) != 0:
141      return aps
142
143  return []
144
145
146def select_best_scan_results(scans, select_count, lowest_rssi=-80):
147  """Select the strongest 'select_count' scans in the input list based on
148  highest RSSI. Exclude all very weak signals, even if results in a shorter
149  list.
150
151  Args:
152    scans: List of scan results.
153    select_count: An integer specifying how many scans to return at most.
154    lowest_rssi: The lowest RSSI to accept into the output.
155  Returns: a list of the strongest 'select_count' scan results from the scans
156           list.
157  """
158  def takeRssi(element):
159    return element['level']
160
161  result = []
162  scans.sort(key=takeRssi, reverse=True)
163  for scan in scans:
164    if len(result) == select_count:
165      break
166    if scan['level'] < lowest_rssi:
167      break # rest are lower since we're sorted
168    result.append(scan)
169
170  return result
171
172
173def validate_ap_result(scan_result, range_result):
174  """Validate the range results:
175  - Successful if AP (per scan result) support 802.11mc (allowed to fail
176    otherwise)
177  - MAC of result matches the BSSID
178
179  Args:
180    scan_result: Scan result for the AP
181    range_result: Range result returned by the RTT API
182  """
183  asserts.assert_equal(scan_result[wutils.WifiEnums.BSSID_KEY], range_result[
184    rconsts.EVENT_CB_RANGING_KEY_MAC_AS_STRING_BSSID], 'MAC/BSSID mismatch')
185  if (rconsts.SCAN_RESULT_KEY_RTT_RESPONDER in scan_result and
186      scan_result[rconsts.SCAN_RESULT_KEY_RTT_RESPONDER]):
187    asserts.assert_true(range_result[rconsts.EVENT_CB_RANGING_KEY_STATUS] ==
188                        rconsts.EVENT_CB_RANGING_STATUS_SUCCESS,
189                        'Ranging failed for an AP which supports 802.11mc!')
190
191
192def validate_ap_results(scan_results, range_results):
193  """Validate an array of ranging results against the scan results used to
194  trigger the range. The assumption is that the results are returned in the
195  same order as the request (which were the scan results).
196
197  Args:
198    scan_results: Scans results used to trigger the range request
199    range_results: Range results returned by the RTT API
200  """
201  asserts.assert_equal(
202      len(scan_results),
203      len(range_results),
204      'Mismatch in length of scan results and range results')
205
206  # sort first based on BSSID/MAC
207  scan_results.sort(key=lambda x: x[wutils.WifiEnums.BSSID_KEY])
208  range_results.sort(
209      key=lambda x: x[rconsts.EVENT_CB_RANGING_KEY_MAC_AS_STRING_BSSID])
210
211  for i in range(len(scan_results)):
212    validate_ap_result(scan_results[i], range_results[i])
213
214
215def validate_aware_mac_result(range_result, mac, description):
216  """Validate the range result for an Aware peer specified with a MAC address:
217  - Correct MAC address.
218
219  The MAC addresses may contain ":" (which are ignored for the comparison) and
220  may be in any case (which is ignored for the comparison).
221
222  Args:
223    range_result: Range result returned by the RTT API
224    mac: MAC address of the peer
225    description: Additional content to print on failure
226  """
227  mac1 = mac.replace(':', '').lower()
228  mac2 = range_result[rconsts.EVENT_CB_RANGING_KEY_MAC_AS_STRING].replace(':',
229                                                                  '').lower()
230  asserts.assert_equal(mac1, mac2,
231                       '%s: MAC mismatch' % description)
232
233def validate_aware_peer_id_result(range_result, peer_id, description):
234  """Validate the range result for An Aware peer specified with a Peer ID:
235  - Correct Peer ID
236  - MAC address information not available
237
238  Args:
239    range_result: Range result returned by the RTT API
240    peer_id: Peer ID of the peer
241    description: Additional content to print on failure
242  """
243  asserts.assert_equal(peer_id,
244                       range_result[rconsts.EVENT_CB_RANGING_KEY_PEER_ID],
245                       '%s: Peer Id mismatch' % description)
246  asserts.assert_false(rconsts.EVENT_CB_RANGING_KEY_MAC in range_result,
247                       '%s: MAC Address not empty!' % description)
248
249
250def extract_stats(results, range_reference_mm, range_margin_mm, min_rssi,
251    reference_lci=[], reference_lcr=[], summary_only=False):
252  """Extract statistics from a list of RTT results. Returns a dictionary
253   with results:
254     - num_results (success or fails)
255     - num_success_results
256     - num_no_results (e.g. timeout)
257     - num_failures
258     - num_range_out_of_margin (only for successes)
259     - num_invalid_rssi (only for successes)
260     - distances: extracted list of distances
261     - distance_std_devs: extracted list of distance standard-deviations
262     - rssis: extracted list of RSSI
263     - distance_mean
264     - distance_std_dev (based on distance - ignoring the individual std-devs)
265     - rssi_mean
266     - rssi_std_dev
267     - status_codes
268     - lcis: extracted list of all of the individual LCI
269     - lcrs: extracted list of all of the individual LCR
270     - any_lci_mismatch: True/False - checks if all LCI results are identical to
271                         the reference LCI.
272     - any_lcr_mismatch: True/False - checks if all LCR results are identical to
273                         the reference LCR.
274     - num_attempted_measurements: extracted list of all of the individual
275                                   number of attempted measurements.
276     - num_successful_measurements: extracted list of all of the individual
277                                    number of successful measurements.
278     - invalid_num_attempted: True/False - checks if number of attempted
279                              measurements is non-zero for successful results.
280     - invalid_num_successful: True/False - checks if number of successful
281                               measurements is non-zero for successful results.
282
283  Args:
284    results: List of RTT results.
285    range_reference_mm: Reference value for the distance (in mm)
286    range_margin_mm: Acceptable absolute margin for distance (in mm)
287    min_rssi: Acceptable minimum RSSI value.
288    reference_lci, reference_lcr: Reference values for LCI and LCR.
289    summary_only: Only include summary keys (reduce size).
290
291  Returns: A dictionary of stats.
292  """
293  stats = {}
294  stats['num_results'] = 0
295  stats['num_success_results'] = 0
296  stats['num_no_results'] = 0
297  stats['num_failures'] = 0
298  stats['num_range_out_of_margin'] = 0
299  stats['num_invalid_rssi'] = 0
300  stats['any_lci_mismatch'] = False
301  stats['any_lcr_mismatch'] = False
302  stats['invalid_num_attempted'] = False
303  stats['invalid_num_successful'] = False
304
305  range_max_mm = range_reference_mm + range_margin_mm
306  range_min_mm = range_reference_mm - range_margin_mm
307
308  distances = []
309  distance_std_devs = []
310  rssis = []
311  num_attempted_measurements = []
312  num_successful_measurements = []
313  status_codes = []
314  lcis = []
315  lcrs = []
316
317  for i in range(len(results)):
318    result = results[i]
319
320    if result is None: # None -> timeout waiting for RTT result
321      stats['num_no_results'] = stats['num_no_results'] + 1
322      continue
323    stats['num_results'] = stats['num_results'] + 1
324
325    status_codes.append(result[rconsts.EVENT_CB_RANGING_KEY_STATUS])
326    if status_codes[-1] != rconsts.EVENT_CB_RANGING_STATUS_SUCCESS:
327      stats['num_failures'] = stats['num_failures'] + 1
328      continue
329    stats['num_success_results'] = stats['num_success_results'] + 1
330
331    distance_mm = result[rconsts.EVENT_CB_RANGING_KEY_DISTANCE_MM]
332    distances.append(distance_mm)
333    if not range_min_mm <= distance_mm <= range_max_mm:
334      stats['num_range_out_of_margin'] = stats['num_range_out_of_margin'] + 1
335    distance_std_devs.append(
336        result[rconsts.EVENT_CB_RANGING_KEY_DISTANCE_STD_DEV_MM])
337
338    rssi = result[rconsts.EVENT_CB_RANGING_KEY_RSSI]
339    rssis.append(rssi)
340    if not min_rssi <= rssi <= 0:
341      stats['num_invalid_rssi'] = stats['num_invalid_rssi'] + 1
342
343    num_attempted = result[
344      rconsts.EVENT_CB_RANGING_KEY_NUM_ATTEMPTED_MEASUREMENTS]
345    num_attempted_measurements.append(num_attempted)
346    if num_attempted == 0:
347      stats['invalid_num_attempted'] = True
348
349    num_successful = result[
350      rconsts.EVENT_CB_RANGING_KEY_NUM_SUCCESSFUL_MEASUREMENTS]
351    num_successful_measurements.append(num_successful)
352    if num_successful == 0:
353      stats['invalid_num_successful'] = True
354
355    lcis.append(result[rconsts.EVENT_CB_RANGING_KEY_LCI])
356    if (result[rconsts.EVENT_CB_RANGING_KEY_LCI] != reference_lci):
357      stats['any_lci_mismatch'] = True
358    lcrs.append(result[rconsts.EVENT_CB_RANGING_KEY_LCR])
359    if (result[rconsts.EVENT_CB_RANGING_KEY_LCR] != reference_lcr):
360      stats['any_lcr_mismatch'] = True
361
362  if len(distances) > 0:
363    stats['distance_mean'] = statistics.mean(distances)
364  if len(distances) > 1:
365    stats['distance_std_dev'] = statistics.stdev(distances)
366  if len(rssis) > 0:
367    stats['rssi_mean'] = statistics.mean(rssis)
368  if len(rssis) > 1:
369    stats['rssi_std_dev'] = statistics.stdev(rssis)
370  if not summary_only:
371    stats['distances'] = distances
372    stats['distance_std_devs'] = distance_std_devs
373    stats['rssis'] = rssis
374    stats['num_attempted_measurements'] = num_attempted_measurements
375    stats['num_successful_measurements'] = num_successful_measurements
376    stats['status_codes'] = status_codes
377    stats['lcis'] = lcis
378    stats['lcrs'] = lcrs
379
380  return stats
381
382
383def run_ranging(dut, aps, iter_count, time_between_iterations,
384    target_run_time_sec=0):
385  """Executing ranging to the set of APs.
386
387  Will execute a minimum of 'iter_count' iterations. Will continue to run
388  until execution time (just) exceeds 'target_run_time_sec'.
389
390  Args:
391    dut: Device under test
392    aps: A list of APs (Access Points) to range to.
393    iter_count: (Minimum) Number of measurements to perform.
394    time_between_iterations: Number of seconds to wait between iterations.
395    target_run_time_sec: The target run time in seconds.
396
397  Returns: a list of the events containing the RTT results (or None for a
398  failed measurement).
399  """
400  max_peers = dut.droid.wifiRttMaxPeersInRequest()
401
402  asserts.assert_true(len(aps) > 0, "Need at least one AP!")
403  if len(aps) > max_peers:
404    aps = aps[0:max_peers]
405
406  events = {} # need to keep track per BSSID!
407  for ap in aps:
408    events[ap["BSSID"]] = []
409
410  start_clock = time.time()
411  iterations_done = 0
412  run_time = 0
413  while iterations_done < iter_count or (
414      target_run_time_sec != 0 and run_time < target_run_time_sec):
415    if iterations_done != 0 and time_between_iterations != 0:
416      time.sleep(time_between_iterations)
417
418    id = dut.droid.wifiRttStartRangingToAccessPoints(aps)
419    try:
420      event = dut.ed.pop_event(
421        decorate_event(rconsts.EVENT_CB_RANGING_ON_RESULT, id), EVENT_TIMEOUT)
422      range_results = event["data"][rconsts.EVENT_CB_RANGING_KEY_RESULTS]
423      asserts.assert_equal(
424          len(aps),
425          len(range_results),
426          'Mismatch in length of scan results and range results')
427      for result in range_results:
428        bssid = result[rconsts.EVENT_CB_RANGING_KEY_MAC_AS_STRING]
429        asserts.assert_true(bssid in events,
430                            "Result BSSID %s not in requested AP!?" % bssid)
431        asserts.assert_equal(len(events[bssid]), iterations_done,
432                             "Duplicate results for BSSID %s!?" % bssid)
433        events[bssid].append(result)
434    except queue.Empty:
435      for ap in aps:
436        events[ap["BSSID"]].append(None)
437
438    iterations_done = iterations_done + 1
439    run_time = time.time() - start_clock
440
441  return events
442
443
444def analyze_results(all_aps_events, rtt_reference_distance_mm,
445    distance_margin_mm, min_expected_rssi, lci_reference, lcr_reference,
446    summary_only=False):
447  """Verifies the results of the RTT experiment.
448
449  Args:
450    all_aps_events: Dictionary of APs, each a list of RTT result events.
451    rtt_reference_distance_mm: Expected distance to the AP (source of truth).
452    distance_margin_mm: Accepted error marging in distance measurement.
453    min_expected_rssi: Minimum acceptable RSSI value
454    lci_reference, lcr_reference: Expected LCI/LCR values (arrays of bytes).
455    summary_only: Only include summary keys (reduce size).
456  """
457  all_stats = {}
458  for bssid, events in all_aps_events.items():
459    stats = extract_stats(events, rtt_reference_distance_mm,
460                          distance_margin_mm, min_expected_rssi,
461                          lci_reference, lcr_reference, summary_only)
462    all_stats[bssid] = stats
463  return all_stats
464