1#!/usr/bin/python3.4
2#
3#   Copyright 2018 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import queue
18import time
19
20from acts import asserts
21from acts.test_utils.wifi.aware import aware_const as aconsts
22from acts.test_utils.wifi.aware import aware_test_utils as autils
23from acts.test_utils.wifi.aware.AwareBaseTest import AwareBaseTest
24from acts.test_utils.wifi.rtt import rtt_const as rconsts
25from acts.test_utils.wifi.rtt import rtt_test_utils as rutils
26from acts.test_utils.wifi.rtt.RttBaseTest import RttBaseTest
27
28
29class StressRangeAwareTest(AwareBaseTest, RttBaseTest):
30  """Test class for stress testing of RTT ranging to Wi-Fi Aware peers."""
31  SERVICE_NAME = "GoogleTestServiceXY"
32
33  def __init__(self, controllers):
34    AwareBaseTest.__init__(self, controllers)
35    RttBaseTest.__init__(self, controllers)
36
37  def setup_test(self):
38    """Manual setup here due to multiple inheritance: explicitly execute the
39    setup method from both parents."""
40    AwareBaseTest.setup_test(self)
41    RttBaseTest.setup_test(self)
42
43  def teardown_test(self):
44    """Manual teardown here due to multiple inheritance: explicitly execute the
45    teardown method from both parents."""
46    AwareBaseTest.teardown_test(self)
47    RttBaseTest.teardown_test(self)
48
49  #############################################################################
50
51  def run_rtt_discovery(self, init_dut, resp_mac=None, resp_peer_id=None):
52    """Perform single RTT measurement, using Aware, from the Initiator DUT to
53    a Responder. The RTT Responder can be specified using its MAC address
54    (obtained using out- of-band discovery) or its Peer ID (using Aware
55    discovery).
56
57    Args:
58      init_dut: RTT Initiator device
59      resp_mac: MAC address of the RTT Responder device
60      resp_peer_id: Peer ID of the RTT Responder device
61    """
62    asserts.assert_true(resp_mac is not None or resp_peer_id is not None,
63                        "One of the Responder specifications (MAC or Peer ID)"
64                        " must be provided!")
65    if resp_mac is not None:
66      id = init_dut.droid.wifiRttStartRangingToAwarePeerMac(resp_mac)
67    else:
68      id = init_dut.droid.wifiRttStartRangingToAwarePeerId(resp_peer_id)
69    try:
70      event = init_dut.ed.pop_event(rutils.decorate_event(
71          rconsts.EVENT_CB_RANGING_ON_RESULT, id), rutils.EVENT_TIMEOUT)
72      result = event["data"][rconsts.EVENT_CB_RANGING_KEY_RESULTS][0]
73      if resp_mac is not None:
74        rutils.validate_aware_mac_result(result, resp_mac, "DUT")
75      else:
76        rutils.validate_aware_peer_id_result(result, resp_peer_id, "DUT")
77      return result
78    except queue.Empty:
79      return None
80
81  def test_stress_rtt_ib_discovery_set(self):
82    """Perform a set of RTT measurements, using in-band (Aware) discovery, and
83    switching Initiator and Responder roles repeatedly.
84
85    Stress test: repeat ranging operations. Verify rate of success and
86    stability of results.
87    """
88    p_dut = self.android_devices[0]
89    s_dut = self.android_devices[1]
90
91    (p_id, s_id, p_disc_id, s_disc_id,
92     peer_id_on_sub, peer_id_on_pub) = autils.create_discovery_pair(
93        p_dut,
94        s_dut,
95        p_config=autils.add_ranging_to_pub(autils.create_discovery_config(
96            self.SERVICE_NAME, aconsts.PUBLISH_TYPE_UNSOLICITED), True),
97        s_config=autils.add_ranging_to_pub(autils.create_discovery_config(
98            self.SERVICE_NAME, aconsts.SUBSCRIBE_TYPE_PASSIVE), True),
99        device_startup_offset=self.device_startup_offset,
100        msg_id=self.get_next_msg_id())
101
102    results = []
103    start_clock = time.time()
104    iterations_done = 0
105    run_time = 0
106    while iterations_done < self.stress_test_min_iteration_count or (
107            self.stress_test_target_run_time_sec != 0
108        and run_time < self.stress_test_target_run_time_sec):
109      results.append(self.run_rtt_discovery(p_dut, resp_peer_id=peer_id_on_pub))
110      results.append(self.run_rtt_discovery(s_dut, resp_peer_id=peer_id_on_sub))
111
112      iterations_done = iterations_done + 1
113      run_time = time.time() - start_clock
114
115    stats = rutils.extract_stats(results, self.rtt_reference_distance_mm,
116                                 self.rtt_reference_distance_margin_mm,
117                                 self.rtt_min_expected_rssi_dbm,
118                                 summary_only=True)
119    self.log.debug("Stats: %s", stats)
120    asserts.assert_true(stats['num_no_results'] == 0,
121                        "Missing (timed-out) results", extras=stats)
122    asserts.assert_false(stats['any_lci_mismatch'],
123                         "LCI mismatch", extras=stats)
124    asserts.assert_false(stats['any_lcr_mismatch'],
125                         "LCR mismatch", extras=stats)
126    asserts.assert_equal(stats['num_invalid_rssi'], 0, "Invalid RSSI",
127                         extras=stats)
128    asserts.assert_true(
129        stats['num_failures'] <=
130        self.rtt_max_failure_rate_two_sided_rtt_percentage
131        * stats['num_results'] / 100,
132        "Failure rate is too high", extras=stats)
133    asserts.assert_true(
134        stats['num_range_out_of_margin']
135        <= self.rtt_max_margin_exceeded_rate_two_sided_rtt_percentage
136        * stats['num_success_results'] / 100,
137        "Results exceeding error margin rate is too high", extras=stats)
138