1#!/usr/bin/env python3
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17Test script to execute Bluetooth basic functionality test cases.
18This test was designed to be run in a shield box.
19"""
20
21import threading
22import time
23
24from queue import Empty
25from acts.test_decorators import test_tracker_info
26from acts_contrib.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
27from acts_contrib.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
28from acts_contrib.test_utils.bt.bt_test_utils import orchestrate_rfcomm_connection
29from acts_contrib.test_utils.bt.bt_test_utils import write_read_verify_data
30
31
32class RfcommStressTest(BluetoothBaseTest):
33    default_timeout = 10
34    scan_discovery_time = 5
35    message = (
36        "Space: the final frontier. These are the voyages of "
37        "the starship Enterprise. Its continuing mission: to explore "
38        "strange new worlds, to seek out new life and new civilizations,"
39        " to boldly go where no man has gone before.")
40
41    def setup_class(self):
42        super().setup_class()
43        self.client_ad = self.android_devices[0]
44        self.server_ad = self.android_devices[1]
45
46    @test_tracker_info(uuid='10452f63-e1fd-4345-a4da-e00fc4609a69')
47    def test_rfcomm_connection_stress(self):
48        """Stress test an RFCOMM connection
49
50        Test the integrity of RFCOMM. Verify that file descriptors are cleared
51        out properly.
52
53        Steps:
54        1. Establish a bonding between two Android devices.
55        2. Write data to RFCOMM from the client droid.
56        3. Read data from RFCOMM from the server droid.
57        4. Stop the RFCOMM connection.
58        5. Repeat steps 2-4 1000 times.
59
60        Expected Result:
61        Each iteration should read and write to the RFCOMM connection
62        successfully.
63
64        Returns:
65          Pass if True
66          Fail if False
67
68        TAGS: Classic, Stress, RFCOMM
69        Priority: 1
70        """
71        iterations = 1000
72        for n in range(iterations):
73            if not orchestrate_rfcomm_connection(self.client_ad,
74                                                 self.server_ad):
75                return False
76            self.client_ad.droid.bluetoothRfcommStop()
77            self.server_ad.droid.bluetoothRfcommStop()
78            self.log.info("Iteration {} completed".format(n))
79        return True
80
81    @test_tracker_info(uuid='2dba96ec-4e5d-4394-85ef-b57b66399fbc')
82    def test_rfcomm_connection_write_msg_stress(self):
83        """Stress test an RFCOMM connection
84
85        Test the integrity of RFCOMM. Verify that file descriptors are cleared
86        out properly.
87
88        Steps:
89        1. Establish a bonding between two Android devices.
90        2. Write data to RFCOMM from the client droid.
91        3. Read data from RFCOMM from the server droid.
92        4. Stop the RFCOMM connection.
93        5. Repeat steps 2-4 1000 times.
94
95        Expected Result:
96        Each iteration should read and write to the RFCOMM connection
97        successfully.
98
99        Returns:
100          Pass if True
101          Fail if False0
102
103        TAGS: Classic, Stress, RFCOMM
104        Priority: 1
105        """
106        iterations = 1000
107        for n in range(iterations):
108            if not orchestrate_rfcomm_connection(self.client_ad,
109                                                 self.server_ad):
110                return False
111            if not write_read_verify_data(self.client_ad, self.server_ad,
112                                          self.message, False):
113                return False
114            self.client_ad.droid.bluetoothRfcommStop()
115            self.server_ad.droid.bluetoothRfcommStop()
116            self.log.info("Iteration {} completed".format(n))
117        return True
118
119    @test_tracker_info(uuid='78dca597-89c0-431c-a553-531f230fc3c0')
120    def test_rfcomm_read_write_stress(self):
121        """Stress test an RFCOMM connection's read and write capabilities
122
123        Test the integrity of RFCOMM. Verify that file descriptors are cleared
124        out properly.
125
126        Steps:
127        1. Establish a bonding between two Android devices.
128        2. Write data to RFCOMM from the client droid.
129        3. Read data from RFCOMM from the server droid.
130        4. Repeat steps 2-3 10000 times.
131        5. Stop the RFCOMM connection.
132
133        Expected Result:
134        Each iteration should read and write to the RFCOMM connection
135        successfully.
136
137        Returns:
138          Pass if True
139          Fail if False
140
141        TAGS: Classic, Stress, RFCOMM
142        Priority: 1
143        """
144        iterations = 1000
145        if not orchestrate_rfcomm_connection(self.client_ad, self.server_ad):
146            return False
147        for n in range(iterations):
148            self.log.info("Write message.")
149            if not write_read_verify_data(self.client_ad, self.server_ad,
150                                          self.message, False):
151                return False
152            self.log.info("Iteration {} completed".format(n))
153        self.client_ad.droid.bluetoothRfcommStop()
154        self.server_ad.droid.bluetoothRfcommStop()
155        return True
156