1# Copyright 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import multiprocessing
7import Queue
8import struct
9import time
10
11import common
12from autotest_lib.client.bin import utils
13from autotest_lib.client.cros.cellular.mbim_compliance \
14        import mbim_channel_endpoint
15from autotest_lib.client.cros.cellular.mbim_compliance import mbim_errors
16
17
18class MBIMChannel(object):
19    """
20    Provide synchronous access to the modem with MBIM command level interaction.
21
22    This object should simplify your interaction over the MBIM channel as
23    follows:
24    - Use |bidirectional_transaction| to send MBIM packets that are part of a
25      transaction. This function will block until the transaction completes and
26      return the MBIM packets received in response.
27    - |bidirectional_transaction| will filter out packets that do not correspond
28      to your transaction. This way, you don't have to worry about unsolicited
29      notifications and/or stale packets when interacting with the modem.
30    - All filtered out packets can be grabbed using the
31      |get_outstanding_packets| function. Use this function to receive error
32      notifications, status notifications, etc.
33    - Use |unidirectional_transaction| to send MBIM packets for which you don't
34      expect a response.
35    - Use |flush| to clean out all pipes before starting a new transaction.
36
37    Note that "MBIM packets" here really means MBIM fragments. This object does
38    not (de)fragment packets for you. Out of necessity, it does check that
39    received fragments are contiguous and in-order.
40
41    So, this object houses the minimum information necessary about the MBIM
42    fragments to provide you a comfortable synchronous packet level channel.
43
44    """
45
46    ENDPOINT_JOIN_TIMEOUT_S = 5
47    FRAGMENT_TIMEOUT_S = 3
48    # TODO(pprabhu) Consider allowing each transaction to specify its own
49    # timeout.
50    TRANSACTION_TIMEOUT_S = 5
51
52    MESSAGE_HEADER_FORMAT = '<LLL'
53    FRAGMENT_HEADER_FORMAT = '<LL'
54    MBIM_FRAGMENTED_MESSAGES = [
55            0x00000003,  # MBIM_COMMAND_MSG
56            0x80000003,  # MBIM_COMMAND_DONE
57            0x80000007]  # MBIM_INDICATE_STATUS
58
59    def __init__(self,
60                 device,
61                 interface_number,
62                 interrupt_endpoint_address,
63                 in_buffer_size,
64                 process_class=None):
65        """
66        @param device: Device handle returned by PyUSB for the modem to test.
67        @param interface_number: |bInterfaceNumber| of the MBIM interface.
68        @param interrupt_endpoint_address: |bEndpointAddress| for the usb
69                INTERRUPT IN endpoint for notifications.
70        @param in_buffer_size: The (fixed) buffer size to use for in control
71                transfers.
72        @param process_class: The class to instantiate to create a subprocess.
73                This is used by tests only, to easily mock out the process
74                ceation.
75
76        """
77        self._stop_request_event = multiprocessing.Event()
78        self._request_queue = multiprocessing.Queue()
79        self._response_queue = multiprocessing.Queue()
80        self._outstanding_packets = []
81        self._last_response = []
82        self._stashed_first_fragment = None
83        if process_class is None:
84            process_class = multiprocessing.Process
85        self._endpoint_process = process_class(
86                target=mbim_channel_endpoint.MBIMChannelEndpoint,
87                args=(device,
88                      interface_number,
89                      interrupt_endpoint_address,
90                      in_buffer_size,
91                      self._request_queue,
92                      self._response_queue,
93                      self._stop_request_event))
94        self._endpoint_process.start()
95
96
97    def __del__(self):
98        """
99        The destructor.
100
101        Note that it is not guaranteed that |__del__| is called for objects that
102        exist when the interpreter exits. It is recommended to call |close|
103        explicitly.
104
105        """
106        self.close()
107
108
109    def close(self):
110        """
111        Cleanly close the MBIMChannel.
112
113        MBIMChannel forks a subprocess to communicate with the USB device. It is
114        recommended that |close| be called explicitly.
115
116        """
117        if not self._endpoint_process:
118            return
119
120        if self._endpoint_process.is_alive():
121            self._stop_request_event.set()
122            self._endpoint_process.join(self.ENDPOINT_JOIN_TIMEOUT_S)
123            if self._endpoint_process.is_alive():
124                self._endpoint_process.terminate()
125
126        self._endpoint_process = None
127
128
129    def bidirectional_transaction(self, *args):
130        """
131        Execute a synchronous bidirectional transaction.
132
133        @param *args: Fragments of a single MBIM transaction. An MBIM
134                transaction may consist of multiple fragments - each fragment is
135                the payload for a USB control message. It should be an
136                |array.array| object.  It is your responsibility (and choice) to
137                keep the fragments in-order, and to send all the fragments.
138                For more details, see "Fragmentation of messages" in the MBIM
139                spec.
140        @returns: A list of fragments in the same order as received that
141                correspond to the given transaction. If we receive less
142                fragments than claimed, we will return what we get. If we
143                receive non-contiguous / out-of-order fragments, we'll complain.
144        @raises: MBIMComplianceChannelError if received fragments are
145                out-of-order or non-contigouos.
146
147        """
148        self._verify_endpoint_open()
149        if not args:
150            mbim_errors.log_and_raise(
151                    mbim_errors.MBIMComplianceChannelError,
152                    'No data given to |bidirectional_transaction|.')
153
154        transaction_id, _, _ = self._fragment_metadata(args[0])
155        for fragment in args:
156            self._request_queue.put_nowait(fragment)
157        return self._get_response_fragments(transaction_id)
158
159
160    def unidirectional_transaction(self, *args):
161        """
162        Execute a synchronous unidirectional transaction. No return value.
163
164        @param *args: Fragments of a single MBIM transaction. An MBIM
165                transaction may consist of multiple fragments - each fragment is
166                the payload for a USB control message. It should be an
167                |array.array| object.  It is your responsibility (and choice) to
168                keep the fragments in-order, and to send all the fragments.
169                For more details, see "Fragmentation of messages" in the MBIM
170                spec.
171
172        """
173        self._verify_endpoint_open()
174        if not args:
175            mbim_errors.log_and_raise(
176                    mbim_errors.MBIMComplianceChannelError,
177                    'No data given to |unidirectional_transaction|.')
178
179        for fragment in args:
180            self._request_queue.put_nowait(fragment)
181
182
183    def flush(self):
184        """
185        Clean out all queues.
186
187        This waits till all outgoing packets have been sent, and then waits some
188        more to give the channel time to settle down.
189
190        @raises: MBIMComplianceChannelError if things don't settle down fast
191                enough.
192        """
193        self._verify_endpoint_open()
194        num_remaining_fragments = self._request_queue.qsize()
195        try:
196            timeout = self.FRAGMENT_TIMEOUT_S * num_remaining_fragments
197            utils.poll_for_condition(lambda: self._request_queue.empty(),
198                                     timeout=timeout)
199        except utils.TimeoutError:
200            mbim_errors.log_and_raise(
201                    mbim_errors.MBIMComplianceChannelError,
202                    'Could not flush request queue.')
203
204        # Now wait for the response queue to settle down.
205        # In the worst case, each request fragment that was remaining at the
206        # time flush was called belonged to a different transaction, and each of
207        # these transactions would serially timeout in |TRANSACTION_TIMEOUT_S|.
208        # To avoid sleeping for long times, we cap this value arbitrarily to 5
209        # transactions.
210        num_remaining_transactions = min(5, num_remaining_fragments)
211        time.sleep(num_remaining_fragments * self.TRANSACTION_TIMEOUT_S)
212        extra_packets = self.get_outstanding_packets()
213        for packet in extra_packets:
214            logging.debug('flush: discarding packet: %s', packet)
215
216
217    def get_outstanding_packets(self):
218        """
219        Get all received packets that were not part of an explicit transaction.
220
221        @returns: A list of packets. Each packet is a list of fragments, so you
222        perhaps want to do something like:
223            for packet in channel.get_outstanding_packets():
224                for fragment in packet:
225                    # handle fragment.
226
227        """
228        self._verify_endpoint_open()
229        # Try to get more packets from the response queue.
230        # This can block forever if the modem keeps spewing trash at us.
231        while True:
232            packet = self._get_packet_fragments()
233            if not packet:
234                break
235            self._outstanding_packets.append(packet)
236
237        packets = self._outstanding_packets
238        self._outstanding_packets = []
239        return packets
240
241
242    def _get_response_fragments(self, transaction_id):
243        """
244        Get response for the given |transaction_id|.
245
246        @returns: A list of fragments.
247        @raises: MBIMComplianceChannelError if response is not recieved.
248
249        """
250        def _poll_response():
251            packet = self._get_packet_fragments()
252            if not packet:
253                return False
254            first_fragment = packet[0]
255            response_id, _, _ = self._fragment_metadata(first_fragment)
256            if response_id == transaction_id:
257                self._last_response = packet
258                return True
259            self._outstanding_packets.append(packet)
260            return False
261
262        try:
263            utils.poll_for_condition(
264                    _poll_response,
265                    timeout=self.TRANSACTION_TIMEOUT_S)
266        except utils.TimeoutError:
267            mbim_errors.log_and_raise(
268                    mbim_errors.MBIMComplianceChannelError,
269                    'Did not receive timely reply to transaction %d' %
270                    transaction_id)
271        return self._last_response
272
273
274    def _get_packet_fragments(self):
275        """
276        Get all fragements of the next packet from the modem.
277
278        This function is responsible for putting together fragments of one
279        packet, and checking that fragments are continguous and in-order.
280
281        """
282        fragments = []
283        if self._stashed_first_fragment is not None:
284            first_fragment = self._stashed_first_fragment
285            self._stashed_first_fragment = None
286        else:
287            try:
288                first_fragment = self._response_queue.get(
289                        True, self.FRAGMENT_TIMEOUT_S)
290            except Queue.Empty:
291                # *Don't fail* Just return nothing.
292                return fragments
293
294        transaction_id, total_fragments, current_fragment = (
295                self._fragment_metadata(first_fragment))
296        if current_fragment != 0:
297            mbim_errors.log_and_raise(
298                    mbim_errors.MBIMComplianceChannelError,
299                    'First fragment reports fragment number %d' %
300                    current_fragment)
301
302        fragments.append(first_fragment)
303
304        last_fragment = 0
305        while last_fragment < total_fragments - 1:
306            try:
307                fragment = self._response_queue.get(True,
308                                                    self.FRAGMENT_TIMEOUT_S)
309            except Queue.Empty:
310                # *Don't fail* Just return the fragments we got so far.
311                break
312
313            fragment_id, fragment_total, fragment_current = (
314                    self._fragment_metadata(fragment))
315            if fragment_id != transaction_id:
316                # *Don't fail* Treat a different transaction id as indicating
317                # that the next packet has already arrived.
318                logging.warning('Recieved only %d out of %d fragments for '
319                                'transaction %d.',
320                                last_fragment,
321                                total_fragments,
322                                transaction_id)
323                self._stashed_first_fragment = fragment
324                break
325
326            if fragment_total != total_fragments:
327                mbim_errors.log_and_raise(
328                        mbim_errors.MBIMComplianceChannelError,
329                        'Fragment number %d reports incorrect total (%d/%d)' %
330                        (last_fragment + 1, fragment_total, total_fragments))
331
332            if fragment_current != last_fragment + 1:
333                mbim_errors.log_and_raise(
334                        mbim_errors.MBIMComplianceChannelError,
335                        'Received reordered fragments. Expected %d, got %d' %
336                        (last_fragment + 1, fragment_current))
337
338            last_fragment += 1
339            fragments.append(fragment)
340
341        return fragments
342
343
344    def _fragment_metadata(self, fragment):
345        """ This function houses all the MBIM packet knowledge. """
346        # All packets have a message header.
347        if len(fragment) < struct.calcsize(self.MESSAGE_HEADER_FORMAT):
348            mbim_errors.log_and_raise(
349                    mbim_errors.MBIMComplianceChannelError,
350                    'Corrupted fragment |%s| does not have an MBIM header.' %
351                    fragment)
352
353        message_type, _, transaction_id = struct.unpack_from(
354                self.MESSAGE_HEADER_FORMAT,
355                fragment)
356
357        if message_type in self.MBIM_FRAGMENTED_MESSAGES:
358            fragment = fragment[struct.calcsize(self.MESSAGE_HEADER_FORMAT):]
359            if len(fragment) < struct.calcsize(self.FRAGMENT_HEADER_FORMAT):
360                mbim_errors.log_and_raise(
361                        mbim_errors.MBIMComplianceChannelError,
362                        'Corrupted fragment |%s| does not have a fragment '
363                        'header. ' %
364                        fragment)
365
366            total_fragments, current_fragment = struct.unpack_from(
367                    self.FRAGMENT_HEADER_FORMAT,
368                    fragment)
369        else:
370            # For other types, there is only one 'fragment'.
371            total_fragments = 1
372            current_fragment = 0
373
374        return transaction_id, total_fragments, current_fragment
375
376
377    def _verify_endpoint_open(self):
378        if not self._endpoint_process.is_alive():
379            mbim_errors.log_and_raise(
380                    mbim_errors.MBIMComplianceChannelError,
381                    'MBIMChannelEndpoint died unexpectedly. '
382                    'The actual exception can be found in log entries from the '
383                    'subprocess.')
384