1#!/usr/bin/env python3
2#   Copyright 2019 - The Android Open Source Project
3#
4#   Licensed under the Apache License, Version 2.0 (the "License");
5#   you may not use this file except in compliance with the License.
6#   You may obtain a copy of the License at
7#
8#       http://www.apache.org/licenses/LICENSE-2.0
9#
10#   Unless required by applicable law or agreed to in writing, software
11#   distributed under the License is distributed on an "AS IS" BASIS,
12#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13#   See the License for the specific language governing permissions and
14#   limitations under the License.
15
16import asyncio
17import logging
18
19from blueberry.tests.gd.cert.gd_device import GdHostOnlyDevice
20from blueberry.tests.gd.cert.gd_device import MOBLY_CONTROLLER_CONFIG_NAME
21from blueberry.tests.gd.cert.os_utils import get_gd_root
22from blueberry.tests.topshim.lib.async_closable import AsyncClosable
23from blueberry.tests.topshim.lib.async_closable import asyncSafeClose
24from blueberry.tests.topshim.lib.oob_data import OobData
25from blueberry.tests.topshim.lib.gatt_client import GattClient
26
27
28def create(configs):
29    return get_instances_with_configs(configs)
30
31
32def destroy(devices):
33    pass
34
35
36def replace_vars_for_topshim(string, config):
37    serial_number = config.get("serial_number")
38    if serial_number is None:
39        serial_number = ""
40    rootcanal_port = config.get("rootcanal_port")
41    if rootcanal_port is None:
42        rootcanal_port = ""
43    if serial_number == "DUT" or serial_number == "CERT":
44        raise Exception("Did you forget to configure the serial number?")
45    # We run bt_topshim_facade instead of bluetooth_stack_with_facade
46    return string.replace("$GD_ROOT", get_gd_root()) \
47                 .replace("bluetooth_stack_with_facade", "bt_topshim_facade") \
48                 .replace("$(grpc_port)", config.get("grpc_port")) \
49                 .replace("$(grpc_root_server_port)", config.get("grpc_root_server_port")) \
50                 .replace("$(rootcanal_port)", rootcanal_port) \
51                 .replace("$(signal_port)", config.get("signal_port")) \
52                 .replace("$(serial_number)", serial_number)
53
54
55def get_instances_with_configs(configs):
56    logging.info(configs)
57    devices = []
58    for config in configs:
59        resolved_cmd = []
60        for arg in config["cmd"]:
61            logging.debug(arg)
62            resolved_cmd.append(replace_vars_for_topshim(arg, config))
63        verbose_mode = bool(config.get('verbose_mode', False))
64        device = GdHostOnlyDevice(config["grpc_port"], "-1", config["signal_port"], resolved_cmd, config["label"],
65                                  MOBLY_CONTROLLER_CONFIG_NAME, config["name"], verbose_mode)
66        device.setup()
67        devices.append(device)
68    return devices
69
70
71TRANSPORT_CLASSIC = 1
72TRANSPORT_LE = 2
73
74
75class TopshimDevice(AsyncClosable):
76    __adapter = None
77    __gatt = None
78    __security = None
79    __hfp = None
80    __hf_client = None
81
82    async def __le_rand_wrapper(self, async_fn):
83        result = await async_fn
84        await self.__adapter.le_rand()
85        le_rand_future = await self.__adapter._listen_for_event(facade_pb2.EventType.LE_RAND)
86        return result
87
88    def __post(self, async_fn):
89        return asyncio.get_event_loop().run_until_complete(async_fn)
90
91    def __init__(self, adapter, gatt, security, hfp, hf_client):
92        self.__adapter = adapter
93        self.__gatt = gatt
94        self.__security = security
95        self.__hfp = hfp
96        self.__hf_client = hf_client
97
98    async def close(self):
99        """
100        Implement abstract method to close out any streams or jobs.
101        """
102        await asyncSafeClose(self.__adapter)
103        await asyncSafeClose(self.__gatt)
104        await asyncSafeClose(self.__security)
105        await asyncSafeClose(self.__hfp)
106        await asyncSafeClose(self.__hf_client)
107
108    def enable_inquiry_scan(self):
109        f = self.__post(self.__adapter.enable_inquiry_scan())
110        return self.__post(self.__discovery_mode_waiter(f))
111
112    def enable_page_scan(self):
113        f = self.__post(self.__adapter.enable_page_scan())
114        return self.__post(self.__discovery_mode_waiter(f))
115
116    def disable_page_scan(self):
117        f = self.__post(self.__adapter.disable_page_scan())
118        return self.__post(self.__discovery_mode_waiter(f))
119
120    async def __discovery_mode_waiter(self, f):
121        params = await f
122        status, discovery_mode = params["status"].data[0], params["AdapterScanMode"].data[0]
123        return (status, discovery_mode)
124
125    def start_advertising(self):
126        """
127        Starts BLE Advertiser for the stack.
128        Assumes stack defaults.  Which in our case would be RRPA
129        """
130        self.__post(self.__gatt.advertising_enable())
131
132    def stop_advertising(self):
133        """
134        Stop BLE Advertiser.
135        """
136        self.__post(self.__gatt.advertising_disable())
137
138    def start_scanning(self):
139        pass
140
141    def stop_scanning(self):
142        pass
143
144    def clear_event_mask(self):
145        self.__post(self.__adapter.clear_event_mask())
146
147    def clear_event_filter(self):
148        self.__post(self.__adapter.clear_event_filter())
149
150    def clear_filter_accept_list(self):
151        self.__post(self.__adapter.clear_filter_accept_list())
152
153    def disconnect_all_acls(self):
154        self.__post(self.__adapter.disconnect_all_acls())
155
156    def allow_wake_by_hid(self):
157        self.__post(self.__adapter.allow_wake_by_hid())
158
159    def set_default_event_mask_except(self, mask, le_mask):
160        self.__post(self.__adapter.set_default_event_mask_except(mask, le_mask))
161
162    def set_event_filter_inquiry_result_all_devices(self):
163        self.__post(self.__adapter.set_event_filter_inquiry_result_all_devices())
164
165    def set_event_filter_connection_setup_all_devices(self):
166        self.__post(self.__adapter.set_event_filter_connection_setup_all_devices())
167
168    def le_rand(self):
169        self.__post(self.__adapter.le_rand())
170
171    def create_bond(self, address, transport=1):
172        """
173        Create a bonding entry for a given address with a particular transport type.
174        """
175        f = self.__post(self.__security.create_bond(address, transport))
176        return self.__post(self.__bond_change_waiter(f))
177
178    def remove_bonded_device(self, address):
179        """
180        Removes a bonding entry for a given address.
181        """
182        self.__post(self.__security.remove_bond(address))
183
184    async def __bond_change_waiter(self, f):
185        params = await f
186        state, address = params["bond_state"].data[0], params["address"].data[0]
187        return (state, address)
188
189    def generate_local_oob_data(self, transport=TRANSPORT_LE):
190        """
191        Generate local Out of Band data
192
193        @param transport TRANSPORT_CLASSIC or TRANSPORT_LE
194
195        @return a future to await on fo the data
196        """
197        f = self.__post(self.__security.generate_local_oob_data(transport))
198
199        async def waiter(f):
200            params = await f
201            return OobData(params["is_valid"].data[0], params["transport"].data[0], params["address"].data[0],
202                           params["confirmation"].data[0], params["randomizer"].data[0])
203
204        return self.__post(waiter(f))
205
206    def set_local_io_caps(self, io_capability=0):
207        f = self.__post(self.__adapter.set_local_io_caps(io_capability))
208
209        async def waiter(f):
210            params = await f
211            status, io_caps = params["status"].data[0], params["LocalIoCaps"].data[0]
212            return (status, io_caps)
213
214        return self.__post(waiter(f))
215
216    def toggle_discovery(self, is_start):
217        f = self.__post(self.__adapter.toggle_discovery(is_start))
218
219        async def waiter(f):
220            params = await f
221            return params["discovery_state"].data[0]
222
223        return self.__post(waiter(f))
224
225    def find_device(self):
226        """
227        Attempts to find discoverable devices when discovery is toggled on.
228
229        @return a list of properties of found device.
230        """
231        f = self.__post(self.__adapter.find_device())
232
233        async def waiter(f):
234            try:
235                params = await f
236                return params["BdAddr"].data[0]
237            except:
238                # The future `f` has a timeout after 2s post which it is cancelled.
239                print("No device was found. Timed out.")
240            return None
241
242        return self.__post(waiter(f))
243
244    def start_slc(self, address):
245        f = self.__post(self.__hfp.start_slc(address))
246        return self.__post(self.__hfp_connection_state_waiter(f))
247
248    def stop_slc(self, address):
249        f = self.__post(self.__hfp.stop_slc(address))
250        return self.__post(self.__hfp_connection_state_waiter(f))
251
252    def wait_for_hfp_connection_state_change(self):
253        f = self.__post(self.__hfp.wait_for_hfp_connection_state_change())
254        return self.__post(self.__hfp_connection_state_waiter(f))
255
256    async def __hfp_connection_state_waiter(self, f):
257        data = await f
258        data_list = data.split(", ")
259        state, address = data_list[0].strip(), data_list[1].strip()
260        return (state, address)
261