1# Copyright 2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import grpc
16import logging
17
18from bumble.core import UUID as BumbleUUID, AdvertisingData
19from bumble.device import Device
20from bumble.gatt import Characteristic, CharacteristicValue, TemplateService
21from bumble.l2cap import Channel
22from bumble.pandora import utils
23from google.protobuf.empty_pb2 import Empty
24from pandora_experimental.dck_grpc_aio import DckServicer
25from typing import Optional
26
27
28class DckGattService(TemplateService):
29    CCC_DK_UUID: BumbleUUID = BumbleUUID.from_16_bits(0xFFF5, 'Car Connectivity Consortium, LLC')
30    UUID = CCC_DK_UUID
31    UUID_SPSM = BumbleUUID("D3B5A130-9E23-4B3A-8BE4-6B1EE5F980A3", "Vehicle SPSM")
32    UUID_SPSM_DK_VERSION = BumbleUUID("D3B5A130-9E23-4B3A-8BE4-6B1EE5B780A3", "DK version")
33    UUID_DEVICE_DK_VERSION = BumbleUUID("BD4B9502-3F54-11EC-B919-0242AC120005", "Device Selected DK version")
34    UUID_ANTENNA_IDENTIFIER = BumbleUUID("c6d7d4a1-e2b0-4e95-b576-df983d1a5d9f", "Vehicle Antenna Identifier")
35
36    def __init__(self, device: Device):
37        logger = logging.getLogger(__name__)
38
39        def on_l2cap_server(channel: Channel) -> None:
40            logger.info(f"--- DckGattService on_l2cap_server")
41
42        self.device_dk_version_value = None
43        self.psm = device.register_l2cap_channel_server(0, on_l2cap_server)  # type: ignore
44
45        def on_device_version_write(value: bytes) -> None:
46            logger.info(f"--- DK Device Version Write: {value!r}")
47            self.device_dk_version_value = value
48
49        characteristics = [
50            Characteristic(
51                DckGattService.UUID_SPSM,
52                Characteristic.Properties.READ,
53                Characteristic.READABLE,
54                CharacteristicValue(read=bytes(self.psm)),  # type: ignore[no-untyped-call]
55            ),
56            Characteristic(
57                DckGattService.UUID_SPSM_DK_VERSION,
58                Characteristic.Properties.READ,
59                Characteristic.READ_REQUIRES_ENCRYPTION,
60                CharacteristicValue(read=b''),  # type: ignore[no-untyped-call]
61            ),
62            Characteristic(
63                DckGattService.UUID_DEVICE_DK_VERSION,
64                Characteristic.Properties.WRITE,
65                Characteristic.READ_REQUIRES_ENCRYPTION,
66                CharacteristicValue(write=on_device_version_write),  # type: ignore[no-untyped-call]
67            ),
68            Characteristic(
69                DckGattService.UUID_ANTENNA_IDENTIFIER,
70                Characteristic.READ,
71                Characteristic.READABLE,
72                CharacteristicValue(read=b''),  # type: ignore[no-untyped-call]
73            ),
74        ]
75
76        super().__init__(characteristics)  # type: ignore[no-untyped-call]
77
78    def get_advertising_data(self) -> bytes:
79        # CCC Specification Digital-Key R3-1.2.0-r14
80        # 19.2 LE Procedures AdvData field of ADV_IND
81
82        return bytes(AdvertisingData([(AdvertisingData.SERVICE_DATA_16_BIT_UUID, bytes(DckGattService.CCC_DK_UUID))]))
83
84
85class DckService(DckServicer):
86    device: Device
87    dck_gatt_service: Optional[DckGattService]
88
89    def __init__(self, device: Device) -> None:
90        self.log = utils.BumbleServerLoggerAdapter(logging.getLogger(), {"service_name": "Dck", "device": device})
91        self.device = device
92        self.dck_gatt_service = None
93
94    @utils.rpc
95    def Register(self, request: Empty, context: grpc.ServicerContext) -> Empty:
96        if self.dck_gatt_service is None:
97            self.dck_gatt_service = DckGattService(self.device)
98            self.device.add_service(self.dck_gatt_service)  # type: ignore[no-untyped-call]
99
100        return Empty()
101