1#!/usr/bin/python3.4
2#
3#   Copyright 2017 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import queue
18
19from acts import asserts
20from acts.test_decorators import test_tracker_info
21from acts.test_utils.wifi.aware import aware_const as aconsts
22from acts.test_utils.wifi.aware import aware_test_utils as autils
23from acts.test_utils.wifi.aware.AwareBaseTest import AwareBaseTest
24
25KEY_ID = "id"
26KEY_TX_OK_COUNT = "tx_ok_count"
27KEY_TX_FAIL_COUNT = "tx_fail_count"
28KEY_RX_COUNT = "rx_count"
29
30
31class MessagesStressTest(AwareBaseTest):
32  """Set of stress tests for Wi-Fi Aware L2 (layer 2) message exchanges."""
33
34  # Number of iterations in the stress test (number of messages)
35  NUM_ITERATIONS = 100
36
37  # Maximum permitted percentage of messages which fail to be transmitted
38  # correctly
39  MAX_TX_FAILURE_PERCENTAGE = 2
40
41  # Maximum permitted percentage of messages which are received more than once
42  # (indicating, most likely, that the ACK wasn't received and the message was
43  # retransmitted)
44  MAX_DUPLICATE_RX_PERCENTAGE = 2
45
46  SERVICE_NAME = "GoogleTestServiceXY"
47
48  def __init__(self, controllers):
49    AwareBaseTest.__init__(self, controllers)
50
51  def init_info(self, msg, id, messages_by_msg, messages_by_id):
52    """Initialize the message data structures.
53
54    Args:
55      msg: message text
56      id: message id
57      messages_by_msg: {text -> {id, tx_ok_count, tx_fail_count, rx_count}}
58      messages_by_id: {id -> text}
59    """
60    messages_by_msg[msg] = {}
61    messages_by_msg[msg][KEY_ID] = id
62    messages_by_msg[msg][KEY_TX_OK_COUNT] = 0
63    messages_by_msg[msg][KEY_TX_FAIL_COUNT] = 0
64    messages_by_msg[msg][KEY_RX_COUNT] = 0
65    messages_by_id[id] = msg
66
67  def wait_for_tx_events(self, dut, num_msgs, messages_by_msg, messages_by_id):
68    """Wait for messages to be transmitted and update data structures.
69
70    Args:
71      dut: device under test
72      num_msgs: number of expected message tx
73      messages_by_msg: {text -> {id, tx_ok_count, tx_fail_count, rx_count}}
74      messages_by_id: {id -> text}
75    """
76    num_ok_tx_confirmations = 0
77    num_fail_tx_confirmations = 0
78    num_unexpected_ids = 0
79    tx_events_regex = "%s|%s" % (aconsts.SESSION_CB_ON_MESSAGE_SEND_FAILED,
80                                 aconsts.SESSION_CB_ON_MESSAGE_SENT)
81    while num_ok_tx_confirmations + num_fail_tx_confirmations < num_msgs:
82      try:
83        events = dut.ed.pop_events(tx_events_regex, autils.EVENT_TIMEOUT)
84        for event in events:
85          if (event["name"] != aconsts.SESSION_CB_ON_MESSAGE_SENT and
86              event["name"] != aconsts.SESSION_CB_ON_MESSAGE_SEND_FAILED):
87            asserts.fail("Unexpected event: %s" % event)
88          is_tx_ok = event["name"] == aconsts.SESSION_CB_ON_MESSAGE_SENT
89
90          id = event["data"][aconsts.SESSION_CB_KEY_MESSAGE_ID]
91          if id in messages_by_id:
92            msg = messages_by_id[id]
93            if is_tx_ok:
94              messages_by_msg[msg][
95                  KEY_TX_OK_COUNT] = messages_by_msg[msg][KEY_TX_OK_COUNT] + 1
96              if messages_by_msg[msg][KEY_TX_OK_COUNT] == 1:
97                num_ok_tx_confirmations = num_ok_tx_confirmations + 1
98            else:
99              messages_by_msg[msg][KEY_TX_FAIL_COUNT] = (
100                  messages_by_msg[msg][KEY_TX_FAIL_COUNT] + 1)
101              if messages_by_msg[msg][KEY_TX_FAIL_COUNT] == 1:
102                num_fail_tx_confirmations = num_fail_tx_confirmations + 1
103          else:
104            self.log.warning(
105                "Tx confirmation of unknown message ID received: %s", event)
106            num_unexpected_ids = num_unexpected_ids + 1
107      except queue.Empty:
108        self.log.warning("[%s] Timed out waiting for any MESSAGE_SEND* event - "
109                         "assuming the rest are not coming", dut.pretty_name)
110        break
111
112    return (num_ok_tx_confirmations, num_fail_tx_confirmations,
113            num_unexpected_ids)
114
115  def wait_for_rx_events(self, dut, num_msgs, messages_by_msg):
116    """Wait for messages to be received and update data structures
117
118    Args:
119      dut: device under test
120      num_msgs: number of expected messages to receive
121      messages_by_msg: {text -> {id, tx_ok_count, tx_fail_count, rx_count}}
122    """
123    num_rx_msgs = 0
124    while num_rx_msgs < num_msgs:
125      try:
126        event = dut.ed.pop_event(aconsts.SESSION_CB_ON_MESSAGE_RECEIVED,
127                                 autils.EVENT_TIMEOUT)
128        msg = event["data"][aconsts.SESSION_CB_KEY_MESSAGE_AS_STRING]
129        if msg not in messages_by_msg:
130          messages_by_msg[msg] = {}
131          messages_by_msg[msg][KEY_ID] = -1
132          messages_by_msg[msg][KEY_TX_OK_COUNT] = 0
133          messages_by_msg[msg][KEY_TX_FAIL_COUNT] = 0
134          messages_by_msg[msg][KEY_RX_COUNT] = 1
135
136        messages_by_msg[msg][
137            KEY_RX_COUNT] = messages_by_msg[msg][KEY_RX_COUNT] + 1
138        if messages_by_msg[msg][KEY_RX_COUNT] == 1:
139          num_rx_msgs = num_rx_msgs + 1
140      except queue.Empty:
141        self.log.warning(
142            "[%s] Timed out waiting for ON_MESSAGE_RECEIVED event - "
143            "assuming the rest are not coming", dut.pretty_name)
144        break
145
146  def analyze_results(self, results, messages_by_msg):
147    """Analyze the results of the stress message test and add to the results
148    dictionary
149
150    Args:
151      results: result dictionary into which to add data
152      messages_by_msg: {text -> {id, tx_ok_count, tx_fail_count, rx_count}}
153    """
154    results["raw_data"] = messages_by_msg
155    results["tx_count_success"] = 0
156    results["tx_count_duplicate_success"] = 0
157    results["tx_count_fail"] = 0
158    results["tx_count_duplicate_fail"] = 0
159    results["tx_count_neither"] = 0
160    results["tx_count_tx_ok_but_no_rx"] = 0
161    results["rx_count"] = 0
162    results["rx_count_duplicate"] = 0
163    results["rx_count_no_ok_tx_indication"] = 0
164    results["rx_count_fail_tx_indication"] = 0
165    results["rx_count_no_tx_message"] = 0
166
167    for msg, data in messages_by_msg.items():
168      if data[KEY_TX_OK_COUNT] > 0:
169        results["tx_count_success"] = results["tx_count_success"] + 1
170      if data[KEY_TX_OK_COUNT] > 1:
171        results["tx_count_duplicate_success"] = (
172            results["tx_count_duplicate_success"] + 1)
173      if data[KEY_TX_FAIL_COUNT] > 0:
174        results["tx_count_fail"] = results["tx_count_fail"] + 1
175      if data[KEY_TX_FAIL_COUNT] > 1:
176        results[
177            "tx_count_duplicate_fail"] = results["tx_count_duplicate_fail"] + 1
178      if (data[KEY_TX_OK_COUNT] == 0 and data[KEY_TX_FAIL_COUNT] == 0 and
179          data[KEY_ID] != -1):
180        results["tx_count_neither"] = results["tx_count_neither"] + 1
181      if data[KEY_TX_OK_COUNT] > 0 and data[KEY_RX_COUNT] == 0:
182        results["tx_count_tx_ok_but_no_rx"] = (
183            results["tx_count_tx_ok_but_no_rx"] + 1)
184      if data[KEY_RX_COUNT] > 0:
185        results["rx_count"] = results["rx_count"] + 1
186      if data[KEY_RX_COUNT] > 1:
187        results["rx_count_duplicate"] = results["rx_count_duplicate"] + 1
188      if data[KEY_RX_COUNT] > 0 and data[KEY_TX_OK_COUNT] == 0:
189        results["rx_count_no_ok_tx_indication"] = (
190            results["rx_count_no_ok_tx_indication"] + 1)
191      if data[KEY_RX_COUNT] > 0 and data[KEY_TX_FAIL_COUNT] > 0:
192        results["rx_count_fail_tx_indication"] = (
193            results["rx_count_fail_tx_indication"] + 1)
194      if data[KEY_RX_COUNT] > 0 and data[KEY_ID] == -1:
195        results[
196            "rx_count_no_tx_message"] = results["rx_count_no_tx_message"] + 1
197
198  #######################################################################
199
200  @test_tracker_info(uuid="e88c060f-4ca7-41c1-935a-d3d62878ec0b")
201  def test_stress_message(self):
202    """Stress test for bi-directional message transmission and reception."""
203    p_dut = self.android_devices[0]
204    s_dut = self.android_devices[1]
205
206    # Start up a discovery session
207    discovery_data = autils.create_discovery_pair(
208        p_dut,
209        s_dut,
210        p_config=autils.create_discovery_config(
211            self.SERVICE_NAME, aconsts.PUBLISH_TYPE_UNSOLICITED),
212        s_config=autils.create_discovery_config(self.SERVICE_NAME,
213                                                aconsts.SUBSCRIBE_TYPE_PASSIVE),
214        device_startup_offset=self.device_startup_offset,
215        msg_id=self.get_next_msg_id())
216    p_id = discovery_data[0]
217    s_id = discovery_data[1]
218    p_disc_id = discovery_data[2]
219    s_disc_id = discovery_data[3]
220    peer_id_on_sub = discovery_data[4]
221    peer_id_on_pub = discovery_data[5]
222
223    # Store information on Tx & Rx messages
224    messages_by_msg = {}  # keyed by message text
225    # {text -> {id, tx_ok_count, tx_fail_count, rx_count}}
226    messages_by_id = {}  # keyed by message ID {id -> text}
227
228    # send all messages at once (one in each direction)
229    for i in range(self.NUM_ITERATIONS):
230      msg_p2s = "Message Publisher -> Subscriber #%d" % i
231      next_msg_id = self.get_next_msg_id()
232      self.init_info(msg_p2s, next_msg_id, messages_by_msg, messages_by_id)
233      p_dut.droid.wifiAwareSendMessage(p_disc_id, peer_id_on_pub, next_msg_id,
234                                       msg_p2s, 0)
235
236      msg_s2p = "Message Subscriber -> Publisher #%d" % i
237      next_msg_id = self.get_next_msg_id()
238      self.init_info(msg_s2p, next_msg_id, messages_by_msg, messages_by_id)
239      s_dut.droid.wifiAwareSendMessage(s_disc_id, peer_id_on_sub, next_msg_id,
240                                       msg_s2p, 0)
241
242    # wait for message tx confirmation
243    (p_tx_ok_count, p_tx_fail_count, p_tx_unknown_id) = self.wait_for_tx_events(
244        p_dut, self.NUM_ITERATIONS, messages_by_msg, messages_by_id)
245    (s_tx_ok_count, s_tx_fail_count, s_tx_unknown_id) = self.wait_for_tx_events(
246        s_dut, self.NUM_ITERATIONS, messages_by_msg, messages_by_id)
247    self.log.info("Transmission done: pub=%d, sub=%d transmitted successfully",
248                  p_tx_ok_count, s_tx_ok_count)
249
250    # wait for message rx confirmation (giving it the total number of messages
251    # transmitted rather than just those transmitted correctly since sometimes
252    # the Tx doesn't get that information correctly. I.e. a message the Tx
253    # thought was not transmitted correctly is actually received - missing ACK?
254    # bug?)
255    self.wait_for_rx_events(p_dut, self.NUM_ITERATIONS, messages_by_msg)
256    self.wait_for_rx_events(s_dut, self.NUM_ITERATIONS, messages_by_msg)
257
258    # analyze results
259    results = {}
260    results["tx_count"] = 2 * self.NUM_ITERATIONS
261    results["tx_unknown_ids"] = p_tx_unknown_id + s_tx_unknown_id
262    self.analyze_results(results, messages_by_msg)
263
264    # clear errors
265    asserts.assert_equal(results["tx_unknown_ids"], 0, "Message ID corruption",
266                         results)
267    asserts.assert_equal(results["tx_count_neither"], 0,
268                         "Tx message with no success or fail indication",
269                         results)
270    asserts.assert_equal(results["tx_count_duplicate_fail"], 0,
271                         "Duplicate Tx fail messages", results)
272    asserts.assert_equal(results["tx_count_duplicate_success"], 0,
273                         "Duplicate Tx success messages", results)
274    asserts.assert_equal(results["rx_count_no_tx_message"], 0,
275                         "Rx message which wasn't sent - message corruption?",
276                         results)
277    asserts.assert_equal(results["tx_count_tx_ok_but_no_rx"], 0,
278                         "Tx got ACK but Rx didn't get message", results)
279
280    # possibly ok - but flag since most frequently a bug
281    asserts.assert_equal(results["rx_count_no_ok_tx_indication"], 0,
282                         "Message received but Tx didn't get ACK", results)
283    asserts.assert_equal(results["rx_count_fail_tx_indication"], 0,
284                         "Message received but Tx didn't get ACK", results)
285
286    # permissible failures based on thresholds
287    asserts.assert_true(results["tx_count_fail"] <= (
288          self.MAX_TX_FAILURE_PERCENTAGE * self.NUM_ITERATIONS / 100),
289                        "Number of Tx failures exceeds threshold",
290                        extras=results)
291    asserts.assert_true(results["rx_count_duplicate"] <= (
292        self.MAX_DUPLICATE_RX_PERCENTAGE * self.NUM_ITERATIONS / 100),
293                        "Number of duplicate Rx exceeds threshold",
294                        extras=results)
295
296    asserts.explicit_pass("test_stress_message done", extras=results)