1#!/usr/bin/env python3
2#
3#   Copyright 2022 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import logging
18import queue
19
20from blueberry.facade import common_pb2 as common
21from blueberry.tests.gd.cert.closable import Closable
22from blueberry.tests.gd.cert.closable import safeClose
23from blueberry.tests.gd.cert.truth import assertThat
24from blueberry.tests.gd_sl4a.lib.ble_lib import generate_ble_scan_objects
25from blueberry.tests.gd_sl4a.lib.bt_constants import ble_scan_settings_modes
26from blueberry.tests.gd_sl4a.lib.bt_constants import scan_result
27from blueberry.tests.sl4a_sl4a.lib import sl4a_sl4a_base_test
28
29
30class LeScanner(Closable):
31
32    is_scanning = False
33    device = None
34    filter_list = None
35    scan_settings = None
36    scan_callback = None
37
38    def __init__(self, device):
39        self.device = device
40
41    def __wait_for_scan_result_event(self, expected_event_name, timeout=60):
42        try:
43            event_info = self.device.ed.pop_event(expected_event_name, timeout)
44        except queue.Empty as error:
45            logging.error("Could not find scan result event: %s", expected_event_name)
46            return None
47        return event_info['data']['Result']['deviceInfo']['address']
48
49    def scan_for_address_expect_none(self, address, addr_type):
50        if self.is_scanning:
51            print("Already scanning!")
52            return None
53        self.is_scanning = True
54        logging.info("Start scanning for identity address {} or type {}".format(address, addr_type))
55        self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
56        self.device.sl4a.bleSetScanSettingsLegacy(False)
57        self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a)
58        expected_event_name = scan_result.format(self.scan_callback)
59
60        # Start scanning on SL4A DUT
61        self.device.sl4a.bleSetScanFilterDeviceAddressAndType(address, addr_type)
62        self.device.sl4a.bleBuildScanFilter(self.filter_list)
63        self.device.sl4a.bleStartBleScan(self.filter_list, self.scan_settings, self.scan_callback)
64
65        # Verify that scan result is received on SL4A DUT
66        advertising_address = self.__wait_for_scan_result_event(expected_event_name, 1)
67        assertThat(advertising_address).isNone()
68        logging.info("Filter advertisement with address {}".format(advertising_address))
69        return advertising_address
70
71    def scan_for_address(self, address, addr_type):
72        if self.is_scanning:
73            print("Already scanning!")
74            return None
75        self.is_scanning = True
76        logging.info("Start scanning for identity address {} or type {}".format(address, addr_type))
77        self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
78        self.device.sl4a.bleSetScanSettingsLegacy(False)
79        self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a)
80        expected_event_name = scan_result.format(self.scan_callback)
81
82        # Start scanning on SL4A DUT
83        self.device.sl4a.bleSetScanFilterDeviceAddressAndType(address, addr_type)
84        self.device.sl4a.bleBuildScanFilter(self.filter_list)
85        self.device.sl4a.bleStartBleScan(self.filter_list, self.scan_settings, self.scan_callback)
86
87        # Verify that scan result is received on SL4A DUT
88        advertising_address = self.__wait_for_scan_result_event(expected_event_name)
89        assertThat(advertising_address).isNotNone()
90        logging.info("Filter advertisement with address {}".format(advertising_address))
91        return advertising_address
92
93    def scan_for_address_with_irk(self, address, addr_type, irk):
94        if self.is_scanning:
95            print("Already scanning!")
96            return None
97        self.is_scanning = True
98        logging.info("Start scanning for identity address {} or type {} using irk {}".format(address, addr_type, irk))
99        self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
100        self.device.sl4a.bleSetScanSettingsLegacy(False)
101        self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a)
102        expected_event_name = scan_result.format(self.scan_callback)
103
104        # Start scanning on SL4A DUT
105        self.device.sl4a.bleSetScanFilterDeviceAddressTypeAndIrkHexString(address, addr_type, irk)
106        self.device.sl4a.bleBuildScanFilter(self.filter_list)
107        self.device.sl4a.bleStartBleScan(self.filter_list, self.scan_settings, self.scan_callback)
108
109        # Verify that scan result is received on SL4A DUT
110        advertising_address = self.__wait_for_scan_result_event(expected_event_name)
111        assertThat(advertising_address).isNotNone()
112        logging.info("Filter advertisement with address {}".format(advertising_address))
113        return advertising_address
114
115    def scan_for_address_with_irk_pending_intent(self, address, addr_type, irk):
116        if self.is_scanning:
117            print("Already scanning!")
118            return None
119        self.is_scanning = True
120        logging.info("Start scanning for identity address {} or type {} using irk {}".format(address, addr_type, irk))
121        self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
122        self.device.sl4a.bleSetScanSettingsLegacy(False)
123        self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a)
124        # Hard code here since callback index iterates and will cause this to fail if ran
125        # Second as the impl in SL4A sends this since it's a single callback for broadcast.
126        expected_event_name = "BleScan1onScanResults"
127
128        # Start scanning on SL4A DUT
129        self.device.sl4a.bleSetScanFilterDeviceAddressTypeAndIrkHexString(address, addr_type, irk)
130        self.device.sl4a.bleBuildScanFilter(self.filter_list)
131        self.device.sl4a.bleStartBleScanPendingIntent(self.filter_list, self.scan_settings)
132
133        # Verify that scan result is received on SL4A DUT
134        advertising_address = self.__wait_for_scan_result_event(expected_event_name)
135        assertThat(advertising_address).isNotNone()
136        logging.info("Filter advertisement with address {}".format(advertising_address))
137        return advertising_address
138
139    def scan_for_name(self, name):
140        if self.is_scanning:
141            print("Already scanning!")
142            return
143        self.is_scanning = True
144        logging.info("Start scanning for name {}".format(name))
145        self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
146        self.device.sl4a.bleSetScanSettingsLegacy(False)
147        self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a)
148        expected_event_name = scan_result.format(1)
149        self.device.ed.clear_events(expected_event_name)
150
151        # Start scanning on SL4A DUT
152        self.device.sl4a.bleSetScanFilterDeviceName(name)
153        self.device.sl4a.bleBuildScanFilter(self.filter_list)
154        self.device.sl4a.bleStartBleScanPendingIntent(self.filter_list, self.scan_settings)
155
156        # Verify that scan result is received on SL4A DUT
157        advertising_address = self.__wait_for_scan_result_event(expected_event_name)
158        assertThat(advertising_address).isNotNone()
159        logging.info("Filter advertisement with address {}".format(advertising_address))
160        return advertising_address
161
162    def stop_scanning(self):
163        """
164        Warning: no java callback registered for this
165        """
166        if self.is_scanning:
167            logging.info("Stopping scan")
168            self.device.sl4a.bleStopBleScan(self.scan_callback)
169            self.is_scanning = False
170
171    def close(self):
172        self.stop_scanning()
173        self.device = None
174