Coverage for src / tracekit / analyzers / protocols / hdlc.py: 94%

131 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""HDLC protocol decoder. 

2 

3This module provides High-Level Data Link Control (HDLC) telecom protocol 

4decoding with bit stuffing, FCS validation, and field extraction. 

5 

6 

7Example: 

8 >>> from tracekit.analyzers.protocols.hdlc import HDLCDecoder 

9 >>> decoder = HDLCDecoder() 

10 >>> for packet in decoder.decode(trace): 

11 ... print(f"Address: 0x{packet.annotations['address']:02X}") 

12 

13References: 

14 ISO/IEC 13239:2002 - HDLC Frame Structure 

15""" 

16 

17from __future__ import annotations 

18 

19from typing import TYPE_CHECKING, Literal 

20 

21from tracekit.analyzers.protocols.base import ( 

22 AnnotationLevel, 

23 AsyncDecoder, 

24 ChannelDef, 

25 OptionDef, 

26) 

27from tracekit.core.types import ( 

28 DigitalTrace, 

29 ProtocolPacket, 

30 TraceMetadata, 

31 WaveformTrace, 

32) 

33 

34if TYPE_CHECKING: 

35 from collections.abc import Iterator 

36 

37 import numpy as np 

38 from numpy.typing import NDArray 

39 

40 

41class HDLCDecoder(AsyncDecoder): 

42 """HDLC protocol decoder. 

43 

44 Decodes HDLC frames with flag detection, bit unstuffing, 

45 and FCS (Frame Check Sequence) validation using CRC-16 or CRC-32. 

46 

47 Attributes: 

48 id: "hdlc" 

49 name: "HDLC" 

50 channels: [data] (required) 

51 

52 Example: 

53 >>> decoder = HDLCDecoder(baudrate=1000000, fcs="crc16") 

54 >>> for packet in decoder.decode(trace): 

55 ... print(f"Info: {packet.data.hex()}") 

56 """ 

57 

58 id = "hdlc" 

59 name = "HDLC" 

60 longname = "High-Level Data Link Control" 

61 desc = "HDLC telecom protocol decoder" 

62 

63 channels = [ # noqa: RUF012 

64 ChannelDef("data", "DATA", "HDLC data line", required=True), 

65 ] 

66 

67 optional_channels = [] # noqa: RUF012 

68 

69 options = [ # noqa: RUF012 

70 OptionDef("baudrate", "Baud rate", "Bits per second", default=1000000, values=None), 

71 OptionDef( 

72 "fcs", 

73 "FCS type", 

74 "Frame check sequence", 

75 default="crc16", 

76 values=["crc16", "crc32"], 

77 ), 

78 ] 

79 

80 annotations = [ # noqa: RUF012 

81 ("flag", "Flag sequence"), 

82 ("address", "Address field"), 

83 ("control", "Control field"), 

84 ("info", "Information field"), 

85 ("fcs", "Frame check sequence"), 

86 ("error", "Error"), 

87 ] 

88 

89 # HDLC flag pattern 

90 FLAG_PATTERN = 0b01111110 # 0x7E 

91 

92 def __init__( 

93 self, 

94 baudrate: int = 1000000, 

95 fcs: Literal["crc16", "crc32"] = "crc16", 

96 ) -> None: 

97 """Initialize HDLC decoder. 

98 

99 Args: 

100 baudrate: Baud rate in bps. 

101 fcs: FCS type ("crc16" or "crc32"). 

102 """ 

103 super().__init__(baudrate=baudrate, fcs=fcs) 

104 self._fcs = fcs 

105 self._fcs_bytes = 2 if fcs == "crc16" else 4 

106 

107 def decode( 

108 self, 

109 trace: DigitalTrace | WaveformTrace, 

110 **channels: NDArray[np.bool_], 

111 ) -> Iterator[ProtocolPacket]: 

112 """Decode HDLC frames from trace. 

113 

114 Args: 

115 trace: Input digital trace. 

116 **channels: Additional channel data. 

117 

118 Yields: 

119 Decoded HDLC frames as ProtocolPacket objects. 

120 

121 Example: 

122 >>> decoder = HDLCDecoder(baudrate=1000000) 

123 >>> for packet in decoder.decode(trace): 

124 ... print(f"Address: 0x{packet.annotations['address']:02X}") 

125 """ 

126 # Convert to digital if needed 

127 if isinstance(trace, WaveformTrace): 

128 from tracekit.analyzers.digital.extraction import to_digital 

129 

130 digital_trace = to_digital(trace, threshold="auto") 

131 else: 

132 digital_trace = trace 

133 

134 data = digital_trace.data 

135 sample_rate = digital_trace.metadata.sample_rate 

136 

137 bit_period = sample_rate / self._baudrate 

138 

139 # Extract bit stream 

140 bits = self._sample_bits(data, bit_period) 

141 

142 # Find frames (between flag sequences) 

143 frame_num = 0 

144 idx = 0 

145 

146 while idx < len(bits): 

147 # Look for opening flag 

148 flag_idx = self._find_flag(bits, idx) 

149 if flag_idx is None: 

150 break 

151 

152 # Look for closing flag 

153 next_flag_idx = self._find_flag(bits, flag_idx + 8) 

154 if next_flag_idx is None: 

155 break 

156 

157 # Extract frame bits (between flags, excluding flags) 

158 frame_bits = bits[flag_idx + 8 : next_flag_idx] 

159 

160 if len(frame_bits) < 16: # Minimum: address(8) + control(8) 

161 idx = next_flag_idx + 8 

162 continue 

163 

164 # Bit unstuffing (remove 0 after five consecutive 1s) 

165 unstuffed_bits, stuff_errors = self._unstuff_bits(frame_bits) 

166 

167 if len(unstuffed_bits) < 16 + self._fcs_bytes * 8: 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true

168 idx = next_flag_idx + 8 

169 continue 

170 

171 # Extract fields 

172 field_bytes = [] 

173 for i in range(0, len(unstuffed_bits), 8): 

174 if i + 8 <= len(unstuffed_bits): 174 ↛ 173line 174 didn't jump to line 173 because the condition on line 174 was always true

175 byte_val = self._bits_to_byte(unstuffed_bits[i : i + 8]) 

176 field_bytes.append(byte_val) 

177 

178 if len(field_bytes) < 2 + self._fcs_bytes: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true

179 idx = next_flag_idx + 8 

180 continue 

181 

182 # Split into address, control, info, and FCS 

183 address = field_bytes[0] 

184 control = field_bytes[1] 

185 info_bytes = field_bytes[2 : -self._fcs_bytes] 

186 fcs_bytes = field_bytes[-self._fcs_bytes :] 

187 

188 # Validate FCS 

189 errors = list(stuff_errors) 

190 frame_data = field_bytes[: -self._fcs_bytes] 

191 

192 if self._fcs == "crc16": 

193 computed_fcs = self._crc16_ccitt(bytes(frame_data)) 

194 received_fcs = (fcs_bytes[1] << 8) | fcs_bytes[0] 

195 else: 

196 computed_fcs = self._crc32(bytes(frame_data)) 

197 received_fcs = ( 

198 (fcs_bytes[3] << 24) | (fcs_bytes[2] << 16) | (fcs_bytes[1] << 8) | fcs_bytes[0] 

199 ) 

200 

201 if computed_fcs != received_fcs: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true

202 errors.append("FCS mismatch") 

203 

204 # Calculate timing 

205 start_time = (flag_idx * bit_period) / sample_rate 

206 end_time = ((next_flag_idx + 8) * bit_period) / sample_rate 

207 

208 # Add annotation 

209 self.put_annotation( 

210 start_time, 

211 end_time, 

212 AnnotationLevel.PACKETS, 

213 f"Addr: 0x{address:02X}, Ctrl: 0x{control:02X}", 

214 ) 

215 

216 # Create packet 

217 annotations = { 

218 "frame_num": frame_num, 

219 "address": address, 

220 "control": control, 

221 "info_length": len(info_bytes), 

222 "fcs_type": self._fcs, 

223 } 

224 

225 packet = ProtocolPacket( 

226 timestamp=start_time, 

227 protocol="hdlc", 

228 data=bytes(info_bytes), 

229 annotations=annotations, 

230 errors=errors, 

231 ) 

232 

233 yield packet 

234 

235 frame_num += 1 

236 idx = next_flag_idx + 8 

237 

238 def _sample_bits( 

239 self, 

240 data: NDArray[np.bool_], 

241 bit_period: float, 

242 ) -> list[int]: 

243 """Sample data at bit centers to extract bit stream. 

244 

245 Args: 

246 data: Digital data array. 

247 bit_period: Bit period in samples. 

248 

249 Returns: 

250 List of bit values. 

251 """ 

252 bits = [] 

253 idx = 0 

254 while idx < len(data): 

255 sample_idx = int(idx + bit_period / 2) 

256 if sample_idx < len(data): 256 ↛ 258line 256 didn't jump to line 258 because the condition on line 256 was always true

257 bits.append(1 if data[sample_idx] else 0) 

258 idx += bit_period # type: ignore[assignment] 

259 return bits 

260 

261 def _find_flag(self, bits: list[int], start_idx: int) -> int | None: 

262 """Find HDLC flag pattern (01111110). 

263 

264 Args: 

265 bits: Bit stream. 

266 start_idx: Start search index. 

267 

268 Returns: 

269 Index of flag start, or None if not found. 

270 """ 

271 for i in range(start_idx, len(bits) - 7): 

272 if ( 

273 bits[i] == 0 

274 and bits[i + 1] == 1 

275 and bits[i + 2] == 1 

276 and bits[i + 3] == 1 

277 and bits[i + 4] == 1 

278 and bits[i + 5] == 1 

279 and bits[i + 6] == 1 

280 and bits[i + 7] == 0 

281 ): 

282 return i 

283 return None 

284 

285 def _unstuff_bits(self, bits: list[int]) -> tuple[list[int], list[str]]: 

286 """Remove bit stuffing (0 after five consecutive 1s). 

287 

288 Args: 

289 bits: Stuffed bit stream. 

290 

291 Returns: 

292 (unstuffed_bits, errors) tuple. 

293 """ 

294 unstuffed = [] 

295 errors = [] # type: ignore[var-annotated] 

296 ones_count = 0 

297 

298 i = 0 

299 while i < len(bits): 

300 bit = bits[i] 

301 

302 if bit == 1: 

303 ones_count += 1 

304 unstuffed.append(bit) 

305 elif ones_count == 5: 

306 # This is a stuff bit, skip it 

307 ones_count = 0 

308 else: 

309 unstuffed.append(bit) 

310 ones_count = 0 

311 

312 i += 1 

313 

314 return unstuffed, errors 

315 

316 def _bits_to_byte(self, bits: list[int]) -> int: 

317 """Convert 8 bits to byte (LSB first). 

318 

319 Args: 

320 bits: List of 8 bits. 

321 

322 Returns: 

323 Byte value. 

324 """ 

325 value = 0 

326 for i in range(min(8, len(bits))): 

327 value |= bits[i] << i 

328 return value 

329 

330 def _crc16_ccitt(self, data: bytes) -> int: 

331 """Compute CRC-16-CCITT. 

332 

333 Args: 

334 data: Input data bytes. 

335 

336 Returns: 

337 16-bit CRC. 

338 """ 

339 crc = 0xFFFF 

340 for byte in data: 

341 crc ^= byte << 8 

342 for _ in range(8): 

343 crc = (crc << 1 ^ 4129) & 65535 if crc & 32768 else crc << 1 & 65535 

344 return crc ^ 0xFFFF 

345 

346 def _crc32(self, data: bytes) -> int: 

347 """Compute CRC-32. 

348 

349 Args: 

350 data: Input data bytes. 

351 

352 Returns: 

353 32-bit CRC. 

354 """ 

355 crc = 0xFFFFFFFF 

356 for byte in data: 

357 crc ^= byte 

358 for _ in range(8): 

359 if crc & 1: 

360 crc = (crc >> 1) ^ 0xEDB88320 

361 else: 

362 crc >>= 1 

363 return crc ^ 0xFFFFFFFF 

364 

365 

366def decode_hdlc( 

367 data: NDArray[np.bool_] | WaveformTrace | DigitalTrace, 

368 sample_rate: float = 1.0, 

369 baudrate: int = 1000000, 

370 fcs: Literal["crc16", "crc32"] = "crc16", 

371) -> list[ProtocolPacket]: 

372 """Convenience function to decode HDLC frames. 

373 

374 Args: 

375 data: HDLC data signal (digital array or trace). 

376 sample_rate: Sample rate in Hz. 

377 baudrate: Baud rate in bps. 

378 fcs: FCS type ("crc16" or "crc32"). 

379 

380 Returns: 

381 List of decoded HDLC frames. 

382 

383 Example: 

384 >>> packets = decode_hdlc(signal, sample_rate=10e6, baudrate=1000000) 

385 >>> for pkt in packets: 

386 ... print(f"Address: 0x{pkt.annotations['address']:02X}") 

387 """ 

388 decoder = HDLCDecoder(baudrate=baudrate, fcs=fcs) 

389 if isinstance(data, WaveformTrace | DigitalTrace): 

390 return list(decoder.decode(data)) 

391 else: 

392 trace = DigitalTrace( 

393 data=data, 

394 metadata=TraceMetadata(sample_rate=sample_rate), 

395 ) 

396 return list(decoder.decode(trace)) 

397 

398 

399__all__ = ["HDLCDecoder", "decode_hdlc"]