1# Copyright 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5
6import common
7from autotest_lib.client.bin import utils
8from autotest_lib.client.cros.cellular.mbim_compliance import mbim_channel
9from autotest_lib.client.cros.cellular.mbim_compliance \
10        import mbim_command_message
11from autotest_lib.client.cros.cellular.mbim_compliance import mbim_errors
12from autotest_lib.client.cros.cellular.mbim_compliance \
13        import mbim_message_request
14from autotest_lib.client.cros.cellular.mbim_compliance \
15        import mbim_message_response
16from autotest_lib.client.cros.cellular.mbim_compliance \
17        import mbim_test_base
18from autotest_lib.client.cros.cellular.mbim_compliance.sequences \
19        import get_descriptors_sequence
20from autotest_lib.client.cros.cellular.mbim_compliance.sequences \
21        import mbim_open_generic_sequence
22
23
24class cellular_MbimComplianceCM05(mbim_test_base.MbimTestBase):
25    """
26    CM_05 Validation for modem's responses to two consecutive MBIM command
27    messages are correct with regards to |transaction_id|, |service_id| and
28    |cid|.
29
30    This test verifies that the function uses separate transactions to deliver
31    control message responses.
32
33    Reference:
34        [1] Universal Serial Bus Communication Class MBIM Compliance Testing: 39
35        http://www.usb.org/developers/docs/devclass_docs/MBIM-Compliance-1.0.pdf
36    """
37    version = 1
38
39    def run_internal(self):
40        """ Run CM_05 test. """
41        # Precondition
42        descriptors = get_descriptors_sequence.GetDescriptorsSequence(
43                self.device_context).run()
44        self.device_context.update_descriptor_cache(descriptors)
45        mbim_open_generic_sequence.MBIMOpenGenericSequence(
46                self.device_context).run()
47
48        device_context = self.device_context
49        descriptor_cache = device_context.descriptor_cache
50        self.channel = mbim_channel.MBIMChannel(
51                device_context._device,
52                descriptor_cache.mbim_communication_interface.bInterfaceNumber,
53                descriptor_cache.interrupt_endpoint.bEndpointAddress,
54                device_context.max_control_transfer_size)
55
56        # Step 1
57        caps_command_message = mbim_command_message.MBIMDeviceCapsQuery()
58        caps_packets = mbim_message_request.generate_request_packets(
59                caps_command_message,
60                device_context.max_control_transfer_size)
61        self.caps_transaction_id = caps_command_message.transaction_id
62        self.channel.unidirectional_transaction(*caps_packets)
63
64        # Step 2
65        services_command_message = (
66                mbim_command_message.MBIMDeviceServicesQuery())
67        services_packets = mbim_message_request.generate_request_packets(
68                services_command_message,
69                device_context.max_control_transfer_size)
70        self.services_transaction_id = services_command_message.transaction_id
71        self.channel.unidirectional_transaction(*services_packets)
72
73        # Step 3
74        utils.poll_for_condition(
75                self._get_response_packets,
76                timeout=5,
77                exception=mbim_errors.MBIMComplianceChannelError(
78                        'Failed to retrieve the response packets to specific '
79                        'control messages.'))
80        self.channel.close()
81
82        caps_response_message = self.caps_response
83        services_response_message = self.services_response
84        is_caps_message_valid = isinstance(
85                caps_response_message,
86                mbim_command_message.MBIMDeviceCapsInfo)
87        is_services_message_valid = isinstance(
88                services_response_message,
89                mbim_command_message.MBIMDeviceServicesInfo)
90        if not ((is_caps_message_valid and is_services_message_valid) and
91                (caps_response_message.transaction_id ==
92                 caps_command_message.transaction_id) and
93                (caps_response_message.device_service_id ==
94                 caps_command_message.device_service_id) and
95                caps_response_message.cid == caps_command_message.cid and
96                (services_command_message.transaction_id ==
97                 services_response_message.transaction_id) and
98                (services_command_message.device_service_id ==
99                 services_response_message.device_service_id) and
100                services_command_message.cid == services_response_message.cid):
101            mbim_errors.log_and_raise(mbim_errors.MBIMComplianceAssertionError,
102                                      'mbim1.0:8.1.2#2')
103
104
105    def _get_response_packets(self):
106        """
107        Condition method for |poll_for_condition| to check the retrieval of
108        target packets.
109
110        @returns True if both caps response packet and services response packet
111                are received, False otherwise.
112
113        """
114        packets = self.channel.get_outstanding_packets()
115        self.caps_response = None
116        self.services_response = None
117        for packet in packets:
118            message_response = mbim_message_response.parse_response_packets(
119                    packet)
120            if message_response.transaction_id == self.caps_transaction_id:
121                self.caps_response = message_response
122            elif message_response.transaction_id == self.services_transaction_id:
123                self.services_response = message_response
124            if self.caps_response and self.services_response:
125                return True
126        return False
127