1# Copyright 2023 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import grpc 16import logging 17 18from bumble.core import UUID as BumbleUUID, AdvertisingData 19from bumble.device import Device 20from bumble.gatt import Characteristic, CharacteristicValue, TemplateService 21from bumble.l2cap import Channel 22from bumble.pandora import utils 23from google.protobuf.empty_pb2 import Empty 24from pandora_experimental.dck_grpc_aio import DckServicer 25from typing import Optional 26 27 28class DckGattService(TemplateService): 29 CCC_DK_UUID: BumbleUUID = BumbleUUID.from_16_bits(0xFFF5, 'Car Connectivity Consortium, LLC') 30 UUID = CCC_DK_UUID 31 UUID_SPSM = BumbleUUID("D3B5A130-9E23-4B3A-8BE4-6B1EE5F980A3", "Vehicle SPSM") 32 UUID_SPSM_DK_VERSION = BumbleUUID("D3B5A130-9E23-4B3A-8BE4-6B1EE5B780A3", "DK version") 33 UUID_DEVICE_DK_VERSION = BumbleUUID("BD4B9502-3F54-11EC-B919-0242AC120005", "Device Selected DK version") 34 UUID_ANTENNA_IDENTIFIER = BumbleUUID("c6d7d4a1-e2b0-4e95-b576-df983d1a5d9f", "Vehicle Antenna Identifier") 35 36 def __init__(self, device: Device): 37 logger = logging.getLogger(__name__) 38 39 def on_l2cap_server(channel: Channel) -> None: 40 logger.info(f"--- DckGattService on_l2cap_server") 41 42 self.device_dk_version_value = None 43 self.psm = device.register_l2cap_channel_server(0, on_l2cap_server) # type: ignore 44 45 def on_device_version_write(value: bytes) -> None: 46 logger.info(f"--- DK Device Version Write: {value!r}") 47 self.device_dk_version_value = value 48 49 characteristics = [ 50 Characteristic( 51 DckGattService.UUID_SPSM, 52 Characteristic.Properties.READ, 53 Characteristic.READABLE, 54 CharacteristicValue(read=bytes(self.psm)), # type: ignore[no-untyped-call] 55 ), 56 Characteristic( 57 DckGattService.UUID_SPSM_DK_VERSION, 58 Characteristic.Properties.READ, 59 Characteristic.READ_REQUIRES_ENCRYPTION, 60 CharacteristicValue(read=b''), # type: ignore[no-untyped-call] 61 ), 62 Characteristic( 63 DckGattService.UUID_DEVICE_DK_VERSION, 64 Characteristic.Properties.WRITE, 65 Characteristic.READ_REQUIRES_ENCRYPTION, 66 CharacteristicValue(write=on_device_version_write), # type: ignore[no-untyped-call] 67 ), 68 Characteristic( 69 DckGattService.UUID_ANTENNA_IDENTIFIER, 70 Characteristic.READ, 71 Characteristic.READABLE, 72 CharacteristicValue(read=b''), # type: ignore[no-untyped-call] 73 ), 74 ] 75 76 super().__init__(characteristics) # type: ignore[no-untyped-call] 77 78 def get_advertising_data(self) -> bytes: 79 # CCC Specification Digital-Key R3-1.2.0-r14 80 # 19.2 LE Procedures AdvData field of ADV_IND 81 82 return bytes(AdvertisingData([(AdvertisingData.SERVICE_DATA_16_BIT_UUID, bytes(DckGattService.CCC_DK_UUID))])) 83 84 85class DckService(DckServicer): 86 device: Device 87 dck_gatt_service: Optional[DckGattService] 88 89 def __init__(self, device: Device) -> None: 90 self.log = utils.BumbleServerLoggerAdapter(logging.getLogger(), {"service_name": "Dck", "device": device}) 91 self.device = device 92 self.dck_gatt_service = None 93 94 @utils.rpc 95 def Register(self, request: Empty, context: grpc.ServicerContext) -> Empty: 96 if self.dck_gatt_service is None: 97 self.dck_gatt_service = DckGattService(self.device) 98 self.device.add_service(self.dck_gatt_service) # type: ignore[no-untyped-call] 99 100 return Empty() 101