Coverage for src / tracekit / analyzers / protocols / hdlc.py: 94%
131 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""HDLC protocol decoder.
3This module provides High-Level Data Link Control (HDLC) telecom protocol
4decoding with bit stuffing, FCS validation, and field extraction.
7Example:
8 >>> from tracekit.analyzers.protocols.hdlc import HDLCDecoder
9 >>> decoder = HDLCDecoder()
10 >>> for packet in decoder.decode(trace):
11 ... print(f"Address: 0x{packet.annotations['address']:02X}")
13References:
14 ISO/IEC 13239:2002 - HDLC Frame Structure
15"""
17from __future__ import annotations
19from typing import TYPE_CHECKING, Literal
21from tracekit.analyzers.protocols.base import (
22 AnnotationLevel,
23 AsyncDecoder,
24 ChannelDef,
25 OptionDef,
26)
27from tracekit.core.types import (
28 DigitalTrace,
29 ProtocolPacket,
30 TraceMetadata,
31 WaveformTrace,
32)
34if TYPE_CHECKING:
35 from collections.abc import Iterator
37 import numpy as np
38 from numpy.typing import NDArray
41class HDLCDecoder(AsyncDecoder):
42 """HDLC protocol decoder.
44 Decodes HDLC frames with flag detection, bit unstuffing,
45 and FCS (Frame Check Sequence) validation using CRC-16 or CRC-32.
47 Attributes:
48 id: "hdlc"
49 name: "HDLC"
50 channels: [data] (required)
52 Example:
53 >>> decoder = HDLCDecoder(baudrate=1000000, fcs="crc16")
54 >>> for packet in decoder.decode(trace):
55 ... print(f"Info: {packet.data.hex()}")
56 """
58 id = "hdlc"
59 name = "HDLC"
60 longname = "High-Level Data Link Control"
61 desc = "HDLC telecom protocol decoder"
63 channels = [ # noqa: RUF012
64 ChannelDef("data", "DATA", "HDLC data line", required=True),
65 ]
67 optional_channels = [] # noqa: RUF012
69 options = [ # noqa: RUF012
70 OptionDef("baudrate", "Baud rate", "Bits per second", default=1000000, values=None),
71 OptionDef(
72 "fcs",
73 "FCS type",
74 "Frame check sequence",
75 default="crc16",
76 values=["crc16", "crc32"],
77 ),
78 ]
80 annotations = [ # noqa: RUF012
81 ("flag", "Flag sequence"),
82 ("address", "Address field"),
83 ("control", "Control field"),
84 ("info", "Information field"),
85 ("fcs", "Frame check sequence"),
86 ("error", "Error"),
87 ]
89 # HDLC flag pattern
90 FLAG_PATTERN = 0b01111110 # 0x7E
92 def __init__(
93 self,
94 baudrate: int = 1000000,
95 fcs: Literal["crc16", "crc32"] = "crc16",
96 ) -> None:
97 """Initialize HDLC decoder.
99 Args:
100 baudrate: Baud rate in bps.
101 fcs: FCS type ("crc16" or "crc32").
102 """
103 super().__init__(baudrate=baudrate, fcs=fcs)
104 self._fcs = fcs
105 self._fcs_bytes = 2 if fcs == "crc16" else 4
107 def decode(
108 self,
109 trace: DigitalTrace | WaveformTrace,
110 **channels: NDArray[np.bool_],
111 ) -> Iterator[ProtocolPacket]:
112 """Decode HDLC frames from trace.
114 Args:
115 trace: Input digital trace.
116 **channels: Additional channel data.
118 Yields:
119 Decoded HDLC frames as ProtocolPacket objects.
121 Example:
122 >>> decoder = HDLCDecoder(baudrate=1000000)
123 >>> for packet in decoder.decode(trace):
124 ... print(f"Address: 0x{packet.annotations['address']:02X}")
125 """
126 # Convert to digital if needed
127 if isinstance(trace, WaveformTrace):
128 from tracekit.analyzers.digital.extraction import to_digital
130 digital_trace = to_digital(trace, threshold="auto")
131 else:
132 digital_trace = trace
134 data = digital_trace.data
135 sample_rate = digital_trace.metadata.sample_rate
137 bit_period = sample_rate / self._baudrate
139 # Extract bit stream
140 bits = self._sample_bits(data, bit_period)
142 # Find frames (between flag sequences)
143 frame_num = 0
144 idx = 0
146 while idx < len(bits):
147 # Look for opening flag
148 flag_idx = self._find_flag(bits, idx)
149 if flag_idx is None:
150 break
152 # Look for closing flag
153 next_flag_idx = self._find_flag(bits, flag_idx + 8)
154 if next_flag_idx is None:
155 break
157 # Extract frame bits (between flags, excluding flags)
158 frame_bits = bits[flag_idx + 8 : next_flag_idx]
160 if len(frame_bits) < 16: # Minimum: address(8) + control(8)
161 idx = next_flag_idx + 8
162 continue
164 # Bit unstuffing (remove 0 after five consecutive 1s)
165 unstuffed_bits, stuff_errors = self._unstuff_bits(frame_bits)
167 if len(unstuffed_bits) < 16 + self._fcs_bytes * 8: 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true
168 idx = next_flag_idx + 8
169 continue
171 # Extract fields
172 field_bytes = []
173 for i in range(0, len(unstuffed_bits), 8):
174 if i + 8 <= len(unstuffed_bits): 174 ↛ 173line 174 didn't jump to line 173 because the condition on line 174 was always true
175 byte_val = self._bits_to_byte(unstuffed_bits[i : i + 8])
176 field_bytes.append(byte_val)
178 if len(field_bytes) < 2 + self._fcs_bytes: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true
179 idx = next_flag_idx + 8
180 continue
182 # Split into address, control, info, and FCS
183 address = field_bytes[0]
184 control = field_bytes[1]
185 info_bytes = field_bytes[2 : -self._fcs_bytes]
186 fcs_bytes = field_bytes[-self._fcs_bytes :]
188 # Validate FCS
189 errors = list(stuff_errors)
190 frame_data = field_bytes[: -self._fcs_bytes]
192 if self._fcs == "crc16":
193 computed_fcs = self._crc16_ccitt(bytes(frame_data))
194 received_fcs = (fcs_bytes[1] << 8) | fcs_bytes[0]
195 else:
196 computed_fcs = self._crc32(bytes(frame_data))
197 received_fcs = (
198 (fcs_bytes[3] << 24) | (fcs_bytes[2] << 16) | (fcs_bytes[1] << 8) | fcs_bytes[0]
199 )
201 if computed_fcs != received_fcs: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true
202 errors.append("FCS mismatch")
204 # Calculate timing
205 start_time = (flag_idx * bit_period) / sample_rate
206 end_time = ((next_flag_idx + 8) * bit_period) / sample_rate
208 # Add annotation
209 self.put_annotation(
210 start_time,
211 end_time,
212 AnnotationLevel.PACKETS,
213 f"Addr: 0x{address:02X}, Ctrl: 0x{control:02X}",
214 )
216 # Create packet
217 annotations = {
218 "frame_num": frame_num,
219 "address": address,
220 "control": control,
221 "info_length": len(info_bytes),
222 "fcs_type": self._fcs,
223 }
225 packet = ProtocolPacket(
226 timestamp=start_time,
227 protocol="hdlc",
228 data=bytes(info_bytes),
229 annotations=annotations,
230 errors=errors,
231 )
233 yield packet
235 frame_num += 1
236 idx = next_flag_idx + 8
238 def _sample_bits(
239 self,
240 data: NDArray[np.bool_],
241 bit_period: float,
242 ) -> list[int]:
243 """Sample data at bit centers to extract bit stream.
245 Args:
246 data: Digital data array.
247 bit_period: Bit period in samples.
249 Returns:
250 List of bit values.
251 """
252 bits = []
253 idx = 0
254 while idx < len(data):
255 sample_idx = int(idx + bit_period / 2)
256 if sample_idx < len(data): 256 ↛ 258line 256 didn't jump to line 258 because the condition on line 256 was always true
257 bits.append(1 if data[sample_idx] else 0)
258 idx += bit_period # type: ignore[assignment]
259 return bits
261 def _find_flag(self, bits: list[int], start_idx: int) -> int | None:
262 """Find HDLC flag pattern (01111110).
264 Args:
265 bits: Bit stream.
266 start_idx: Start search index.
268 Returns:
269 Index of flag start, or None if not found.
270 """
271 for i in range(start_idx, len(bits) - 7):
272 if (
273 bits[i] == 0
274 and bits[i + 1] == 1
275 and bits[i + 2] == 1
276 and bits[i + 3] == 1
277 and bits[i + 4] == 1
278 and bits[i + 5] == 1
279 and bits[i + 6] == 1
280 and bits[i + 7] == 0
281 ):
282 return i
283 return None
285 def _unstuff_bits(self, bits: list[int]) -> tuple[list[int], list[str]]:
286 """Remove bit stuffing (0 after five consecutive 1s).
288 Args:
289 bits: Stuffed bit stream.
291 Returns:
292 (unstuffed_bits, errors) tuple.
293 """
294 unstuffed = []
295 errors = [] # type: ignore[var-annotated]
296 ones_count = 0
298 i = 0
299 while i < len(bits):
300 bit = bits[i]
302 if bit == 1:
303 ones_count += 1
304 unstuffed.append(bit)
305 elif ones_count == 5:
306 # This is a stuff bit, skip it
307 ones_count = 0
308 else:
309 unstuffed.append(bit)
310 ones_count = 0
312 i += 1
314 return unstuffed, errors
316 def _bits_to_byte(self, bits: list[int]) -> int:
317 """Convert 8 bits to byte (LSB first).
319 Args:
320 bits: List of 8 bits.
322 Returns:
323 Byte value.
324 """
325 value = 0
326 for i in range(min(8, len(bits))):
327 value |= bits[i] << i
328 return value
330 def _crc16_ccitt(self, data: bytes) -> int:
331 """Compute CRC-16-CCITT.
333 Args:
334 data: Input data bytes.
336 Returns:
337 16-bit CRC.
338 """
339 crc = 0xFFFF
340 for byte in data:
341 crc ^= byte << 8
342 for _ in range(8):
343 crc = (crc << 1 ^ 4129) & 65535 if crc & 32768 else crc << 1 & 65535
344 return crc ^ 0xFFFF
346 def _crc32(self, data: bytes) -> int:
347 """Compute CRC-32.
349 Args:
350 data: Input data bytes.
352 Returns:
353 32-bit CRC.
354 """
355 crc = 0xFFFFFFFF
356 for byte in data:
357 crc ^= byte
358 for _ in range(8):
359 if crc & 1:
360 crc = (crc >> 1) ^ 0xEDB88320
361 else:
362 crc >>= 1
363 return crc ^ 0xFFFFFFFF
366def decode_hdlc(
367 data: NDArray[np.bool_] | WaveformTrace | DigitalTrace,
368 sample_rate: float = 1.0,
369 baudrate: int = 1000000,
370 fcs: Literal["crc16", "crc32"] = "crc16",
371) -> list[ProtocolPacket]:
372 """Convenience function to decode HDLC frames.
374 Args:
375 data: HDLC data signal (digital array or trace).
376 sample_rate: Sample rate in Hz.
377 baudrate: Baud rate in bps.
378 fcs: FCS type ("crc16" or "crc32").
380 Returns:
381 List of decoded HDLC frames.
383 Example:
384 >>> packets = decode_hdlc(signal, sample_rate=10e6, baudrate=1000000)
385 >>> for pkt in packets:
386 ... print(f"Address: 0x{pkt.annotations['address']:02X}")
387 """
388 decoder = HDLCDecoder(baudrate=baudrate, fcs=fcs)
389 if isinstance(data, WaveformTrace | DigitalTrace):
390 return list(decoder.decode(data))
391 else:
392 trace = DigitalTrace(
393 data=data,
394 metadata=TraceMetadata(sample_rate=sample_rate),
395 )
396 return list(decoder.decode(trace))
399__all__ = ["HDLCDecoder", "decode_hdlc"]