1# Copyright 2020 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Decoder class for decoding bytes using HDLC protocol"""
15
16import enum
17import logging
18from typing import Iterator, Optional
19import zlib
20
21from pw_hdlc import protocol
22
23_LOG = logging.getLogger('pw_hdlc')
24
25NO_ADDRESS = -1
26_MIN_FRAME_SIZE = 6  # 1 B address + 1 B control + 4 B CRC-32
27
28
29class FrameStatus(enum.Enum):
30    """Indicates that an error occurred."""
31    OK = 'OK'
32    FCS_MISMATCH = 'frame check sequence failure'
33    FRAMING_ERROR = 'invalid flag or escape characters'
34    BAD_ADDRESS = 'address field too long'
35
36
37class Frame:
38    """Represents an HDLC frame."""
39    def __init__(self,
40                 raw_encoded: bytes,
41                 raw_decoded: bytes,
42                 status: FrameStatus = FrameStatus.OK):
43        """Parses fields from an HDLC frame.
44
45        Arguments:
46            raw_encoded: The complete HDLC-encoded frame, excluding HDLC flag
47                characters.
48            raw_decoded: The complete decoded frame (address, control,
49                information, FCS).
50            status: Whether parsing the frame succeeded.
51        """
52        self.raw_encoded = raw_encoded
53        self.raw_decoded = raw_decoded
54        self.status = status
55
56        self.address: int = NO_ADDRESS
57        self.control: bytes = b''
58        self.data: bytes = b''
59
60        if status == FrameStatus.OK:
61            address, address_length = protocol.decode_address(raw_decoded)
62            if address_length == 0:
63                self.status = FrameStatus.BAD_ADDRESS
64                return
65
66            self.address = address
67            self.control = raw_decoded[address_length:address_length + 1]
68            self.data = raw_decoded[address_length + 1:-4]
69
70    def ok(self) -> bool:
71        """True if this represents a valid frame.
72
73        If false, then parsing failed. The status is set to indicate what type
74        of error occurred, and the data field contains all bytes parsed from the
75        frame (including bytes parsed as address or control bytes).
76        """
77        return self.status is FrameStatus.OK
78
79    def __repr__(self) -> str:
80        if self.ok():
81            body = (f'address={self.address}, control={self.control!r}, '
82                    f'data={self.data!r}')
83        else:
84            body = str(self.status)
85
86        return f'{type(self).__name__}({body})'
87
88
89class _State(enum.Enum):
90    INTERFRAME = 0
91    FRAME = 1
92    FRAME_ESCAPE = 2
93
94
95def _check_frame(frame_data: bytes) -> FrameStatus:
96    if len(frame_data) < _MIN_FRAME_SIZE:
97        return FrameStatus.FRAMING_ERROR
98
99    frame_crc = int.from_bytes(frame_data[-4:], 'little')
100    if zlib.crc32(frame_data[:-4]) != frame_crc:
101        return FrameStatus.FCS_MISMATCH
102
103    return FrameStatus.OK
104
105
106class FrameDecoder:
107    """Decodes one or more HDLC frames from a stream of data."""
108    def __init__(self):
109        self._decoded_data = bytearray()
110        self._raw_data = bytearray()
111        self._state = _State.INTERFRAME
112
113    def process(self, data: bytes) -> Iterator[Frame]:
114        """Decodes and yields HDLC frames, including corrupt frames.
115
116        The ok() method on Frame indicates whether it is valid or represents a
117        frame parsing error.
118
119        Yields:
120          Frames, which may be valid (frame.ok()) or corrupt (!frame.ok())
121        """
122        for byte in data:
123            frame = self._process_byte(byte)
124            if frame:
125                yield frame
126
127    def process_valid_frames(self, data: bytes) -> Iterator[Frame]:
128        """Decodes and yields valid HDLC frames, logging any errors."""
129        for frame in self.process(data):
130            if frame.ok():
131                yield frame
132            else:
133                _LOG.warning('Failed to decode frame: %s; discarded %d bytes',
134                             frame.status.value, len(frame.raw_encoded))
135                _LOG.debug('Discarded data: %s', frame.raw_encoded)
136
137    def _finish_frame(self, status: FrameStatus) -> Frame:
138        frame = Frame(bytes(self._raw_data), bytes(self._decoded_data), status)
139        self._raw_data.clear()
140        self._decoded_data.clear()
141        return frame
142
143    def _process_byte(self, byte: int) -> Optional[Frame]:
144        frame: Optional[Frame] = None
145
146        # Record every byte except the flag character.
147        if byte != protocol.FLAG:
148            self._raw_data.append(byte)
149
150        if self._state is _State.INTERFRAME:
151            if byte == protocol.FLAG:
152                if self._raw_data:
153                    frame = self._finish_frame(FrameStatus.FRAMING_ERROR)
154
155                self._state = _State.FRAME
156        elif self._state is _State.FRAME:
157            if byte == protocol.FLAG:
158                if self._raw_data:
159                    frame = self._finish_frame(_check_frame(
160                        self._decoded_data))
161
162                self._state = _State.FRAME
163            elif byte == protocol.ESCAPE:
164                self._state = _State.FRAME_ESCAPE
165            else:
166                self._decoded_data.append(byte)
167        elif self._state is _State.FRAME_ESCAPE:
168            if byte == protocol.FLAG:
169                frame = self._finish_frame(FrameStatus.FRAMING_ERROR)
170                self._state = _State.FRAME
171            elif byte in protocol.VALID_ESCAPED_BYTES:
172                self._state = _State.FRAME
173                self._decoded_data.append(protocol.escape(byte))
174            else:
175                self._state = _State.INTERFRAME
176        else:
177            raise AssertionError(f'Invalid decoder state: {self._state}')
178
179        return frame
180