1#
2#   Copyright 2019 - The Android Open Source Project
3#
4#   Licensed under the Apache License, Version 2.0 (the "License");
5#   you may not use this file except in compliance with the License.
6#   You may obtain a copy of the License at
7#
8#       http://www.apache.org/licenses/LICENSE-2.0
9#
10#   Unless required by applicable law or agreed to in writing, software
11#   distributed under the License is distributed on an "AS IS" BASIS,
12#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13#   See the License for the specific language governing permissions and
14#   limitations under the License.
15
16from datetime import timedelta
17
18from mobly import asserts
19
20from bluetooth_packets_python3 import l2cap_packets
21from bluetooth_packets_python3 import RawBuilder
22from bluetooth_packets_python3.l2cap_packets import FcsType
23from bluetooth_packets_python3.l2cap_packets import Final
24from bluetooth_packets_python3.l2cap_packets import Poll
25from bluetooth_packets_python3.l2cap_packets import SegmentationAndReassembly
26from bluetooth_packets_python3.l2cap_packets import SupervisoryFunction
27from cert.behavior import when, anything, wait_until
28from cert.event_stream import EventStream
29from cert.gd_base_test import GdBaseTestClass
30from cert.matchers import L2capMatchers
31from cert.metadata import metadata
32from cert.py_l2cap import PyL2cap
33from cert.truth import assertThat
34from facade import common_pb2
35from google.protobuf import empty_pb2 as empty_proto
36from l2cap.classic.cert.cert_l2cap import CertL2cap
37from l2cap.classic.facade_pb2 import RetransmissionFlowControlMode
38from neighbor.facade import facade_pb2 as neighbor_facade
39
40# Assemble a sample packet.
41SAMPLE_PACKET_DATA = b"\x19\x26\x08\x17"
42SAMPLE_PACKET = RawBuilder([x for x in SAMPLE_PACKET_DATA])
43
44
45class L2capTestBase(GdBaseTestClass):
46
47    def setup_class(self):
48        super().setup_class(dut_module='L2CAP', cert_module='HCI_INTERFACES')
49
50    def setup_test(self):
51        super().setup_test()
52
53        self.dut_address = self.dut.hci_controller.GetMacAddressSimple()
54        cert_address = common_pb2.BluetoothAddress(
55            address=self.cert.controller_read_only_property.ReadLocalAddress(empty_proto.Empty()).address)
56
57        self.dut_l2cap = PyL2cap(self.dut, cert_address)
58        self.cert_l2cap = CertL2cap(self.cert)
59
60    def teardown_test(self):
61        self.cert_l2cap.close()
62        self.dut_l2cap.close()
63        super().teardown_test()
64
65    def _setup_link_from_cert(self):
66        self.dut.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True))
67        self.cert_l2cap.connect_acl(self.dut_address)
68
69    def _open_unconfigured_channel_from_cert(self,
70                                             signal_id=1,
71                                             scid=0x0101,
72                                             psm=0x33,
73                                             mode=RetransmissionFlowControlMode.BASIC,
74                                             fcs=None):
75
76        dut_channel = self.dut_l2cap.register_dynamic_channel(psm, mode)
77        cert_channel = self.cert_l2cap.open_channel(signal_id, psm, scid, fcs=fcs)
78
79        return (dut_channel, cert_channel)
80
81    def _open_channel_from_cert(self,
82                                signal_id=1,
83                                scid=0x0101,
84                                psm=0x33,
85                                mode=RetransmissionFlowControlMode.BASIC,
86                                fcs=None,
87                                req_config_options=None,
88                                rsp_config_options=None):
89        request_matcher = L2capMatchers.ConfigurationRequestView(scid)
90        if rsp_config_options is not None:
91            when(self.cert_l2cap).on_config_req(request_matcher).then().send_configuration_response(
92                options=rsp_config_options)
93        if rsp_config_options is None and fcs is not None:
94            when(self.cert_l2cap).on_config_req(request_matcher).then().send_configuration_response(
95                options=CertL2cap.config_option_ertm(fcs=fcs))
96
97        (dut_channel, cert_channel) = self._open_unconfigured_channel_from_cert(signal_id, scid, psm, mode, fcs)
98        if req_config_options is None:
99            req_config_options = CertL2cap.config_option_ertm(
100                fcs=fcs) if mode == RetransmissionFlowControlMode.ERTM else []
101
102        cert_channel.send_configure_request(req_config_options)
103
104        cert_channel.verify_configuration_response()
105
106        wait_until(self.cert_l2cap).on_config_req(request_matcher).times(1)
107
108        assertThat(cert_channel.is_configured()).isTrue()
109
110        return (dut_channel, cert_channel)
111
112    def _open_channel_from_dut(self, psm=0x33, mode=RetransmissionFlowControlMode.BASIC):
113        dut_channel_future = self.dut_l2cap.connect_dynamic_channel_to_cert(psm, mode)
114        cert_channel = self.cert_l2cap.verify_and_respond_open_channel_from_remote(psm)
115        dut_channel = dut_channel_future.get_channel()
116
117        cert_channel.verify_configuration_request_and_respond()
118        outgoing_config = []
119        if mode == RetransmissionFlowControlMode.ERTM:
120            outgoing_config = CertL2cap.config_option_ertm()
121        cert_channel.send_configure_request(outgoing_config)
122        cert_channel.verify_configuration_response()
123
124        return (dut_channel, cert_channel)
125
126
127class L2capTest(L2capTestBase):
128
129    def test_connect_dynamic_channel_and_send_data(self):
130        self._setup_link_from_cert()
131
132        (dut_channel, cert_channel) = self._open_channel_from_cert()
133
134        dut_channel.send(b'abc')
135        assertThat(cert_channel).emits(L2capMatchers.Data(b'abc'))
136
137    def test_receive_packet_from_unknown_channel(self):
138        self._setup_link_from_cert()
139
140        (dut_channel, cert_channel) = self._open_channel_from_cert(scid=0x41, psm=0x33)
141
142        i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
143            0x99, 0, Final.NOT_SET, 1, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
144        self.cert_l2cap.send_acl(i_frame)
145        assertThat(cert_channel).emitsNone(L2capMatchers.SFrame(req_seq=4), timeout=timedelta(seconds=1))
146
147    def test_open_two_channels(self):
148        self._setup_link_from_cert()
149
150        self._open_channel_from_cert(signal_id=1, scid=0x41, psm=0x41)
151        self._open_channel_from_cert(signal_id=2, scid=0x43, psm=0x43)
152
153    def test_connect_and_send_data_ertm_no_segmentation(self):
154        self._setup_link_from_cert()
155
156        (dut_channel, cert_channel) = self._open_channel_from_cert(
157            mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.NO_FCS)
158
159        dut_channel.send(b'abc' * 34)
160        assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0, payload=b'abc' * 34))
161
162        cert_channel.send_i_frame(tx_seq=0, req_seq=1, payload=SAMPLE_PACKET)
163        # todo verify received?
164
165    @metadata(pts_test_id="L2CAP/COS/CED/BV-01-C", pts_test_name="Request Connection")
166    def test_basic_operation_request_connection(self):
167        """
168        Verify that the IUT is able to request the connection establishment for
169        an L2CAP data channel and initiate the configuration procedure.
170        """
171        self._setup_link_from_cert()
172        (dut_channel, cert_channel) = self._open_channel_from_dut()
173
174    @metadata(pts_test_id="L2CAP/COS/CED/BV-03-C", pts_test_name="Send data")
175    def test_send_data(self):
176        """
177        Verify that the IUT is able to send DATA
178        """
179        self._setup_link_from_cert()
180
181        (dut_channel, cert_channel) = self._open_channel_from_cert()
182        dut_channel.send(b'hello')
183        assertThat(cert_channel).emits(L2capMatchers.Data(b'hello'))
184
185    @metadata(pts_test_id="L2CAP/COS/CED/BV-04-C", pts_test_name="Disconnect")
186    def test_disconnect(self):
187        """
188        Verify that the IUT is able to disconnect the data channel
189        """
190        self._setup_link_from_cert()
191
192        (dut_channel, cert_channel) = self._open_channel_from_cert()
193        dut_channel.close_channel()
194        cert_channel.verify_disconnect_request()
195
196    @metadata(pts_test_id="L2CAP/COS/CED/BV-05-C", pts_test_name="Accept connection")
197    def test_accept_connection(self):
198        """
199        Also verify that DUT can send 48 bytes PDU (minimal MTU)
200        """
201        self._setup_link_from_cert()
202
203        (dut_channel, cert_channel) = self._open_channel_from_cert()
204        dut_channel.send(b'a' * 48)
205        assertThat(cert_channel).emits(L2capMatchers.Data(b'a' * 48))
206
207    @metadata(pts_test_id="L2CAP/COS/CED/BV-07-C", pts_test_name="Accept Disconnect")
208    def test_accept_disconnect(self):
209        """
210        Verify that the IUT is able to respond to the request to disconnect the
211        data channel
212        """
213        self._setup_link_from_cert()
214
215        (dut_channel, cert_channel) = self._open_channel_from_cert()
216        cert_channel.disconnect_and_verify()
217
218    @metadata(pts_test_id="L2CAP/COS/CED/BV-08-C", pts_test_name="Disconnect on Timeout")
219    def test_disconnect_on_timeout(self):
220        """
221        Verify that the IUT disconnects the data channel and shuts down this
222        channel if no response occurs
223        """
224        self._setup_link_from_cert()
225
226        (dut_channel, cert_channel) = self._open_unconfigured_channel_from_cert()
227
228        assertThat(self.cert_l2cap.get_control_channel()).emitsNone(L2capMatchers.ConfigurationResponse())
229        # TODO: Verify that IUT sends disconnect request (not mandated)
230
231    @metadata(pts_test_id="L2CAP/COS/CED/BV-09-C", pts_test_name="Receive Multi-Command Packet")
232    def test_receive_multi_command_packet(self):
233        """
234        Verify that the IUT is able to receive more than one signaling command in one L2CAP
235        packet.
236        """
237        self._setup_link_from_cert()
238
239        psm = 0x33
240        self.dut_l2cap.connect_dynamic_channel_to_cert(psm)
241        self.cert_l2cap.verify_and_respond_open_channel_from_remote_and_send_config_req(psm)
242
243        assertThat(self.cert_l2cap.get_control_channel()).emits(L2capMatchers.ConfigurationResponse())
244
245    @metadata(pts_test_id="L2CAP/COS/CED/BV-11-C", pts_test_name="Configure MTU size")
246    def test_configure_mtu_size(self):
247        """
248        Verify that the IUT is able to configure the supported MTU size
249        """
250        self._setup_link_from_cert()
251        (dut_channel, cert_channel) = self._open_unconfigured_channel_from_cert()
252        cert_channel.send_configure_request(CertL2cap.config_option_mtu_explicit(672))
253        cert_channel.verify_configuration_request_and_respond()
254        # TODO: Probably remove verify_configuration_request_and_respond
255
256    @metadata(pts_test_id="L2CAP/COS/CFD/BV-01-C", pts_test_name="Continuation Flag")
257    def test_continuation_flag(self):
258        """
259        Verify the IUT is able to receive configuration requests that have the
260        continuation flag set
261        """
262        self._setup_link_from_cert()
263
264        (dut_channel, cert_channel) = self._open_unconfigured_channel_from_cert()
265
266        # Send configuration request with CONTINUE
267        mtu_opt = l2cap_packets.MtuConfigurationOption()
268        mtu_opt.mtu = 0x1234
269        cert_channel.send_configure_request([mtu_opt], 2, l2cap_packets.Continuation.CONTINUE)
270
271        flush_timeout_option = l2cap_packets.FlushTimeoutConfigurationOption()
272        flush_timeout_option.flush_timeout = 65535
273        cert_channel.send_configure_request([flush_timeout_option], 3, l2cap_packets.Continuation.END)
274
275        assertThat(self.cert_l2cap.get_control_channel()).emits(L2capMatchers.ConfigurationResponse(), at_least_times=2)
276
277    @metadata(pts_test_id="L2CAP/COS/CFD/BV-02-C", pts_test_name="Negotiation with Reject")
278    def test_retry_config_after_rejection(self):
279        """
280        Verify that the IUT is able to perform negotiation while the Lower
281        Tester rejects the proposed configuration parameter values
282        """
283        self._setup_link_from_cert()
284        scid = 0x41
285        when(self.cert_l2cap).on_config_req(
286            L2capMatchers.ConfigurationRequestView(scid)).then().send_configuration_response(
287                result=l2cap_packets.ConfigurationResponseResult.UNACCEPTABLE_PARAMETERS,
288                options=CertL2cap.config_option_mtu_explicit(200)).send_configuration_response(options=[])
289        (dut_channel, cert_channel) = self._open_unconfigured_channel_from_cert(scid=scid)
290
291        assertThat(self.cert_l2cap.get_control_channel()).emits(L2capMatchers.ConfigurationRequest(), at_least_times=2)
292
293    @metadata(pts_test_id="L2CAP/COS/CFD/BV-03-C", pts_test_name="Send Requested Options")
294    def test_send_requested_options(self):
295        """
296        Verify that the IUT can receive a configuration request with no options
297        and send the requested options to the Lower Tester
298        """
299        self._setup_link_from_cert()
300        (dut_channel, cert_channel) = self._open_channel_from_cert(scid=0x41, psm=0x33)
301
302        # TODO(hsz) implement me!
303
304    @metadata(pts_test_id="L2CAP/COS/CFD/BV-08-C", pts_test_name="Non-blocking Config Response")
305    def test_non_blocking_config_response(self):
306        """
307        Verify that the IUT does not block transmitting L2CAP_ConfigRsp while
308        waiting for L2CAP_ConfigRsp from the Lower Tester
309        """
310        self._setup_link_from_cert()
311
312        (dut_channel, cert_channel) = self._open_unconfigured_channel_from_cert()
313
314        cert_channel.send_configure_request([])
315        cert_channel.verify_configuration_response()
316        cert_channel.verify_configuration_request_and_respond()
317
318        # TODO(hsz) implement me!
319
320    @metadata(pts_test_id="L2CAP/COS/CFD/BV-09-C", pts_test_name="Mandatory 48 Byte MTU")
321    def test_mandatory_48_byte_mtu(self):
322        """
323        Verify that the IUT can support mandatory 48 byte MTU
324        """
325        self._setup_link_from_cert()
326        (dut_channel,
327         cert_channel) = self._open_channel_from_cert(req_config_options=CertL2cap.config_option_mtu_explicit(48))
328
329        dut_channel.send(b"a" * 44)
330        assertThat(cert_channel).emits(L2capMatchers.Data(b"a" * 44))
331
332    @metadata(pts_test_id="L2CAP/COS/CFD/BV-11-C", pts_test_name="Negotiation of Unsupported Parameter")
333    def test_negotiation_of_unsupported_parameter(self):
334        """
335        Verify that the IUT can negotiate when the Lower Tester proposes an unsupported configuration
336        parameter value.
337        """
338        self._setup_link_from_cert()
339        (dut_channel, cert_channel) = self._open_unconfigured_channel_from_cert()
340
341        cert_channel.send_configure_request(CertL2cap.config_option_mtu_explicit(20))
342        # Invalid because minimum is 48
343
344        cert_channel.verify_configuration_response(l2cap_packets.ConfigurationResponseResult.UNACCEPTABLE_PARAMETERS)
345
346    @metadata(pts_test_id="L2CAP/COS/CFD/BV-12-C", pts_test_name="Unknown Option Response")
347    def test_config_unknown_options_with_hint(self):
348        """
349        Verify that the IUT can give the appropriate error code when the Lower
350        Tester proposes any number of unknown options that are optional
351        NOTE: In GD stack, ExtendedWindowSizeOption in unsupported
352        """
353        self._setup_link_from_cert()
354
355        (dut_channel, cert_channel) = self._open_unconfigured_channel_from_cert()
356
357        unknown_opt_hint = l2cap_packets.ExtendedWindowSizeOption()
358        unknown_opt_hint.max_window_size = 20
359        unknown_opt_hint.is_hint = l2cap_packets.ConfigurationOptionIsHint.OPTION_IS_A_HINT
360
361        for i in range(10):
362            cert_channel.send_configure_request([unknown_opt_hint] * i)
363            cert_channel.verify_configuration_response(l2cap_packets.ConfigurationResponseResult.SUCCESS)
364
365    @metadata(pts_test_id="L2CAP/COS/CFD/BV-14-C", pts_test_name="Unknown Mandatory Options Request")
366    def test_unknown_mandatory_options_request(self):
367        """
368        Verify that the IUT can give the appropriate error code when the Lower
369        Tester proposes any number of unknown options where at least one is
370        mandatory.
371        Note: GD stack doesn't support extended window size. For other stacks,
372              we may need to use some other config option
373        """
374        self._setup_link_from_cert()
375
376        (dut_channel, cert_channel) = self._open_unconfigured_channel_from_cert(scid=0x41, psm=0x33)
377
378        unknown_opt = l2cap_packets.ExtendedWindowSizeOption()
379        unknown_opt.max_window_size = 20
380
381        unknown_opt_hint = l2cap_packets.ExtendedWindowSizeOption()
382        unknown_opt_hint.max_window_size = 20
383        unknown_opt_hint.is_hint = l2cap_packets.ConfigurationOptionIsHint.OPTION_IS_A_HINT
384
385        configuration_option_attempts = [[unknown_opt], [unknown_opt, unknown_opt_hint], [
386            unknown_opt, unknown_opt, unknown_opt
387        ], [unknown_opt, unknown_opt_hint, unknown_opt_hint,
388            unknown_opt], [unknown_opt_hint, unknown_opt_hint, unknown_opt_hint, unknown_opt_hint, unknown_opt], [
389                unknown_opt, unknown_opt_hint, unknown_opt_hint, unknown_opt, unknown_opt_hint, unknown_opt_hint
390            ], [unknown_opt, unknown_opt, unknown_opt, unknown_opt, unknown_opt, unknown_opt, unknown_opt], [
391                unknown_opt_hint, unknown_opt_hint, unknown_opt_hint, unknown_opt_hint, unknown_opt_hint,
392                unknown_opt_hint, unknown_opt_hint, unknown_opt
393            ], [
394                unknown_opt_hint, unknown_opt_hint, unknown_opt_hint, unknown_opt_hint, unknown_opt_hint,
395                unknown_opt_hint, unknown_opt_hint, unknown_opt, unknown_opt
396            ], [
397                unknown_opt_hint, unknown_opt_hint, unknown_opt_hint, unknown_opt_hint, unknown_opt_hint,
398                unknown_opt_hint, unknown_opt_hint, unknown_opt_hint, unknown_opt_hint, unknown_opt
399            ]]
400
401        for option_list in configuration_option_attempts:
402            cert_channel.send_configure_request(option_list)
403            cert_channel.verify_configuration_response(l2cap_packets.ConfigurationResponseResult.UNKNOWN_OPTIONS)
404
405    @metadata(pts_test_id="L2CAP/COS/ECH/BV-01-C", pts_test_name="Respond to Echo Request")
406    def test_respond_to_echo_request(self):
407        """
408        Verify that the IUT responds to an echo request.
409        """
410        self._setup_link_from_cert()
411        echo_request = l2cap_packets.EchoRequestBuilder(100, RawBuilder([1, 2, 3]))
412        self.cert_l2cap.get_control_channel().send(echo_request)
413
414        assertThat(self.cert_l2cap.get_control_channel()).emits(L2capMatchers.EchoResponse())
415
416    @metadata(pts_test_id="L2CAP/COS/CED/BI-01-C", pts_test_name="Reject Unknown Command")
417    def test_reject_unknown_command(self):
418        """
419        Verify that the IUT rejects an unknown signaling command
420        """
421        self._setup_link_from_cert()
422
423        # Command code ff, Signal id 01, size 0000
424        invalid_command_packet = RawBuilder([0xff, 0x01, 0x00, 0x00])
425        self.cert_l2cap.get_control_channel().send(invalid_command_packet)
426
427        assertThat(self.cert_l2cap.get_control_channel()).emits(L2capMatchers.CommandReject())
428
429    @metadata(pts_test_id="L2CAP/COS/IEX/BV-01-C", pts_test_name="Query for 1.2 Features")
430    def test_query_for_1_2_features(self):
431        """
432        Verify that the IUT transmits an information request command to solicit
433        if the remote device supports Specification 1.2 features.
434        """
435        self._setup_link_from_cert()
436        assertThat(self.cert_l2cap.get_control_channel()).emits(
437            L2capMatchers.InformationRequestWithType(
438                l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED))
439
440    @metadata(pts_test_id="L2CAP/COS/IEX/BV-02-C", pts_test_name="Respond with 1.2 Features")
441    def test_respond_with_1_2_features(self):
442        """
443        Verify that the IUT responds to an information request command
444        soliciting for Specification 1.2 features
445        """
446        self._setup_link_from_cert()
447        control_channel = self.cert_l2cap.get_control_channel()
448
449        control_channel.send_extended_features_request()
450
451        assertThat(control_channel).emits(L2capMatchers.InformationResponseExtendedFeatures())
452
453    @metadata(
454        pts_test_id="L2CAP/EXF/BV-01-C",
455        pts_test_name="Extended Features Information Response for "
456        "Enhanced Retransmission Mode")
457    def test_extended_feature_info_response_ertm(self):
458        """
459        Verify the IUT can format an Information Response for the information
460        type of Extended Features that correctly identifies that Enhanced
461        Retransmission Mode is locally supported
462        """
463        self._setup_link_from_cert()
464        control_channel = self.cert_l2cap.get_control_channel()
465
466        control_channel.send_extended_features_request()
467
468        assertThat(control_channel).emits(L2capMatchers.InformationResponseExtendedFeatures(supports_ertm=True))
469
470    @metadata(
471        pts_test_id="L2CAP/EXF/BV-02-C", pts_test_name="Extended Features Information Response for "
472        "Streaming Mode")
473    def test_extended_feature_info_response_streaming(self):
474        """
475        Verify the IUT can format an Information Response for the information
476        type of Extended Features that correctly identifies that Streaming Mode
477        is locally supported
478        """
479        asserts.skip("Streaming not supported")
480        self._setup_link_from_cert()
481        control_channel = self.cert_l2cap.get_control_channel()
482
483        control_channel.send_extended_features_request()
484
485        assertThat(control_channel).emits(L2capMatchers.InformationResponseExtendedFeatures(supports_streaming=True))
486
487    @metadata(pts_test_id="L2CAP/EXF/BV-03-C", pts_test_name="Extended Features Information Response for FCS " "Option")
488    def test_extended_feature_info_response_fcs(self):
489        """
490        Verify the IUT can format an Information Response for the information
491        type of Extended Features that correctly identifies that the FCS Option
492        is locally supported.
493
494        Note: This is not mandated by L2CAP Spec
495        """
496        self._setup_link_from_cert()
497        control_channel = self.cert_l2cap.get_control_channel()
498
499        control_channel.send_extended_features_request()
500
501        assertThat(control_channel).emits(L2capMatchers.InformationResponseExtendedFeatures(supports_fcs=True))
502
503    @metadata(
504        pts_test_id="L2CAP/EXF/BV-05-C", pts_test_name="Extended Features Information Response for Fixed "
505        "Channels")
506    def test_extended_feature_info_response_fixed_channels(self):
507        """
508        Verify the IUT can format an Information Response for the information
509        type of Extended Features that correctly identifies that the Fixed
510        Channels option is locally supported
511
512        Note: This is not mandated by L2CAP Spec
513        """
514        self._setup_link_from_cert()
515        control_channel = self.cert_l2cap.get_control_channel()
516
517        control_channel.send_extended_features_request()
518
519        assertThat(control_channel).emits(
520            L2capMatchers.InformationResponseExtendedFeatures(supports_fixed_channels=True))
521
522    @metadata(pts_test_id="L2CAP/FIX/BV-01-C", pts_test_name="Fixed Channels Supported Information Request")
523    def test_fixed_channels_supported_information_request(self):
524        """
525        Verify that the IUT can send an Information Request for the information
526        type of Fixed Channels Supported.
527        """
528        self._setup_link_from_cert()
529        assertThat(self.cert_l2cap.get_control_channel()).emits(
530            L2capMatchers.InformationRequestWithType(l2cap_packets.InformationRequestInfoType.FIXED_CHANNELS_SUPPORTED))
531
532    @metadata(pts_test_id="L2CAP/FOC/BV-01-C", pts_test_name="IUT Initiated Configuration of the FCS Option")
533    def test_config_channel_not_use_FCS(self):
534        """
535        Verify the IUT can configure a channel to not use FCS in I/S-frames.
536        """
537        self._setup_link_from_cert()
538
539        (dut_channel, cert_channel) = self._open_channel_from_cert(
540            mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.NO_FCS)
541
542        dut_channel.send(b'abc')
543        assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0, payload=b'abc'))
544
545    @metadata(pts_test_id="L2CAP/FOC/BV-02-C", pts_test_name="Lower Tester Explicitly Requests FCS should be " "Used")
546    def test_explicitly_request_use_FCS(self):
547        """
548        Verify the IUT will include the FCS in I/S-frames if the Lower Tester
549        explicitly requests that FCS should be used
550        """
551        self._setup_link_from_cert()
552
553        (dut_channel, cert_channel) = self._open_channel_from_cert(
554            mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.DEFAULT)
555
556        dut_channel.send(b'abc')
557        assertThat(cert_channel).emits(L2capMatchers.IFrameWithFcs(payload=b"abc"))
558
559    @metadata(pts_test_id="L2CAP/FOC/BV-03-C", pts_test_name="Lower Tester Implicitly Requests FCS should be " "Used")
560    def test_implicitly_request_use_FCS(self):
561        """
562        Verify the IUT will include the FCS in I/S-frames if the Lower Tester
563        implicitly requests that FCS should be used.
564        """
565        self._setup_link_from_cert()
566
567        (dut_channel, cert_channel) = self._open_channel_from_cert(
568            mode=RetransmissionFlowControlMode.ERTM,
569            fcs=FcsType.DEFAULT,
570            req_config_options=CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS))
571
572        dut_channel.send(b'abc')
573        assertThat(cert_channel).emits(L2capMatchers.IFrameWithFcs(payload=b"abc"))
574
575    @metadata(pts_test_id="L2CAP/OFS/BV-01-C", pts_test_name="Sending I-Frames without FCS for ERTM")
576    def test_sending_i_frames_without_fcs_for_ertm(self):
577        """
578        Verify the IUT does not include the FCS in I-frames.
579        """
580        self._setup_link_from_cert()
581
582        (dut_channel, cert_channel) = self._open_channel_from_cert(
583            mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.NO_FCS)
584
585        dut_channel.send(b'abc')
586        assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0, payload=b"abc"))
587
588    @metadata(pts_test_id="L2CAP/OFS/BV-02-C", pts_test_name="Receiving I-Frames without FCS for ERTM")
589    def test_receiving_i_frames_without_fcs_for_ertm(self):
590        """
591        Verify the IUT can handle I-frames that do not contain the FCS.
592        """
593        self._setup_link_from_cert()
594
595        (dut_channel, cert_channel) = self._open_channel_from_cert(
596            mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.NO_FCS)
597
598        cert_channel.send_i_frame(tx_seq=0, req_seq=0, payload=SAMPLE_PACKET)
599        assertThat(dut_channel).emits(L2capMatchers.PacketPayloadRawData(SAMPLE_PACKET_DATA))
600
601    @metadata(pts_test_id="L2CAP/OFS/BV-05-C", pts_test_name="Sending I-Frames with FCS for ERTM")
602    def test_sending_i_frames_with_fcs_for_ertm(self):
603        """
604        Verify the IUT does include the FCS in I-frames.
605        """
606        self._setup_link_from_cert()
607
608        (dut_channel, cert_channel) = self._open_channel_from_cert(
609            mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.DEFAULT)
610
611        dut_channel.send(b'abc')
612        assertThat(cert_channel).emits(L2capMatchers.IFrameWithFcs(tx_seq=0, payload=b"abc"))
613
614    @metadata(pts_test_id="L2CAP/OFS/BV-06-C", pts_test_name="Receiving I-Frames with FCS for ERTM")
615    def test_receiving_i_frames_with_fcs_for_ertm(self):
616        """
617        Verify the IUT can handle I-frames that do contain the FCS.
618        """
619        self._setup_link_from_cert()
620
621        (dut_channel, cert_channel) = self._open_channel_from_cert(
622            mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.DEFAULT)
623
624        cert_channel.send_i_frame(tx_seq=0, req_seq=0, payload=SAMPLE_PACKET, fcs=FcsType.DEFAULT)
625        assertThat(dut_channel).emits(L2capMatchers.PacketPayloadRawData(SAMPLE_PACKET_DATA))
626
627    @metadata(pts_test_id="L2CAP/ERM/BV-01-C", pts_test_name="Transmit I-frames")
628    def test_transmit_i_frames(self):
629        """
630        Verify the IUT can send correctly formatted sequential I-frames with
631        valid values for the enhanced control fields (SAR, F-bit, ReqSeq,
632        TxSeq)
633        """
634        self._setup_link_from_cert()
635
636        (dut_channel, cert_channel) = self._open_channel_from_cert(
637            mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.NO_FCS)
638
639        dut_channel.send(b'abc')
640        assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0, payload=b"abc"))
641
642        cert_channel.send_i_frame(tx_seq=0, req_seq=1, payload=SAMPLE_PACKET)
643
644        dut_channel.send(b'abc')
645        assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=1, payload=b"abc"))
646
647        cert_channel.send_i_frame(tx_seq=1, req_seq=2, payload=SAMPLE_PACKET)
648
649        dut_channel.send(b'abc')
650        assertThat(cert_channel).emits(L2capMatchers.PartialData(b"abc"))
651
652        cert_channel.send_i_frame(tx_seq=2, req_seq=3, payload=SAMPLE_PACKET)
653
654    @metadata(pts_test_id="L2CAP/ERM/BV-02-C", pts_test_name="Receive I-Frames")
655    def test_receive_i_frames(self):
656        """
657        Verify the IUT can receive in-sequence valid I-frames and deliver L2CAP
658        SDUs to the Upper Tester
659        """
660        self._setup_link_from_cert()
661
662        (dut_channel, cert_channel) = self._open_channel_from_cert(
663            mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.NO_FCS)
664
665        for i in range(3):
666            cert_channel.send_i_frame(tx_seq=i, req_seq=0, payload=SAMPLE_PACKET)
667            assertThat(cert_channel).emits(L2capMatchers.SFrame(req_seq=i + 1))
668
669        cert_channel.send_i_frame(tx_seq=3, req_seq=0, sar=SegmentationAndReassembly.START, payload=SAMPLE_PACKET)
670        assertThat(cert_channel).emits(L2capMatchers.SFrame(req_seq=4))
671
672        cert_channel.send_i_frame(
673            tx_seq=4, req_seq=0, sar=SegmentationAndReassembly.CONTINUATION, payload=SAMPLE_PACKET)
674        assertThat(cert_channel).emits(L2capMatchers.SFrame(req_seq=5))
675
676        cert_channel.send_i_frame(tx_seq=5, req_seq=0, sar=SegmentationAndReassembly.END, payload=SAMPLE_PACKET)
677        assertThat(cert_channel).emits(L2capMatchers.SFrame(req_seq=6))
678
679    @metadata(pts_test_id="L2CAP/ERM/BV-03-C", pts_test_name="Acknowledging Received I-Frames")
680    def test_acknowledging_received_i_frames(self):
681        """
682        Verify the IUT sends S-frame [RR] with the Poll bit not set to
683        acknowledge data received from the Lower Tester
684        """
685        self._setup_link_from_cert()
686
687        (dut_channel, cert_channel) = self._open_channel_from_cert(
688            mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.NO_FCS)
689
690        for i in range(3):
691            cert_channel.send_i_frame(tx_seq=i, req_seq=0, payload=SAMPLE_PACKET)
692            assertThat(cert_channel).emits(L2capMatchers.SFrame(req_seq=i + 1))
693
694        assertThat(cert_channel).emitsNone(L2capMatchers.SFrame(req_seq=4), timeout=timedelta(seconds=1))
695
696    @metadata(
697        pts_test_id="L2CAP/ERM/BV-05-C",
698        pts_test_name="Resume Transmitting I-Frames when an S-Frame [RR] "
699        "is Received")
700    def test_resume_transmitting_when_received_rr(self):
701        """
702        Verify the IUT will cease transmission of I-frames when the negotiated
703        TxWindow is full. Verify the IUT will resume transmission of I-frames
704        when an S-frame [RR] is received that acknowledges previously sent
705        I-frames
706        """
707        self._setup_link_from_cert()
708
709        config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, tx_window_size=1)
710
711        (dut_channel, cert_channel) = self._open_channel_from_cert(
712            mode=RetransmissionFlowControlMode.ERTM, req_config_options=config, rsp_config_options=config)
713
714        dut_channel.send(b'abc')
715        dut_channel.send(b'def')
716
717        assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0, payload=b'abc'))
718        assertThat(cert_channel).emitsNone(L2capMatchers.IFrame(tx_seq=1, payload=b'def'))
719
720        cert_channel.send_s_frame(req_seq=1, f=Final.POLL_RESPONSE)
721        assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=1))
722
723    @metadata(
724        pts_test_id="L2CAP/ERM/BV-06-C", pts_test_name="Resume Transmitting I-Frames when an I-Frame is "
725        "Received")
726    def test_resume_transmitting_when_acknowledge_previously_sent(self):
727        """
728        Verify the IUT will cease transmission of I-frames when the negotiated
729        TxWindow is full. Verify the IUT will resume transmission of I-frames
730        when an I-frame is received that acknowledges previously sent I-frames
731        """
732        self._setup_link_from_cert()
733
734        config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, tx_window_size=1)
735
736        (dut_channel, cert_channel) = self._open_channel_from_cert(
737            mode=RetransmissionFlowControlMode.ERTM, req_config_options=config, rsp_config_options=config)
738
739        dut_channel.send(b'abc')
740        dut_channel.send(b'def')
741
742        assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0, payload=b'abc'))
743        assertThat(cert_channel).emitsNone(
744            L2capMatchers.IFrame(tx_seq=1, payload=b'abc'), timeout=timedelta(seconds=0.5))
745
746        cert_channel.send_i_frame(tx_seq=0, req_seq=1, payload=SAMPLE_PACKET)
747
748        assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=1, payload=b'def'))
749
750        cert_channel.send_i_frame(tx_seq=1, req_seq=2, payload=SAMPLE_PACKET)
751
752    @metadata(pts_test_id="L2CAP/ERM/BV-07-C", pts_test_name="Send S-Frame [RNR]")
753    def test_send_s_frame_rnr(self):
754        """
755        Verify the IUT sends an S-frame [RNR] when it detects local busy condition
756        NOTE: In GD stack, we enter local busy condition if client doesn't dequeue
757        and packets are accumulating in buffer
758        """
759        self._setup_link_from_cert()
760
761        config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, tx_window_size=10)
762
763        (dut_channel, cert_channel) = self._open_channel_from_cert(
764            mode=RetransmissionFlowControlMode.ERTM,
765            fcs=FcsType.NO_FCS,
766            req_config_options=config,
767            rsp_config_options=config)
768
769        dut_channel.send(b'abc')
770        assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0, payload=b'abc'))
771        cert_channel.send_i_frame(tx_seq=0, req_seq=1, payload=SAMPLE_PACKET)
772
773        dut_channel.set_traffic_paused(True)
774
775        # Allow 1 additional packet in channel queue buffer
776        buffer_size = self.dut_l2cap.get_channel_queue_buffer_size() + 1
777
778        for i in range(buffer_size):
779            cert_channel.send_i_frame(tx_seq=i + 1, req_seq=1, payload=SAMPLE_PACKET)
780            assertThat(cert_channel).emits(L2capMatchers.SFrame(s=l2cap_packets.SupervisoryFunction.RECEIVER_READY))
781
782        cert_channel.send_i_frame(tx_seq=buffer_size + 1, req_seq=1, payload=SAMPLE_PACKET)
783        assertThat(cert_channel).emits(L2capMatchers.SFrame(s=l2cap_packets.SupervisoryFunction.RECEIVER_NOT_READY))
784
785    @metadata(pts_test_id="L2CAP/ERM/BV-08-C", pts_test_name="Send S-Frame [RR] with Poll Bit Set")
786    def test_transmit_s_frame_rr_with_poll_bit_set(self):
787        """
788        Verify the IUT sends an S-frame [RR] with the Poll bit set when its
789        retransmission timer expires.
790        """
791        self._setup_link_from_cert()
792
793        config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, retransmission_time_out=1500)
794
795        (dut_channel, cert_channel) = self._open_channel_from_cert(
796            mode=RetransmissionFlowControlMode.ERTM,
797            fcs=FcsType.NO_FCS,
798            req_config_options=config,
799            rsp_config_options=config)
800
801        dut_channel.send(b'abc')
802        assertThat(cert_channel).emits(L2capMatchers.SFrame(p=l2cap_packets.Poll.POLL))
803
804    @metadata(pts_test_id="L2CAP/ERM/BV-09-C", pts_test_name="Send S-Frame [RR] with Final Bit Set")
805    def test_transmit_s_frame_rr_with_final_bit_set(self):
806        """
807        Verify the IUT responds with an S-frame [RR] with the Final bit set
808        after receiving an S-frame [RR] with the Poll bit set
809        """
810        self._setup_link_from_cert()
811
812        (dut_channel, cert_channel) = self._open_channel_from_cert(
813            mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.NO_FCS)
814
815        cert_channel.send_s_frame(req_seq=0, p=Poll.POLL)
816        assertThat(cert_channel).emits(L2capMatchers.SFrame(f=Final.POLL_RESPONSE))
817
818    @metadata(pts_test_id="L2CAP/ERM/BV-10-C", pts_test_name="Retransmit S-Frame [RR] with Final Bit Set")
819    def test_retransmit_s_frame_rr_with_poll_bit_set(self):
820        """
821        Verify the IUT will retransmit the S-frame [RR] with the Poll bit set
822        when the Monitor Timer expires
823        """
824        self._setup_link_from_cert()
825
826        (dut_channel, cert_channel) = self._open_channel_from_cert(
827            mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.NO_FCS)
828        dut_channel.send(b'abc')
829
830        assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0, payload=b'abc'))
831        assertThat(cert_channel).emits(L2capMatchers.SFrame(req_seq=0, p=Poll.POLL, f=Final.NOT_SET))
832        cert_channel.send_s_frame(req_seq=1, f=Final.POLL_RESPONSE)
833
834    @metadata(pts_test_id="L2CAP/ERM/BV-11-C", pts_test_name="S-Frame Transmissions Exceed MaxTransmit")
835    def test_s_frame_transmissions_exceed_max_transmit(self):
836        """
837        Verify the IUT will close the channel when the Monitor Timer expires.
838        """
839        self._setup_link_from_cert()
840
841        config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, tx_window_size=1, max_transmit=1, monitor_time_out=10)
842
843        (dut_channel, cert_channel) = self._open_channel_from_cert(
844            mode=RetransmissionFlowControlMode.ERTM, req_config_options=config, rsp_config_options=config)
845
846        dut_channel.send(b'abc')
847        cert_channel.verify_disconnect_request()
848
849    @metadata(pts_test_id="L2CAP/ERM/BV-12-C", pts_test_name="I-Frame Transmissions Exceed MaxTransmit")
850    def test_i_frame_transmissions_exceed_max_transmit(self):
851        """
852        Verify the IUT will close the channel when it receives an S-frame [RR]
853        with the final bit set that does not acknowledge the previous I-frame
854        sent by the IUT
855        """
856        self._setup_link_from_cert()
857
858        config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, tx_window_size=1, max_transmit=1)
859
860        (dut_channel, cert_channel) = self._open_channel_from_cert(
861            mode=RetransmissionFlowControlMode.ERTM, req_config_options=config, rsp_config_options=config)
862
863        dut_channel.send(b'abc')
864        assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0), L2capMatchers.SFrame(p=Poll.POLL)).inOrder()
865
866        cert_channel.send_s_frame(req_seq=0, f=Final.POLL_RESPONSE)
867        cert_channel.verify_disconnect_request()
868
869    @metadata(pts_test_id="L2CAP/ERM/BV-13-C", pts_test_name="Respond to S-Frame [REJ]")
870    def test_respond_to_rej(self):
871        """
872        Verify the IUT retransmits I-frames starting from the sequence number
873        specified in the S-frame [REJ]
874        """
875        self._setup_link_from_cert()
876
877        config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, tx_window_size=2, max_transmit=2)
878
879        (dut_channel, cert_channel) = self._open_channel_from_cert(
880            mode=RetransmissionFlowControlMode.ERTM, req_config_options=config, rsp_config_options=config)
881
882        dut_channel.send(b'abc')
883        dut_channel.send(b'abc')
884        assertThat(cert_channel).emits(
885            L2capMatchers.IFrame(tx_seq=0, payload=b'abc'), L2capMatchers.IFrame(tx_seq=1, payload=b'abc')).inOrder()
886
887        cert_channel.send_s_frame(req_seq=0, s=SupervisoryFunction.REJECT)
888
889        assertThat(cert_channel).emits(
890            L2capMatchers.IFrame(tx_seq=0, payload=b'abc'), L2capMatchers.IFrame(tx_seq=1, payload=b'abc')).inOrder()
891
892    @metadata(pts_test_id="L2CAP/ERM/BV-14-C", pts_test_name="Respond to S-Frame [SREJ] POLL Bit Set")
893    def test_respond_to_srej_p_set(self):
894        """
895        Verify the IUT responds with the correct I-frame when sent an SREJ
896        frame. Verify that the IUT processes the acknowledgment of previously
897        unacknowledged I-frames
898        """
899        self._setup_link_from_cert()
900
901        config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, max_transmit=2, tx_window_size=3)
902
903        (dut_channel, cert_channel) = self._open_channel_from_cert(
904            mode=RetransmissionFlowControlMode.ERTM, req_config_options=config, rsp_config_options=config)
905
906        for _ in range(4):
907            dut_channel.send(b'abc')
908        assertThat(cert_channel).emits(
909            L2capMatchers.IFrame(tx_seq=0, payload=b'abc'), L2capMatchers.IFrame(tx_seq=1, payload=b'abc'),
910            L2capMatchers.IFrame(tx_seq=2, payload=b'abc')).inOrder()
911
912        cert_channel.send_s_frame(req_seq=1, p=Poll.POLL, s=SupervisoryFunction.SELECT_REJECT)
913
914        assertThat(cert_channel).emits(
915            L2capMatchers.IFrame(tx_seq=1, payload=b'abc', f=Final.POLL_RESPONSE),
916            L2capMatchers.IFrame(tx_seq=3, payload=b'abc')).inOrder()
917
918    @metadata(pts_test_id="L2CAP/ERM/BV-15-C", pts_test_name="Respond to S-Frame [SREJ] POLL Bit Clear")
919    def test_respond_to_srej_p_clear(self):
920        """
921        Verify the IUT responds with the correct I-frame when sent an SREJ frame
922        """
923        self._setup_link_from_cert()
924
925        config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, max_transmit=2, tx_window_size=3)
926        (dut_channel, cert_channel) = self._open_channel_from_cert(
927            mode=RetransmissionFlowControlMode.ERTM, req_config_options=config, rsp_config_options=config)
928
929        for _ in range(4):
930            dut_channel.send(b'abc')
931        assertThat(cert_channel).emits(
932            L2capMatchers.IFrame(tx_seq=0, payload=b'abc'), L2capMatchers.IFrame(tx_seq=1, payload=b'abc'),
933            L2capMatchers.IFrame(tx_seq=2, payload=b'abc')).inOrder()
934
935        cert_channel.send_s_frame(req_seq=1, s=SupervisoryFunction.SELECT_REJECT)
936        assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=1, payload=b'abc', f=Final.NOT_SET))
937        cert_channel.send_s_frame(req_seq=3, s=SupervisoryFunction.RECEIVER_READY)
938        assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=3, payload=b'abc', f=Final.NOT_SET))
939
940    @metadata(pts_test_id="L2CAP/ERM/BV-16-C", pts_test_name="Send S-Frame [REJ]")
941    def test_send_s_frame_rej(self):
942        """
943        Verify the IUT can send an S-Frame [REJ] after receiving out of sequence
944        I-Frames
945        """
946        self._setup_link_from_cert()
947        tx_window_size = 4
948
949        config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, tx_window_size=tx_window_size)
950        (dut_channel, cert_channel) = self._open_channel_from_cert(
951            mode=RetransmissionFlowControlMode.ERTM, req_config_options=config, rsp_config_options=config)
952
953        cert_channel.send_i_frame(tx_seq=0, req_seq=0, f=Final.NOT_SET, payload=SAMPLE_PACKET)
954        cert_channel.send_i_frame(tx_seq=2, req_seq=0, f=Final.NOT_SET, payload=SAMPLE_PACKET)
955
956        assertThat(cert_channel).emits(
957            L2capMatchers.SFrame(req_seq=1, f=Final.NOT_SET, s=SupervisoryFunction.REJECT, p=Poll.NOT_SET))
958
959        for i in range(1, tx_window_size):
960            cert_channel.send_i_frame(tx_seq=i, req_seq=0, f=Final.NOT_SET, payload=SAMPLE_PACKET)
961        assertThat(cert_channel).emits(
962            L2capMatchers.SFrame(req_seq=i + 1, f=Final.NOT_SET, s=SupervisoryFunction.RECEIVER_READY))
963
964    @metadata(pts_test_id="L2CAP/ERM/BV-18-C", pts_test_name="Receive S-Frame [RR] Final Bit = 1")
965    def test_receive_s_frame_rr_final_bit_set(self):
966        """
967        Verify the IUT will retransmit any previously sent I-frames
968        unacknowledged by receipt of an S-Frame [RR] with the Final Bit set
969        """
970        self._setup_link_from_cert()
971
972        config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, retransmission_time_out=1500)
973
974        (dut_channel, cert_channel) = self._open_channel_from_cert(
975            mode=RetransmissionFlowControlMode.ERTM, req_config_options=config, rsp_config_options=config)
976
977        dut_channel.send(b'abc')
978
979        assertThat(cert_channel).emits(L2capMatchers.SFrame(p=l2cap_packets.Poll.POLL))
980
981        cert_channel.send_s_frame(req_seq=0, f=Final.POLL_RESPONSE)
982        assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0))
983
984    @metadata(pts_test_id="L2CAP/ERM/BV-19-C", pts_test_name="Receive I-Frame Final Bit = 1")
985    def test_receive_i_frame_final_bit_set(self):
986        """
987        Verify the IUT will retransmit any previously sent I-frames
988        unacknowledged by receipt of an I-frame with the final bit set
989        """
990        self._setup_link_from_cert()
991
992        config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, retransmission_time_out=1500)
993        (dut_channel, cert_channel) = self._open_channel_from_cert(
994            mode=RetransmissionFlowControlMode.ERTM, req_config_options=config, rsp_config_options=config)
995
996        dut_channel.send(b'abc')
997        assertThat(cert_channel).emits(L2capMatchers.SFrame(p=Poll.POLL))
998
999        cert_channel.send_i_frame(tx_seq=0, req_seq=0, f=Final.POLL_RESPONSE, payload=SAMPLE_PACKET)
1000        assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0))
1001
1002    @metadata(pts_test_id="L2CAP/ERM/BV-20-C", pts_test_name="Enter Remote Busy Condition")
1003    def test_receive_rnr(self):
1004        """
1005        Verify the IUT will not retransmit any I-frames when it receives a
1006        remote busy indication from the Lower Tester (S-frame [RNR])
1007        """
1008        self._setup_link_from_cert()
1009
1010        config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, retransmission_time_out=1500)
1011        (dut_channel, cert_channel) = self._open_channel_from_cert(
1012            mode=RetransmissionFlowControlMode.ERTM, req_config_options=config, rsp_config_options=config)
1013
1014        dut_channel.send(b'abc')
1015        assertThat(cert_channel).emits(L2capMatchers.SFrame(p=l2cap_packets.Poll.POLL))
1016
1017        cert_channel.send_s_frame(req_seq=0, s=SupervisoryFunction.RECEIVER_NOT_READY, f=Final.POLL_RESPONSE)
1018        assertThat(cert_channel).emitsNone(L2capMatchers.IFrame(tx_seq=0))
1019
1020    @metadata(pts_test_id="L2CAP/ERM/BV-22-C", pts_test_name="Exit Local Busy Condition")
1021    def test_exit_local_busy_condition(self):
1022        """
1023        Verify the IUT sends an S-frame [RR] Poll = 1 when the local busy condition is cleared
1024        NOTE: In GD stack, we enter local busy condition if client doesn't dequeue
1025        and packets are accumulating in buffer
1026        """
1027        self._setup_link_from_cert()
1028
1029        config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, tx_window_size=10)
1030
1031        (dut_channel, cert_channel) = self._open_channel_from_cert(
1032            mode=RetransmissionFlowControlMode.ERTM,
1033            fcs=FcsType.NO_FCS,
1034            req_config_options=config,
1035            rsp_config_options=config)
1036
1037        dut_channel.send(b'abc')
1038        assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0, payload=b'abc'))
1039        cert_channel.send_i_frame(tx_seq=0, req_seq=1, payload=SAMPLE_PACKET)
1040
1041        dut_channel.set_traffic_paused(True)
1042
1043        # Allow 1 additional packet in channel queue buffer
1044        buffer_size = self.dut_l2cap.get_channel_queue_buffer_size() + 1
1045
1046        for i in range(buffer_size):
1047            cert_channel.send_i_frame(tx_seq=i + 1, req_seq=1, payload=SAMPLE_PACKET)
1048            assertThat(cert_channel).emits(L2capMatchers.SFrame(s=l2cap_packets.SupervisoryFunction.RECEIVER_READY))
1049
1050        cert_channel.send_i_frame(tx_seq=buffer_size + 1, req_seq=1, payload=SAMPLE_PACKET)
1051        assertThat(cert_channel).emits(L2capMatchers.SFrame(s=l2cap_packets.SupervisoryFunction.RECEIVER_NOT_READY))
1052
1053        dut_channel.set_traffic_paused(False)
1054        assertThat(cert_channel).emits(
1055            L2capMatchers.SFrame(s=l2cap_packets.SupervisoryFunction.RECEIVER_READY, p=l2cap_packets.Poll.POLL))
1056        cert_channel.send_s_frame(1, f=l2cap_packets.Final.POLL_RESPONSE)
1057
1058    @metadata(pts_test_id="L2CAP/ERM/BV-23-C", pts_test_name="Transmit I-Frames using SAR")
1059    def test_transmit_i_frames_using_sar(self):
1060        """
1061        Verify the IUT can send correctly formatted sequential I-frames with
1062        valid values for the enhanced control fields (SAR, F-bit, ReqSeq,
1063        TxSeq) when performing SAR.
1064        """
1065        self._setup_link_from_cert()
1066
1067        config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, mps=11)
1068        (dut_channel, cert_channel) = self._open_channel_from_cert(
1069            mode=RetransmissionFlowControlMode.ERTM, req_config_options=config, rsp_config_options=config)
1070
1071        dut_channel.send(b'abcabcabc')
1072        # First IFrame should contain SDU size after control field
1073        assertThat(cert_channel).emits(
1074            L2capMatchers.IFrameStart(tx_seq=0, payload=b'abc'), L2capMatchers.IFrame(tx_seq=1, payload=b'abc'),
1075            L2capMatchers.IFrame(tx_seq=2, payload=b'abc')).inOrder()
1076
1077        cert_channel.send_s_frame(req_seq=3, s=SupervisoryFunction.RECEIVER_READY)
1078
1079        dut_channel.send(b'defdefdef')
1080        # First IFrame should contain SDU size after control field
1081        assertThat(cert_channel).emits(
1082            L2capMatchers.IFrameStart(tx_seq=3, payload=b'def'), L2capMatchers.IFrame(tx_seq=4, payload=b'def'),
1083            L2capMatchers.IFrame(tx_seq=5, payload=b'def')).inOrder()
1084
1085    @metadata(pts_test_id="L2CAP/ERM/BI-01-C", pts_test_name="S-Frame [REJ] Lost or Corrupted")
1086    def test_sent_rej_lost(self):
1087        """
1088        Verify the IUT can handle receipt of an S-=frame [RR] Poll = 1 if the
1089        S-frame [REJ] sent from the IUT is lost
1090        """
1091        self._setup_link_from_cert()
1092        ertm_tx_window_size = 5
1093
1094        config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, tx_window_size=ertm_tx_window_size)
1095        (dut_channel, cert_channel) = self._open_channel_from_cert(
1096            mode=RetransmissionFlowControlMode.ERTM, req_config_options=config, rsp_config_options=config)
1097
1098        cert_channel.send_i_frame(tx_seq=0, req_seq=0, payload=SAMPLE_PACKET)
1099        assertThat(cert_channel).emits(L2capMatchers.SFrame(req_seq=1))
1100
1101        cert_channel.send_i_frame(tx_seq=ertm_tx_window_size - 1, req_seq=0, payload=SAMPLE_PACKET)
1102        assertThat(cert_channel).emits(L2capMatchers.SFrame(s=SupervisoryFunction.REJECT))
1103
1104        cert_channel.send_s_frame(req_seq=0, p=Poll.POLL)
1105
1106        assertThat(cert_channel).emits(L2capMatchers.SFrame(req_seq=1, f=l2cap_packets.Final.POLL_RESPONSE))
1107        for i in range(1, ertm_tx_window_size):
1108            cert_channel.send_i_frame(tx_seq=i, req_seq=0, payload=SAMPLE_PACKET)
1109            assertThat(cert_channel).emits(L2capMatchers.SFrame(req_seq=i + 1))
1110
1111    @metadata(pts_test_id="L2CAP/ERM/BI-03-C", pts_test_name="Handle Duplicate S-Frame [SREJ]")
1112    def test_handle_duplicate_srej(self):
1113        """
1114        Verify the IUT will only retransmit the requested I-frame once after
1115        receiving a duplicate SREJ
1116        """
1117        self._setup_link_from_cert()
1118
1119        (dut_channel, cert_channel) = self._open_channel_from_cert(
1120            mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.NO_FCS)
1121
1122        dut_channel.send(b'abc')
1123        dut_channel.send(b'abc')
1124        assertThat(cert_channel).emits(
1125            L2capMatchers.IFrame(tx_seq=0), L2capMatchers.IFrame(tx_seq=1),
1126            L2capMatchers.SFrame(p=Poll.POLL)).inOrder()
1127
1128        cert_channel.send_s_frame(req_seq=0, s=SupervisoryFunction.SELECT_REJECT)
1129        assertThat(cert_channel).emitsNone(timeout=timedelta(seconds=0.5))
1130
1131        cert_channel.send_s_frame(req_seq=0, s=SupervisoryFunction.SELECT_REJECT, f=Final.POLL_RESPONSE)
1132        assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0))
1133
1134    @metadata(
1135        pts_test_id="L2CAP/ERM/BI-04-C",
1136        pts_test_name="Handle Receipt of S-Frame [REJ] and S-Frame "
1137        "[RR, F=1] that Both Require Retransmission of the "
1138        "Same I-Frames")
1139    def test_handle_receipt_rej_and_rr_with_f_set(self):
1140        """
1141        Verify the IUT will only retransmit the requested I-frames once after
1142        receiving an S-frame [REJ] followed by an S-frame [RR] with the Final
1143        bit set that indicates the same I-frames should be retransmitted
1144        """
1145        self._setup_link_from_cert()
1146
1147        (dut_channel, cert_channel) = self._open_channel_from_cert(
1148            mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.NO_FCS)
1149
1150        dut_channel.send(b'abc')
1151        dut_channel.send(b'abc')
1152        assertThat(cert_channel).emits(
1153            L2capMatchers.IFrame(tx_seq=0),
1154            L2capMatchers.IFrame(tx_seq=1),
1155            L2capMatchers.SFrame(p=l2cap_packets.Poll.POLL)).inOrder()
1156
1157        cert_channel.send_s_frame(req_seq=0, s=SupervisoryFunction.REJECT)
1158        assertThat(cert_channel).emitsNone(timeout=timedelta(seconds=0.5))
1159
1160        # Send RR with F set
1161        cert_channel.send_s_frame(req_seq=0, s=SupervisoryFunction.REJECT, f=Final.POLL_RESPONSE)
1162        assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0))
1163        assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=1))
1164
1165    @metadata(
1166        pts_test_id="L2CAP/ERM/BI-05-C",
1167        pts_test_name="Handle receipt of S-Frame [REJ] and I-Frame [F=1] "
1168        "that Both Require Retransmission of the Same "
1169        "I-Frames")
1170    def test_handle_rej_and_i_frame_with_f_set(self):
1171        """
1172        Verify the IUT will only retransmit the requested I-frames once after
1173        receiving an S-frame [REJ] followed by an I-frame with the Final bit
1174        set that indicates the same I-frames should be retransmitted
1175        """
1176        self._setup_link_from_cert()
1177
1178        (dut_channel, cert_channel) = self._open_channel_from_cert(
1179            mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.NO_FCS)
1180
1181        dut_channel.send(b'abc')
1182        dut_channel.send(b'abc')
1183        assertThat(cert_channel).emits(
1184            L2capMatchers.IFrame(tx_seq=0),
1185            L2capMatchers.IFrame(tx_seq=1),
1186            L2capMatchers.SFrame(p=l2cap_packets.Poll.POLL)).inOrder()
1187
1188        # Send SREJ with F not set
1189        cert_channel.send_s_frame(req_seq=0, s=SupervisoryFunction.SELECT_REJECT)
1190        assertThat(cert_channel).emitsNone(timeout=timedelta(seconds=0.5))
1191
1192        cert_channel.send_i_frame(tx_seq=0, req_seq=0, f=Final.POLL_RESPONSE, payload=SAMPLE_PACKET)
1193
1194        assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0))
1195        assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=1))
1196
1197    @metadata(
1198        pts_test_id="L2CAP/CMC/BV-01-C", pts_test_name="IUT Initiated Configuration of Enhanced "
1199        "Retransmission Mode")
1200    def test_initiated_configuration_request_ertm(self):
1201        """
1202        Verify the IUT can send a Configuration Request command containing the
1203        F&EC option that specifies Enhanced Retransmission Mode
1204        """
1205        self._setup_link_from_cert()
1206
1207        self._open_unconfigured_channel_from_cert(scid=0x41, psm=0x33, mode=RetransmissionFlowControlMode.ERTM)
1208
1209        assertThat(self.cert_l2cap.get_control_channel()).emits(L2capMatchers.ConfigurationRequestWithErtm())
1210
1211    @metadata(
1212        pts_test_id="L2CAP/CMC/BV-02-C",
1213        pts_test_name="Lower Tester Initiated Configuration of Enhanced "
1214        "Retransmission Mode")
1215    def test_respond_configuration_request_ertm(self):
1216        """
1217        Verify the IUT can accept a Configuration Request from the Lower Tester
1218        containing an F&EC option that specifies Enhanced Retransmission Mode
1219        """
1220        self._setup_link_from_cert()
1221
1222        self._open_channel_from_dut(psm=0x33, mode=RetransmissionFlowControlMode.ERTM)
1223
1224    @metadata(
1225        pts_test_id="L2CAP/CMC/BV-12-C",
1226        pts_test_name="ERTM Not Supported by Lower Tester for Mandatory "
1227        "ERTM channel")
1228    def test_respond_not_support_ertm_when_using_mandatory_ertm(self):
1229        """
1230        The IUT is initiating connection of an L2CAP channel that mandates use
1231        of ERTM. Verify the IUT will not attempt to configure the connection to
1232        ERTM if the Lower Tester has not indicated support for ERTM in the
1233        Information Response [Extended Features]
1234        """
1235        self._setup_link_from_cert()
1236        self.cert_l2cap.claim_ertm_unsupported()
1237        dut_channel_future = self.dut_l2cap.connect_dynamic_channel_to_cert(
1238            psm=0x33, mode=RetransmissionFlowControlMode.ERTM)
1239        assertThat(self.cert_l2cap.get_control_channel()).emitsNone(L2capMatchers.ConnectionRequest(0x33))
1240
1241    @metadata(
1242        pts_test_id="L2CAP/CMC/BI-01-C",
1243        pts_test_name="Failed Configuration of Enhanced Retransmission "
1244        "Mode when use of the Mode is Mandatory]")
1245    def test_config_respond_basic_mode_when_using_mandatory_ertm(self):
1246        """
1247        When creating a connection for a PSM that mandates the use of ERTM
1248        verify the IUT can handle receipt (close the channel in accordance with
1249        the specification) of a Configure Response indicating the peer L2CAP
1250        entity doesn’t wish to use Enhanced Retransmission Mode (Configure
1251        Response Result = Reject Unacceptable Parameters)
1252        """
1253
1254        self._setup_link_from_cert()
1255
1256        (dut_channel, cert_channel) = self._open_channel_from_cert(
1257            mode=RetransmissionFlowControlMode.ERTM,
1258            req_config_options=CertL2cap.config_option_ertm(),
1259            rsp_config_options=CertL2cap.config_option_basic_explicit())
1260
1261        cert_channel.verify_disconnect_request()
1262
1263    @metadata(
1264        pts_test_id="L2CAP/CMC/BI-02-C",
1265        pts_test_name="Configuration Mode mismatch when use of Enhanced "
1266        "Retransmission Mode is Mandatory")
1267    def test_config_request_basic_mode_when_using_mandatory_ertm(self):
1268        """
1269        When creating a connection for a PSM that mandates the use of ERTM,
1270        verify the IUT will close the channel if the Lower Tester attempts to
1271        configure Basic Mode.
1272        """
1273        self._setup_link_from_cert()
1274
1275        (dut_channel, cert_channel) = self._open_unconfigured_channel_from_cert(mode=RetransmissionFlowControlMode.ERTM)
1276        cert_channel.send_configure_request(CertL2cap.config_option_basic_explicit())
1277        cert_channel.verify_disconnect_request()
1278
1279    def test_initiate_connection_for_security(self):
1280        """
1281        This will test the PyL2cap API for initiating a connection for security
1282        via the security api
1283        """
1284        self.dut.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True))
1285        self.cert.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True))
1286        self.dut_l2cap.initiate_connection_for_security()
1287        self.cert_l2cap.accept_incoming_connection()
1288        self.dut_l2cap.verify_security_connection()
1289