1#/usr/bin/env python3.4
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17This test script exercises different testcases with a lot of ble beacon traffic.
18
19This test script was designed with this setup in mind:
20Shield box one: Android Device as DUT. 7x Sprout devices acting as 192 beacons
21"""
22
23import threading
24
25from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
26from acts.test_utils.bt.BleEnum import AdvertiseSettingsAdvertiseMode
27from acts.test_utils.bt.BleEnum import ScanSettingsScanMode
28from acts.test_utils.bt.bt_test_utils import adv_succ
29from acts.test_utils.bt.bt_test_utils import batch_scan_result
30from acts.test_utils.bt.bt_test_utils import scan_result
31from acts.test_utils.bt.bt_test_utils import generate_ble_advertise_objects
32from acts.test_utils.bt.bt_test_utils import generate_ble_scan_objects
33from acts.test_utils.bt.bt_test_utils import reset_bluetooth
34from acts.test_utils.bt.bt_test_utils import setup_multiple_devices_for_bt_test
35from acts.test_utils.bt.bt_test_utils import take_btsnoop_logs
36
37
38class BeaconSwarmTest(BluetoothBaseTest):
39    default_timeout = 10
40    beacon_swarm_count = 0
41    advertising_device_name_list = []
42    discovered_mac_address_list = []
43
44    def __init__(self, controllers):
45        BluetoothBaseTest.__init__(self, controllers)
46        self.scn_ad = self.android_devices[0]
47
48    def setup_test(self):
49        self.discovered_mac_address_list = []
50        for a in self.android_devices:
51            a.ed.clear_all_events()
52        return True
53
54    def teardown_test(self):
55        reset_bluetooth([self.android_devices[0]])
56        return True
57
58    def setup_class(self):
59        if not setup_multiple_devices_for_bt_test(self.android_devices):
60            return False
61        return self._start_special_advertisements()
62
63    def cleanup_class(self):
64        return reset_bluetooth(self.android_devices)
65
66    def on_fail(self, test_name, begin_time):
67        take_btsnoop_logs(self.android_devices, self, test_name)
68        reset_bluetooth([self.scn_ad])
69
70    def _start_advertisements_thread(self, ad, beacon_count, restart=False):
71        d, e = ad.droid, ad.ed
72        if restart:
73            try:
74                reset_bluetooth([ad])
75            except Exception:
76                self.log.debug("Failed resetting Bluetooth, continuing...")
77                return
78        try:
79            for _ in range(beacon_count):
80                d.bleSetAdvertiseDataIncludeDeviceName(True)
81                d.bleSetAdvertiseSettingsAdvertiseMode(
82                    AdvertiseSettingsAdvertiseMode.ADVERTISE_MODE_LOW_LATENCY.
83                    value)
84                advertise_callback, advertise_data, advertise_settings = (
85                    generate_ble_advertise_objects(d))
86                d.bleStartBleAdvertising(advertise_callback, advertise_data,
87                                         advertise_settings)
88                try:
89                    e.pop_event(
90                        adv_succ.format(advertise_callback),
91                        self.default_timeout)
92                    self.beacon_swarm_count += 1
93                    local_bt_name = d.bluetoothGetLocalName()
94                    if local_bt_name not in self.advertising_device_name_list:
95                        self.advertising_device_name_list.append(
96                            d.bluetoothGetLocalName())
97                except Exception as e:
98                    self.log.info("Advertising failed due to " + str(e))
99                self.log.info("Beacons active: {}".format(
100                    self.beacon_swarm_count))
101        except Exception:
102            self.log.debug(
103                "Something went wrong in starting advertisements, continuing.")
104        return
105
106    def _start_special_advertisements(self):
107        self.log.info("Setting up advertisements.")
108        beacon_serials = []
109        beacon_count = 0
110        try:
111            beacon_serials = self.user_params['beacon_devices']
112            beacon_count = self.user_params['beacon_count']
113        except AttributeError:
114            self.log.info(
115                "No controllable devices connected to create beacons with."
116                " Continuing...")
117        threads = []
118        for a in self.android_devices:
119            d, e = a.droid, a.ed
120            serial_no = a.serial
121            if serial_no not in beacon_serials:
122                continue
123            thread = threading.Thread(target=self._start_advertisements_thread,
124                                      args=(d, e, beacon_count))
125            threads.append(thread)
126            thread.start()
127        for t in threads:
128            t.join()
129        if self.beacon_swarm_count < (beacon_count * len(beacon_serials)):
130            self.log.error("Not enough beacons advertising: {}".format(
131                self.beacon_swarm_count))
132            return False
133        return True
134
135    def _restart_special_advertisements_thread(self):
136        beacon_serials = []
137        beacon_count = 0
138        try:
139            beacon_serials = self.user_params['beacon_devices']
140            beacon_count = self.user_params['beacon_count']
141        except AttributeError:
142            self.log.info("No controllable devices connected to create beacons"
143                          " with. Continuing...")
144        threads = []
145        while True:
146            self.log.info("Restarting advertisements.")
147            for a in self.android_devices:
148                d, e = a.droid, a.ed
149                serial_no = a.serial
150                if serial_no not in beacon_serials:
151                    continue
152                thread = threading.Thread(
153                    target=self._start_advertisements_thread,
154                    args=(d, e, beacon_count, True))
155                threads.append(thread)
156                thread.start()
157            for t in threads:
158                t.join()
159        return True
160
161    def test_swarm_1000_on_scan_result(self):
162        """Test LE scanning in a mass beacon deployment.
163
164        Test finding 1000 LE scan results in a mass beacon deployment.
165
166        Steps:
167        1. Assume that mass beacon deployment is setup.
168        2. Set LE scanning mode to low latency.
169        3. Start LE scan.
170        4. Pop scan results off the event dispatcher 1000 times.
171        5. Stop LE scanning.
172
173        Expected Result:
174        1000 scan results should be found without any exceptions.
175
176        Returns:
177          Pass if True
178          Fail if False
179
180        TAGS: LE, Scanning, Beacon
181        Priority: 1
182        """
183        self.scn_ad.droid.bleSetScanSettingsScanMode(
184            ScanSettingsScanMode.SCAN_MODE_LOW_LATENCY.value)
185        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(
186            self.scn_ad.droid)
187        self.scn_ad.droid.bleStartBleScan(filter_list, scan_settings,
188                                          scan_callback)
189        for _ in range(1000000):
190            event_info = self.scn_ad.ed.pop_event(
191                scan_result.format(scan_callback), self.default_timeout)
192            mac_address = event_info['data']['Result']['deviceInfo']['address']
193            if mac_address not in self.discovered_mac_address_list:
194                self.discovered_mac_address_list.append(mac_address)
195                self.log.info("Discovered {} different devices.".format(len(
196                    self.discovered_mac_address_list)))
197        self.log.debug("Discovered {} different devices.".format(len(
198            self.discovered_mac_address_list)))
199        self.scn_ad.droid.bleStopBleScan(scan_callback)
200        return True
201
202    def test_swarm_10000_on_batch_scan_result(self):
203        """Test LE batch scanning in a mass beacon deployment.
204
205        Test finding 10000 LE batch scan results in a mass beacon deployment.
206
207        Steps:
208        1. Assume that mass beacon deployment is setup.
209        2. Set LE scanning mode to low latency and report delay millis to 1
210        second.
211        3. Start LE scan.
212        4. Pop batch scan results off the event dispatcher 10000 times.
213        5. Stop LE scanning.
214
215        Expected Result:
216        1000 scan results should be found without any exceptions.
217
218        Returns:
219          Pass if True
220          Fail if False
221
222        TAGS: LE, Scanning, Beacon
223        Priority: 1
224        """
225        self.scn_ad.droid.bleSetScanSettingsScanMode(
226            ScanSettingsScanMode.SCAN_MODE_LOW_LATENCY.value)
227        self.scn_ad.droid.bleSetScanSettingsReportDelayMillis(1000)
228        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(
229            self.scn_ad.droid)
230        self.scn_ad.droid.bleStartBleScan(filter_list, scan_settings,
231                                          scan_callback)
232        for _ in range(10000):
233            event_info = self.scn_ad.ed.pop_event(
234                batch_scan_result.format(scan_callback), self.default_timeout)
235            for result in event_info['data']['Results']:
236                mac_address = result['deviceInfo']['address']
237                if mac_address not in self.discovered_mac_address_list:
238                    self.discovered_mac_address_list.append(mac_address)
239        self.log.info("Discovered {} different devices.".format(len(
240            self.discovered_mac_address_list)))
241        self.scn_ad.droid.bleStopBleScan(scan_callback)
242        return True
243
244    def test_swarm_scan_result_filter_each_device_name(self):
245        """Test basic LE scan filtering in a mass beacon deployment.
246
247        Test finding LE scan results in a mass beacon deployment. This
248        test specifically tests scan filtering of different device names and
249        that each device name is found.
250
251        Steps:
252        1. Assume that mass beacon deployment is setup with device names
253        advertising.
254        2. Set LE scanning mode to low latency.
255        3. Filter device name from one of the known advertising device names
256        4. Start LE scan.
257        5. Pop scan results matching the scan filter.
258        6. Stop LE scanning.
259        7. Repeat steps 2-6 until all advertising device names are found.
260
261        Expected Result:
262        All advertising beacons are found by their device name.
263
264        Returns:
265          Pass if True
266          Fail if False
267
268        TAGS: LE, Scanning, Beacon, Filtering
269        Priority: 1
270        """
271        for filter_name in self.advertising_device_name_list:
272            self.scn_ad.droid.bleSetScanSettingsScanMode(
273                ScanSettingsScanMode.SCAN_MODE_LOW_LATENCY.value)
274            filter_list, scan_settings, scan_callback = (
275                generate_ble_scan_objects(self.scn_ad.droid))
276            try:
277                self.scn_ad.droid.bleSetScanFilterDeviceName(filter_name)
278                self.scn_ad.droid.bleBuildScanFilter(filter_list)
279                self.scn_ad.droid.bleStartBleScan(filter_list, scan_settings,
280                                                  scan_callback)
281                self.log.debug(self.scn_ad.ed.pop_event(
282                    scan_result.format(scan_callback), self.default_timeout))
283            except Exception:
284                self.log.info("Couldn't find advertiser name {}.".format(
285                    filter_name))
286                return False
287            self.scn_ad.droid.bleStopBleScan(scan_callback)
288        return True
289
290    def test_swarm_rotate_addresses(self):
291        """Test basic LE scan filtering in a mass beacon deployment.
292
293        Test finding LE scan results in a mass beacon deployment. This test
294        rotates the mac address of the advertising devices at a consistent
295        interval in order to make the scanning device think there are
296        thousands of devices nearby.
297
298        Steps:
299        1. Assume that mass beacon deployment is setup with device names
300        advertising.
301        2. Set LE scanning mode to low latency on 28 scan instances.
302        3. Start LE scan on each of the scan instances.
303        5. Continuously Pop scan results matching the scan filter.
304        6. Rotate mac address of each advertising device sequentially.
305        7. 5-6 10,000 times.
306        8. Stop LE scanning
307
308        Expected Result:
309        The Bluetooth stack doesn't crash.
310
311        Returns:
312          Pass if True
313          Fail if False
314
315        TAGS: LE, Scanning, Beacon
316        Priority: 1
317        """
318        scan_callback_list = []
319        for _ in range(28):
320            self.scn_ad.droid.bleSetScanSettingsScanMode(
321                ScanSettingsScanMode.SCAN_MODE_LOW_LATENCY.value)
322            filter_list, scan_settings, scan_callback = generate_ble_scan_objects(
323                self.scn_ad.droid)
324            self.scn_ad.droid.bleStartBleScan(filter_list, scan_settings,
325                                              scan_callback)
326            scan_callback_list.append(scan_callback)
327        thread = threading.Thread(
328            target=self._restart_special_advertisements_thread,
329            args=())
330        thread.start()
331        n = 0
332        while n < 10000:
333            for cb in scan_callback_list:
334                event_info = self.scn_ad.ed.pop_event(
335                    scan_result.format(cb), self.default_timeout)
336                mac_address = event_info['data']['Result']['deviceInfo'][
337                    'address']
338                if mac_address not in self.discovered_mac_address_list:
339                    self.discovered_mac_address_list.append(mac_address)
340                self.log.info("Discovered {} different devices.".format(len(
341                    self.discovered_mac_address_list)))
342                n += 1
343        self.scn_ad.droid.bleStopBleScan(scan_callback)
344        return True
345