1# Copyright 2020 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Decoder class for decoding bytes using HDLC protocol""" 15 16import enum 17import logging 18from typing import Iterator, Optional 19import zlib 20 21from pw_hdlc import protocol 22 23_LOG = logging.getLogger('pw_hdlc') 24 25NO_ADDRESS = -1 26_MIN_FRAME_SIZE = 6 # 1 B address + 1 B control + 4 B CRC-32 27 28 29class FrameStatus(enum.Enum): 30 """Indicates that an error occurred.""" 31 OK = 'OK' 32 FCS_MISMATCH = 'frame check sequence failure' 33 FRAMING_ERROR = 'invalid flag or escape characters' 34 BAD_ADDRESS = 'address field too long' 35 36 37class Frame: 38 """Represents an HDLC frame.""" 39 def __init__(self, 40 raw_encoded: bytes, 41 raw_decoded: bytes, 42 status: FrameStatus = FrameStatus.OK): 43 """Parses fields from an HDLC frame. 44 45 Arguments: 46 raw_encoded: The complete HDLC-encoded frame, excluding HDLC flag 47 characters. 48 raw_decoded: The complete decoded frame (address, control, 49 information, FCS). 50 status: Whether parsing the frame succeeded. 51 """ 52 self.raw_encoded = raw_encoded 53 self.raw_decoded = raw_decoded 54 self.status = status 55 56 self.address: int = NO_ADDRESS 57 self.control: bytes = b'' 58 self.data: bytes = b'' 59 60 if status == FrameStatus.OK: 61 address, address_length = protocol.decode_address(raw_decoded) 62 if address_length == 0: 63 self.status = FrameStatus.BAD_ADDRESS 64 return 65 66 self.address = address 67 self.control = raw_decoded[address_length:address_length + 1] 68 self.data = raw_decoded[address_length + 1:-4] 69 70 def ok(self) -> bool: 71 """True if this represents a valid frame. 72 73 If false, then parsing failed. The status is set to indicate what type 74 of error occurred, and the data field contains all bytes parsed from the 75 frame (including bytes parsed as address or control bytes). 76 """ 77 return self.status is FrameStatus.OK 78 79 def __repr__(self) -> str: 80 if self.ok(): 81 body = (f'address={self.address}, control={self.control!r}, ' 82 f'data={self.data!r}') 83 else: 84 body = str(self.status) 85 86 return f'{type(self).__name__}({body})' 87 88 89class _State(enum.Enum): 90 INTERFRAME = 0 91 FRAME = 1 92 FRAME_ESCAPE = 2 93 94 95def _check_frame(frame_data: bytes) -> FrameStatus: 96 if len(frame_data) < _MIN_FRAME_SIZE: 97 return FrameStatus.FRAMING_ERROR 98 99 frame_crc = int.from_bytes(frame_data[-4:], 'little') 100 if zlib.crc32(frame_data[:-4]) != frame_crc: 101 return FrameStatus.FCS_MISMATCH 102 103 return FrameStatus.OK 104 105 106class FrameDecoder: 107 """Decodes one or more HDLC frames from a stream of data.""" 108 def __init__(self): 109 self._decoded_data = bytearray() 110 self._raw_data = bytearray() 111 self._state = _State.INTERFRAME 112 113 def process(self, data: bytes) -> Iterator[Frame]: 114 """Decodes and yields HDLC frames, including corrupt frames. 115 116 The ok() method on Frame indicates whether it is valid or represents a 117 frame parsing error. 118 119 Yields: 120 Frames, which may be valid (frame.ok()) or corrupt (!frame.ok()) 121 """ 122 for byte in data: 123 frame = self._process_byte(byte) 124 if frame: 125 yield frame 126 127 def process_valid_frames(self, data: bytes) -> Iterator[Frame]: 128 """Decodes and yields valid HDLC frames, logging any errors.""" 129 for frame in self.process(data): 130 if frame.ok(): 131 yield frame 132 else: 133 _LOG.warning('Failed to decode frame: %s; discarded %d bytes', 134 frame.status.value, len(frame.raw_encoded)) 135 _LOG.debug('Discarded data: %s', frame.raw_encoded) 136 137 def _finish_frame(self, status: FrameStatus) -> Frame: 138 frame = Frame(bytes(self._raw_data), bytes(self._decoded_data), status) 139 self._raw_data.clear() 140 self._decoded_data.clear() 141 return frame 142 143 def _process_byte(self, byte: int) -> Optional[Frame]: 144 frame: Optional[Frame] = None 145 146 # Record every byte except the flag character. 147 if byte != protocol.FLAG: 148 self._raw_data.append(byte) 149 150 if self._state is _State.INTERFRAME: 151 if byte == protocol.FLAG: 152 if self._raw_data: 153 frame = self._finish_frame(FrameStatus.FRAMING_ERROR) 154 155 self._state = _State.FRAME 156 elif self._state is _State.FRAME: 157 if byte == protocol.FLAG: 158 if self._raw_data: 159 frame = self._finish_frame(_check_frame( 160 self._decoded_data)) 161 162 self._state = _State.FRAME 163 elif byte == protocol.ESCAPE: 164 self._state = _State.FRAME_ESCAPE 165 else: 166 self._decoded_data.append(byte) 167 elif self._state is _State.FRAME_ESCAPE: 168 if byte == protocol.FLAG: 169 frame = self._finish_frame(FrameStatus.FRAMING_ERROR) 170 self._state = _State.FRAME 171 elif byte in protocol.VALID_ESCAPED_BYTES: 172 self._state = _State.FRAME 173 self._decoded_data.append(protocol.escape(byte)) 174 else: 175 self._state = _State.INTERFRAME 176 else: 177 raise AssertionError(f'Invalid decoder state: {self._state}') 178 179 return frame 180