Coverage for src / tracekit / analyzers / protocols / lin.py: 98%

143 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""LIN protocol decoder. 

2 

3This module provides Local Interconnect Network (LIN) automotive protocol 

4decoding for LIN 1.x and 2.x frames. 

5 

6 

7Example: 

8 >>> from tracekit.analyzers.protocols.lin import LINDecoder 

9 >>> decoder = LINDecoder(baudrate=19200) 

10 >>> for packet in decoder.decode(trace): 

11 ... print(f"ID: 0x{packet.annotations['frame_id']:02X}") 

12 

13References: 

14 LIN Specification 1.3, 2.0, 2.1, 2.2A 

15""" 

16 

17from __future__ import annotations 

18 

19from enum import Enum 

20from typing import TYPE_CHECKING, Literal 

21 

22from tracekit.analyzers.protocols.base import ( 

23 AnnotationLevel, 

24 AsyncDecoder, 

25 ChannelDef, 

26 OptionDef, 

27) 

28from tracekit.core.types import DigitalTrace, ProtocolPacket, WaveformTrace 

29 

30if TYPE_CHECKING: 

31 from collections.abc import Iterator 

32 

33 import numpy as np 

34 from numpy.typing import NDArray 

35 

36 

37class LINVersion(Enum): 

38 """LIN protocol version.""" 

39 

40 LIN_1X = "1.x" 

41 LIN_2X = "2.x" 

42 

43 

44class LINDecoder(AsyncDecoder): 

45 """LIN protocol decoder. 

46 

47 Decodes LIN bus frames with sync field validation, identifier extraction, 

48 and checksum validation for both LIN 1.x (classic) and 2.x (enhanced). 

49 

50 Attributes: 

51 id: "lin" 

52 name: "LIN" 

53 channels: [bus] (required) 

54 

55 Example: 

56 >>> decoder = LINDecoder(baudrate=19200, version="2.x") 

57 >>> for packet in decoder.decode(trace): 

58 ... print(f"ID: 0x{packet.annotations['frame_id']:02X}") 

59 """ 

60 

61 id = "lin" 

62 name = "LIN" 

63 longname = "Local Interconnect Network" 

64 desc = "LIN automotive bus protocol decoder" 

65 

66 channels = [ # noqa: RUF012 

67 ChannelDef("bus", "BUS", "LIN bus signal", required=True), 

68 ] 

69 

70 optional_channels = [] # noqa: RUF012 

71 

72 options = [ # noqa: RUF012 

73 OptionDef( 

74 "baudrate", 

75 "Baud rate", 

76 "Bits per second", 

77 default=19200, 

78 values=[9600, 19200, 20000], 

79 ), 

80 OptionDef( 

81 "version", 

82 "LIN version", 

83 "Protocol version", 

84 default="2.x", 

85 values=["1.x", "2.x"], 

86 ), 

87 ] 

88 

89 annotations = [ # noqa: RUF012 

90 ("sync", "Sync field"), 

91 ("pid", "Protected identifier"), 

92 ("data", "Data bytes"), 

93 ("checksum", "Checksum"), 

94 ("error", "Error"), 

95 ] 

96 

97 def __init__( 

98 self, 

99 baudrate: int = 19200, 

100 version: Literal["1.x", "2.x"] = "2.x", 

101 ) -> None: 

102 """Initialize LIN decoder. 

103 

104 Args: 

105 baudrate: Baud rate in bps (9600, 19200, 20000). 

106 version: LIN version ("1.x" or "2.x"). 

107 """ 

108 super().__init__(baudrate=baudrate, version=version) 

109 self._version = LINVersion.LIN_1X if version == "1.x" else LINVersion.LIN_2X 

110 

111 def decode( 

112 self, 

113 trace: DigitalTrace | WaveformTrace, 

114 **channels: NDArray[np.bool_], 

115 ) -> Iterator[ProtocolPacket]: 

116 """Decode LIN frames from trace. 

117 

118 Args: 

119 trace: Input digital trace. 

120 **channels: Additional channel data. 

121 

122 Yields: 

123 Decoded LIN frames as ProtocolPacket objects. 

124 

125 Example: 

126 >>> decoder = LINDecoder(baudrate=19200) 

127 >>> for packet in decoder.decode(trace): 

128 ... print(f"Data: {packet.data.hex()}") 

129 """ 

130 # Convert to digital if needed 

131 if isinstance(trace, WaveformTrace): 

132 from tracekit.analyzers.digital.extraction import to_digital 

133 

134 digital_trace = to_digital(trace, threshold="auto") 

135 else: 

136 digital_trace = trace 

137 

138 data = digital_trace.data 

139 sample_rate = digital_trace.metadata.sample_rate 

140 

141 bit_period = sample_rate / self._baudrate 

142 half_bit = bit_period / 2 

143 

144 idx = 0 

145 frame_num = 0 

146 

147 while idx < len(data): 

148 # Look for break field (dominant for at least 13 bit times) 

149 break_start = self._find_break_field(data, idx, bit_period) 

150 if break_start is None: 

151 break 

152 

153 # After break field, find the end of the dominant period (break) 

154 # and skip the delimiter (recessive) to reach the sync byte 

155 sync_start_idx = break_start 

156 while sync_start_idx < len(data) and not data[sync_start_idx]: 

157 sync_start_idx += 1 

158 

159 # Skip delimiter (recessive period) to reach sync byte start bit 

160 # The delimiter is at least 1 bit time recessive 

161 while sync_start_idx < len(data) and data[sync_start_idx]: 

162 sync_start_idx += 1 

163 

164 if sync_start_idx >= len(data): 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true

165 break 

166 

167 # Sync field is 0x55 (01010101) 

168 sync_byte, sync_errors = self._decode_byte(data, sync_start_idx, bit_period, half_bit) 

169 

170 if sync_byte != 0x55: 

171 sync_errors.append(f"Invalid sync field: 0x{sync_byte:02X} (expected 0x55)") 

172 

173 # Protected identifier (PID) 

174 pid_start_idx = int(sync_start_idx + 10 * bit_period) 

175 if pid_start_idx >= len(data): 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true

176 break 

177 

178 pid_byte, pid_errors = self._decode_byte(data, pid_start_idx, bit_period, half_bit) 

179 

180 # Extract ID and parity 

181 frame_id = pid_byte & 0x3F 

182 parity = (pid_byte >> 6) & 0x03 

183 

184 # Validate parity 

185 expected_parity = self._compute_parity(frame_id) 

186 if parity != expected_parity: 

187 pid_errors.append(f"Parity error: {parity} (expected {expected_parity})") 

188 

189 # Data length depends on frame ID 

190 data_length = self._get_data_length(frame_id) 

191 

192 # Decode data bytes 

193 data_bytes = [] 

194 data_errors = [] 

195 data_start_idx = int(pid_start_idx + 10 * bit_period) 

196 

197 for i in range(data_length): 

198 byte_start_idx = int(data_start_idx + i * 10 * bit_period) 

199 if byte_start_idx >= len(data): 

200 break 

201 

202 byte_val, byte_errors = self._decode_byte( 

203 data, byte_start_idx, bit_period, half_bit 

204 ) 

205 data_bytes.append(byte_val) 

206 data_errors.extend(byte_errors) 

207 

208 # Decode checksum 

209 checksum_start_idx = int(data_start_idx + data_length * 10 * bit_period) 

210 if checksum_start_idx < len(data): 

211 checksum_byte, checksum_errors = self._decode_byte( 

212 data, checksum_start_idx, bit_period, half_bit 

213 ) 

214 

215 # Validate checksum 

216 expected_checksum = self._compute_checksum(frame_id, data_bytes) 

217 if checksum_byte != expected_checksum: 

218 checksum_errors.append( 

219 f"Checksum error: 0x{checksum_byte:02X} (expected 0x{expected_checksum:02X})" 

220 ) 

221 else: 

222 checksum_byte = 0 

223 checksum_errors = ["Missing checksum"] 

224 

225 # Calculate timestamps 

226 start_time = break_start / sample_rate 

227 end_time = (checksum_start_idx + 10 * bit_period) / sample_rate 

228 

229 # Collect all errors 

230 errors = sync_errors + pid_errors + data_errors + checksum_errors 

231 

232 # Add annotations 

233 self.put_annotation( 

234 start_time, 

235 end_time, 

236 AnnotationLevel.PACKETS, 

237 f"ID: 0x{frame_id:02X}", 

238 data=bytes(data_bytes), 

239 ) 

240 

241 # Create packet 

242 annotations = { 

243 "frame_num": frame_num, 

244 "frame_id": frame_id, 

245 "pid": pid_byte, 

246 "data_length": data_length, 

247 "checksum": checksum_byte, 

248 "version": self._version.value, 

249 } 

250 

251 packet = ProtocolPacket( 

252 timestamp=start_time, 

253 protocol="lin", 

254 data=bytes(data_bytes), 

255 annotations=annotations, 

256 errors=errors, 

257 ) 

258 

259 yield packet 

260 

261 frame_num += 1 

262 idx = int(checksum_start_idx + 10 * bit_period) 

263 

264 def _find_break_field( 

265 self, 

266 data: NDArray[np.bool_], 

267 start_idx: int, 

268 bit_period: float, 

269 ) -> int | None: 

270 """Find LIN break field (dominant for >= 13 bits). 

271 

272 Args: 

273 data: Digital data array. 

274 start_idx: Index to start searching. 

275 bit_period: Bit period in samples. 

276 

277 Returns: 

278 Index of break field start, or None if not found. 

279 """ 

280 # Use a slightly smaller threshold to account for rounding 

281 # LIN spec requires >= 13 bit times, use 12.5 to be tolerant 

282 min_break_samples = int(12.5 * bit_period) 

283 

284 idx = start_idx 

285 while idx < len(data) - min_break_samples: 

286 # Look for recessive-to-dominant transition 

287 if idx > 0 and data[idx - 1] and not data[idx]: 

288 # Check if dominant for at least 12.5 bit periods 

289 dominant_length = 0 

290 check_idx = idx 

291 while check_idx < len(data) and not data[check_idx]: 

292 dominant_length += 1 

293 check_idx += 1 

294 

295 if dominant_length >= min_break_samples: 

296 return idx 

297 

298 idx += 1 

299 

300 return None 

301 

302 def _decode_byte( 

303 self, 

304 data: NDArray[np.bool_], 

305 start_idx: int, 

306 bit_period: float, 

307 half_bit: float, 

308 ) -> tuple[int, list[str]]: 

309 """Decode UART-style byte (1 start, 8 data, 1 stop). 

310 

311 Args: 

312 data: Digital data array. 

313 start_idx: Start index (at start bit). 

314 bit_period: Bit period in samples. 

315 half_bit: Half bit period in samples. 

316 

317 Returns: 

318 (byte_value, errors) tuple. 

319 """ 

320 errors = [] 

321 

322 # Sample at center of each bit 

323 sample_points = [] 

324 for bit_num in range(10): # Start + 8 data + stop 

325 sample_idx = int(start_idx + half_bit + bit_num * bit_period) 

326 if sample_idx < len(data): 

327 sample_points.append(sample_idx) 

328 

329 if len(sample_points) < 10: 

330 return 0, ["Incomplete byte"] 

331 

332 # Verify start bit (should be 0) 

333 if data[sample_points[0]]: 

334 errors.append("Invalid start bit") 

335 

336 # Extract data bits (LSB first) 

337 byte_val = 0 

338 for i in range(8): 

339 bit = 1 if data[sample_points[1 + i]] else 0 

340 byte_val |= bit << i 

341 

342 # Verify stop bit (should be 1) 

343 if not data[sample_points[9]]: 

344 errors.append("Framing error") 

345 

346 return byte_val, errors 

347 

348 def _compute_parity(self, frame_id: int) -> int: 

349 """Compute LIN 2.x protected identifier parity. 

350 

351 Args: 

352 frame_id: 6-bit frame identifier. 

353 

354 Returns: 

355 2-bit parity value. 

356 """ 

357 # Extract ID bits 

358 id0 = (frame_id >> 0) & 1 

359 id1 = (frame_id >> 1) & 1 

360 id2 = (frame_id >> 2) & 1 

361 id3 = (frame_id >> 3) & 1 

362 id4 = (frame_id >> 4) & 1 

363 id5 = (frame_id >> 5) & 1 

364 

365 # P0 = ID0 ^ ID1 ^ ID2 ^ ID4 

366 p0 = id0 ^ id1 ^ id2 ^ id4 

367 

368 # P1 = !(ID1 ^ ID3 ^ ID4 ^ ID5) 

369 p1 = (id1 ^ id3 ^ id4 ^ id5) ^ 1 

370 

371 return (p1 << 1) | p0 

372 

373 def _get_data_length(self, frame_id: int) -> int: 

374 """Get data length for frame ID. 

375 

376 Args: 

377 frame_id: Frame identifier. 

378 

379 Returns: 

380 Data length in bytes (0-8). 

381 """ 

382 # Standard frame IDs have predefined lengths 

383 # For simplicity, assume 8 bytes (can be configured per application) 

384 return 8 

385 

386 def _compute_checksum(self, frame_id: int, data_bytes: list[int]) -> int: 

387 """Compute LIN checksum. 

388 

389 Args: 

390 frame_id: Frame identifier. 

391 data_bytes: Data bytes. 

392 

393 Returns: 

394 Checksum byte. 

395 """ 

396 if self._version == LINVersion.LIN_1X: 

397 # Classic checksum: sum of data bytes 

398 checksum = sum(data_bytes) 

399 else: 

400 # Enhanced checksum: sum of PID + data bytes 

401 pid = frame_id | (self._compute_parity(frame_id) << 6) 

402 checksum = pid + sum(data_bytes) 

403 

404 # Handle carries 

405 while checksum > 0xFF: 

406 checksum = (checksum & 0xFF) + (checksum >> 8) 

407 

408 # Invert 

409 return (~checksum) & 0xFF 

410 

411 

412def decode_lin( 

413 data: NDArray[np.bool_] | WaveformTrace | DigitalTrace, 

414 sample_rate: float = 1.0, 

415 baudrate: int = 19200, 

416 version: Literal["1.x", "2.x"] = "2.x", 

417) -> list[ProtocolPacket]: 

418 """Convenience function to decode LIN frames. 

419 

420 Args: 

421 data: LIN bus signal (digital array or trace). 

422 sample_rate: Sample rate in Hz. 

423 baudrate: Baud rate (9600, 19200, 20000). 

424 version: LIN version ("1.x" or "2.x"). 

425 

426 Returns: 

427 List of decoded LIN frames. 

428 

429 Example: 

430 >>> packets = decode_lin(signal, sample_rate=1e6, baudrate=19200) 

431 >>> for pkt in packets: 

432 ... print(f"ID: 0x{pkt.annotations['frame_id']:02X}") 

433 """ 

434 decoder = LINDecoder(baudrate=baudrate, version=version) 

435 if isinstance(data, WaveformTrace | DigitalTrace): 

436 return list(decoder.decode(data)) 

437 else: 

438 from tracekit.core.types import TraceMetadata 

439 

440 metadata = TraceMetadata(sample_rate=sample_rate) 

441 trace = DigitalTrace(data=data, metadata=metadata) 

442 return list(decoder.decode(trace)) 

443 

444 

445__all__ = ["LINDecoder", "LINVersion", "decode_lin"]