1#!/usr/bin/env python
2
3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7import base64
8import json
9import logging
10import logging.handlers
11
12import common
13from autotest_lib.client.common_lib.cros.bluetooth import bluetooth_sdp_socket
14from autotest_lib.client.common_lib.cros.bluetooth import bluetooth_socket
15from autotest_lib.client.cros import constants
16from autotest_lib.client.cros import xmlrpc_server
17
18
19class BluetoothTesterXmlRpcDelegate(xmlrpc_server.XmlRpcDelegate):
20    """Exposes Tester methods called remotely during Bluetooth autotests.
21
22    All instance methods of this object without a preceding '_' are exposed via
23    an XML-RPC server. This is not a stateless handler object, which means that
24    if you store state inside the delegate, that state will remain around for
25    future calls.
26    """
27
28    BR_EDR_LE_PROFILE = (
29            bluetooth_socket.MGMT_SETTING_POWERED |
30            bluetooth_socket.MGMT_SETTING_CONNECTABLE |
31            bluetooth_socket.MGMT_SETTING_PAIRABLE |
32            bluetooth_socket.MGMT_SETTING_SSP |
33            bluetooth_socket.MGMT_SETTING_BREDR |
34            bluetooth_socket.MGMT_SETTING_LE)
35
36    LE_PROFILE = (
37            bluetooth_socket.MGMT_SETTING_POWERED |
38            bluetooth_socket.MGMT_SETTING_CONNECTABLE |
39            bluetooth_socket.MGMT_SETTING_PAIRABLE |
40            bluetooth_socket.MGMT_SETTING_LE)
41
42    PROFILE_SETTINGS = {
43        'computer': BR_EDR_LE_PROFILE,
44        'peripheral': LE_PROFILE
45    }
46
47    PROFILE_CLASS = {
48        'computer': 0x000104,
49        'peripheral': None
50    }
51
52    PROFILE_NAMES = {
53        'computer': ('ChromeOS Bluetooth Tester', 'Tester'),
54        'peripheral': ('ChromeOS Bluetooth Tester', 'Tester')
55    }
56
57
58    def __init__(self):
59        super(BluetoothTesterXmlRpcDelegate, self).__init__()
60
61        # Open the Bluetooth Control socket to the kernel which provides us
62        # the needed raw management access to the Bluetooth Host Subsystem.
63        self._control = bluetooth_socket.BluetoothControlSocket()
64        # Open the Bluetooth SDP socket to the kernel which provides us the
65        # needed interface to use SDP commands.
66        self._sdp = bluetooth_sdp_socket.BluetoothSDPSocket()
67        # This is almost a constant, but it might not be forever.
68        self.index = 0
69
70
71    def setup(self, profile):
72        """Set up the tester with the given profile.
73
74        @param profile: Profile to use for this test, valid values are:
75                computer - a standard computer profile
76
77        @return True on success, False otherwise.
78
79        """
80        profile_settings = self.PROFILE_SETTINGS[profile]
81        profile_class = self.PROFILE_CLASS[profile]
82        (profile_name, profile_short_name) = self.PROFILE_NAMES[profile]
83
84        # Make sure the controller actually exists.
85        if self.index not in self._control.read_index_list():
86            logging.warning('Bluetooth Controller missing on tester')
87            return False
88
89        # Make sure all of the settings are supported by the controller.
90        ( address, bluetooth_version, manufacturer_id,
91          supported_settings, current_settings, class_of_device,
92          name, short_name ) = self._control.read_info(self.index)
93        if profile_settings & supported_settings != profile_settings:
94            logging.warning('Controller does not support requested settings')
95            logging.debug('Supported: %b; Requested: %b', supported_settings,
96                          profile_settings)
97            return False
98
99        # Before beginning, force the adapter power off, even if it's already
100        # off; this is enough to persuade an AP-mode Intel chip to accept
101        # settings.
102        if not self._control.set_powered(self.index, False):
103            logging.warning('Failed to power off adapter to accept settings')
104            return False
105
106        # Set the controller up as either BR/EDR only, LE only or Dual Mode.
107        # This is a bit tricky because it rejects commands outright unless
108        # it's in dual mode, so we actually have to figure out what changes
109        # we have to make, and we have to turn things on before we turn them
110        # off.
111        turn_on = (current_settings ^ profile_settings) & profile_settings
112        if turn_on & bluetooth_socket.MGMT_SETTING_BREDR:
113            if self._control.set_bredr(self.index, True) is None:
114                logging.warning('Failed to enable BR/EDR')
115                return False
116        if turn_on & bluetooth_socket.MGMT_SETTING_LE:
117            if self._control.set_le(self.index, True) is None:
118                logging.warning('Failed to enable LE')
119                return False
120
121        turn_off = (current_settings ^ profile_settings) & current_settings
122        if turn_off & bluetooth_socket.MGMT_SETTING_BREDR:
123            if self._control.set_bredr(self.index, False) is None:
124                logging.warning('Failed to disable BR/EDR')
125                return False
126        if turn_off & bluetooth_socket.MGMT_SETTING_LE:
127            if self._control.set_le(self.index, False) is None:
128                logging.warning('Failed to disable LE')
129                return False
130
131        # Adjust settings that are BR/EDR specific that we need to set before
132        # powering on the adapter, and would be rejected otherwise.
133        if profile_settings & bluetooth_socket.MGMT_SETTING_BREDR:
134            if (self._control.set_link_security(
135                    self.index,
136                    (profile_settings &
137                            bluetooth_socket.MGMT_SETTING_LINK_SECURITY))
138                        is None):
139                logging.warning('Failed to set link security setting')
140                return False
141            if (self._control.set_ssp(
142                    self.index,
143                    profile_settings & bluetooth_socket.MGMT_SETTING_SSP)
144                        is None):
145                logging.warning('Failed to set SSP setting')
146                return False
147            if (self._control.set_hs(
148                    self.index,
149                    profile_settings & bluetooth_socket.MGMT_SETTING_HS)
150                        is None):
151                logging.warning('Failed to set High Speed setting')
152                return False
153
154            # Split our the major and minor class; it's listed as a kernel bug
155            # that we supply these to the kernel without shifting the bits over
156            # to take out the CoD format field, so this might have to change
157            # one day.
158            major_class = (profile_class & 0x00ff00) >> 8
159            minor_class = profile_class & 0x0000ff
160            if (self._control.set_device_class(
161                    self.index, major_class, minor_class)
162                        is None):
163                logging.warning('Failed to set device class')
164                return False
165
166        # Setup generic settings that apply to either BR/EDR, LE or dual-mode
167        # that still require the power to be off.
168        if (self._control.set_connectable(
169                self.index,
170                profile_settings & bluetooth_socket.MGMT_SETTING_CONNECTABLE)
171                    is None):
172            logging.warning('Failed to set connectable setting')
173            return False
174        if (self._control.set_pairable(
175                self.index,
176                profile_settings & bluetooth_socket.MGMT_SETTING_PAIRABLE)
177                    is None):
178            logging.warning('Failed to set pairable setting')
179            return False
180
181        if (self._control.set_local_name(
182                    self.index, profile_name, profile_short_name)
183                    is None):
184            logging.warning('Failed to set local name')
185            return False
186
187        # Now the settings have been set, power up the adapter.
188        if not self._control.set_powered(
189                self.index,
190                profile_settings & bluetooth_socket.MGMT_SETTING_POWERED):
191            logging.warning('Failed to set powered setting')
192            return False
193
194        # Fast connectable can only be set once the controller is powered,
195        # and only when BR/EDR is enabled.
196        if profile_settings & bluetooth_socket.MGMT_SETTING_BREDR:
197            # Wait for the device class set event, this happens after the
198            # power up "command complete" event when we've pre-set the class
199            # even though it's a side-effect of doing that.
200            self._control.wait_for_events(
201                    self.index,
202                    ( bluetooth_socket.MGMT_EV_CLASS_OF_DEV_CHANGED, ))
203
204            if (self._control.set_fast_connectable(
205                    self.index,
206                    profile_settings &
207                    bluetooth_socket.MGMT_SETTING_FAST_CONNECTABLE)
208                        is None):
209                logging.warning('Failed to set fast connectable setting')
210                return False
211
212        # Fetch the settings again and make sure they're all set correctly,
213        # including the BR/EDR flag.
214        ( address, bluetooth_version, manufacturer_id,
215          supported_settings, current_settings, class_of_device,
216          name, short_name ) = self._control.read_info(self.index)
217
218        # Check generic settings.
219        if profile_settings != current_settings:
220            logging.warning('Controller settings did not match those set: '
221                            '%x != %x', current_settings, profile_settings)
222            return False
223        if name != profile_name:
224            logging.warning('Local name did not match that set: "%s" != "%s"',
225                            name, profile_name)
226            return False
227        elif short_name != profile_short_name:
228            logging.warning('Short name did not match that set: "%s" != "%s"',
229                            short_name, profile_short_name)
230            return False
231
232        # Check BR/EDR specific settings.
233        if profile_settings & bluetooth_socket.MGMT_SETTING_BREDR:
234            if class_of_device != profile_class:
235                if class_of_device & 0x00ffff == profile_class & 0x00ffff:
236                    logging.warning('Class of device matched that set, but '
237                                    'Service Class field did not: %x != %x '
238                                    'Reboot Tester? ',
239                                    class_of_device, profile_class)
240                else:
241                    logging.warning('Class of device did not match that set: '
242                                    '%x != %x', class_of_device, profile_class)
243                return False
244
245        return True
246
247
248    def set_discoverable(self, discoverable, timeout=0):
249        """Set the discoverable state of the controller.
250
251        @param discoverable: Whether controller should be discoverable.
252        @param timeout: Timeout in seconds before disabling discovery again,
253                ignored when discoverable is False, must not be zero when
254                discoverable is True.
255
256        @return True on success, False otherwise.
257
258        """
259        settings = self._control.set_discoverable(self.index,
260                                                  discoverable, timeout)
261        return settings & bluetooth_socket.MGMT_SETTING_DISCOVERABLE
262
263
264    def read_info(self):
265        """Read the adapter information from the Kernel.
266
267        @return the information as a JSON-encoded tuple of:
268          ( address, bluetooth_version, manufacturer_id,
269            supported_settings, current_settings, class_of_device,
270            name, short_name )
271
272        """
273        return json.dumps(self._control.read_info(self.index))
274
275
276    def set_advertising(self, advertising):
277        """Set the whether the controller is advertising via LE.
278
279        @param advertising: Whether controller should advertise via LE.
280
281        @return True on success, False otherwise.
282
283        """
284        settings = self._control.set_advertising(self.index, advertising)
285        return settings & bluetooth_socket.MGMT_SETTING_ADVERTISING
286
287
288    def discover_devices(self, br_edr=True, le_public=True, le_random=True):
289        """Discover remote devices.
290
291        Activates device discovery and collects the set of devices found,
292        returning them as a list.
293
294        @param br_edr: Whether to detect BR/EDR devices.
295        @param le_public: Whether to detect LE Public Address devices.
296        @param le_random: Whether to detect LE Random Address devices.
297
298        @return List of devices found as JSON-encoded tuples with the format
299                (address, address_type, rssi, flags, base64-encoded eirdata),
300                or False if discovery could not be started.
301
302        """
303        address_type = 0
304        if br_edr:
305            address_type |= 0x1
306        if le_public:
307            address_type |= 0x2
308        if le_random:
309            address_type |= 0x4
310
311        set_type = self._control.start_discovery(self.index, address_type)
312        if set_type != address_type:
313            logging.warning('Discovery address type did not match that set: '
314                            '%x != %x', set_type, address_type)
315            return False
316
317        devices = self._control.get_discovered_devices(self.index)
318        return json.dumps([
319                (address, address_type, rssi, flags,
320                 base64.encodestring(eirdata))
321                for address, address_type, rssi, flags, eirdata in devices
322        ])
323
324
325    def connect(self, address):
326        """Connect to device with the given address
327
328        @param address: Bluetooth address.
329
330        """
331        self._sdp.connect(address)
332        return True
333
334
335    def service_search_request(self, uuids, max_rec_cnt, preferred_size=32,
336                               forced_pdu_size=None, invalid_request=False):
337        """Send a Service Search Request
338
339        @param uuids: List of UUIDs (as integers) to look for.
340        @param max_rec_cnt: Maximum count of returned service records.
341        @param preferred_size: Preffered size of UUIDs in bits (16, 32, or 128).
342        @param forced_pdu_size: Use certain PDU size parameter instead of
343               calculating actual length of sequence.
344        @param invalid_request: Whether to send request with intentionally
345               invalid syntax for testing purposes (bool flag).
346
347        @return list of found services' service record handles or Error Code
348
349        """
350        return json.dumps(
351                self._sdp.service_search_request(
352                 uuids, max_rec_cnt, preferred_size, forced_pdu_size,
353                 invalid_request)
354        )
355
356
357    def service_attribute_request(self, handle, max_attr_byte_count, attr_ids,
358                                  forced_pdu_size=None, invalid_request=None):
359        """Send a Service Attribute Request
360
361        @param handle: service record from which attribute values are to be
362               retrieved.
363        @param max_attr_byte_count: maximum number of bytes of attribute data to
364               be returned in the response to this request.
365        @param attr_ids: a list, where each element is either an attribute ID
366               or a range of attribute IDs.
367        @param forced_pdu_size: Use certain PDU size parameter instead of
368               calculating actual length of sequence.
369        @param invalid_request: Whether to send request with intentionally
370               invalid syntax for testing purposes (string with raw request).
371
372        @return list of found attributes IDs and their values or Error Code
373
374        """
375        return json.dumps(
376                self._sdp.service_attribute_request(
377                 handle, max_attr_byte_count, attr_ids, forced_pdu_size,
378                 invalid_request)
379        )
380
381
382    def service_search_attribute_request(self, uuids, max_attr_byte_count,
383                                         attr_ids, preferred_size=32,
384                                         forced_pdu_size=None,
385                                         invalid_request=None):
386        """Send a Service Search Attribute Request
387
388        @param uuids: list of UUIDs (as integers) to look for.
389        @param max_attr_byte_count: maximum number of bytes of attribute data to
390               be returned in the response to this request.
391        @param attr_ids: a list, where each element is either an attribute ID
392               or a range of attribute IDs.
393        @param preferred_size: Preffered size of UUIDs in bits (16, 32, or 128).
394        @param forced_pdu_size: Use certain PDU size parameter instead of
395               calculating actual length of sequence.
396        @param invalid_request: Whether to send request with intentionally
397               invalid syntax for testing purposes (string to be prepended
398               to correct request).
399
400        @return list of found attributes IDs and their values or Error Code
401
402        """
403        return json.dumps(
404                self._sdp.service_search_attribute_request(
405                 uuids, max_attr_byte_count, attr_ids, preferred_size,
406                 forced_pdu_size, invalid_request)
407        )
408
409
410if __name__ == '__main__':
411    logging.basicConfig(level=logging.DEBUG)
412    handler = logging.handlers.SysLogHandler(address = '/dev/log')
413    formatter = logging.Formatter(
414            'bluetooth_tester_xmlrpc_server: [%(levelname)s] %(message)s')
415    handler.setFormatter(formatter)
416    logging.getLogger().addHandler(handler)
417    logging.debug('bluetooth_tester_xmlrpc_server main...')
418    server = xmlrpc_server.XmlRpcServer(
419            'localhost',
420            constants.BLUETOOTH_TESTER_XMLRPC_SERVER_PORT)
421    server.register_delegate(BluetoothTesterXmlRpcDelegate())
422    server.run()
423