1# 2# Copyright 2019 - The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16from datetime import timedelta 17 18from mobly import asserts 19 20from bluetooth_packets_python3 import l2cap_packets 21from bluetooth_packets_python3 import RawBuilder 22from bluetooth_packets_python3.l2cap_packets import FcsType 23from bluetooth_packets_python3.l2cap_packets import Final 24from bluetooth_packets_python3.l2cap_packets import Poll 25from bluetooth_packets_python3.l2cap_packets import SegmentationAndReassembly 26from bluetooth_packets_python3.l2cap_packets import SupervisoryFunction 27from cert.behavior import when, anything, wait_until 28from cert.event_stream import EventStream 29from cert.gd_base_test import GdBaseTestClass 30from cert.matchers import L2capMatchers 31from cert.metadata import metadata 32from cert.py_l2cap import PyL2cap 33from cert.truth import assertThat 34from facade import common_pb2 35from google.protobuf import empty_pb2 as empty_proto 36from l2cap.classic.cert.cert_l2cap import CertL2cap 37from l2cap.classic.facade_pb2 import RetransmissionFlowControlMode 38from neighbor.facade import facade_pb2 as neighbor_facade 39 40# Assemble a sample packet. 41SAMPLE_PACKET_DATA = b"\x19\x26\x08\x17" 42SAMPLE_PACKET = RawBuilder([x for x in SAMPLE_PACKET_DATA]) 43 44 45class L2capTestBase(GdBaseTestClass): 46 47 def setup_class(self): 48 super().setup_class(dut_module='L2CAP', cert_module='HCI_INTERFACES') 49 50 def setup_test(self): 51 super().setup_test() 52 53 self.dut_address = self.dut.hci_controller.GetMacAddressSimple() 54 cert_address = common_pb2.BluetoothAddress( 55 address=self.cert.controller_read_only_property.ReadLocalAddress(empty_proto.Empty()).address) 56 57 self.dut_l2cap = PyL2cap(self.dut, cert_address) 58 self.cert_l2cap = CertL2cap(self.cert) 59 60 def teardown_test(self): 61 self.cert_l2cap.close() 62 self.dut_l2cap.close() 63 super().teardown_test() 64 65 def _setup_link_from_cert(self): 66 self.dut.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True)) 67 self.cert_l2cap.connect_acl(self.dut_address) 68 69 def _open_unconfigured_channel_from_cert(self, 70 signal_id=1, 71 scid=0x0101, 72 psm=0x33, 73 mode=RetransmissionFlowControlMode.BASIC, 74 fcs=None): 75 76 dut_channel = self.dut_l2cap.register_dynamic_channel(psm, mode) 77 cert_channel = self.cert_l2cap.open_channel(signal_id, psm, scid, fcs=fcs) 78 79 return (dut_channel, cert_channel) 80 81 def _open_channel_from_cert(self, 82 signal_id=1, 83 scid=0x0101, 84 psm=0x33, 85 mode=RetransmissionFlowControlMode.BASIC, 86 fcs=None, 87 req_config_options=None, 88 rsp_config_options=None): 89 request_matcher = L2capMatchers.ConfigurationRequestView(scid) 90 if rsp_config_options is not None: 91 when(self.cert_l2cap).on_config_req(request_matcher).then().send_configuration_response( 92 options=rsp_config_options) 93 if rsp_config_options is None and fcs is not None: 94 when(self.cert_l2cap).on_config_req(request_matcher).then().send_configuration_response( 95 options=CertL2cap.config_option_ertm(fcs=fcs)) 96 97 (dut_channel, cert_channel) = self._open_unconfigured_channel_from_cert(signal_id, scid, psm, mode, fcs) 98 if req_config_options is None: 99 req_config_options = CertL2cap.config_option_ertm( 100 fcs=fcs) if mode == RetransmissionFlowControlMode.ERTM else [] 101 102 cert_channel.send_configure_request(req_config_options) 103 104 cert_channel.verify_configuration_response() 105 106 wait_until(self.cert_l2cap).on_config_req(request_matcher).times(1) 107 108 assertThat(cert_channel.is_configured()).isTrue() 109 110 return (dut_channel, cert_channel) 111 112 def _open_channel_from_dut(self, psm=0x33, mode=RetransmissionFlowControlMode.BASIC): 113 dut_channel_future = self.dut_l2cap.connect_dynamic_channel_to_cert(psm, mode) 114 cert_channel = self.cert_l2cap.verify_and_respond_open_channel_from_remote(psm) 115 dut_channel = dut_channel_future.get_channel() 116 117 cert_channel.verify_configuration_request_and_respond() 118 outgoing_config = [] 119 if mode == RetransmissionFlowControlMode.ERTM: 120 outgoing_config = CertL2cap.config_option_ertm() 121 cert_channel.send_configure_request(outgoing_config) 122 cert_channel.verify_configuration_response() 123 124 return (dut_channel, cert_channel) 125 126 127class L2capTest(L2capTestBase): 128 129 def test_connect_dynamic_channel_and_send_data(self): 130 self._setup_link_from_cert() 131 132 (dut_channel, cert_channel) = self._open_channel_from_cert() 133 134 dut_channel.send(b'abc') 135 assertThat(cert_channel).emits(L2capMatchers.Data(b'abc')) 136 137 def test_receive_packet_from_unknown_channel(self): 138 self._setup_link_from_cert() 139 140 (dut_channel, cert_channel) = self._open_channel_from_cert(scid=0x41, psm=0x33) 141 142 i_frame = l2cap_packets.EnhancedInformationFrameBuilder( 143 0x99, 0, Final.NOT_SET, 1, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET) 144 self.cert_l2cap.send_acl(i_frame) 145 assertThat(cert_channel).emitsNone(L2capMatchers.SFrame(req_seq=4), timeout=timedelta(seconds=1)) 146 147 def test_open_two_channels(self): 148 self._setup_link_from_cert() 149 150 self._open_channel_from_cert(signal_id=1, scid=0x41, psm=0x41) 151 self._open_channel_from_cert(signal_id=2, scid=0x43, psm=0x43) 152 153 def test_connect_and_send_data_ertm_no_segmentation(self): 154 self._setup_link_from_cert() 155 156 (dut_channel, cert_channel) = self._open_channel_from_cert( 157 mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.NO_FCS) 158 159 dut_channel.send(b'abc' * 34) 160 assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0, payload=b'abc' * 34)) 161 162 cert_channel.send_i_frame(tx_seq=0, req_seq=1, payload=SAMPLE_PACKET) 163 # todo verify received? 164 165 @metadata(pts_test_id="L2CAP/COS/CED/BV-01-C", pts_test_name="Request Connection") 166 def test_basic_operation_request_connection(self): 167 """ 168 Verify that the IUT is able to request the connection establishment for 169 an L2CAP data channel and initiate the configuration procedure. 170 """ 171 self._setup_link_from_cert() 172 (dut_channel, cert_channel) = self._open_channel_from_dut() 173 174 @metadata(pts_test_id="L2CAP/COS/CED/BV-03-C", pts_test_name="Send data") 175 def test_send_data(self): 176 """ 177 Verify that the IUT is able to send DATA 178 """ 179 self._setup_link_from_cert() 180 181 (dut_channel, cert_channel) = self._open_channel_from_cert() 182 dut_channel.send(b'hello') 183 assertThat(cert_channel).emits(L2capMatchers.Data(b'hello')) 184 185 @metadata(pts_test_id="L2CAP/COS/CED/BV-04-C", pts_test_name="Disconnect") 186 def test_disconnect(self): 187 """ 188 Verify that the IUT is able to disconnect the data channel 189 """ 190 self._setup_link_from_cert() 191 192 (dut_channel, cert_channel) = self._open_channel_from_cert() 193 dut_channel.close_channel() 194 cert_channel.verify_disconnect_request() 195 196 @metadata(pts_test_id="L2CAP/COS/CED/BV-05-C", pts_test_name="Accept connection") 197 def test_accept_connection(self): 198 """ 199 Also verify that DUT can send 48 bytes PDU (minimal MTU) 200 """ 201 self._setup_link_from_cert() 202 203 (dut_channel, cert_channel) = self._open_channel_from_cert() 204 dut_channel.send(b'a' * 48) 205 assertThat(cert_channel).emits(L2capMatchers.Data(b'a' * 48)) 206 207 @metadata(pts_test_id="L2CAP/COS/CED/BV-07-C", pts_test_name="Accept Disconnect") 208 def test_accept_disconnect(self): 209 """ 210 Verify that the IUT is able to respond to the request to disconnect the 211 data channel 212 """ 213 self._setup_link_from_cert() 214 215 (dut_channel, cert_channel) = self._open_channel_from_cert() 216 cert_channel.disconnect_and_verify() 217 218 @metadata(pts_test_id="L2CAP/COS/CED/BV-08-C", pts_test_name="Disconnect on Timeout") 219 def test_disconnect_on_timeout(self): 220 """ 221 Verify that the IUT disconnects the data channel and shuts down this 222 channel if no response occurs 223 """ 224 self._setup_link_from_cert() 225 226 (dut_channel, cert_channel) = self._open_unconfigured_channel_from_cert() 227 228 assertThat(self.cert_l2cap.get_control_channel()).emitsNone(L2capMatchers.ConfigurationResponse()) 229 # TODO: Verify that IUT sends disconnect request (not mandated) 230 231 @metadata(pts_test_id="L2CAP/COS/CED/BV-09-C", pts_test_name="Receive Multi-Command Packet") 232 def test_receive_multi_command_packet(self): 233 """ 234 Verify that the IUT is able to receive more than one signaling command in one L2CAP 235 packet. 236 """ 237 self._setup_link_from_cert() 238 239 psm = 0x33 240 self.dut_l2cap.connect_dynamic_channel_to_cert(psm) 241 self.cert_l2cap.verify_and_respond_open_channel_from_remote_and_send_config_req(psm) 242 243 assertThat(self.cert_l2cap.get_control_channel()).emits(L2capMatchers.ConfigurationResponse()) 244 245 @metadata(pts_test_id="L2CAP/COS/CED/BV-11-C", pts_test_name="Configure MTU size") 246 def test_configure_mtu_size(self): 247 """ 248 Verify that the IUT is able to configure the supported MTU size 249 """ 250 self._setup_link_from_cert() 251 (dut_channel, cert_channel) = self._open_unconfigured_channel_from_cert() 252 cert_channel.send_configure_request(CertL2cap.config_option_mtu_explicit(672)) 253 cert_channel.verify_configuration_request_and_respond() 254 # TODO: Probably remove verify_configuration_request_and_respond 255 256 @metadata(pts_test_id="L2CAP/COS/CFD/BV-01-C", pts_test_name="Continuation Flag") 257 def test_continuation_flag(self): 258 """ 259 Verify the IUT is able to receive configuration requests that have the 260 continuation flag set 261 """ 262 self._setup_link_from_cert() 263 264 (dut_channel, cert_channel) = self._open_unconfigured_channel_from_cert() 265 266 # Send configuration request with CONTINUE 267 mtu_opt = l2cap_packets.MtuConfigurationOption() 268 mtu_opt.mtu = 0x1234 269 cert_channel.send_configure_request([mtu_opt], 2, l2cap_packets.Continuation.CONTINUE) 270 271 flush_timeout_option = l2cap_packets.FlushTimeoutConfigurationOption() 272 flush_timeout_option.flush_timeout = 65535 273 cert_channel.send_configure_request([flush_timeout_option], 3, l2cap_packets.Continuation.END) 274 275 assertThat(self.cert_l2cap.get_control_channel()).emits(L2capMatchers.ConfigurationResponse(), at_least_times=2) 276 277 @metadata(pts_test_id="L2CAP/COS/CFD/BV-02-C", pts_test_name="Negotiation with Reject") 278 def test_retry_config_after_rejection(self): 279 """ 280 Verify that the IUT is able to perform negotiation while the Lower 281 Tester rejects the proposed configuration parameter values 282 """ 283 self._setup_link_from_cert() 284 scid = 0x41 285 when(self.cert_l2cap).on_config_req( 286 L2capMatchers.ConfigurationRequestView(scid)).then().send_configuration_response( 287 result=l2cap_packets.ConfigurationResponseResult.UNACCEPTABLE_PARAMETERS, 288 options=CertL2cap.config_option_mtu_explicit(200)).send_configuration_response(options=[]) 289 (dut_channel, cert_channel) = self._open_unconfigured_channel_from_cert(scid=scid) 290 291 assertThat(self.cert_l2cap.get_control_channel()).emits(L2capMatchers.ConfigurationRequest(), at_least_times=2) 292 293 @metadata(pts_test_id="L2CAP/COS/CFD/BV-03-C", pts_test_name="Send Requested Options") 294 def test_send_requested_options(self): 295 """ 296 Verify that the IUT can receive a configuration request with no options 297 and send the requested options to the Lower Tester 298 """ 299 self._setup_link_from_cert() 300 (dut_channel, cert_channel) = self._open_channel_from_cert(scid=0x41, psm=0x33) 301 302 # TODO(hsz) implement me! 303 304 @metadata(pts_test_id="L2CAP/COS/CFD/BV-08-C", pts_test_name="Non-blocking Config Response") 305 def test_non_blocking_config_response(self): 306 """ 307 Verify that the IUT does not block transmitting L2CAP_ConfigRsp while 308 waiting for L2CAP_ConfigRsp from the Lower Tester 309 """ 310 self._setup_link_from_cert() 311 312 (dut_channel, cert_channel) = self._open_unconfigured_channel_from_cert() 313 314 cert_channel.send_configure_request([]) 315 cert_channel.verify_configuration_response() 316 cert_channel.verify_configuration_request_and_respond() 317 318 # TODO(hsz) implement me! 319 320 @metadata(pts_test_id="L2CAP/COS/CFD/BV-09-C", pts_test_name="Mandatory 48 Byte MTU") 321 def test_mandatory_48_byte_mtu(self): 322 """ 323 Verify that the IUT can support mandatory 48 byte MTU 324 """ 325 self._setup_link_from_cert() 326 (dut_channel, 327 cert_channel) = self._open_channel_from_cert(req_config_options=CertL2cap.config_option_mtu_explicit(48)) 328 329 dut_channel.send(b"a" * 44) 330 assertThat(cert_channel).emits(L2capMatchers.Data(b"a" * 44)) 331 332 @metadata(pts_test_id="L2CAP/COS/CFD/BV-11-C", pts_test_name="Negotiation of Unsupported Parameter") 333 def test_negotiation_of_unsupported_parameter(self): 334 """ 335 Verify that the IUT can negotiate when the Lower Tester proposes an unsupported configuration 336 parameter value. 337 """ 338 self._setup_link_from_cert() 339 (dut_channel, cert_channel) = self._open_unconfigured_channel_from_cert() 340 341 cert_channel.send_configure_request(CertL2cap.config_option_mtu_explicit(20)) 342 # Invalid because minimum is 48 343 344 cert_channel.verify_configuration_response(l2cap_packets.ConfigurationResponseResult.UNACCEPTABLE_PARAMETERS) 345 346 @metadata(pts_test_id="L2CAP/COS/CFD/BV-12-C", pts_test_name="Unknown Option Response") 347 def test_config_unknown_options_with_hint(self): 348 """ 349 Verify that the IUT can give the appropriate error code when the Lower 350 Tester proposes any number of unknown options that are optional 351 NOTE: In GD stack, ExtendedWindowSizeOption in unsupported 352 """ 353 self._setup_link_from_cert() 354 355 (dut_channel, cert_channel) = self._open_unconfigured_channel_from_cert() 356 357 unknown_opt_hint = l2cap_packets.ExtendedWindowSizeOption() 358 unknown_opt_hint.max_window_size = 20 359 unknown_opt_hint.is_hint = l2cap_packets.ConfigurationOptionIsHint.OPTION_IS_A_HINT 360 361 for i in range(10): 362 cert_channel.send_configure_request([unknown_opt_hint] * i) 363 cert_channel.verify_configuration_response(l2cap_packets.ConfigurationResponseResult.SUCCESS) 364 365 @metadata(pts_test_id="L2CAP/COS/CFD/BV-14-C", pts_test_name="Unknown Mandatory Options Request") 366 def test_unknown_mandatory_options_request(self): 367 """ 368 Verify that the IUT can give the appropriate error code when the Lower 369 Tester proposes any number of unknown options where at least one is 370 mandatory. 371 Note: GD stack doesn't support extended window size. For other stacks, 372 we may need to use some other config option 373 """ 374 self._setup_link_from_cert() 375 376 (dut_channel, cert_channel) = self._open_unconfigured_channel_from_cert(scid=0x41, psm=0x33) 377 378 unknown_opt = l2cap_packets.ExtendedWindowSizeOption() 379 unknown_opt.max_window_size = 20 380 381 unknown_opt_hint = l2cap_packets.ExtendedWindowSizeOption() 382 unknown_opt_hint.max_window_size = 20 383 unknown_opt_hint.is_hint = l2cap_packets.ConfigurationOptionIsHint.OPTION_IS_A_HINT 384 385 configuration_option_attempts = [[unknown_opt], [unknown_opt, unknown_opt_hint], [ 386 unknown_opt, unknown_opt, unknown_opt 387 ], [unknown_opt, unknown_opt_hint, unknown_opt_hint, 388 unknown_opt], [unknown_opt_hint, unknown_opt_hint, unknown_opt_hint, unknown_opt_hint, unknown_opt], [ 389 unknown_opt, unknown_opt_hint, unknown_opt_hint, unknown_opt, unknown_opt_hint, unknown_opt_hint 390 ], [unknown_opt, unknown_opt, unknown_opt, unknown_opt, unknown_opt, unknown_opt, unknown_opt], [ 391 unknown_opt_hint, unknown_opt_hint, unknown_opt_hint, unknown_opt_hint, unknown_opt_hint, 392 unknown_opt_hint, unknown_opt_hint, unknown_opt 393 ], [ 394 unknown_opt_hint, unknown_opt_hint, unknown_opt_hint, unknown_opt_hint, unknown_opt_hint, 395 unknown_opt_hint, unknown_opt_hint, unknown_opt, unknown_opt 396 ], [ 397 unknown_opt_hint, unknown_opt_hint, unknown_opt_hint, unknown_opt_hint, unknown_opt_hint, 398 unknown_opt_hint, unknown_opt_hint, unknown_opt_hint, unknown_opt_hint, unknown_opt 399 ]] 400 401 for option_list in configuration_option_attempts: 402 cert_channel.send_configure_request(option_list) 403 cert_channel.verify_configuration_response(l2cap_packets.ConfigurationResponseResult.UNKNOWN_OPTIONS) 404 405 @metadata(pts_test_id="L2CAP/COS/ECH/BV-01-C", pts_test_name="Respond to Echo Request") 406 def test_respond_to_echo_request(self): 407 """ 408 Verify that the IUT responds to an echo request. 409 """ 410 self._setup_link_from_cert() 411 echo_request = l2cap_packets.EchoRequestBuilder(100, RawBuilder([1, 2, 3])) 412 self.cert_l2cap.get_control_channel().send(echo_request) 413 414 assertThat(self.cert_l2cap.get_control_channel()).emits(L2capMatchers.EchoResponse()) 415 416 @metadata(pts_test_id="L2CAP/COS/CED/BI-01-C", pts_test_name="Reject Unknown Command") 417 def test_reject_unknown_command(self): 418 """ 419 Verify that the IUT rejects an unknown signaling command 420 """ 421 self._setup_link_from_cert() 422 423 # Command code ff, Signal id 01, size 0000 424 invalid_command_packet = RawBuilder([0xff, 0x01, 0x00, 0x00]) 425 self.cert_l2cap.get_control_channel().send(invalid_command_packet) 426 427 assertThat(self.cert_l2cap.get_control_channel()).emits(L2capMatchers.CommandReject()) 428 429 @metadata(pts_test_id="L2CAP/COS/IEX/BV-01-C", pts_test_name="Query for 1.2 Features") 430 def test_query_for_1_2_features(self): 431 """ 432 Verify that the IUT transmits an information request command to solicit 433 if the remote device supports Specification 1.2 features. 434 """ 435 self._setup_link_from_cert() 436 assertThat(self.cert_l2cap.get_control_channel()).emits( 437 L2capMatchers.InformationRequestWithType( 438 l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED)) 439 440 @metadata(pts_test_id="L2CAP/COS/IEX/BV-02-C", pts_test_name="Respond with 1.2 Features") 441 def test_respond_with_1_2_features(self): 442 """ 443 Verify that the IUT responds to an information request command 444 soliciting for Specification 1.2 features 445 """ 446 self._setup_link_from_cert() 447 control_channel = self.cert_l2cap.get_control_channel() 448 449 control_channel.send_extended_features_request() 450 451 assertThat(control_channel).emits(L2capMatchers.InformationResponseExtendedFeatures()) 452 453 @metadata( 454 pts_test_id="L2CAP/EXF/BV-01-C", 455 pts_test_name="Extended Features Information Response for " 456 "Enhanced Retransmission Mode") 457 def test_extended_feature_info_response_ertm(self): 458 """ 459 Verify the IUT can format an Information Response for the information 460 type of Extended Features that correctly identifies that Enhanced 461 Retransmission Mode is locally supported 462 """ 463 self._setup_link_from_cert() 464 control_channel = self.cert_l2cap.get_control_channel() 465 466 control_channel.send_extended_features_request() 467 468 assertThat(control_channel).emits(L2capMatchers.InformationResponseExtendedFeatures(supports_ertm=True)) 469 470 @metadata( 471 pts_test_id="L2CAP/EXF/BV-02-C", pts_test_name="Extended Features Information Response for " 472 "Streaming Mode") 473 def test_extended_feature_info_response_streaming(self): 474 """ 475 Verify the IUT can format an Information Response for the information 476 type of Extended Features that correctly identifies that Streaming Mode 477 is locally supported 478 """ 479 asserts.skip("Streaming not supported") 480 self._setup_link_from_cert() 481 control_channel = self.cert_l2cap.get_control_channel() 482 483 control_channel.send_extended_features_request() 484 485 assertThat(control_channel).emits(L2capMatchers.InformationResponseExtendedFeatures(supports_streaming=True)) 486 487 @metadata(pts_test_id="L2CAP/EXF/BV-03-C", pts_test_name="Extended Features Information Response for FCS " "Option") 488 def test_extended_feature_info_response_fcs(self): 489 """ 490 Verify the IUT can format an Information Response for the information 491 type of Extended Features that correctly identifies that the FCS Option 492 is locally supported. 493 494 Note: This is not mandated by L2CAP Spec 495 """ 496 self._setup_link_from_cert() 497 control_channel = self.cert_l2cap.get_control_channel() 498 499 control_channel.send_extended_features_request() 500 501 assertThat(control_channel).emits(L2capMatchers.InformationResponseExtendedFeatures(supports_fcs=True)) 502 503 @metadata( 504 pts_test_id="L2CAP/EXF/BV-05-C", pts_test_name="Extended Features Information Response for Fixed " 505 "Channels") 506 def test_extended_feature_info_response_fixed_channels(self): 507 """ 508 Verify the IUT can format an Information Response for the information 509 type of Extended Features that correctly identifies that the Fixed 510 Channels option is locally supported 511 512 Note: This is not mandated by L2CAP Spec 513 """ 514 self._setup_link_from_cert() 515 control_channel = self.cert_l2cap.get_control_channel() 516 517 control_channel.send_extended_features_request() 518 519 assertThat(control_channel).emits( 520 L2capMatchers.InformationResponseExtendedFeatures(supports_fixed_channels=True)) 521 522 @metadata(pts_test_id="L2CAP/FIX/BV-01-C", pts_test_name="Fixed Channels Supported Information Request") 523 def test_fixed_channels_supported_information_request(self): 524 """ 525 Verify that the IUT can send an Information Request for the information 526 type of Fixed Channels Supported. 527 """ 528 self._setup_link_from_cert() 529 assertThat(self.cert_l2cap.get_control_channel()).emits( 530 L2capMatchers.InformationRequestWithType(l2cap_packets.InformationRequestInfoType.FIXED_CHANNELS_SUPPORTED)) 531 532 @metadata(pts_test_id="L2CAP/FOC/BV-01-C", pts_test_name="IUT Initiated Configuration of the FCS Option") 533 def test_config_channel_not_use_FCS(self): 534 """ 535 Verify the IUT can configure a channel to not use FCS in I/S-frames. 536 """ 537 self._setup_link_from_cert() 538 539 (dut_channel, cert_channel) = self._open_channel_from_cert( 540 mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.NO_FCS) 541 542 dut_channel.send(b'abc') 543 assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0, payload=b'abc')) 544 545 @metadata(pts_test_id="L2CAP/FOC/BV-02-C", pts_test_name="Lower Tester Explicitly Requests FCS should be " "Used") 546 def test_explicitly_request_use_FCS(self): 547 """ 548 Verify the IUT will include the FCS in I/S-frames if the Lower Tester 549 explicitly requests that FCS should be used 550 """ 551 self._setup_link_from_cert() 552 553 (dut_channel, cert_channel) = self._open_channel_from_cert( 554 mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.DEFAULT) 555 556 dut_channel.send(b'abc') 557 assertThat(cert_channel).emits(L2capMatchers.IFrameWithFcs(payload=b"abc")) 558 559 @metadata(pts_test_id="L2CAP/FOC/BV-03-C", pts_test_name="Lower Tester Implicitly Requests FCS should be " "Used") 560 def test_implicitly_request_use_FCS(self): 561 """ 562 Verify the IUT will include the FCS in I/S-frames if the Lower Tester 563 implicitly requests that FCS should be used. 564 """ 565 self._setup_link_from_cert() 566 567 (dut_channel, cert_channel) = self._open_channel_from_cert( 568 mode=RetransmissionFlowControlMode.ERTM, 569 fcs=FcsType.DEFAULT, 570 req_config_options=CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS)) 571 572 dut_channel.send(b'abc') 573 assertThat(cert_channel).emits(L2capMatchers.IFrameWithFcs(payload=b"abc")) 574 575 @metadata(pts_test_id="L2CAP/OFS/BV-01-C", pts_test_name="Sending I-Frames without FCS for ERTM") 576 def test_sending_i_frames_without_fcs_for_ertm(self): 577 """ 578 Verify the IUT does not include the FCS in I-frames. 579 """ 580 self._setup_link_from_cert() 581 582 (dut_channel, cert_channel) = self._open_channel_from_cert( 583 mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.NO_FCS) 584 585 dut_channel.send(b'abc') 586 assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0, payload=b"abc")) 587 588 @metadata(pts_test_id="L2CAP/OFS/BV-02-C", pts_test_name="Receiving I-Frames without FCS for ERTM") 589 def test_receiving_i_frames_without_fcs_for_ertm(self): 590 """ 591 Verify the IUT can handle I-frames that do not contain the FCS. 592 """ 593 self._setup_link_from_cert() 594 595 (dut_channel, cert_channel) = self._open_channel_from_cert( 596 mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.NO_FCS) 597 598 cert_channel.send_i_frame(tx_seq=0, req_seq=0, payload=SAMPLE_PACKET) 599 assertThat(dut_channel).emits(L2capMatchers.PacketPayloadRawData(SAMPLE_PACKET_DATA)) 600 601 @metadata(pts_test_id="L2CAP/OFS/BV-05-C", pts_test_name="Sending I-Frames with FCS for ERTM") 602 def test_sending_i_frames_with_fcs_for_ertm(self): 603 """ 604 Verify the IUT does include the FCS in I-frames. 605 """ 606 self._setup_link_from_cert() 607 608 (dut_channel, cert_channel) = self._open_channel_from_cert( 609 mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.DEFAULT) 610 611 dut_channel.send(b'abc') 612 assertThat(cert_channel).emits(L2capMatchers.IFrameWithFcs(tx_seq=0, payload=b"abc")) 613 614 @metadata(pts_test_id="L2CAP/OFS/BV-06-C", pts_test_name="Receiving I-Frames with FCS for ERTM") 615 def test_receiving_i_frames_with_fcs_for_ertm(self): 616 """ 617 Verify the IUT can handle I-frames that do contain the FCS. 618 """ 619 self._setup_link_from_cert() 620 621 (dut_channel, cert_channel) = self._open_channel_from_cert( 622 mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.DEFAULT) 623 624 cert_channel.send_i_frame(tx_seq=0, req_seq=0, payload=SAMPLE_PACKET, fcs=FcsType.DEFAULT) 625 assertThat(dut_channel).emits(L2capMatchers.PacketPayloadRawData(SAMPLE_PACKET_DATA)) 626 627 @metadata(pts_test_id="L2CAP/ERM/BV-01-C", pts_test_name="Transmit I-frames") 628 def test_transmit_i_frames(self): 629 """ 630 Verify the IUT can send correctly formatted sequential I-frames with 631 valid values for the enhanced control fields (SAR, F-bit, ReqSeq, 632 TxSeq) 633 """ 634 self._setup_link_from_cert() 635 636 (dut_channel, cert_channel) = self._open_channel_from_cert( 637 mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.NO_FCS) 638 639 dut_channel.send(b'abc') 640 assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0, payload=b"abc")) 641 642 cert_channel.send_i_frame(tx_seq=0, req_seq=1, payload=SAMPLE_PACKET) 643 644 dut_channel.send(b'abc') 645 assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=1, payload=b"abc")) 646 647 cert_channel.send_i_frame(tx_seq=1, req_seq=2, payload=SAMPLE_PACKET) 648 649 dut_channel.send(b'abc') 650 assertThat(cert_channel).emits(L2capMatchers.PartialData(b"abc")) 651 652 cert_channel.send_i_frame(tx_seq=2, req_seq=3, payload=SAMPLE_PACKET) 653 654 @metadata(pts_test_id="L2CAP/ERM/BV-02-C", pts_test_name="Receive I-Frames") 655 def test_receive_i_frames(self): 656 """ 657 Verify the IUT can receive in-sequence valid I-frames and deliver L2CAP 658 SDUs to the Upper Tester 659 """ 660 self._setup_link_from_cert() 661 662 (dut_channel, cert_channel) = self._open_channel_from_cert( 663 mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.NO_FCS) 664 665 for i in range(3): 666 cert_channel.send_i_frame(tx_seq=i, req_seq=0, payload=SAMPLE_PACKET) 667 assertThat(cert_channel).emits(L2capMatchers.SFrame(req_seq=i + 1)) 668 669 cert_channel.send_i_frame(tx_seq=3, req_seq=0, sar=SegmentationAndReassembly.START, payload=SAMPLE_PACKET) 670 assertThat(cert_channel).emits(L2capMatchers.SFrame(req_seq=4)) 671 672 cert_channel.send_i_frame( 673 tx_seq=4, req_seq=0, sar=SegmentationAndReassembly.CONTINUATION, payload=SAMPLE_PACKET) 674 assertThat(cert_channel).emits(L2capMatchers.SFrame(req_seq=5)) 675 676 cert_channel.send_i_frame(tx_seq=5, req_seq=0, sar=SegmentationAndReassembly.END, payload=SAMPLE_PACKET) 677 assertThat(cert_channel).emits(L2capMatchers.SFrame(req_seq=6)) 678 679 @metadata(pts_test_id="L2CAP/ERM/BV-03-C", pts_test_name="Acknowledging Received I-Frames") 680 def test_acknowledging_received_i_frames(self): 681 """ 682 Verify the IUT sends S-frame [RR] with the Poll bit not set to 683 acknowledge data received from the Lower Tester 684 """ 685 self._setup_link_from_cert() 686 687 (dut_channel, cert_channel) = self._open_channel_from_cert( 688 mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.NO_FCS) 689 690 for i in range(3): 691 cert_channel.send_i_frame(tx_seq=i, req_seq=0, payload=SAMPLE_PACKET) 692 assertThat(cert_channel).emits(L2capMatchers.SFrame(req_seq=i + 1)) 693 694 assertThat(cert_channel).emitsNone(L2capMatchers.SFrame(req_seq=4), timeout=timedelta(seconds=1)) 695 696 @metadata( 697 pts_test_id="L2CAP/ERM/BV-05-C", 698 pts_test_name="Resume Transmitting I-Frames when an S-Frame [RR] " 699 "is Received") 700 def test_resume_transmitting_when_received_rr(self): 701 """ 702 Verify the IUT will cease transmission of I-frames when the negotiated 703 TxWindow is full. Verify the IUT will resume transmission of I-frames 704 when an S-frame [RR] is received that acknowledges previously sent 705 I-frames 706 """ 707 self._setup_link_from_cert() 708 709 config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, tx_window_size=1) 710 711 (dut_channel, cert_channel) = self._open_channel_from_cert( 712 mode=RetransmissionFlowControlMode.ERTM, req_config_options=config, rsp_config_options=config) 713 714 dut_channel.send(b'abc') 715 dut_channel.send(b'def') 716 717 assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0, payload=b'abc')) 718 assertThat(cert_channel).emitsNone(L2capMatchers.IFrame(tx_seq=1, payload=b'def')) 719 720 cert_channel.send_s_frame(req_seq=1, f=Final.POLL_RESPONSE) 721 assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=1)) 722 723 @metadata( 724 pts_test_id="L2CAP/ERM/BV-06-C", pts_test_name="Resume Transmitting I-Frames when an I-Frame is " 725 "Received") 726 def test_resume_transmitting_when_acknowledge_previously_sent(self): 727 """ 728 Verify the IUT will cease transmission of I-frames when the negotiated 729 TxWindow is full. Verify the IUT will resume transmission of I-frames 730 when an I-frame is received that acknowledges previously sent I-frames 731 """ 732 self._setup_link_from_cert() 733 734 config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, tx_window_size=1) 735 736 (dut_channel, cert_channel) = self._open_channel_from_cert( 737 mode=RetransmissionFlowControlMode.ERTM, req_config_options=config, rsp_config_options=config) 738 739 dut_channel.send(b'abc') 740 dut_channel.send(b'def') 741 742 assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0, payload=b'abc')) 743 assertThat(cert_channel).emitsNone( 744 L2capMatchers.IFrame(tx_seq=1, payload=b'abc'), timeout=timedelta(seconds=0.5)) 745 746 cert_channel.send_i_frame(tx_seq=0, req_seq=1, payload=SAMPLE_PACKET) 747 748 assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=1, payload=b'def')) 749 750 cert_channel.send_i_frame(tx_seq=1, req_seq=2, payload=SAMPLE_PACKET) 751 752 @metadata(pts_test_id="L2CAP/ERM/BV-07-C", pts_test_name="Send S-Frame [RNR]") 753 def test_send_s_frame_rnr(self): 754 """ 755 Verify the IUT sends an S-frame [RNR] when it detects local busy condition 756 NOTE: In GD stack, we enter local busy condition if client doesn't dequeue 757 and packets are accumulating in buffer 758 """ 759 self._setup_link_from_cert() 760 761 config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, tx_window_size=10) 762 763 (dut_channel, cert_channel) = self._open_channel_from_cert( 764 mode=RetransmissionFlowControlMode.ERTM, 765 fcs=FcsType.NO_FCS, 766 req_config_options=config, 767 rsp_config_options=config) 768 769 dut_channel.send(b'abc') 770 assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0, payload=b'abc')) 771 cert_channel.send_i_frame(tx_seq=0, req_seq=1, payload=SAMPLE_PACKET) 772 773 dut_channel.set_traffic_paused(True) 774 775 # Allow 1 additional packet in channel queue buffer 776 buffer_size = self.dut_l2cap.get_channel_queue_buffer_size() + 1 777 778 for i in range(buffer_size): 779 cert_channel.send_i_frame(tx_seq=i + 1, req_seq=1, payload=SAMPLE_PACKET) 780 assertThat(cert_channel).emits(L2capMatchers.SFrame(s=l2cap_packets.SupervisoryFunction.RECEIVER_READY)) 781 782 cert_channel.send_i_frame(tx_seq=buffer_size + 1, req_seq=1, payload=SAMPLE_PACKET) 783 assertThat(cert_channel).emits(L2capMatchers.SFrame(s=l2cap_packets.SupervisoryFunction.RECEIVER_NOT_READY)) 784 785 @metadata(pts_test_id="L2CAP/ERM/BV-08-C", pts_test_name="Send S-Frame [RR] with Poll Bit Set") 786 def test_transmit_s_frame_rr_with_poll_bit_set(self): 787 """ 788 Verify the IUT sends an S-frame [RR] with the Poll bit set when its 789 retransmission timer expires. 790 """ 791 self._setup_link_from_cert() 792 793 config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, retransmission_time_out=1500) 794 795 (dut_channel, cert_channel) = self._open_channel_from_cert( 796 mode=RetransmissionFlowControlMode.ERTM, 797 fcs=FcsType.NO_FCS, 798 req_config_options=config, 799 rsp_config_options=config) 800 801 dut_channel.send(b'abc') 802 assertThat(cert_channel).emits(L2capMatchers.SFrame(p=l2cap_packets.Poll.POLL)) 803 804 @metadata(pts_test_id="L2CAP/ERM/BV-09-C", pts_test_name="Send S-Frame [RR] with Final Bit Set") 805 def test_transmit_s_frame_rr_with_final_bit_set(self): 806 """ 807 Verify the IUT responds with an S-frame [RR] with the Final bit set 808 after receiving an S-frame [RR] with the Poll bit set 809 """ 810 self._setup_link_from_cert() 811 812 (dut_channel, cert_channel) = self._open_channel_from_cert( 813 mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.NO_FCS) 814 815 cert_channel.send_s_frame(req_seq=0, p=Poll.POLL) 816 assertThat(cert_channel).emits(L2capMatchers.SFrame(f=Final.POLL_RESPONSE)) 817 818 @metadata(pts_test_id="L2CAP/ERM/BV-10-C", pts_test_name="Retransmit S-Frame [RR] with Final Bit Set") 819 def test_retransmit_s_frame_rr_with_poll_bit_set(self): 820 """ 821 Verify the IUT will retransmit the S-frame [RR] with the Poll bit set 822 when the Monitor Timer expires 823 """ 824 self._setup_link_from_cert() 825 826 (dut_channel, cert_channel) = self._open_channel_from_cert( 827 mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.NO_FCS) 828 dut_channel.send(b'abc') 829 830 assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0, payload=b'abc')) 831 assertThat(cert_channel).emits(L2capMatchers.SFrame(req_seq=0, p=Poll.POLL, f=Final.NOT_SET)) 832 cert_channel.send_s_frame(req_seq=1, f=Final.POLL_RESPONSE) 833 834 @metadata(pts_test_id="L2CAP/ERM/BV-11-C", pts_test_name="S-Frame Transmissions Exceed MaxTransmit") 835 def test_s_frame_transmissions_exceed_max_transmit(self): 836 """ 837 Verify the IUT will close the channel when the Monitor Timer expires. 838 """ 839 self._setup_link_from_cert() 840 841 config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, tx_window_size=1, max_transmit=1, monitor_time_out=10) 842 843 (dut_channel, cert_channel) = self._open_channel_from_cert( 844 mode=RetransmissionFlowControlMode.ERTM, req_config_options=config, rsp_config_options=config) 845 846 dut_channel.send(b'abc') 847 cert_channel.verify_disconnect_request() 848 849 @metadata(pts_test_id="L2CAP/ERM/BV-12-C", pts_test_name="I-Frame Transmissions Exceed MaxTransmit") 850 def test_i_frame_transmissions_exceed_max_transmit(self): 851 """ 852 Verify the IUT will close the channel when it receives an S-frame [RR] 853 with the final bit set that does not acknowledge the previous I-frame 854 sent by the IUT 855 """ 856 self._setup_link_from_cert() 857 858 config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, tx_window_size=1, max_transmit=1) 859 860 (dut_channel, cert_channel) = self._open_channel_from_cert( 861 mode=RetransmissionFlowControlMode.ERTM, req_config_options=config, rsp_config_options=config) 862 863 dut_channel.send(b'abc') 864 assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0), L2capMatchers.SFrame(p=Poll.POLL)).inOrder() 865 866 cert_channel.send_s_frame(req_seq=0, f=Final.POLL_RESPONSE) 867 cert_channel.verify_disconnect_request() 868 869 @metadata(pts_test_id="L2CAP/ERM/BV-13-C", pts_test_name="Respond to S-Frame [REJ]") 870 def test_respond_to_rej(self): 871 """ 872 Verify the IUT retransmits I-frames starting from the sequence number 873 specified in the S-frame [REJ] 874 """ 875 self._setup_link_from_cert() 876 877 config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, tx_window_size=2, max_transmit=2) 878 879 (dut_channel, cert_channel) = self._open_channel_from_cert( 880 mode=RetransmissionFlowControlMode.ERTM, req_config_options=config, rsp_config_options=config) 881 882 dut_channel.send(b'abc') 883 dut_channel.send(b'abc') 884 assertThat(cert_channel).emits( 885 L2capMatchers.IFrame(tx_seq=0, payload=b'abc'), L2capMatchers.IFrame(tx_seq=1, payload=b'abc')).inOrder() 886 887 cert_channel.send_s_frame(req_seq=0, s=SupervisoryFunction.REJECT) 888 889 assertThat(cert_channel).emits( 890 L2capMatchers.IFrame(tx_seq=0, payload=b'abc'), L2capMatchers.IFrame(tx_seq=1, payload=b'abc')).inOrder() 891 892 @metadata(pts_test_id="L2CAP/ERM/BV-14-C", pts_test_name="Respond to S-Frame [SREJ] POLL Bit Set") 893 def test_respond_to_srej_p_set(self): 894 """ 895 Verify the IUT responds with the correct I-frame when sent an SREJ 896 frame. Verify that the IUT processes the acknowledgment of previously 897 unacknowledged I-frames 898 """ 899 self._setup_link_from_cert() 900 901 config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, max_transmit=2, tx_window_size=3) 902 903 (dut_channel, cert_channel) = self._open_channel_from_cert( 904 mode=RetransmissionFlowControlMode.ERTM, req_config_options=config, rsp_config_options=config) 905 906 for _ in range(4): 907 dut_channel.send(b'abc') 908 assertThat(cert_channel).emits( 909 L2capMatchers.IFrame(tx_seq=0, payload=b'abc'), L2capMatchers.IFrame(tx_seq=1, payload=b'abc'), 910 L2capMatchers.IFrame(tx_seq=2, payload=b'abc')).inOrder() 911 912 cert_channel.send_s_frame(req_seq=1, p=Poll.POLL, s=SupervisoryFunction.SELECT_REJECT) 913 914 assertThat(cert_channel).emits( 915 L2capMatchers.IFrame(tx_seq=1, payload=b'abc', f=Final.POLL_RESPONSE), 916 L2capMatchers.IFrame(tx_seq=3, payload=b'abc')).inOrder() 917 918 @metadata(pts_test_id="L2CAP/ERM/BV-15-C", pts_test_name="Respond to S-Frame [SREJ] POLL Bit Clear") 919 def test_respond_to_srej_p_clear(self): 920 """ 921 Verify the IUT responds with the correct I-frame when sent an SREJ frame 922 """ 923 self._setup_link_from_cert() 924 925 config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, max_transmit=2, tx_window_size=3) 926 (dut_channel, cert_channel) = self._open_channel_from_cert( 927 mode=RetransmissionFlowControlMode.ERTM, req_config_options=config, rsp_config_options=config) 928 929 for _ in range(4): 930 dut_channel.send(b'abc') 931 assertThat(cert_channel).emits( 932 L2capMatchers.IFrame(tx_seq=0, payload=b'abc'), L2capMatchers.IFrame(tx_seq=1, payload=b'abc'), 933 L2capMatchers.IFrame(tx_seq=2, payload=b'abc')).inOrder() 934 935 cert_channel.send_s_frame(req_seq=1, s=SupervisoryFunction.SELECT_REJECT) 936 assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=1, payload=b'abc', f=Final.NOT_SET)) 937 cert_channel.send_s_frame(req_seq=3, s=SupervisoryFunction.RECEIVER_READY) 938 assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=3, payload=b'abc', f=Final.NOT_SET)) 939 940 @metadata(pts_test_id="L2CAP/ERM/BV-16-C", pts_test_name="Send S-Frame [REJ]") 941 def test_send_s_frame_rej(self): 942 """ 943 Verify the IUT can send an S-Frame [REJ] after receiving out of sequence 944 I-Frames 945 """ 946 self._setup_link_from_cert() 947 tx_window_size = 4 948 949 config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, tx_window_size=tx_window_size) 950 (dut_channel, cert_channel) = self._open_channel_from_cert( 951 mode=RetransmissionFlowControlMode.ERTM, req_config_options=config, rsp_config_options=config) 952 953 cert_channel.send_i_frame(tx_seq=0, req_seq=0, f=Final.NOT_SET, payload=SAMPLE_PACKET) 954 cert_channel.send_i_frame(tx_seq=2, req_seq=0, f=Final.NOT_SET, payload=SAMPLE_PACKET) 955 956 assertThat(cert_channel).emits( 957 L2capMatchers.SFrame(req_seq=1, f=Final.NOT_SET, s=SupervisoryFunction.REJECT, p=Poll.NOT_SET)) 958 959 for i in range(1, tx_window_size): 960 cert_channel.send_i_frame(tx_seq=i, req_seq=0, f=Final.NOT_SET, payload=SAMPLE_PACKET) 961 assertThat(cert_channel).emits( 962 L2capMatchers.SFrame(req_seq=i + 1, f=Final.NOT_SET, s=SupervisoryFunction.RECEIVER_READY)) 963 964 @metadata(pts_test_id="L2CAP/ERM/BV-18-C", pts_test_name="Receive S-Frame [RR] Final Bit = 1") 965 def test_receive_s_frame_rr_final_bit_set(self): 966 """ 967 Verify the IUT will retransmit any previously sent I-frames 968 unacknowledged by receipt of an S-Frame [RR] with the Final Bit set 969 """ 970 self._setup_link_from_cert() 971 972 config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, retransmission_time_out=1500) 973 974 (dut_channel, cert_channel) = self._open_channel_from_cert( 975 mode=RetransmissionFlowControlMode.ERTM, req_config_options=config, rsp_config_options=config) 976 977 dut_channel.send(b'abc') 978 979 assertThat(cert_channel).emits(L2capMatchers.SFrame(p=l2cap_packets.Poll.POLL)) 980 981 cert_channel.send_s_frame(req_seq=0, f=Final.POLL_RESPONSE) 982 assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0)) 983 984 @metadata(pts_test_id="L2CAP/ERM/BV-19-C", pts_test_name="Receive I-Frame Final Bit = 1") 985 def test_receive_i_frame_final_bit_set(self): 986 """ 987 Verify the IUT will retransmit any previously sent I-frames 988 unacknowledged by receipt of an I-frame with the final bit set 989 """ 990 self._setup_link_from_cert() 991 992 config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, retransmission_time_out=1500) 993 (dut_channel, cert_channel) = self._open_channel_from_cert( 994 mode=RetransmissionFlowControlMode.ERTM, req_config_options=config, rsp_config_options=config) 995 996 dut_channel.send(b'abc') 997 assertThat(cert_channel).emits(L2capMatchers.SFrame(p=Poll.POLL)) 998 999 cert_channel.send_i_frame(tx_seq=0, req_seq=0, f=Final.POLL_RESPONSE, payload=SAMPLE_PACKET) 1000 assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0)) 1001 1002 @metadata(pts_test_id="L2CAP/ERM/BV-20-C", pts_test_name="Enter Remote Busy Condition") 1003 def test_receive_rnr(self): 1004 """ 1005 Verify the IUT will not retransmit any I-frames when it receives a 1006 remote busy indication from the Lower Tester (S-frame [RNR]) 1007 """ 1008 self._setup_link_from_cert() 1009 1010 config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, retransmission_time_out=1500) 1011 (dut_channel, cert_channel) = self._open_channel_from_cert( 1012 mode=RetransmissionFlowControlMode.ERTM, req_config_options=config, rsp_config_options=config) 1013 1014 dut_channel.send(b'abc') 1015 assertThat(cert_channel).emits(L2capMatchers.SFrame(p=l2cap_packets.Poll.POLL)) 1016 1017 cert_channel.send_s_frame(req_seq=0, s=SupervisoryFunction.RECEIVER_NOT_READY, f=Final.POLL_RESPONSE) 1018 assertThat(cert_channel).emitsNone(L2capMatchers.IFrame(tx_seq=0)) 1019 1020 @metadata(pts_test_id="L2CAP/ERM/BV-22-C", pts_test_name="Exit Local Busy Condition") 1021 def test_exit_local_busy_condition(self): 1022 """ 1023 Verify the IUT sends an S-frame [RR] Poll = 1 when the local busy condition is cleared 1024 NOTE: In GD stack, we enter local busy condition if client doesn't dequeue 1025 and packets are accumulating in buffer 1026 """ 1027 self._setup_link_from_cert() 1028 1029 config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, tx_window_size=10) 1030 1031 (dut_channel, cert_channel) = self._open_channel_from_cert( 1032 mode=RetransmissionFlowControlMode.ERTM, 1033 fcs=FcsType.NO_FCS, 1034 req_config_options=config, 1035 rsp_config_options=config) 1036 1037 dut_channel.send(b'abc') 1038 assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0, payload=b'abc')) 1039 cert_channel.send_i_frame(tx_seq=0, req_seq=1, payload=SAMPLE_PACKET) 1040 1041 dut_channel.set_traffic_paused(True) 1042 1043 # Allow 1 additional packet in channel queue buffer 1044 buffer_size = self.dut_l2cap.get_channel_queue_buffer_size() + 1 1045 1046 for i in range(buffer_size): 1047 cert_channel.send_i_frame(tx_seq=i + 1, req_seq=1, payload=SAMPLE_PACKET) 1048 assertThat(cert_channel).emits(L2capMatchers.SFrame(s=l2cap_packets.SupervisoryFunction.RECEIVER_READY)) 1049 1050 cert_channel.send_i_frame(tx_seq=buffer_size + 1, req_seq=1, payload=SAMPLE_PACKET) 1051 assertThat(cert_channel).emits(L2capMatchers.SFrame(s=l2cap_packets.SupervisoryFunction.RECEIVER_NOT_READY)) 1052 1053 dut_channel.set_traffic_paused(False) 1054 assertThat(cert_channel).emits( 1055 L2capMatchers.SFrame(s=l2cap_packets.SupervisoryFunction.RECEIVER_READY, p=l2cap_packets.Poll.POLL)) 1056 cert_channel.send_s_frame(1, f=l2cap_packets.Final.POLL_RESPONSE) 1057 1058 @metadata(pts_test_id="L2CAP/ERM/BV-23-C", pts_test_name="Transmit I-Frames using SAR") 1059 def test_transmit_i_frames_using_sar(self): 1060 """ 1061 Verify the IUT can send correctly formatted sequential I-frames with 1062 valid values for the enhanced control fields (SAR, F-bit, ReqSeq, 1063 TxSeq) when performing SAR. 1064 """ 1065 self._setup_link_from_cert() 1066 1067 config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, mps=11) 1068 (dut_channel, cert_channel) = self._open_channel_from_cert( 1069 mode=RetransmissionFlowControlMode.ERTM, req_config_options=config, rsp_config_options=config) 1070 1071 dut_channel.send(b'abcabcabc') 1072 # First IFrame should contain SDU size after control field 1073 assertThat(cert_channel).emits( 1074 L2capMatchers.IFrameStart(tx_seq=0, payload=b'abc'), L2capMatchers.IFrame(tx_seq=1, payload=b'abc'), 1075 L2capMatchers.IFrame(tx_seq=2, payload=b'abc')).inOrder() 1076 1077 cert_channel.send_s_frame(req_seq=3, s=SupervisoryFunction.RECEIVER_READY) 1078 1079 dut_channel.send(b'defdefdef') 1080 # First IFrame should contain SDU size after control field 1081 assertThat(cert_channel).emits( 1082 L2capMatchers.IFrameStart(tx_seq=3, payload=b'def'), L2capMatchers.IFrame(tx_seq=4, payload=b'def'), 1083 L2capMatchers.IFrame(tx_seq=5, payload=b'def')).inOrder() 1084 1085 @metadata(pts_test_id="L2CAP/ERM/BI-01-C", pts_test_name="S-Frame [REJ] Lost or Corrupted") 1086 def test_sent_rej_lost(self): 1087 """ 1088 Verify the IUT can handle receipt of an S-=frame [RR] Poll = 1 if the 1089 S-frame [REJ] sent from the IUT is lost 1090 """ 1091 self._setup_link_from_cert() 1092 ertm_tx_window_size = 5 1093 1094 config = CertL2cap.config_option_ertm(fcs=FcsType.NO_FCS, tx_window_size=ertm_tx_window_size) 1095 (dut_channel, cert_channel) = self._open_channel_from_cert( 1096 mode=RetransmissionFlowControlMode.ERTM, req_config_options=config, rsp_config_options=config) 1097 1098 cert_channel.send_i_frame(tx_seq=0, req_seq=0, payload=SAMPLE_PACKET) 1099 assertThat(cert_channel).emits(L2capMatchers.SFrame(req_seq=1)) 1100 1101 cert_channel.send_i_frame(tx_seq=ertm_tx_window_size - 1, req_seq=0, payload=SAMPLE_PACKET) 1102 assertThat(cert_channel).emits(L2capMatchers.SFrame(s=SupervisoryFunction.REJECT)) 1103 1104 cert_channel.send_s_frame(req_seq=0, p=Poll.POLL) 1105 1106 assertThat(cert_channel).emits(L2capMatchers.SFrame(req_seq=1, f=l2cap_packets.Final.POLL_RESPONSE)) 1107 for i in range(1, ertm_tx_window_size): 1108 cert_channel.send_i_frame(tx_seq=i, req_seq=0, payload=SAMPLE_PACKET) 1109 assertThat(cert_channel).emits(L2capMatchers.SFrame(req_seq=i + 1)) 1110 1111 @metadata(pts_test_id="L2CAP/ERM/BI-03-C", pts_test_name="Handle Duplicate S-Frame [SREJ]") 1112 def test_handle_duplicate_srej(self): 1113 """ 1114 Verify the IUT will only retransmit the requested I-frame once after 1115 receiving a duplicate SREJ 1116 """ 1117 self._setup_link_from_cert() 1118 1119 (dut_channel, cert_channel) = self._open_channel_from_cert( 1120 mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.NO_FCS) 1121 1122 dut_channel.send(b'abc') 1123 dut_channel.send(b'abc') 1124 assertThat(cert_channel).emits( 1125 L2capMatchers.IFrame(tx_seq=0), L2capMatchers.IFrame(tx_seq=1), 1126 L2capMatchers.SFrame(p=Poll.POLL)).inOrder() 1127 1128 cert_channel.send_s_frame(req_seq=0, s=SupervisoryFunction.SELECT_REJECT) 1129 assertThat(cert_channel).emitsNone(timeout=timedelta(seconds=0.5)) 1130 1131 cert_channel.send_s_frame(req_seq=0, s=SupervisoryFunction.SELECT_REJECT, f=Final.POLL_RESPONSE) 1132 assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0)) 1133 1134 @metadata( 1135 pts_test_id="L2CAP/ERM/BI-04-C", 1136 pts_test_name="Handle Receipt of S-Frame [REJ] and S-Frame " 1137 "[RR, F=1] that Both Require Retransmission of the " 1138 "Same I-Frames") 1139 def test_handle_receipt_rej_and_rr_with_f_set(self): 1140 """ 1141 Verify the IUT will only retransmit the requested I-frames once after 1142 receiving an S-frame [REJ] followed by an S-frame [RR] with the Final 1143 bit set that indicates the same I-frames should be retransmitted 1144 """ 1145 self._setup_link_from_cert() 1146 1147 (dut_channel, cert_channel) = self._open_channel_from_cert( 1148 mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.NO_FCS) 1149 1150 dut_channel.send(b'abc') 1151 dut_channel.send(b'abc') 1152 assertThat(cert_channel).emits( 1153 L2capMatchers.IFrame(tx_seq=0), 1154 L2capMatchers.IFrame(tx_seq=1), 1155 L2capMatchers.SFrame(p=l2cap_packets.Poll.POLL)).inOrder() 1156 1157 cert_channel.send_s_frame(req_seq=0, s=SupervisoryFunction.REJECT) 1158 assertThat(cert_channel).emitsNone(timeout=timedelta(seconds=0.5)) 1159 1160 # Send RR with F set 1161 cert_channel.send_s_frame(req_seq=0, s=SupervisoryFunction.REJECT, f=Final.POLL_RESPONSE) 1162 assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0)) 1163 assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=1)) 1164 1165 @metadata( 1166 pts_test_id="L2CAP/ERM/BI-05-C", 1167 pts_test_name="Handle receipt of S-Frame [REJ] and I-Frame [F=1] " 1168 "that Both Require Retransmission of the Same " 1169 "I-Frames") 1170 def test_handle_rej_and_i_frame_with_f_set(self): 1171 """ 1172 Verify the IUT will only retransmit the requested I-frames once after 1173 receiving an S-frame [REJ] followed by an I-frame with the Final bit 1174 set that indicates the same I-frames should be retransmitted 1175 """ 1176 self._setup_link_from_cert() 1177 1178 (dut_channel, cert_channel) = self._open_channel_from_cert( 1179 mode=RetransmissionFlowControlMode.ERTM, fcs=FcsType.NO_FCS) 1180 1181 dut_channel.send(b'abc') 1182 dut_channel.send(b'abc') 1183 assertThat(cert_channel).emits( 1184 L2capMatchers.IFrame(tx_seq=0), 1185 L2capMatchers.IFrame(tx_seq=1), 1186 L2capMatchers.SFrame(p=l2cap_packets.Poll.POLL)).inOrder() 1187 1188 # Send SREJ with F not set 1189 cert_channel.send_s_frame(req_seq=0, s=SupervisoryFunction.SELECT_REJECT) 1190 assertThat(cert_channel).emitsNone(timeout=timedelta(seconds=0.5)) 1191 1192 cert_channel.send_i_frame(tx_seq=0, req_seq=0, f=Final.POLL_RESPONSE, payload=SAMPLE_PACKET) 1193 1194 assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=0)) 1195 assertThat(cert_channel).emits(L2capMatchers.IFrame(tx_seq=1)) 1196 1197 @metadata( 1198 pts_test_id="L2CAP/CMC/BV-01-C", pts_test_name="IUT Initiated Configuration of Enhanced " 1199 "Retransmission Mode") 1200 def test_initiated_configuration_request_ertm(self): 1201 """ 1202 Verify the IUT can send a Configuration Request command containing the 1203 F&EC option that specifies Enhanced Retransmission Mode 1204 """ 1205 self._setup_link_from_cert() 1206 1207 self._open_unconfigured_channel_from_cert(scid=0x41, psm=0x33, mode=RetransmissionFlowControlMode.ERTM) 1208 1209 assertThat(self.cert_l2cap.get_control_channel()).emits(L2capMatchers.ConfigurationRequestWithErtm()) 1210 1211 @metadata( 1212 pts_test_id="L2CAP/CMC/BV-02-C", 1213 pts_test_name="Lower Tester Initiated Configuration of Enhanced " 1214 "Retransmission Mode") 1215 def test_respond_configuration_request_ertm(self): 1216 """ 1217 Verify the IUT can accept a Configuration Request from the Lower Tester 1218 containing an F&EC option that specifies Enhanced Retransmission Mode 1219 """ 1220 self._setup_link_from_cert() 1221 1222 self._open_channel_from_dut(psm=0x33, mode=RetransmissionFlowControlMode.ERTM) 1223 1224 @metadata( 1225 pts_test_id="L2CAP/CMC/BV-12-C", 1226 pts_test_name="ERTM Not Supported by Lower Tester for Mandatory " 1227 "ERTM channel") 1228 def test_respond_not_support_ertm_when_using_mandatory_ertm(self): 1229 """ 1230 The IUT is initiating connection of an L2CAP channel that mandates use 1231 of ERTM. Verify the IUT will not attempt to configure the connection to 1232 ERTM if the Lower Tester has not indicated support for ERTM in the 1233 Information Response [Extended Features] 1234 """ 1235 self._setup_link_from_cert() 1236 self.cert_l2cap.claim_ertm_unsupported() 1237 dut_channel_future = self.dut_l2cap.connect_dynamic_channel_to_cert( 1238 psm=0x33, mode=RetransmissionFlowControlMode.ERTM) 1239 assertThat(self.cert_l2cap.get_control_channel()).emitsNone(L2capMatchers.ConnectionRequest(0x33)) 1240 1241 @metadata( 1242 pts_test_id="L2CAP/CMC/BI-01-C", 1243 pts_test_name="Failed Configuration of Enhanced Retransmission " 1244 "Mode when use of the Mode is Mandatory]") 1245 def test_config_respond_basic_mode_when_using_mandatory_ertm(self): 1246 """ 1247 When creating a connection for a PSM that mandates the use of ERTM 1248 verify the IUT can handle receipt (close the channel in accordance with 1249 the specification) of a Configure Response indicating the peer L2CAP 1250 entity doesn’t wish to use Enhanced Retransmission Mode (Configure 1251 Response Result = Reject Unacceptable Parameters) 1252 """ 1253 1254 self._setup_link_from_cert() 1255 1256 (dut_channel, cert_channel) = self._open_channel_from_cert( 1257 mode=RetransmissionFlowControlMode.ERTM, 1258 req_config_options=CertL2cap.config_option_ertm(), 1259 rsp_config_options=CertL2cap.config_option_basic_explicit()) 1260 1261 cert_channel.verify_disconnect_request() 1262 1263 @metadata( 1264 pts_test_id="L2CAP/CMC/BI-02-C", 1265 pts_test_name="Configuration Mode mismatch when use of Enhanced " 1266 "Retransmission Mode is Mandatory") 1267 def test_config_request_basic_mode_when_using_mandatory_ertm(self): 1268 """ 1269 When creating a connection for a PSM that mandates the use of ERTM, 1270 verify the IUT will close the channel if the Lower Tester attempts to 1271 configure Basic Mode. 1272 """ 1273 self._setup_link_from_cert() 1274 1275 (dut_channel, cert_channel) = self._open_unconfigured_channel_from_cert(mode=RetransmissionFlowControlMode.ERTM) 1276 cert_channel.send_configure_request(CertL2cap.config_option_basic_explicit()) 1277 cert_channel.verify_disconnect_request() 1278 1279 def test_initiate_connection_for_security(self): 1280 """ 1281 This will test the PyL2cap API for initiating a connection for security 1282 via the security api 1283 """ 1284 self.dut.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True)) 1285 self.cert.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True)) 1286 self.dut_l2cap.initiate_connection_for_security() 1287 self.cert_l2cap.accept_incoming_connection() 1288 self.dut_l2cap.verify_security_connection() 1289