1# Copyright 2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Client class to access the Floss manager interface."""
15
16from floss.pandora.floss import observer_base
17from floss.pandora.floss import utils
18
19
20class ManagerCallbacks:
21    """Callbacks for the Manager Interface.
22
23    Implement this to observe these callbacks when exporting callbacks via
24    register_callback.
25    """
26
27    def on_hci_device_changed(self, hci, present):
28        """Hci device presence is updated.
29
30        Args:
31            hci: Hci interface number.
32            present: Whether this hci interface is appearing or disappearing.
33        """
34        pass
35
36    def on_hci_enabled_changed(self, hci, enabled):
37        """Hci device is being enabled or disabled.
38
39        Args:
40            hci: Hci interface number.
41            enabled: Whether this hci interface is being enabled or disabled.
42        """
43        pass
44
45
46class FlossManagerClient(ManagerCallbacks):
47    """Handles method calls to and callbacks from the Manager interface."""
48
49    MGR_SERVICE = 'org.chromium.bluetooth.Manager'
50    MGR_INTERFACE = 'org.chromium.bluetooth.Manager'
51    MGR_OBJECT = '/org/chromium/bluetooth/Manager'
52
53    # Exported callback interface and objects
54    CB_EXPORTED_INTF = 'org.chromium.bluetooth.ManagerCallback'
55    CB_EXPORTED_OBJ_NAME = 'test_manager_client'
56
57    class AdaptersNotParseable(Exception):
58        """An entry in the result of GetAvailableAdapters was not parseable."""
59        pass
60
61    class ExportedManagerCallbacks(observer_base.ObserverBase):
62        """
63        <node>
64            <interface name="org.chromium.bluetooth.ManagerCallback">
65                <method name="OnHciDeviceChanged">
66                    <arg type="i" name="hci" direction="in" />
67                    <arg type="b" name="present" direction="in" />
68                </method>
69                <method name="OnHciEnabledChanged">
70                    <arg type="i" name="hci" direction="in" />
71                    <arg type="b" name="enabled" direction="in" />
72                </method>
73            </interface>
74        </node>
75        """
76
77        def __init__(self):
78            """Construct exported callbacks object."""
79            observer_base.ObserverBase.__init__(self)
80
81        def OnHciDeviceChanged(self, hci, present):
82            """Handle device presence callbacks."""
83            for observer in self.observers.values():
84                observer.on_hci_device_changed(hci, present)
85
86        def OnHciEnabledChanged(self, hci, enabled):
87            """Handle device enabled callbacks."""
88            for observer in self.observers.values():
89                observer.on_hci_enabled_changed(hci, enabled)
90
91    def __init__(self, bus):
92        """Construct the client.
93
94        Args:
95            bus: DBus bus over which we'll establish connections.
96        """
97        self.bus = bus
98
99        # We don't register callbacks by default. The client owner must call
100        # register_callbacks to do so.
101        self.callbacks = None
102
103        # Initialize hci devices and their power states
104        self.adapters = {}
105
106    def __del__(self):
107        """Destructor."""
108        del self.callbacks
109
110    @utils.glib_call(False)
111    def has_proxy(self):
112        """Checks whether manager proxy can be acquired."""
113        return bool(self.proxy())
114
115    def proxy(self):
116        """Gets proxy object to manager interface for method calls."""
117        return self.bus.get(self.MGR_SERVICE, self.MGR_OBJECT)[self.MGR_INTERFACE]
118
119    @utils.glib_call(False)
120    def register_callbacks(self):
121        """Registers manager callbacks for this client if one doesn't already exist."""
122        # Callbacks already registered
123        if self.callbacks:
124            return True
125
126        # Create and publish callbacks
127        self.callbacks = self.ExportedManagerCallbacks()
128        self.callbacks.add_observer('manager_client', self)
129        objpath = utils.generate_dbus_cb_objpath(self.CB_EXPORTED_OBJ_NAME)
130        self.bus.register_object(objpath, self.callbacks, None)
131
132        # Register published callbacks with manager daemon
133        self.proxy().RegisterCallback(objpath)
134
135        return True
136
137    @utils.glib_callback()
138    def on_hci_device_changed(self, hci, present):
139        """Handle device presence change."""
140        if present:
141            self.adapters[hci] = self.adapters.get(hci, False)
142        elif hci in self.adapters:
143            del self.adapters[hci]
144
145    @utils.glib_callback()
146    def on_hci_enabled_changed(self, hci, enabled):
147        """Handle device enabled change."""
148        self.adapters[hci] = enabled
149
150    def get_default_adapter(self):
151        """Get the default adapter in use by the manager."""
152        # TODO(abps): The default adapter is hci0 until we support multiple
153        #             adapters.
154        return 0
155
156    def has_default_adapter(self):
157        """Checks whether the default adapter exists on this system."""
158        return self.get_default_adapter() in self.adapters
159
160    @utils.glib_call()
161    def start(self, hci):
162        """Start a specific adapter."""
163        self.proxy().Start(hci)
164
165    @utils.glib_call()
166    def stop(self, hci):
167        """Stop a specific adapter."""
168        self.proxy().Stop(hci)
169
170    @utils.glib_call(False)
171    def get_adapter_enabled(self, hci):
172        """Checks whether a specific adapter is enabled (i.e. started)."""
173        return bool(self.proxy().GetAdapterEnabled(hci))
174
175    @utils.glib_call(False)
176    def get_floss_enabled(self):
177        """Gets whether Floss is enabled."""
178        return bool(self.proxy().GetFlossEnabled())
179
180    @utils.glib_call()
181    def set_floss_enabled(self, enabled):
182        self.proxy().SetFlossEnabled(enabled)
183
184    @utils.glib_call([])
185    def get_available_adapters(self):
186        """Gets a list of currently available adapters and if they are enabled."""
187        all_adapters = []
188        dbus_result = self.proxy().GetAvailableAdapters()
189
190        for d in dbus_result:
191            if 'hci_interface' in d and 'enabled' in d:
192                all_adapters.append((int(d['hci_interface']), bool(d['enabled'])))
193            else:
194                raise FlossManagerClient.AdaptersNotParseable(f'Could not parse: {d}')
195
196        # This function call overwrites any existing cached values of
197        # self.adapters that we may have gotten from observers.
198        self.adapters = {}
199        for (hci, enabled) in all_adapters:
200            self.adapters[hci] = enabled
201
202        return all_adapters
203