1#!/usr/bin/python3.4
2#
3#   Copyright 2017 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import queue
18import time
19
20from acts import asserts
21from acts.test_decorators import test_tracker_info
22from acts.test_utils.wifi.aware import aware_const as aconsts
23from acts.test_utils.wifi.aware import aware_test_utils as autils
24from acts.test_utils.wifi.aware.AwareBaseTest import AwareBaseTest
25from acts.test_utils.wifi.rtt import rtt_const as rconsts
26from acts.test_utils.wifi.rtt import rtt_test_utils as rutils
27from acts.test_utils.wifi.rtt.RttBaseTest import RttBaseTest
28
29
30class RangeAwareTest(AwareBaseTest, RttBaseTest):
31  """Test class for RTT ranging to Wi-Fi Aware peers"""
32  SERVICE_NAME = "GoogleTestServiceXY"
33
34  # Number of RTT iterations
35  NUM_ITER = 10
36
37  # Time gap (in seconds) between iterations
38  TIME_BETWEEN_ITERATIONS = 0
39
40  # Time gap (in seconds) when switching between Initiator and Responder
41  TIME_BETWEEN_ROLES = 4
42
43  def __init__(self, controllers):
44    AwareBaseTest.__init__(self, controllers)
45    RttBaseTest.__init__(self, controllers)
46
47  def setup_test(self):
48    """Manual setup here due to multiple inheritance: explicitly execute the
49    setup method from both parents."""
50    AwareBaseTest.setup_test(self)
51    RttBaseTest.setup_test(self)
52
53  def teardown_test(self):
54    """Manual teardown here due to multiple inheritance: explicitly execute the
55    teardown method from both parents."""
56    AwareBaseTest.teardown_test(self)
57    RttBaseTest.teardown_test(self)
58
59  #############################################################################
60
61  def run_rtt_discovery(self, init_dut, resp_mac=None, resp_peer_id=None):
62    """Perform single RTT measurement, using Aware, from the Initiator DUT to
63    a Responder. The RTT Responder can be specified using its MAC address
64    (obtained using out- of-band discovery) or its Peer ID (using Aware
65    discovery).
66
67    Args:
68      init_dut: RTT Initiator device
69      resp_mac: MAC address of the RTT Responder device
70      resp_peer_id: Peer ID of the RTT Responder device
71    """
72    asserts.assert_true(resp_mac is not None or resp_peer_id is not None,
73                        "One of the Responder specifications (MAC or Peer ID)"
74                        " must be provided!")
75    if resp_mac is not None:
76      id = init_dut.droid.wifiRttStartRangingToAwarePeerMac(resp_mac)
77    else:
78      id = init_dut.droid.wifiRttStartRangingToAwarePeerId(resp_peer_id)
79    try:
80      event = init_dut.ed.pop_event(rutils.decorate_event(
81          rconsts.EVENT_CB_RANGING_ON_RESULT, id), rutils.EVENT_TIMEOUT)
82      result = event["data"][rconsts.EVENT_CB_RANGING_KEY_RESULTS][0]
83      if resp_mac is not None:
84        rutils.validate_aware_mac_result(result, resp_mac, "DUT")
85      else:
86        rutils.validate_aware_peer_id_result(result, resp_peer_id, "DUT")
87      return result
88    except queue.Empty:
89      return None
90
91  def run_rtt_ib_discovery_set(self, do_both_directions, iter_count,
92      time_between_iterations, time_between_roles):
93    """Perform a set of RTT measurements, using in-band (Aware) discovery.
94
95    Args:
96      do_both_directions: False - perform all measurements in one direction,
97                          True - perform 2 measurements one in both directions.
98      iter_count: Number of measurements to perform.
99      time_between_iterations: Number of seconds to wait between iterations.
100      time_between_roles: Number of seconds to wait when switching between
101                          Initiator and Responder roles (only matters if
102                          do_both_directions=True).
103
104    Returns: a list of the events containing the RTT results (or None for a
105    failed measurement). If both directions are tested then returns a list of
106    2 elements: one set for each direction.
107    """
108    p_dut = self.android_devices[0]
109    s_dut = self.android_devices[1]
110
111    (p_id, s_id, p_disc_id, s_disc_id,
112     peer_id_on_sub, peer_id_on_pub) = autils.create_discovery_pair(
113        p_dut,
114        s_dut,
115        p_config=autils.add_ranging_to_pub(autils.create_discovery_config(
116            self.SERVICE_NAME, aconsts.PUBLISH_TYPE_UNSOLICITED), True),
117        s_config=autils.add_ranging_to_pub(autils.create_discovery_config(
118            self.SERVICE_NAME, aconsts.SUBSCRIBE_TYPE_PASSIVE), True),
119        device_startup_offset=self.device_startup_offset,
120        msg_id=self.get_next_msg_id())
121
122    resultsPS = []
123    resultsSP = []
124    for i in range(iter_count):
125      if i != 0 and time_between_iterations != 0:
126        time.sleep(time_between_iterations)
127
128      # perform RTT from pub -> sub
129      resultsPS.append(
130        self.run_rtt_discovery(p_dut, resp_peer_id=peer_id_on_pub))
131
132      if do_both_directions:
133        if time_between_roles != 0:
134          time.sleep(time_between_roles)
135
136        # perform RTT from sub -> pub
137        resultsSP.append(
138          self.run_rtt_discovery(s_dut, resp_peer_id=peer_id_on_sub))
139
140    return resultsPS if not do_both_directions else [resultsPS, resultsSP]
141
142  def run_rtt_oob_discovery_set(self, do_both_directions, iter_count,
143      time_between_iterations, time_between_roles):
144    """Perform a set of RTT measurements, using out-of-band discovery.
145
146    Args:
147      do_both_directions: False - perform all measurements in one direction,
148                          True - perform 2 measurements one in both directions.
149      iter_count: Number of measurements to perform.
150      time_between_iterations: Number of seconds to wait between iterations.
151      time_between_roles: Number of seconds to wait when switching between
152                          Initiator and Responder roles (only matters if
153                          do_both_directions=True).
154      enable_ranging: True to enable Ranging, False to disable.
155
156    Returns: a list of the events containing the RTT results (or None for a
157    failed measurement). If both directions are tested then returns a list of
158    2 elements: one set for each direction.
159    """
160    dut0 = self.android_devices[0]
161    dut1 = self.android_devices[1]
162
163    id0, mac0 = autils.attach_with_identity(dut0)
164    id1, mac1 = autils.attach_with_identity(dut1)
165
166    # wait for for devices to synchronize with each other - there are no other
167    # mechanisms to make sure this happens for OOB discovery (except retrying
168    # to execute the data-path request)
169    time.sleep(autils.WAIT_FOR_CLUSTER)
170
171    # start publisher(s) on the Responder(s) with ranging enabled
172    p_config = autils.add_ranging_to_pub(
173      autils.create_discovery_config(self.SERVICE_NAME,
174                                     aconsts.PUBLISH_TYPE_UNSOLICITED),
175      enable_ranging=True)
176    dut1.droid.wifiAwarePublish(id1, p_config)
177    autils.wait_for_event(dut1, aconsts.SESSION_CB_ON_PUBLISH_STARTED)
178    if do_both_directions:
179      dut0.droid.wifiAwarePublish(id0, p_config)
180      autils.wait_for_event(dut0, aconsts.SESSION_CB_ON_PUBLISH_STARTED)
181
182    results01 = []
183    results10 = []
184    for i in range(iter_count):
185      if i != 0 and time_between_iterations != 0:
186        time.sleep(time_between_iterations)
187
188      # perform RTT from dut0 -> dut1
189      results01.append(
190          self.run_rtt_discovery(dut0, resp_mac=mac1))
191
192      if do_both_directions:
193        if time_between_roles != 0:
194          time.sleep(time_between_roles)
195
196        # perform RTT from dut1 -> dut0
197        results10.append(
198            self.run_rtt_discovery(dut1, resp_mac=mac0))
199
200    return results01 if not do_both_directions else [results01, results10]
201
202  def verify_results(self, results, results_reverse_direction=None):
203    """Verifies the results of the RTT experiment.
204
205    Args:
206      results: List of RTT results.
207      results_reverse_direction: List of RTT results executed in the
208                                reverse direction. Optional.
209    """
210    stats = rutils.extract_stats(results, self.rtt_reference_distance_mm,
211                                 self.rtt_reference_distance_margin_mm,
212                                 self.rtt_min_expected_rssi_dbm)
213    stats_reverse_direction = None
214    if results_reverse_direction is not None:
215      stats_reverse_direction = rutils.extract_stats(results_reverse_direction,
216          self.rtt_reference_distance_mm, self.rtt_reference_distance_margin_mm,
217          self.rtt_min_expected_rssi_dbm)
218    self.log.debug("Stats: %s", stats)
219    if stats_reverse_direction is not None:
220      self.log.debug("Stats in reverse direction: %s", stats_reverse_direction)
221
222    extras = stats if stats_reverse_direction is None else {
223      "forward": stats,
224      "reverse": stats_reverse_direction}
225
226    asserts.assert_true(stats['num_no_results'] == 0,
227                        "Missing (timed-out) results", extras=extras)
228    asserts.assert_false(stats['any_lci_mismatch'],
229                         "LCI mismatch", extras=extras)
230    asserts.assert_false(stats['any_lcr_mismatch'],
231                         "LCR mismatch", extras=extras)
232    asserts.assert_false(stats['invalid_num_attempted'],
233                         "Invalid (0) number of attempts", extras=stats)
234    asserts.assert_false(stats['invalid_num_successful'],
235                         "Invalid (0) number of successes", extras=stats)
236    asserts.assert_equal(stats['num_invalid_rssi'], 0, "Invalid RSSI",
237                         extras=extras)
238    asserts.assert_true(
239        stats['num_failures'] <=
240          self.rtt_max_failure_rate_two_sided_rtt_percentage
241          * stats['num_results'] / 100,
242        "Failure rate is too high", extras=extras)
243    asserts.assert_true(
244        stats['num_range_out_of_margin']
245          <= self.rtt_max_margin_exceeded_rate_two_sided_rtt_percentage
246             * stats['num_success_results'] / 100,
247        "Results exceeding error margin rate is too high", extras=extras)
248
249    if stats_reverse_direction is not None:
250      asserts.assert_true(stats_reverse_direction['num_no_results'] == 0,
251                          "Missing (timed-out) results",
252                          extras=extras)
253      asserts.assert_false(stats['any_lci_mismatch'],
254                           "LCI mismatch", extras=extras)
255      asserts.assert_false(stats['any_lcr_mismatch'],
256                           "LCR mismatch", extras=extras)
257      asserts.assert_equal(stats['num_invalid_rssi'], 0, "Invalid RSSI",
258                           extras=extras)
259      asserts.assert_true(
260          stats_reverse_direction['num_failures']
261            <= self.rtt_max_failure_rate_two_sided_rtt_percentage
262                * stats['num_results'] / 100,
263          "Failure rate is too high", extras=extras)
264      asserts.assert_true(
265          stats_reverse_direction['num_range_out_of_margin']
266            <= self.rtt_max_margin_exceeded_rate_two_sided_rtt_percentage
267                * stats['num_success_results'] / 100,
268          "Results exceeding error margin rate is too high",
269          extras=extras)
270
271    asserts.explicit_pass("RTT Aware test done", extras=extras)
272
273  #############################################################################
274
275  @test_tracker_info(uuid="9e4e7ab4-2254-498c-9788-21e15ed9a370")
276  def test_rtt_oob_discovery_one_way(self):
277    """Perform RTT between 2 Wi-Fi Aware devices. Use out-of-band discovery
278    to communicate the MAC addresses to the peer. Test one-direction RTT only.
279    """
280    rtt_results = self.run_rtt_oob_discovery_set(do_both_directions=False,
281          iter_count=self.NUM_ITER,
282          time_between_iterations=self.TIME_BETWEEN_ITERATIONS,
283          time_between_roles=self.TIME_BETWEEN_ROLES)
284    self.verify_results(rtt_results)
285
286  @test_tracker_info(uuid="22edba77-eeb2-43ee-875a-84437550ad84")
287  def test_rtt_oob_discovery_both_ways(self):
288    """Perform RTT between 2 Wi-Fi Aware devices. Use out-of-band discovery
289    to communicate the MAC addresses to the peer. Test RTT both-ways:
290    switching rapidly between Initiator and Responder.
291    """
292    rtt_results1, rtt_results2 = self.run_rtt_oob_discovery_set(
293        do_both_directions=True, iter_count=self.NUM_ITER,
294        time_between_iterations=self.TIME_BETWEEN_ITERATIONS,
295        time_between_roles=self.TIME_BETWEEN_ROLES)
296    self.verify_results(rtt_results1, rtt_results2)
297
298  @test_tracker_info(uuid="18cef4be-95b4-4f7d-a140-5165874e7d1c")
299  def test_rtt_ib_discovery_one_way(self):
300    """Perform RTT between 2 Wi-Fi Aware devices. Use in-band (Aware) discovery
301    to communicate the MAC addresses to the peer. Test one-direction RTT only.
302    """
303    rtt_results = self.run_rtt_ib_discovery_set(do_both_directions=False,
304           iter_count=self.NUM_ITER,
305           time_between_iterations=self.TIME_BETWEEN_ITERATIONS,
306           time_between_roles=self.TIME_BETWEEN_ROLES)
307    self.verify_results(rtt_results)
308
309  @test_tracker_info(uuid="c67c8e70-c417-42d9-9bca-af3a89f1ddd9")
310  def test_rtt_ib_discovery_both_ways(self):
311    """Perform RTT between 2 Wi-Fi Aware devices. Use in-band (Aware) discovery
312    to communicate the MAC addresses to the peer. Test RTT both-ways:
313    switching rapidly between Initiator and Responder.
314    """
315    rtt_results1, rtt_results2 = self.run_rtt_ib_discovery_set(
316        do_both_directions=True, iter_count=self.NUM_ITER,
317        time_between_iterations=self.TIME_BETWEEN_ITERATIONS,
318        time_between_roles=self.TIME_BETWEEN_ROLES)
319    self.verify_results(rtt_results1, rtt_results2)
320
321  @test_tracker_info(uuid="54f9693d-45e5-4979-adbb-1b875d217c0c")
322  def test_rtt_without_initiator_aware(self):
323    """Try to perform RTT operation when there is no local Aware session (on the
324    Initiator). The Responder is configured normally: Aware on and a Publisher
325    with Ranging enable. Should FAIL."""
326    init_dut = self.android_devices[0]
327    resp_dut = self.android_devices[1]
328
329    # Enable a Responder and start a Publisher
330    resp_id = resp_dut.droid.wifiAwareAttach(True)
331    autils.wait_for_event(resp_dut, aconsts.EVENT_CB_ON_ATTACHED)
332    resp_ident_event = autils.wait_for_event(resp_dut,
333                                         aconsts.EVENT_CB_ON_IDENTITY_CHANGED)
334    resp_mac = resp_ident_event['data']['mac']
335
336    resp_config = autils.add_ranging_to_pub(
337        autils.create_discovery_config(self.SERVICE_NAME,
338                                       aconsts.PUBLISH_TYPE_UNSOLICITED),
339        enable_ranging=True)
340    resp_dut.droid.wifiAwarePublish(resp_id, resp_config)
341    autils.wait_for_event(resp_dut, aconsts.SESSION_CB_ON_PUBLISH_STARTED)
342
343    # Initiate an RTT to Responder (no Aware started on Initiator!)
344    results = []
345    num_no_responses = 0
346    num_successes = 0
347    for i in range(self.NUM_ITER):
348      result = self.run_rtt_discovery(init_dut, resp_mac=resp_mac)
349      self.log.debug("result: %s", result)
350      results.append(result)
351      if result is None:
352        num_no_responses = num_no_responses + 1
353      elif (result[rconsts.EVENT_CB_RANGING_KEY_STATUS]
354            == rconsts.EVENT_CB_RANGING_STATUS_SUCCESS):
355        num_successes = num_successes + 1
356
357    asserts.assert_equal(num_no_responses, 0, "No RTT response?",
358                         extras={"data":results})
359    asserts.assert_equal(num_successes, 0, "Aware RTT w/o Aware should FAIL!",
360                         extras={"data":results})
361    asserts.explicit_pass("RTT Aware test done", extras={"data":results})
362
363  @test_tracker_info(uuid="87a69053-8261-4928-8ec1-c93aac7f3a8d")
364  def test_rtt_without_responder_aware(self):
365    """Try to perform RTT operation when there is no peer Aware session (on the
366    Responder). Should FAIL."""
367    init_dut = self.android_devices[0]
368    resp_dut = self.android_devices[1]
369
370    # Enable a Responder and start a Publisher
371    resp_id = resp_dut.droid.wifiAwareAttach(True)
372    autils.wait_for_event(resp_dut, aconsts.EVENT_CB_ON_ATTACHED)
373    resp_ident_event = autils.wait_for_event(resp_dut,
374                                             aconsts.EVENT_CB_ON_IDENTITY_CHANGED)
375    resp_mac = resp_ident_event['data']['mac']
376
377    resp_config = autils.add_ranging_to_pub(
378        autils.create_discovery_config(self.SERVICE_NAME,
379                                       aconsts.PUBLISH_TYPE_UNSOLICITED),
380        enable_ranging=True)
381    resp_dut.droid.wifiAwarePublish(resp_id, resp_config)
382    autils.wait_for_event(resp_dut, aconsts.SESSION_CB_ON_PUBLISH_STARTED)
383
384    # Disable Responder
385    resp_dut.droid.wifiAwareDestroy(resp_id)
386
387    # Enable the Initiator
388    init_id = init_dut.droid.wifiAwareAttach()
389    autils.wait_for_event(init_dut, aconsts.EVENT_CB_ON_ATTACHED)
390
391    # Initiate an RTT to Responder (no Aware started on Initiator!)
392    results = []
393    num_no_responses = 0
394    num_successes = 0
395    for i in range(self.NUM_ITER):
396      result = self.run_rtt_discovery(init_dut, resp_mac=resp_mac)
397      self.log.debug("result: %s", result)
398      results.append(result)
399      if result is None:
400        num_no_responses = num_no_responses + 1
401      elif (result[rconsts.EVENT_CB_RANGING_KEY_STATUS]
402            == rconsts.EVENT_CB_RANGING_STATUS_SUCCESS):
403        num_successes = num_successes + 1
404
405    asserts.assert_equal(num_no_responses, 0, "No RTT response?",
406                         extras={"data":results})
407    asserts.assert_equal(num_successes, 0, "Aware RTT w/o Aware should FAIL!",
408                         extras={"data":results})
409    asserts.explicit_pass("RTT Aware test done", extras={"data":results})
410