1# Copyright (C) 2024 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import re
16import sys
17import time
18from threading import Thread
19
20from mmi2grpc._helpers import assert_description, match_description
21from mmi2grpc._proxy import ProfileProxy
22from mmi2grpc._rootcanal import Dongle
23
24from pandora_experimental.gatt_grpc import GATT
25from pandora.host_grpc import Host
26from pandora.host_pb2 import PUBLIC, RANDOM
27from pandora.security_grpc import Security, SecurityStorage
28from pandora.security_pb2 import PairingEventAnswer
29from pandora_experimental.gatt_pb2 import (
30    INVALID_HANDLE,
31    READ_NOT_PERMITTED,
32    UNKNOWN_ERROR,
33    INSUFFICIENT_AUTHENTICATION,
34    ATTRIBUTE_NOT_FOUND,
35    APPLICATION_ERROR,
36    INVALID_ATTRIBUTE_LENGTH,
37    WRITE_NOT_PERMITTED,
38    PERMISSION_NONE,
39    PROPERTY_READ,
40    PROPERTY_WRITE,
41    PERMISSION_READ,
42    PERMISSION_WRITE,
43    PERMISSION_READ_ENCRYPTED,
44    PERMISSION_READ_ENCRYPTED_MITM,
45    PERMISSION_WRITE_ENCRYPTED,
46    PERMISSION_WRITE_ENCRYPTED_MITM,
47    ENABLE_NOTIFICATION_VALUE,
48    ENABLE_INDICATION_VALUE,
49)
50from pandora_experimental.gatt_pb2 import GattServiceParams
51from pandora_experimental.gatt_pb2 import GattCharacteristicParams
52from pandora_experimental.gatt_pb2 import GattDescriptorParams
53from pandora_experimental.gatt_pb2 import ReadCharacteristicResponse
54from pandora_experimental.gatt_pb2 import ReadCharacteristicsFromUuidResponse
55
56# Tests that need GATT cache cleared before discovering services.
57NEEDS_CACHE_CLEARED = {
58    "GATT/CL/GAD/BV-01-C",
59    "GATT/CL/GAD/BV-06-C",
60}
61
62MMI_SERVER = {
63    "GATT/SR/GAD/BV-01-C",
64}
65
66BASE_UUID = "0000XXXX-0000-1000-8000-00805F9B34FB"
67
68# These UUIDs are used as reference for GATT server tests
69BASE_READ_WRITE_SERVICE_UUID = "0000FFFE-0000-1000-8000-00805F9B34FB"
70BASE_READ_CHARACTERISTIC_UUID = "0000FFD-0000-1000-8000-00805F9B34FB"
71BASE_WRITE_CHARACTERISTIC_UUID = "0000FFFA-0000-1000-8000-00805F9B34FB"
72BASE_READ_WRITE_ENCRYPTED_CHARACTERISTIC_UUID = "0000FFF9-0000-1000-8000-00805F9B34FB"
73BASE_READ_WRITE_ENCRYPTED_MITM_CHARACTERISTIC_UUID = "0000FFF8-0000-1000-8000-00805F9B34FB"
74BASE_READ_DESCRIPTOR_UUID = "0000FFF7-0000-1000-8000-00805F9B34FB"
75BASE_WRITE_DESCRIPTOR_UUID = "0000FFF6-0000-1000-8000-00805F9B34FB"
76CUSTOM_SERVICE_UUID = "0000FFF5-0000-1000-8000-00805F9B34FB"
77CUSTOM_CHARACTERISTIC_UUID = "0000FFF4-0000-1000-8000-00805F9B34FB"
78
79
80class GATTProxy(ProfileProxy):
81
82    def __init__(self, channel, rootcanal):
83        super().__init__(channel)
84        self.gatt = GATT(channel)
85        self.host = Host(channel)
86        self.security = Security(channel)
87        self.security_storage = SecurityStorage(channel)
88        self.rootcanal = rootcanal
89        self.connection = None
90        self.services = None
91        self.characteristics = None
92        self.descriptors = None
93        self.read_response = None
94        self.write_response = None
95        self.characteristic_notification_received = False
96        self.handles = []
97        self.written_over_length = False
98        self.last_added_service = None
99
100    def test_started(self, test: str, **kwargs):
101        self.rootcanal.select_pts_dongle(Dongle.CSR_RCK_PTS_DONGLE)
102
103        return "OK"
104
105    @assert_description
106    def MMI_IUT_INITIATE_CONNECTION(self, test, pts_addr: bytes, **kwargs):
107        """
108        Please initiate a GATT connection to the PTS.
109
110        Description: Verify that
111        the Implementation Under Test (IUT) can initiate GATT connect request to
112        PTS.
113        """
114
115        self.connection = self.host.ConnectLE(own_address_type=RANDOM, public=pts_addr).connection
116        if test in NEEDS_CACHE_CLEARED:
117            self.gatt.ClearCache(connection=self.connection)
118        return "OK"
119
120    @assert_description
121    def MMI_IUT_INITIATE_DISCONNECTION(self, **kwargs):
122        """
123        Please initiate a GATT disconnection to the PTS.
124
125        Description: Verify
126        that the Implementation Under Test (IUT) can initiate GATT disconnect
127        request to PTS.
128        """
129
130        assert self.connection is not None
131        self.host.Disconnect(connection=self.connection)
132        self.connection = None
133        self.services = None
134        self.characteristics = None
135        self.descriptors = None
136        self.read_response = None
137        self.write_response = None
138        self.characteristic_notification_received = False
139        self.handles = []
140        self.written_over_length = False
141        self.last_added_service = None
142        return "OK"
143
144    @assert_description
145    def MMI_IUT_MTU_EXCHANGE(self, **kwargs):
146        """
147        Please send exchange MTU command to the PTS.
148
149        Description: Verify that
150        the Implementation Under Test (IUT) can send Exchange MTU command to the
151        tester.
152        """
153
154        assert self.connection is not None
155        self.gatt.ExchangeMTU(mtu=512, connection=self.connection)
156        return "OK"
157
158    def MMI_IUT_SEND_PREPARE_WRITE_REQUEST_VALID_SIZE(self, description: str, **kwargs):
159        """
160        Please send prepare write request with handle = 'XXXX'O and size = 'XXX'
161        to the PTS.
162
163        Description: Verify that the Implementation Under Test
164        (IUT) can send data according to negotiate MTU size.
165        """
166
167        assert self.connection is not None
168        matches = re.findall("'([a0-Z9]*)'O and size = '([a0-Z9]*)'", description)
169        handle = int(matches[0][0], 16)
170        data = bytes([1]) * int(matches[0][1])
171        self.gatt.WriteAttFromHandle(connection=self.connection,\
172                handle=handle, value=data)
173        return "OK"
174
175    @assert_description
176    def MMI_IUT_DISCOVER_PRIMARY_SERVICES(self, **kwargs):
177        """
178        Please send discover all primary services command to the PTS.
179        Description: Verify that the Implementation Under Test (IUT) can send
180        Discover All Primary Services.
181        """
182
183        assert self.connection is not None
184        self.services = self.gatt.DiscoverServices(connection=self.connection).services
185        return "OK"
186
187    def MMI_SEND_PRIMARY_SERVICE_UUID(self, description: str, **kwargs):
188        """
189        Please send discover primary services with UUID value set to 'XXXX'O to
190        the PTS.
191
192        Description: Verify that the Implementation Under Test (IUT)
193        can send Discover Primary Services UUID = 'XXXX'O.
194        """
195
196        assert self.connection is not None
197        uuid = formatUuid(re.findall("'([a0-Z9]*)'O", description)[0])
198        self.services = self.gatt.DiscoverServiceByUuid(connection=self.connection,\
199                uuid=uuid).services
200        return "OK"
201
202    def MMI_SEND_PRIMARY_SERVICE_UUID_128(self, description: str, **kwargs):
203        """
204        Please send discover primary services with UUID value set to
205        'XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX'O to the PTS.
206
207        Description:
208        Verify that the Implementation Under Test (IUT) can send Discover
209        Primary Services UUID = 'XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX'O.
210        """
211
212        assert self.connection is not None
213        uuid = formatUuid(re.findall("'([a0-Z9-]*)'O", description)[0])
214        self.services = self.gatt.DiscoverServiceByUuid(connection=self.connection,\
215                uuid=uuid).services
216        return "OK"
217
218    def MMI_CONFIRM_PRIMARY_SERVICE_UUID(self, **kwargs):
219        """
220        Please confirm IUT received primary services uuid = 'XXXX'O , Service
221        start handle = 'XXXX'O, end handle = 'XXXX'O in database. Click Yes if
222        IUT received it, otherwise click No.
223
224        Description: Verify that the
225        Implementation Under Test (IUT) can send Discover primary service by
226        UUID in database.
227        """
228
229        # Android doesn't store services discovered by UUID.
230        return "Yes"
231
232    @assert_description
233    def MMI_CONFIRM_NO_PRIMARY_SERVICE_SMALL(self, **kwargs):
234        """
235        Please confirm that IUT received NO service uuid found in the small
236        database file. Click Yes if NO service found, otherwise click No.
237        Description: Verify that the Implementation Under Test (IUT) can send
238        Discover primary service by UUID in small database.
239        """
240
241        # Android doesn't store services discovered by UUID.
242        return "Yes"
243
244    def MMI_CONFIRM_PRIMARY_SERVICE_UUID_128(self, **kwargs):
245        """
246        Please confirm IUT received primary services uuid=
247        'XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX'O, Service start handle =
248        'XXXX'O, end handle = 'XXXX'O in database. Click Yes if IUT received it,
249        otherwise click No.
250
251        Description: Verify that the Implementation Under
252        Test (IUT) can send Discover primary service by UUID in database.
253        """
254
255        # Android doesn't store services discovered by UUID.
256        return "Yes"
257
258    def MMI_CONFIRM_PRIMARY_SERVICE(self, test, description: str, **kwargs):
259        """
260        Please confirm IUT received primary services Primary Service = 'XXXX'O
261        Primary Service = 'XXXX'O  in database. Click Yes if IUT received it,
262        otherwise click No.
263
264        Description: Verify that the Implementation Under
265        Test (IUT) can send Discover all primary services in database.
266        """
267
268        if test not in MMI_SERVER:
269            assert self.services is not None
270            all_matches = list(map(formatUuid, re.findall("'([a0-Z9]*)'O", description)))
271            assert all(uuid in list(map(lambda service: service.uuid, self.services))\
272                    for uuid in all_matches)
273        return "OK"
274
275    @assert_description
276    def MMI_IUT_FIND_INCLUDED_SERVICES(self, **kwargs):
277        """
278        Please send discover all include services to the PTS to discover all
279        Include Service supported in the PTS. Discover primary service if
280        needed.
281
282        Description: Verify that the Implementation Under Test (IUT)
283        can send Discover all include services command.
284        """
285
286        assert self.connection is not None
287        self.services = self.gatt.DiscoverServices(connection=self.connection).services
288        return "OK"
289
290    @assert_description
291    def MMI_CONFIRM_NO_INCLUDE_SERVICE(self, **kwargs):
292        """
293        There is no include service in the database file.
294
295        Description: Verify
296        that the Implementation Under Test (IUT) can send Discover all include
297        services in database.
298        """
299
300        assert self.connection is not None
301        assert self.services is not None
302        for service in self.services:
303            assert len(service.included_services) == 0
304        return "OK"
305
306    def MMI_CONFIRM_INCLUDE_SERVICE(self, description: str, **kwargs):
307        """
308        Please confirm IUT received include services:
309
310        Attribute Handle = 'XXXX'O, Included Service Attribute handle = 'XXXX'O,
311        End Group Handle = 'XXXX'O, Service UUID = 'XXXX'O
312
313        Click Yes if IUT received it, otherwise click No.
314
315        Description: Verify
316        that the Implementation Under Test (IUT) can send Discover all include
317        services in database.
318        """
319
320        assert self.connection is not None
321        assert self.services is not None
322        """
323        Number of checks can vary but information is always the same,
324        so we need to iterate through the services and check if its included
325        services match one of these.
326        """
327        all_matches = re.findall("'([a0-Z9]*)'O", description)
328        found_services = 0
329        for service in self.services:
330            for i in range(0, len(all_matches), 4):
331                if compareIncludedServices(service,\
332                        (stringHandleToInt(all_matches[i])),\
333                        stringHandleToInt(all_matches[i + 1]),\
334                        formatUuid(all_matches[i + 3])):
335                    found_services += 1
336        assert found_services == (len(all_matches) / 4)
337        return "Yes"
338
339    def MMI_IUT_DISCOVER_SERVICE_UUID(self, description: str, **kwargs):
340        """
341        Discover all characteristics of service UUID= 'XXXX'O,  Service start
342        handle = 'XXXX'O, end handle = 'XXXX'O.
343
344        Description: Verify that the
345        Implementation Under Test (IUT) can send Discover all charactieristics
346        of a service.
347        """
348
349        assert self.connection is not None
350        service_uuid = formatUuid(re.findall("'([a0-Z9]*)'O", description)[0])
351        self.services = self.gatt.DiscoverServices(connection=self.connection).services
352        self.characteristics = getCharacteristicsForServiceUuid(self.services, service_uuid)
353        return "OK"
354
355    def MMI_CONFIRM_ALL_CHARACTERISTICS_SERVICE(self, description: str, **kwargs):
356        """
357        Please confirm IUT received all characteristics of service
358        handle='XXXX'O handle='XXXX'O handle='XXXX'O handle='XXXX'O
359        handle='XXXX'O handle='XXXX'O handle='XXXX'O handle='XXXX'O
360        handle='XXXX'O handle='XXXX'O handle='XXXX'O  in database. Click Yes if
361        IUT received it, otherwise click No.
362
363        Description: Verify that the
364        Implementation Under Test (IUT) can send Discover all characteristics of
365        a service in database.
366        """
367
368        assert self.characteristics is not None
369        all_matches = list(map(stringCharHandleToInt, re.findall("'([a0-Z9]*)'O", description)))
370        assert all(handle in list(map(lambda char: char.handle, self.characteristics))\
371                for handle in all_matches)
372        return "Yes"
373
374    def MMI_IUT_DISCOVER_SERVICE_UUID_RANGE(self, description: str, **kwargs):
375        """
376        Please send discover characteristics by UUID. Range start from handle =
377        'XXXX'O end handle = 'XXXX'O characteristics UUID = 0xXXXX'O.
378        Description: Verify that the Implementation Under Test (IUT) can send
379        Discover characteristics by UUID.
380        """
381
382        assert self.connection is not None
383        handles = re.findall("'([a0-Z9]*)'O", description)
384        """
385        PTS sends UUIDS description formatted differently in this MMI,
386        so we need to check for each known format.
387        """
388        uuid_match = re.findall("0x([a0-Z9]*)'O", description)
389        if len(uuid_match) == 0:
390            uuid_match = re.search("UUID = (.*)'O", description)
391            uuid = formatUuid(uuid_match[1])
392        else:
393            uuid = formatUuid(uuid_match[0])
394        self.services = self.gatt.DiscoverServices(connection=self.connection).services
395        self.characteristics = getCharacteristicsRange(self.services,\
396                stringHandleToInt(handles[0]), stringHandleToInt(handles[1]), uuid)
397        return "OK"
398
399    def MMI_CONFIRM_CHARACTERISTICS(self, description: str, **kwargs):
400        """
401        Please confirm IUT received characteristic handle='XXXX'O UUID='XXXX'O
402        in database. Click Yes if IUT received it, otherwise click No.
403        Description: Verify that the Implementation Under Test (IUT) can send
404        Discover primary service by UUID in database.
405        """
406
407        assert self.characteristics is not None
408        all_matches = re.findall("'([a0-Z9-]*)'O", description)
409        for characteristic in self.characteristics:
410            if characteristic.handle == stringHandleToInt(all_matches[0])\
411                    and characteristic.uuid == formatUuid(all_matches[1]):
412                return "Yes"
413        raise ValueError
414
415    @assert_description
416    def MMI_CONFIRM_NO_CHARACTERISTICSUUID_SMALL(self, **kwargs):
417        """
418        Please confirm that IUT received NO 128 bit uuid in the small database
419        file. Click Yes if NO handle found, otherwise click No.
420
421        Description:
422        Verify that the Implementation Under Test (IUT) can discover
423        characteristics by UUID in small database.
424        """
425
426        assert self.characteristics is not None
427        assert len(self.characteristics) == 0
428        return "OK"
429
430    def MMI_IUT_DISCOVER_DESCRIPTOR_RANGE(self, description: str, **kwargs):
431        """
432        Please send discover characteristics descriptor range start from handle
433        = 'XXXX'O end handle = 'XXXX'O to the PTS.
434
435        Description: Verify that the
436        Implementation Under Test (IUT) can send Discover characteristics
437        descriptor.
438        """
439
440        assert self.connection is not None
441        handles = re.findall("'([a0-Z9]*)'O", description)
442        self.services = self.gatt.DiscoverServices(connection=self.connection).services
443        self.descriptors = getDescriptorsRange(self.services,\
444                stringHandleToInt(handles[0]), stringHandleToInt(handles[1]))
445        return "OK"
446
447    def MMI_CONFIRM_CHARACTERISTICS_DESCRIPTORS(self, description: str, **kwargs):
448        """
449        Please confirm IUT received characteristic descriptors handle='XXXX'O
450        UUID=0xXXXX  in database. Click Yes if IUT received it, otherwise click
451        No.
452
453        Description: Verify that the Implementation Under Test (IUT) can
454        send Discover characteristic descriptors in database.
455        """
456
457        assert self.descriptors is not None
458        handle = stringHandleToInt(re.findall("'([a0-Z9]*)'O", description)[0])
459        uuid = formatUuid(re.search("UUID=0x(.*)  ", description)[1])
460        for descriptor in self.descriptors:
461            if descriptor.handle == handle and descriptor.uuid == uuid:
462                return "Yes"
463        raise ValueError
464
465    def MMI_IUT_DISCOVER_ALL_SERVICE_RECORD(self, pts_addr: bytes, description: str, **kwargs):
466        """
467        Please send Service Discovery to discover all primary Services. Click
468        YES if GATT='XXXX'O services are discovered, otherwise click No.
469        Description: Verify that the Implementation Under Test (IUT) can
470        discover basic rate all primary services.
471        """
472
473        uuid = formatSdpUuid(re.findall("'([a0-Z9]*)'O", description)[0])
474        self.services = self.gatt.DiscoverServicesSdp(address=pts_addr).service_uuids
475        assert uuid in self.services
476        return "Yes"
477
478    def MMI_IUT_SEND_READ_CHARACTERISTIC_HANDLE(self, description: str, **kwargs):
479        """
480        Please send read characteristic handle = 'XXXX'O to the PTS.
481        Description: Verify that the Implementation Under Test (IUT) can send
482        Read characteristic.
483        """
484
485        assert self.connection is not None
486        handle = stringHandleToInt(re.findall("'([a0-Z9]*)'O", description)[0])
487
488        def read():
489            nonlocal handle
490            self.read_response = self.gatt.ReadCharacteristicFromHandle(\
491                    connection=self.connection, handle=handle)
492
493        worker = Thread(target=read)
494        worker.start()
495        worker.join(timeout=30)
496        return "OK"
497
498    @assert_description
499    def MMI_IUT_READ_TIMEOUT(self, **kwargs):
500        """
501        Please wait for 30 seconds timeout to abort the procedure.
502
503        Description:
504        Verify that the Implementation Under Test (IUT) can handle timeout after
505        send Read characteristic without receiving response in 30 seconds.
506        """
507
508        return "OK"
509
510    @assert_description
511    def MMI_IUT_CONFIRM_READ_INVALID_HANDLE(self, **kwargs):
512        """
513        Please confirm IUT received Invalid handle error. Click Yes if IUT
514        received it, otherwise click No.
515
516        Description: Verify that the
517        Implementation Under Test (IUT) indicate Invalid handle error when read
518        a characteristic.
519        """
520
521        if type(self.read_response) is ReadCharacteristicResponse:
522            assert self.read_response.status == INVALID_HANDLE
523        elif type(self.read_response) is ReadCharacteristicsFromUuidResponse:
524            assert self.read_response.characteristics_read is not None
525            assert INVALID_HANDLE in\
526                    list(map(lambda characteristic_read: characteristic_read.status,\
527                            self.read_response.characteristics_read))
528        return "Yes"
529
530    @assert_description
531    def MMI_IUT_CONFIRM_READ_NOT_PERMITTED(self, **kwargs):
532        """
533        Please confirm IUT received read is not permitted error. Click Yes if
534        IUT received it, otherwise click No.
535
536        Description: Verify that the
537        Implementation Under Test (IUT) indicate read is not permitted error
538        when read a characteristic.
539        """
540
541        # Android read error doesn't return an error code so we have to also
542        # compare to the generic error code here.
543        if type(self.read_response) is ReadCharacteristicResponse:
544            assert self.read_response.status == READ_NOT_PERMITTED or\
545                   self.read_response.status == UNKNOWN_ERROR
546        elif type(self.read_response) is ReadCharacteristicsFromUuidResponse:
547            assert self.read_response.characteristics_read is not None
548            status_list = list(map(lambda characteristic_read: characteristic_read.status,\
549                    self.read_response.characteristics_read))
550            assert READ_NOT_PERMITTED in status_list or\
551                     UNKNOWN_ERROR in status_list
552        return "Yes"
553
554    @assert_description
555    def MMI_IUT_CONFIRM_READ_AUTHENTICATION(self, **kwargs):
556        """
557        Please confirm IUT received authentication error. Click Yes if IUT
558        received it, otherwise click No.
559
560        Description: Verify that the
561        Implementation Under Test (IUT) indicate authentication error when read
562        a characteristic.
563        """
564
565        if type(self.read_response) is ReadCharacteristicResponse:
566            assert self.read_response.status == INSUFFICIENT_AUTHENTICATION
567        elif type(self.read_response) is ReadCharacteristicsFromUuidResponse:
568            assert self.read_response.characteristics_read is not None
569            assert INSUFFICIENT_AUTHENTICATION in\
570                    list(map(lambda characteristic_read: characteristic_read.status,\
571                            self.read_response.characteristics_read))
572        return "Yes"
573
574    def MMI_IUT_SEND_READ_CHARACTERISTIC_UUID(self, description: str, **kwargs):
575        """
576        Please send read using characteristic UUID = 'XXXX'O handle range =
577        'XXXX'O to 'XXXX'O to the PTS.
578
579        Description: Verify that the
580        Implementation Under Test (IUT) can send Read characteristic by UUID.
581        """
582
583        assert self.connection is not None
584        matches = re.findall("'([a0-Z9]*)'O", description)
585        self.read_response = self.gatt.ReadCharacteristicsFromUuid(\
586                connection=self.connection, uuid=formatUuid(matches[0]),\
587                start_handle=stringHandleToInt(matches[1]),\
588                end_handle=stringHandleToInt(matches[2]))
589        return "OK"
590
591    @assert_description
592    def MMI_IUT_CONFIRM_ATTRIBUTE_NOT_FOUND(self, **kwargs):
593        """
594        Please confirm IUT received attribute not found error. Click Yes if IUT
595        received it, otherwise click No.
596
597        Description: Verify that the
598        Implementation Under Test (IUT) indicate attribute not found error when
599        read a characteristic.
600        """
601
602        # Android read error doesn't return an error code so we have to also
603        # compare to the generic error code here.
604        if type(self.read_response) is ReadCharacteristicResponse:
605            assert self.read_response.status == ATTRIBUTE_NOT_FOUND or\
606                    self.read_response.status == UNKNOWN_ERROR
607        elif type(self.read_response) is ReadCharacteristicsFromUuidResponse:
608            assert self.read_response.characteristics_read is not None
609            status_list = list(map(lambda characteristic_read: characteristic_read.status,\
610                    self.read_response.characteristics_read))
611            assert ATTRIBUTE_NOT_FOUND in status_list or\
612                    UNKNOWN_ERROR in status_list
613        return "Yes"
614
615    def MMI_IUT_SEND_READ_GREATER_OFFSET(self, description: str, **kwargs):
616        """
617        Please send read to handle = 'XXXX'O and offset greater than 'XXXX'O to
618        the PTS.
619
620        Description: Verify that the Implementation Under Test (IUT)
621        can send Read with invalid offset.
622        """
623
624        # Android handles the read offset internally, so we just do read with handle here.
625        # Unfortunately for testing, this will always work.
626        assert self.connection is not None
627        handle = stringHandleToInt(re.findall("'([a0-Z9]*)'O", description)[0])
628        self.read_response = self.gatt.ReadCharacteristicFromHandle(\
629                connection=self.connection, handle=handle)
630        return "OK"
631
632    @assert_description
633    def MMI_IUT_CONFIRM_READ_INVALID_OFFSET(self, **kwargs):
634        """
635        Please confirm IUT received Invalid offset error. Click Yes if IUT
636        received it, otherwise click No.
637
638        Description: Verify that the
639        Implementation Under Test (IUT) indicate Invalid offset error when read
640        a characteristic.
641        """
642
643        # Android handles read offset internally, so we can't read with wrong offset.
644        return "Yes"
645
646    @assert_description
647    def MMI_IUT_CONFIRM_READ_APPLICATION(self, **kwargs):
648        """
649        Please confirm IUT received Application error. Click Yes if IUT received
650        it, otherwise click No.
651
652        Description: Verify that the Implementation
653        Under Test (IUT) indicate Application error when read a characteristic.
654        """
655
656        if type(self.read_response) is ReadCharacteristicResponse:
657            assert self.read_response.status == APPLICATION_ERROR
658        elif type(self.read_response) is ReadCharacteristicsFromUuidResponse:
659            assert self.read_response.characteristics_read is not None
660            assert APPLICATION_ERROR in\
661                    list(map(lambda characteristic_read: characteristic_read.status,\
662                            self.read_response.characteristics_read))
663        return "Yes"
664
665    def MMI_IUT_CONFIRM_READ_CHARACTERISTIC_VALUE(self, description: str, **kwargs):
666        """
667        Please confirm IUT received characteristic value='XX'O in random
668        selected adopted database. Click Yes if IUT received it, otherwise click
669        No.
670
671        Description: Verify that the Implementation Under Test (IUT) can
672        send Read characteristic to PTS random select adopted database.
673        """
674
675        characteristic_value = bytes.fromhex(re.findall("'([a0-Z9]*)'O", description)[0])
676        if type(self.read_response) is ReadCharacteristicResponse:
677            assert self.read_response.value is not None
678            assert characteristic_value in self.read_response.value.value
679        elif type(self.read_response) is ReadCharacteristicsFromUuidResponse:
680            assert self.read_response.characteristics_read is not None
681            assert characteristic_value in list(map(\
682                    lambda characteristic_read: characteristic_read.value.value,\
683                    self.read_response.characteristics_read))
684        return "Yes"
685
686    def MMI_IUT_READ_BY_TYPE_UUID(self, description: str, **kwargs):
687        """
688        Please send read by type characteristic UUID = 'XXXX'O to the PTS.
689        Description: Verify that the Implementation Under Test (IUT) can send
690        Read characteristic.
691        """
692
693        assert self.connection is not None
694        matches = re.findall("'([a0-Z9]*)'O", description)
695        self.read_response = self.gatt.ReadCharacteristicsFromUuid(\
696                connection=self.connection, uuid=formatUuid(matches[0]),\
697                start_handle=0x0001,\
698                end_handle=0xffff)
699        return "OK"
700
701    def MMI_IUT_READ_BY_TYPE_UUID_ALT(self, description: str, **kwargs):
702        """
703        Please send read by type characteristic UUID =
704        'XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX'O to the PTS.
705
706        Description:
707        Verify that the Implementation Under Test (IUT) can send Read
708        characteristic.
709        """
710
711        assert self.connection is not None
712        uuid = formatUuid(re.findall("'([a0-Z9-]*)'O", description)[0])
713        self.read_response = self.gatt.ReadCharacteristicsFromUuid(\
714                connection=self.connection, uuid=uuid, start_handle=0x0001, end_handle=0xffff)
715        return "OK"
716
717    def MMI_IUT_CONFIRM_READ_HANDLE_VALUE(self, description: str, **kwargs):
718        """
719        Please confirm IUT Handle='XX'O characteristic
720        value='XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'O in random
721        selected adopted database. Click Yes if it matches the IUT, otherwise
722        click No.
723
724        Description: Verify that the Implementation Under Test (IUT)
725        can send Read long characteristic to PTS random select adopted database.
726        """
727
728        bytes_value = bytes.fromhex(re.search("value='(.*)'O", description)[1])
729        if type(self.read_response) is ReadCharacteristicResponse:
730            assert self.read_response.value is not None
731            assert self.read_response.value.value == bytes_value
732        elif type(self.read_response) is ReadCharacteristicsFromUuidResponse:
733            assert self.read_response.characteristics_read is not None
734            assert bytes_value in list(map(\
735                    lambda characteristic_read: characteristic_read.value.value,\
736                    self.read_response.characteristics_read))
737        return "Yes"
738
739    def MMI_IUT_SEND_READ_DESCIPTOR_HANDLE(self, description: str, **kwargs):
740        """
741        Please send read characteristic descriptor handle = 'XXXX'O to the PTS.
742        Description: Verify that the Implementation Under Test (IUT) can send
743        Read characteristic descriptor.
744        """
745
746        assert self.connection is not None
747        handle = stringHandleToInt(re.findall("'([a0-Z9]*)'O", description)[0])
748        self.read_response = self.gatt.ReadCharacteristicDescriptorFromHandle(\
749                connection=self.connection, handle=handle)
750        return "OK"
751
752    def MMI_IUT_CONFIRM_READ_DESCRIPTOR_VALUE(self, description: str, **kwargs):
753        """
754        Please confirm IUT received Descriptor value='XXXXXXXX'O in random
755        selected adopted database. Click Yes if IUT received it, otherwise click
756        No.
757
758        Description: Verify that the Implementation Under Test (IUT) can
759        send Read Descriptor to PTS random select adopted database.
760        """
761
762        assert self.read_response.value is not None
763        bytes_value = bytes.fromhex(re.search("value='(.*)'O", description)[1])
764        assert self.read_response.value.value == bytes_value
765        return "Yes"
766
767    def MMI_IUT_SEND_WRITE_REQUEST(self, description: str, **kwargs):
768        """
769        Please send write request with characteristic handle = 'XXXX'O with <=
770        'X' byte of any octet value to the PTS.
771
772        Description: Verify that the
773        Implementation Under Test (IUT) can send write request.
774        """
775
776        assert self.connection is not None
777        matches = re.findall("'([a0-Z9]*)'O with <= '([a0-Z9]*)'", description)
778        handle = stringHandleToInt(matches[0][0])
779        data = bytes([1]) * int(matches[0][1])
780
781        def write():
782            nonlocal handle
783            nonlocal data
784            self.write_response = self.gatt.WriteAttFromHandle(connection=self.connection,\
785                handle=handle, value=data)
786
787        worker = Thread(target=write)
788        worker.start()
789        worker.join(timeout=30)
790        return "OK"
791
792    @assert_description
793    def MMI_IUT_WRITE_TIMEOUT(self, **kwargs):
794        """
795        Please wait for 30 second timeout to abort the procedure.
796
797        Description:
798        Verify that the Implementation Under Test (IUT) can handle timeout after
799        send Write characteristic without receiving response in 30 seconds.
800        """
801
802        return "OK"
803
804    @assert_description
805    def MMI_IUT_CONFIRM_WRITE_INVALID_HANDLE(self, **kwargs):
806        """
807        Please confirm IUT received Invalid handle error. Click Yes if IUT
808        received it, otherwise click No.
809
810        Description: Verify that the
811        Implementation Under Test (IUT) indicate Invalid handle error when write
812        a characteristic.
813        """
814
815        assert self.write_response is not None
816        assert self.write_response.status == INVALID_HANDLE
817        return "Yes"
818
819    @assert_description
820    def MMI_IUT_CONFIRM_WRITE_NOT_PERMITTED(self, **kwargs):
821        """
822        Please confirm IUT received write is not permitted error. Click Yes if
823        IUT received it, otherwise click No.
824
825        Description: Verify that the
826        Implementation Under Test (IUT) indicate write is not permitted error
827        when write a characteristic.
828        """
829
830        assert self.write_response is not None
831        assert self.write_response.status == WRITE_NOT_PERMITTED
832        return "Yes"
833
834    def MMI_IUT_SEND_PREPARE_WRITE(self, description: str, **kwargs):
835        """
836        Please send prepare write request with handle = 'XXXX'O <= 'XX' byte of
837        any octet value to the PTS.
838
839        Description: Verify that the Implementation
840        Under Test (IUT) can send prepare write request.
841        """
842
843        assert self.connection is not None
844        matches = re.findall("'([a0-Z9]*)'O <= '([a0-Z9]*)'", description)
845        handle = stringHandleToInt(matches[0][0])
846        data = bytes([1]) * int(matches[0][1])
847        self.write_response = self.gatt.WriteAttFromHandle(connection=self.connection,\
848                handle=handle, value=data)
849        return "OK"
850
851    def _mmi_150(self, description: str, **kwargs):
852        """
853        Please send an ATT_Write_Request to Client Support Features handle =
854        'XXXX'O to enable Multiple Handle Value Notifications.
855
856        Discover all
857        characteristics if needed.
858        """
859
860        return self.MMI_IUT_WRITE_SUPPORT_FEATURE_MULTIPLE_HANDLE_VALUE(description, **kwargs)
861
862    def MMI_IUT_WRITE_SUPPORT_FEATURE_MULTIPLE_HANDLE_VALUE(self, description: str, **kwargs):
863        """
864        Please send an ATT_Write_Request to Client Support Features handle =
865        'XXXX'O to enable Multiple Handle Value Notifications.
866
867        Discover all
868        characteristics if needed.
869        """
870
871        assert self.connection is not None
872        handle = stringHandleToInt(re.findall("'([a0-Z9]*)'O", description)[0])
873        data = bytes([4])  # Multiple Handle Value Notifications
874        self.write_response = self.gatt.WriteAttFromHandle(connection=self.connection,\
875                handle=handle, value=data)
876        return "OK"
877
878    def MMI_IUT_SEND_PREPARE_WRITE_GREATER_OFFSET(self, description: str, **kwargs):
879        """
880        Please send prepare write request with handle = 'XXXX'O and offset
881        greater than 'XX' byte to the PTS.
882
883        Description: Verify that the
884        Implementation Under Test (IUT) can send prepare write request.
885        """
886
887        assert self.connection is not None
888        matches = re.findall("'([a0-Z9]*)'O and offset greater than '([a0-Z9]*)'", description)
889        handle = stringHandleToInt(matches[0][0])
890        # Android APIs does not permit offset write, however we can test this by writing a value
891        # longer than the characteristic's value size. As sometimes this MMI description will ask
892        # for values greater than 512 bytes, we have to check for this or Android Bluetooth will
893        # crash. Setting written_over_length to True in order to perform the check in next MMI.
894        offset = int(matches[0][1]) + 1
895        if offset <= 512:
896            data = bytes([1]) * offset
897            self.written_over_length = True
898        else:
899            data = bytes([1]) * 512
900        self.write_response = self.gatt.WriteAttFromHandle(connection=self.connection,\
901                handle=handle, value=data)
902        return "OK"
903
904    @assert_description
905    def MMI_IUT_SEND_EXECUTE_WRITE_REQUEST(self, **kwargs):
906        """
907        Please send execute write request to the PTS.
908
909        Description: Verify that
910        the Implementation Under Test (IUT) can send execute write request.
911        """
912
913        # PTS Sends this MMI after the MMI_IUT_SEND_PREPARE_WRITE_GREATER_OFFSET,
914        # nothing to do as we already wrote.
915        return "OK"
916
917    @assert_description
918    def MMI_IUT_CONFIRM_WRITE_INVALID_OFFSET(self, **kwargs):
919        """
920        Please confirm IUT received Invalid offset error. Click Yes if IUT
921        received it, otherwise click No.
922
923        Description: Verify that the
924        Implementation Under Test (IUT) indicate Invalid offset error when write
925        a characteristic.
926        """
927
928        assert self.write_response is not None
929        # See MMI_IUT_SEND_PREPARE_WRITE_GREATER_OFFSET
930        if self.written_over_length == True:
931            assert self.write_response.status == INVALID_ATTRIBUTE_LENGTH
932        return "OK"
933
934    def MMI_IUT_SEND_WRITE_REQUEST_GREATER(self, description: str, **kwargs):
935        """
936        Please send write request with characteristic handle = 'XXXX'O with
937        greater than 'X' byte of any octet value to the PTS.
938
939        Description:
940        Verify that the Implementation Under Test (IUT) can send write request.
941        """
942
943        assert self.connection is not None
944        matches = re.findall("'([a0-Z9]*)'O with greater than '([a0-Z9]*)'", description)
945        handle = stringHandleToInt(matches[0][0])
946        data = bytes([1]) * (int(matches[0][1]) + 1)
947        self.write_response = self.gatt.WriteAttFromHandle(connection=self.connection,\
948                handle=handle, value=data)
949        return "OK"
950
951    @assert_description
952    def MMI_IUT_CONFIRM_WRITE_INVALID_LENGTH(self, **kwargs):
953        """
954        Please confirm IUT received Invalid attribute value length error. Click
955        Yes if IUT received it, otherwise click No.
956
957        Description: Verify that
958        the Implementation Under Test (IUT) indicate Invalid attribute value
959        length error when write a characteristic.
960        """
961
962        assert self.write_response is not None
963        assert self.write_response.status == INVALID_ATTRIBUTE_LENGTH
964        return "OK"
965
966    def MMI_IUT_SEND_PREPARE_WRITE_REQUEST_GREATER(self, description: str, **kwargs):
967        """
968        Please send prepare write request with handle = 'XXXX'O with greater
969        than 'XX' byte of any octet value to the PTS.
970
971        Description: Verify that
972        the Implementation Under Test (IUT) can send prepare write request.
973        """
974
975        assert self.connection is not None
976        matches = re.findall("'([a0-Z9]*)'O with greater than '([a0-Z9]*)'", description)
977        handle = stringHandleToInt(matches[0][0])
978        data = bytes([1]) * (int(matches[0][1]) + 1)
979        self.write_response = self.gatt.WriteAttFromHandle(connection=self.connection,\
980                handle=handle, value=data)
981        return "OK"
982
983    def MMI_IUT_SEND_WRITE_COMMAND(self, description: str, **kwargs):
984        """
985        Please send write command with handle = 'XXXX'O with <= 'X' bytes of any
986        octet value to the PTS.
987
988        Description: Verify that the Implementation
989        Under Test (IUT) can send write request.
990        """
991
992        assert self.connection is not None
993        matches = re.findall("'([a0-Z9]*)'O with <= '([a0-Z9]*)'", description)
994        handle = stringHandleToInt(matches[0][0])
995        data = bytes([1]) * int(matches[0][1])
996        self.write_response = self.gatt.WriteAttFromHandle(connection=self.connection,\
997                handle=handle, value=data)
998        return "OK"
999
1000    def MMI_MAKE_IUT_CONNECTABLE(self, **kwargs):
1001        """
1002        Please prepare IUT into a connectable mode.
1003
1004        Description: Verify that
1005        the Implementation Under Test (IUT) can accept GATT connect request from
1006        PTS.
1007        """
1008        self.advertise = self.host.Advertise(
1009            legacy=True,
1010            connectable=True,
1011            own_address_type=PUBLIC,
1012        )
1013        self.pairing_events = self.security.OnPairing()
1014        time.sleep(1)
1015        self.gatt.RegisterService(service=GattServiceParams(
1016            uuid=BASE_READ_WRITE_SERVICE_UUID,
1017            characteristics=[
1018                GattCharacteristicParams(
1019                    uuid=BASE_READ_CHARACTERISTIC_UUID,
1020                    properties=PROPERTY_READ,
1021                    permissions=PERMISSION_READ,
1022                    descriptors=[
1023                        GattDescriptorParams(
1024                            uuid=BASE_READ_DESCRIPTOR_UUID,
1025                            properties=PROPERTY_READ,
1026                            permissions=PERMISSION_READ,
1027                        ),
1028                    ],
1029                ),
1030                GattCharacteristicParams(
1031                    uuid=BASE_WRITE_CHARACTERISTIC_UUID,
1032                    properties=PROPERTY_WRITE,
1033                    permissions=PERMISSION_WRITE,
1034                ),
1035                GattCharacteristicParams(
1036                    uuid=BASE_READ_WRITE_ENCRYPTED_CHARACTERISTIC_UUID,
1037                    properties=PROPERTY_READ | PROPERTY_WRITE,
1038                    permissions=PERMISSION_READ_ENCRYPTED | PERMISSION_WRITE_ENCRYPTED,
1039                ),
1040                GattCharacteristicParams(
1041                    uuid=BASE_READ_WRITE_ENCRYPTED_MITM_CHARACTERISTIC_UUID,
1042                    properties=PROPERTY_READ | PROPERTY_WRITE,
1043                    permissions=PERMISSION_READ_ENCRYPTED_MITM | PERMISSION_WRITE_ENCRYPTED_MITM,
1044                ),
1045            ],
1046        ))
1047
1048        return "OK"
1049
1050    def MMI_CONFIRM_IUT_PRIMARY_SERVICE_128(self, **kwargs):
1051        """
1052        Please confirm IUT have following primary services UUID= 'XXXX'O
1053        Service start handle = 'XXXX'O, end handle = 'XXXX'O. Click Yes if IUT
1054        have it, otherwise click No.
1055
1056        Description: Verify that the
1057        Implementation Under Test (IUT) can respond Discover all primary
1058        services by UUID.
1059        """
1060
1061        return "Yes"
1062
1063    def MMI_CONFIRM_CHARACTERISTICS_SERVICE(self, **kwargs):
1064        """
1065        Please confirm IUT have following characteristics in services UUID=
1066        'XXXX'O handle='XXXX'O handle='XXXX'O handle='XXXX'O handle='XXXX'O .
1067        Click Yes if IUT have it, otherwise click No.
1068
1069        Description: Verify that
1070        the Implementation Under Test (IUT) can respond Discover all
1071        characteristics of a service.
1072        """
1073
1074        return "Yes"
1075
1076    def MMI_CONFIRM_SERVICE_UUID(self, **kwargs):
1077        """
1078        Please confirm the following handles for GATT Service UUID = 0xXXXX.
1079        Start Handle = 0xXXXX
1080        End Handle = 0xXXXX
1081        """
1082
1083        return "Yes"
1084
1085    def MMI_IUT_ENTER_HANDLE_INVALID(self, **kwargs):
1086        """
1087        Please input a handle(0x)(Range 0x0001-0xFFFF) that is known to be
1088        invalid.
1089
1090        Description: Verify that the Implementation Under Test (IUT)
1091        can issue an Invalid Handle Response.
1092        """
1093
1094        return "FFFF"
1095
1096    def MMI_IUT_NO_SECURITY(self, **kwargs):
1097        """
1098        Please make sure IUT does not initiate security procedure.
1099
1100        Description:
1101        PTS will delete bond information. Test case requires that no
1102        authentication or authorization procedure has been performed between the
1103        IUT and the test system.
1104        """
1105
1106        return "OK"
1107
1108    def MMI_IUT_ENTER_UUID_READ_NOT_PERMITTED(self, **kwargs):
1109        """
1110        Enter UUID(0x) response with Read Not Permitted.
1111
1112        Description: Verify
1113        that the Implementation Under Test (IUT) can respond Read Not Permitted.
1114        """
1115
1116        self.last_added_service = self.gatt.RegisterService(service=GattServiceParams(
1117            uuid=CUSTOM_SERVICE_UUID,
1118            characteristics=[
1119                GattCharacteristicParams(
1120                    uuid=CUSTOM_CHARACTERISTIC_UUID,
1121                    properties=PROPERTY_READ,
1122                    permissions=PERMISSION_NONE,
1123                ),
1124            ],
1125        ))
1126        return CUSTOM_CHARACTERISTIC_UUID[4:8].upper()
1127
1128    def MMI_IUT_ENTER_HANDLE_READ_NOT_PERMITTED(self, **kwargs):
1129        """
1130        Please input a handle(0x)(Range 0x0001-0xFFFF) that doesn't permit
1131        reading (i.e. Read Not Permitted)
1132
1133        Description: Verify that the
1134        Implementation Under Test (IUT) can issue a Read Not Permitted Response.
1135        """
1136
1137        return "{:04x}".format(self.last_added_service.service.characteristics[0].handle)
1138
1139    def MMI_IUT_ENTER_UUID_ATTRIBUTE_NOT_FOUND(self, **kwargs):
1140        """
1141        Enter UUID(0x) response with Attribute Not Found.
1142
1143        Description: Verify
1144        that the Implementation Under Test (IUT) can respond Attribute Not
1145        Found.
1146        """
1147
1148        return CUSTOM_CHARACTERISTIC_UUID[4:8].upper()
1149
1150    def MMI_IUT_ENTER_UUID_INSUFFICIENT_AUTHENTICATION(self, **kwargs):
1151        """
1152        Enter UUID(0x) response with Insufficient Authentication.
1153
1154        Description:
1155        Verify that the Implementation Under Test (IUT) can respond Insufficient
1156        Authentication.
1157        """
1158
1159        self.last_added_service = self.gatt.RegisterService(service=GattServiceParams(
1160            uuid=CUSTOM_SERVICE_UUID,
1161            characteristics=[
1162                GattCharacteristicParams(
1163                    uuid=CUSTOM_CHARACTERISTIC_UUID,
1164                    properties=PROPERTY_READ,
1165                    permissions=PERMISSION_READ_ENCRYPTED,
1166                ),
1167            ],
1168        ))
1169        return CUSTOM_CHARACTERISTIC_UUID[4:8].upper()
1170
1171    def MMI_IUT_ENTER_HANDLE_INSUFFICIENT_AUTHENTICATION(self, **kwargs):
1172        """
1173        Enter Handle(0x)(Range 0x0001-0xFFFF) response with Insufficient
1174        Authentication.
1175
1176        Description: Verify that the Implementation Under Test
1177        (IUT) can respond Insufficient Authentication.
1178        """
1179
1180        return "{:04x}".format(self.last_added_service.service.characteristics[0].handle)
1181
1182    def MMI_IUT_ENTER_HANDLE_READ_NOT_PERMITTED(self, **kwargs):
1183        """
1184        Please input a handle(0x)(Range 0x0001-0xFFFF) that doesn't permit
1185        reading (i.e. Read Not Permitted)
1186
1187        Description: Verify that the
1188        Implementation Under Test (IUT) can issue a Read Not Permitted Response.
1189        """
1190
1191        self.last_added_service = self.gatt.RegisterService(service=GattServiceParams(
1192            uuid=CUSTOM_SERVICE_UUID,
1193            characteristics=[
1194                GattCharacteristicParams(
1195                    uuid=CUSTOM_CHARACTERISTIC_UUID,
1196                    properties=PROPERTY_READ,
1197                    permissions=PERMISSION_NONE,
1198                ),
1199            ],
1200        ))
1201        return "{:04x}".format(self.last_added_service.service.characteristics[0].handle)
1202
1203    def MMI_IUT_CONFIRM_READ_MULTIPLE_HANDLE_VALUES(self, **kwargs):
1204        """
1205        Please confirm IUT Handle pair = 'XXXX'O 'XXXX'O
1206        value='XXXXXXXXXXXXXXXXXXXXXXXXXXX in random selected
1207        adopted database. Click Yes if it matches the IUT, otherwise click No.
1208        Description: Verify that the Implementation Under Test (IUT) can send
1209        Read multiple characteristics.
1210        """
1211
1212        return "OK"
1213
1214    def MMI_IUT_ENTER_HANDLE_WRITE_NOT_PERMITTED(self, **kwargs):
1215        """
1216        Enter Handle(0x) response with Write Not Permitted.
1217
1218        Description:
1219        Verify that the Implementation Under Test (IUT) can respond Write Not
1220        Permitted.
1221        """
1222
1223        self.last_added_service = self.gatt.RegisterService(service=GattServiceParams(
1224            uuid=CUSTOM_SERVICE_UUID,
1225            characteristics=[
1226                GattCharacteristicParams(
1227                    uuid=CUSTOM_CHARACTERISTIC_UUID,
1228                    properties=PROPERTY_WRITE,
1229                    permissions=PERMISSION_NONE,
1230                ),
1231            ],
1232        ))
1233        return "{:04x}".format(self.last_added_service.service.characteristics[0].handle)
1234
1235    def MMI_IUT_SEND_INDICATION(self, description: str, **kwargs):
1236        """
1237        Please write to client characteristic configuration handle = 'XXXX'O to
1238        enable indication to the PTS. Discover all characteristics if needed.
1239        Description: Verify that the Implementation Under Test (IUT) can receive
1240        indication sent from PTS.
1241        """
1242        assert self.connection is not None
1243        handle = stringHandleToInt(re.findall("'([a0-Z9]*)'O", description)[0])
1244        self.handles.append(handle)
1245        self.gatt.SetCharacteristicNotificationFromHandle(connection=self.connection,\
1246                handle=handle, enable_value=ENABLE_INDICATION_VALUE)
1247
1248        return "OK"
1249
1250    @assert_description
1251    def MMI_IUT_RECEIVE_INDICATION(self, **kwargs):
1252        """
1253        Please confirm IUT received indication from PTS. Click YES if received,
1254        otherwise NO.
1255
1256        Description: Verify that the Implementation Under Test
1257        (IUT) can receive indication send from PTS.
1258        """
1259        for handle in self.handles:
1260            self.characteristic_notification_received = self.gatt.WaitCharacteristicNotification(
1261                connection=self.connection, handle=handle).characteristic_notification_received
1262            assert self.characteristic_notification_received
1263
1264        return "OK"
1265
1266    def MMI_IUT_SEND_NOTIFICATION(self, description: str, **kwargs):
1267        """
1268        Please write to client characteristic configuration handle = 'XXXX'O to
1269        enable notification to the PTS. Discover all characteristics if needed.
1270        Description: Verify that the Implementation Under Test (IUT) can receive
1271        notification sent from PTS.
1272        """
1273        assert self.connection is not None
1274        handle = stringHandleToInt(re.findall("'([a0-Z9]*)'O", description)[0])
1275        self.handles.append(handle)
1276        self.gatt.SetCharacteristicNotificationFromHandle(connection=self.connection,\
1277                handle=handle, enable_value=ENABLE_NOTIFICATION_VALUE)
1278
1279        return "OK"
1280
1281    @assert_description
1282    def MMI_IUT_RECEIVE_NOTIFICATION(self, **kwargs):
1283        """
1284        Please confirm IUT received notification from PTS. Click YES if
1285        received, otherwise NO.
1286
1287        Description: Verify that the Implementation
1288        Under Test (IUT) can receive notification send from PTS.
1289        """
1290        for handle in self.handles:
1291            self.characteristic_notification_received = self.gatt.WaitCharacteristicNotification(
1292                connection=self.connection, handle=handle).characteristic_notification_received
1293            assert self.characteristic_notification_received
1294
1295        return "OK"
1296
1297    @assert_description
1298    def MMI_IUT_DELETE_SECUIRTY_KEY(self, pts_addr: bytes, **kwargs):
1299        """
1300        Please delete security key before connecting to PTS if IUT was bonded
1301        previously.
1302        """
1303        self.security_storage.DeleteBond(public=pts_addr)
1304
1305        return "OK"
1306
1307    def MMI_IUT_WRITE_SUPPORT_FEATURE(self, description: str, **kwargs):
1308        """
1309        Please write to client support feature handle = 'XXXX'O to enable Robust
1310        Caching. Discover all characteristics if needed.
1311        """
1312        assert self.connection is not None
1313        handle = stringHandleToInt(re.findall("'([a0-Z9]*)'O", description)[0])
1314        data = bytes([1])
1315        self.write_response = self.gatt.WriteAttFromHandle(connection=self.connection,\
1316                handle=handle, value=data)
1317
1318        return "OK"
1319
1320    @match_description
1321    def _mmi_2004(self, pts_addr: bytes, passkey: str, **kwargs):
1322        """
1323        Please confirm that 6 digit number is matched with (?P<passkey>[0-9]+).
1324        """
1325
1326        for event in self.pairing_events:
1327            if event.address == pts_addr and event.numeric_comparison == int(passkey):
1328                self.pairing_events.send(PairingEventAnswer(
1329                    event=event,
1330                    confirm=True,
1331                ))
1332                return "OK"
1333
1334        assert False
1335
1336    @match_description
1337    def MMI_CONFIRM_IUT_PRIMARY_SERVICE(self, services: str, **kwargs):
1338        """
1339        Please confirm IUT have following primary services UUID= (?P<services>.*?).
1340        Click Yes if IUT have it, otherwise click No.
1341        Description: Verify that the Implementation Under Test \(IUT\) can respond
1342        Discover all primary services by UUID.
1343        """
1344
1345        return "OK"
1346
1347    @match_description
1348    def MMI_CONFIRM_PASSKEY(self, pts_addr: bytes, passkey: str, **kwargs):
1349        """
1350        The secureId is (?P<passkey>[0-9]+).
1351        """
1352
1353        for event in self.pairing_events:
1354            if event.address == pts_addr and event.passkey_entry_request:
1355                self.pairing_events.send(PairingEventAnswer(event=event, passkey=int(passkey)))
1356                return "OK"
1357
1358        assert False
1359
1360
1361def stringHandleToInt(handle: str):
1362    return int(handle, 16)
1363
1364
1365# Discovered characteristics handles are 1 more than PTS handles in one test.
1366def stringCharHandleToInt(handle: str):
1367    return (int(handle, 16) + 1)
1368
1369
1370def formatUuid(uuid: str):
1371    """
1372    Formats PTS described UUIDs to be of the right format.
1373    Right format is: 'XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX'
1374    PTS described format can be:
1375    - 'XXXX'
1376    - 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
1377    - 'XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX-XXXX'
1378    """
1379    uuid_len = len(uuid)
1380    if uuid_len == 4:
1381        return BASE_UUID.replace(BASE_UUID[4:8], uuid.upper())
1382    elif uuid_len == 32 or uuid_len == 39:
1383        uuidCharList = list(uuid.replace('-', '').upper())
1384        uuidCharList.insert(20, '-')
1385        uuidCharList.insert(16, '-')
1386        uuidCharList.insert(12, '-')
1387        uuidCharList.insert(8, '-')
1388        return ''.join(uuidCharList)
1389    else:
1390        return uuid.upper()
1391
1392
1393# PTS asks wrong uuid for services discovered by SDP in some tests
1394def formatSdpUuid(uuid: str):
1395    if uuid[3] == '1':
1396        uuid = uuid[:3] + 'f'
1397    return BASE_UUID.replace(BASE_UUID[4:8], uuid.upper())
1398
1399
1400def compareIncludedServices(service, service_handle, included_handle, included_uuid):
1401    """
1402    Compares included services with given values.
1403    The service_handle passed by the PTS is
1404    [primary service handle] + [included service number].
1405    """
1406    included_service_count = 1
1407    for included_service in service.included_services:
1408        if service.handle == (service_handle - included_service_count)\
1409                and included_service.handle == included_handle\
1410                and included_service.uuid == included_uuid:
1411            return True
1412        included_service_count += 1
1413    return False
1414
1415
1416def getCharacteristicsForServiceUuid(services, uuid):
1417    """
1418    Return an array of characteristics for matching service uuid.
1419    """
1420    for service in services:
1421        if service.uuid == uuid:
1422            return service.characteristics
1423    return []
1424
1425
1426def getCharacteristicsRange(services, start_handle, end_handle, uuid):
1427    """
1428    Return an array of characteristics of which handles are
1429    between start_handle and end_handle and uuid matches.
1430    """
1431    characteristics_list = []
1432    for service in services:
1433        for characteristic in service.characteristics:
1434            if characteristic.handle >= start_handle\
1435                    and characteristic.handle <= end_handle\
1436                    and characteristic.uuid == uuid:
1437                characteristics_list.append(characteristic)
1438    return characteristics_list
1439
1440
1441def getDescriptorsRange(services, start_handle, end_handle):
1442    """
1443    Return an array of descriptors of which handles are
1444    between start_handle and end_handle.
1445    """
1446    descriptors_list = []
1447    for service in services:
1448        for characteristic in service.characteristics:
1449            for descriptor in characteristic.descriptors:
1450                if descriptor.handle >= start_handle and descriptor.handle <= end_handle:
1451                    descriptors_list.append(descriptor)
1452    return descriptors_list
1453