1# Copyright 2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Client class to access the Floss scanner interface."""
15import copy
16import logging
17import uuid as uuid_module
18
19from floss.pandora.floss import floss_enums
20from floss.pandora.floss import observer_base
21from floss.pandora.floss import utils
22from gi.repository import GLib
23
24
25class BluetoothScannerCallbacks:
26    """Callbacks for the scanner interface.
27
28    Implement this to observe these callbacks when exporting callbacks via
29    register_callback.
30    """
31
32    def on_scanner_registered(self, uuid, scanner_id, status):
33        """Called when scanner registered.
34
35        Args:
36            uuid: The specific uuid to register it.
37            scanner_id: Scanner id of scanning set.
38            status: floss_enums.GattStatus.
39        """
40        pass
41
42    def on_scan_result(self, scan_result):
43        """Called when execute start_scan().
44
45        Args:
46            scan_result: The struct of ScanResult.
47        """
48        pass
49
50    def on_advertisement_found(self, scanner_id, scan_result):
51        """Called when advertisement found.
52
53        Args:
54            scanner_id: The scanner ID for scanner.
55            scan_result: The struct of ScanResult.
56        """
57        pass
58
59    def on_advertisement_lost(self, scanner_id, scan_result):
60        """Called when advertisement lost.
61
62        Args:
63            scanner_id: The scanner ID for scanner.
64            scan_result: The struct of ScanResult.
65        """
66        pass
67
68    def on_suspend_mode_change(self, suspend_mode):
69        """Called when suspend mode change.
70
71        Args:
72            suspend_mode: The suspend mode of Bluetooth.
73        """
74        pass
75
76
77class ScannerObj:
78    """The scanner object for Advertisement Monitor Tests.
79
80    This class creates instances of multiple scanners.
81    """
82
83    def __init__(self, scanner_id, uuid, status):
84        """Construction of a scanner object.
85
86        Args:
87            scanner_id: Scanner ID of scanning set.
88            uuid: The specific UUID for scanner.
89            status: GATT status.
90        """
91        self.scanner_id = scanner_id
92        self.uuid = uuid
93        self.status = status
94        self.events = {
95            'DeviceFound': 0,
96            'DeviceLost': 0,
97        }
98        self.target_devices = []
99
100    def get_event_count(self, event):
101        """Reads the event count.
102
103        Args:
104            event: Name of the specific event or 'All' for all events.
105        Returns:
106            Count of a specific event or dict of counts of all events.
107        """
108        if event == 'All':
109            return self.events
110
111        return self.events.get(event)
112
113    def add_event_count(self, event):
114        """Increase the event count by one.
115
116        Args:
117            event: Name of the event as a string.
118        """
119        self.events[event] += 1
120
121    def reset_event_count(self, event):
122        """Resets the event count.
123
124        Args:
125            event: Name of a specific event or 'All' for all events.
126            True on success, False otherwise.
127
128        Returns:
129            True if success, False otherwise.
130        """
131        if event == 'All':
132            for event_key in self.events:
133                self.events[event_key] = 0
134            return True
135
136        if event in self.events:
137            self.events[event] = 0
138            return True
139
140        return False
141
142    def set_target_devices(self, devices):
143        """Sets the target devices to the given scanner.
144
145        DeviceFound and DeviceLost will only be counted if it is triggered by a
146        target device.
147
148        Args:
149            devices: A list of devices in dbus object path.
150        """
151        self.target_devices = copy.deepcopy(devices)
152
153
154class FlossScannerClient(BluetoothScannerCallbacks):
155    """Handles method calls to and callbacks from the scanner interface."""
156
157    SCANNER_SERVICE = 'org.chromium.bluetooth'
158    SCANNER_INTERFACE = 'org.chromium.bluetooth.BluetoothGatt'
159    SCANNER_OBJECT_PATTERN = '/org/chromium/bluetooth/hci{}/gatt'
160
161    SCANNER_CB_INTF = 'org.chromium.bluetooth.ScannerCallback'
162    SCANNER_CB_OBJ_NAME = 'test_scanner_client'
163    FLOSS_RESPONSE_LATENCY_SECS = 3
164
165    class ExportedScannerCallbacks(observer_base.ObserverBase):
166        """
167        <node>
168            <interface name="org.chromium.bluetooth.ScannerCallback">
169                <method name="OnScannerRegistered">
170                    <arg type="ay" name="uuid" direction="in" />
171                    <arg type="y" name="scanner_id" direction="in" />
172                    <arg type="u" name="status" direction="in" />
173                </method>
174                <method name="OnScanResult">
175                    <arg type="a{sv}" name="scan_result" direction="in" />
176                </method>
177                <method name="OnAdvertisementFound">
178                    <arg type="y" name="scanner_id" direction="in" />
179                    <arg type="a{sv}" name="scan_result" direction="in" />
180                </method>
181                <method name="OnAdvertisementLost">
182                    <arg type="y" name="scanner_id" direction="in" />
183                    <arg type="a{sv}" name="scan_result" direction="in" />
184                </method>
185                <method name="OnSuspendModeChange">
186                    <arg type="u" name="suspend_mode" direction="in" />
187                </method>
188            </interface>
189        </node>
190        """
191
192        def __init__(self):
193            """Constructs exported callbacks object."""
194            observer_base.ObserverBase.__init__(self)
195
196        def OnScannerRegistered(self, uuid, scanner_id, status):
197            """Handles scanner registered callback.
198
199            Args:
200                uuid: The specific uuid to register it.
201                scanner_id: Scanner id of scanning set.
202                status: floss_enums.GattStatus.
203            """
204            for observer in self.observers.values():
205                observer.on_scanner_registered(uuid, scanner_id, status)
206
207        def OnScanResult(self, scan_result):
208            """Handles scan result callback.
209
210            Args:
211                scan_result: The struct of ScanResult.
212            """
213            for observer in self.observers.values():
214                observer.on_scan_result(scan_result)
215
216        def OnAdvertisementFound(self, scanner_id, scan_result):
217            """Handles advertisement found callback.
218
219            Args:
220                scanner_id: The scanner ID for scanner.
221                scan_result: The struct of ScanResult.
222            """
223            for observer in self.observers.values():
224                observer.on_advertisement_found(scanner_id, scan_result)
225
226        def OnAdvertisementLost(self, scanner_id, scan_result):
227            """Handles advertisement lost callback.
228
229            Args:
230                scanner_id: The scanner ID for scanner.
231                scan_result: The struct of ScanResult.
232            """
233            for observer in self.observers.values():
234                observer.on_advertisement_lost(scanner_id, scan_result)
235
236        def OnSuspendModeChange(self, suspend_mode):
237            """Handles suspend mode change callback.
238
239            Args:
240                suspend_mode: The suspend mode of Bluetooth.
241            """
242            for observer in self.observers.values():
243                observer.on_suspend_mode_change(suspend_mode)
244
245    def __init__(self, bus, hci):
246        """Constructs the client.
247
248        Args:
249            bus: D-Bus bus over which we'll establish connections.
250            hci: HCI adapter index. Get this value from `get_default_adapter`
251                    on FlossManagerClient.
252        """
253        self.bus = bus
254        self.hci = hci
255        self.objpath = self.SCANNER_OBJECT_PATTERN.format(hci)
256
257        # We don't register callbacks by default.
258        self.callbacks = None
259        self.callback_id = None
260        self.register_scanner_results = {}
261        self.scanners = {}
262
263    def __del__(self):
264        """Destructor."""
265        del self.callbacks
266
267    @utils.glib_callback()
268    def on_scanner_registered(self, uuid, scanner_id, status):
269        """Handles scanner registered callback.
270
271        Args:
272            uuid: The specific uuid to register it.
273            scanner_id: Scanner id of scanning set.
274            status: floss_enums.GattStatus.
275        """
276        logging.debug('on_scanner_registered: uuid: %s, scanner_id: %s status: %s', uuid, scanner_id, status)
277
278        # The uuid is returned as a list of bytes (128-bit UUID) so
279        # we need convert it to uuid object in order to store it in the
280        # dictionary as a key.
281        uuid_object = uuid_module.UUID(bytes=bytes(uuid))
282        self.register_scanner_results[uuid_object] = (scanner_id, status)
283
284        if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS:
285            return
286
287        # Creates a scanner object every time a new scanner registered.
288        scanner = ScannerObj(scanner_id, uuid_object, status)
289        self.scanners[scanner_id] = scanner
290
291    @utils.glib_callback()
292    def on_scan_result(self, scan_result):
293        """Handles scan result callback.
294
295        Args:
296            scan_result: The struct of ScanResult.
297        """
298        logging.debug('on_scan_result: scan_result: %s', scan_result)
299
300    @utils.glib_callback()
301    def on_advertisement_found(self, scanner_id, scan_result):
302        """Handles advertisement found callback.
303
304        Args:
305            scanner_id: The scanner ID for scanner.
306            scan_result: The struct of ScanResult.
307        """
308        logging.debug('on_advertisement_found: scanner_id: %s, scan_result: %s', scanner_id, scan_result)
309
310        # Update DeviceFound if the received address device exists in the
311        # target_device list.
312        if scan_result['address'] in self.scanners[scanner_id].target_devices:
313            self.scanners[scanner_id].add_event_count('DeviceFound')
314
315    @utils.glib_callback()
316    def on_advertisement_lost(self, scanner_id, scan_result):
317        """Handles advertisement lost callback.
318
319        Args:
320            scanner_id: The scanner ID for scanner.
321            scan_result: The struct of ScanResult.
322        """
323        logging.debug('on_advertisement_lost: scanner_id: %s, scan_result: %s', scanner_id, scan_result)
324
325        # Update DeviceLost if the received address device exists in the
326        # target_device list.
327        if scan_result['address'] in self.scanners[scanner_id].target_devices:
328            self.scanners[scanner_id].add_event_count('DeviceLost')
329
330    @utils.glib_callback()
331    def on_suspend_mode_change(self, suspend_mode):
332        """Handles suspend mode change callback.
333
334        Args:
335            suspend_mode: The suspend mode of Bluetooth.
336        """
337        logging.debug('on_suspend_mode_change: suspend_mode: %s', suspend_mode)
338
339    def make_dbus_scan_filter_pattern(self, start_position, ad_type, content):
340        """Makes struct for scan filter pattern D-Bus.
341
342        Args:
343            start_position: The start position of pattern.
344            ad_type: The type of pattern.
345            content: The content of pattern.
346
347        Returns:
348            Dictionary of scan filter pattern.
349        """
350        return {
351            'start_position': GLib.Variant('y', start_position),
352            'ad_type': GLib.Variant('y', ad_type),
353            'content': GLib.Variant('ay', content)
354        }
355
356    def make_dbus_scan_filter_condition(self, patterns):
357        """Makes struct for scan filter condition D-Bus.
358
359        Args:
360            patterns: The list of patterns used for conditions.
361
362        Returns:
363            Dictionary of scan filter condition.
364        """
365        return {'patterns': GLib.Variant('aa{sv}', patterns)}
366
367    def make_dbus_scan_filter(self, rssi_high_threshold, rssi_low_threshold, rssi_low_timeout, rssi_sampling_period,
368                              condition):
369        """Makes struct for scan filter D-Bus.
370
371        Args:
372            rssi_high_threshold: RSSI high threshold value.
373            rssi_low_threshold: RSSI low threshold value.
374            rssi_low_timeout: RSSI low timeout value.
375            rssi_sampling_period: The sampling interval in milliseconds.
376            condition: Struct of ScanFilterCondition.
377
378        Returns:
379            Dictionary of scan filter.
380        """
381        patterns = []
382        for c in condition:
383            patterns.append(self.make_dbus_scan_filter_pattern(c['start_position'], c['ad_type'], c['content']))
384        return {
385            'rssi_high_threshold': GLib.Variant('y', rssi_high_threshold),
386            'rssi_low_threshold': GLib.Variant('y', rssi_low_threshold),
387            'rssi_low_timeout': GLib.Variant('y', rssi_low_timeout),
388            'rssi_sampling_period': GLib.Variant('y', rssi_sampling_period),
389            'condition': GLib.Variant('a{sv}', self.make_dbus_scan_filter_condition(patterns))
390        }
391
392    def make_dbus_scan_settings(self, interval, window, scan_type):
393        """Makes struct for scan settings D-Bus.
394
395        Args:
396            interval: The interval value to setting scan.
397            window: The window value to setting scan.
398            scan_type: The type of scan.
399
400        Returns:
401            Dictionary of scan settings.
402        """
403        return {
404            'interval': GLib.Variant('i', interval),
405            'window': GLib.Variant('i', window),
406            'scan_type': GLib.Variant('u', scan_type)
407        }
408
409    @utils.glib_call(False)
410    def has_proxy(self):
411        """Checks whether scanner proxy can be acquired."""
412        return bool(self.proxy())
413
414    def proxy(self):
415        """Gets proxy object to scanner interface for method calls."""
416        return self.bus.get(self.SCANNER_SERVICE, self.objpath)[self.SCANNER_INTERFACE]
417
418    @utils.glib_call(False)
419    def register_scanner_callback(self):
420        """Registers scanner callbacks if it doesn't exist."""
421
422        if self.callbacks:
423            return True
424
425        # Create and publish callbacks
426        self.callbacks = self.ExportedScannerCallbacks()
427        self.callbacks.add_observer('scanner_client', self)
428        objpath = utils.generate_dbus_cb_objpath(self.SCANNER_CB_OBJ_NAME, self.hci)
429        self.bus.register_object(objpath, self.callbacks, None)
430
431        # Register published callbacks with scanner daemon
432        self.callback_id = self.proxy().RegisterScannerCallback(objpath)
433        return True
434
435    def register_callback_observer(self, name, observer):
436        """Add an observer for all callbacks.
437
438        Args:
439            name:
440                Name of the observer.
441            observer:
442                Observer that implements all callback classes.
443        """
444        if isinstance(observer, BluetoothScannerCallbacks):
445            self.callbacks.add_observer(name, observer)
446
447    def unregister_callback_observer(self, name, observer):
448        """Remove an observer for all callbacks.
449
450        Args:
451            name:
452                Name of the observer.
453            observer:
454                Observer that implements all callback classes.
455        """
456        if isinstance(observer, BluetoothScannerCallbacks):
457            self.callbacks.remove_observer(name, observer)
458
459    def wait_for_on_scanner_registered(self, uuid):
460        """Waits for register scanner.
461
462        Args:
463            uuid: The specific uuid for scanner.
464
465        Returns:
466            scanner_id, status for specific uuid on success,
467                 (None, None) otherwise.
468        """
469        try:
470            utils.poll_for_condition(condition=(lambda: uuid in self.register_scanner_results),
471                                     timeout=self.FLOSS_RESPONSE_LATENCY_SECS)
472        except TimeoutError:
473            logging.error('on_scanner_registered not called')
474            return None, None
475        scanner_id, status = self.register_scanner_results[uuid]
476
477        # Consume the result here because we have no straightforward timing
478        # to drop the info. We can't drop it in unregister_scanner because
479        # if the advertising failed to start then it makes no sense for the
480        # user to call unregister_scanner.
481        del self.register_scanner_results[uuid]
482        return scanner_id, status
483
484    @utils.glib_call(False)
485    def unregister_scanner_callback(self):
486        """Unregisters scanner callback for this client.
487
488        Returns:
489            True on success, False otherwise.
490        """
491        return self.proxy().UnregisterScannerCallback(self.callback_id)
492
493    @utils.glib_call(None)
494    def register_scanner(self):
495        """Registers scanner for the callback id.
496
497        Returns:
498            UUID of the registered scanner on success, None otherwise.
499        """
500        return uuid_module.UUID(bytes=bytes(self.proxy().RegisterScanner(self.callback_id)))
501
502    def register_scanner_sync(self):
503        """Registers scanner for the callback id.
504
505        Returns:
506             scanner_id of the registered scanner on success, None otherwise.
507        """
508        uuid = self.register_scanner()
509
510        # Failed if we have issue in D-bus (None).
511        if uuid is None:
512            logging.error('Failed to register the scanner')
513            return None
514
515        scanner_id, status = self.wait_for_on_scanner_registered(uuid)
516        if status is None:
517            return None
518
519        if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS:
520            logging.error('Failed to register the scanner with id: %s, status = %s', scanner_id, status)
521            return None
522        return scanner_id
523
524    @utils.glib_call(False)
525    def unregister_scanner(self, scanner_id):
526        """Unregisters scanner set using scanner id of set.
527
528        Args:
529            scanner_id: Scanner id of set scanning.
530
531        Returns:
532            True on success, False otherwise.
533        """
534        del self.scanners[scanner_id]
535        return self.proxy().UnregisterScanner(scanner_id)
536
537    @utils.glib_call(False)
538    def start_scan(self, scanner_id, settings, scan_filter):
539        """Starts scan.
540
541        Args:
542            scanner_id: Scanner id of set scanning.
543            settings: ScanSettings structure.
544            scan_filter: ScanFilter structure.
545
546        Returns:
547            True on success, False otherwise.
548        """
549        status = self.proxy().StartScan(scanner_id, settings, scan_filter)
550
551        if floss_enums.BtStatus(status) != floss_enums.BtStatus.SUCCESS:
552            logging.error('Failed to start the scanner with id: %s, status = %s', scanner_id, status)
553            return False
554        return True
555
556    @utils.glib_call(None)
557    def stop_scan(self, scanner_id):
558        """Stops scan set using scanner_id.
559
560        Args:
561            scanner_id: Scanner id of set scanning.
562
563        Returns:
564            floss_enums.BtStatus as int on success, None otherwise.
565        """
566        return self.proxy().StopScan(scanner_id)
567
568    @utils.glib_call(None)
569    def get_scan_suspend_mode(self):
570        """Gets scan suspend mode.
571
572        Returns:
573            SuspendMode as int on success, None otherwise.
574        """
575        return self.proxy().GetScanSuspendMode()
576
577    @utils.glib_call(None)
578    def is_msft_supported(self):
579        """Checks if MSFT supported.
580
581        Returns:
582            MSFT capability as boolean on success, None otherwise.
583        """
584        return self.proxy().IsMsftSupported()
585
586    def get_event_count(self, scanner_id, event):
587        """Reads the count of a particular event on the given scanner.
588
589        Args:
590            scanner_id: The scanner ID.
591            event: Name of the specific event or 'All' for all events.
592
593        Returns:
594            Count of the specific event or dict of counts of all events.
595        """
596        if scanner_id not in self.scanners:
597            return None
598
599        return self.scanners[scanner_id].get_event_count(event)
600
601    def reset_event_count(self, scanner_id, event):
602        """Resets the count of a particular event on the given scanner.
603
604        Args:
605            scanner_id: The scanner ID.
606            event: Name of the specific event or 'All' for all events.
607
608        Returns:
609            True on success, False otherwise.
610        """
611        if scanner_id not in self.scanners:
612            return False
613
614        return self.scanners[scanner_id].reset_event_count(event)
615
616    def set_target_devices(self, scanner_id, devices):
617        """Sets target devices to the given scanner.
618
619        DeviceFound and DeviceLost will only be counted if it is triggered
620        by a target device.
621
622        Args:
623            scanner_id: The scanner ID.
624            devices: A list of devices in dbus object path.
625
626        Returns:
627            True on success, False otherwise.
628        """
629        if scanner_id not in self.scanners:
630            return False
631
632        self.scanners[scanner_id].set_target_devices(devices)
633        return True
634
635    def remove_monitor(self, scanner_id):
636        """Removes the Advertisement Monitor object.
637
638        Args:
639            scanner_id: The scanner ID.
640
641        Returns:
642            True on success, False otherwise.
643        """
644        stop_scan = self.stop_scan(scanner_id)
645        unregister_scanner = self.unregister_scanner(scanner_id)
646
647        if stop_scan == floss_enums.BtStatus.SUCCESS:
648            stop_scan = True
649        else:
650            return False
651        return stop_scan and unregister_scanner
652