Coverage for src / tracekit / analyzers / protocols / lin.py: 98%
143 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""LIN protocol decoder.
3This module provides Local Interconnect Network (LIN) automotive protocol
4decoding for LIN 1.x and 2.x frames.
7Example:
8 >>> from tracekit.analyzers.protocols.lin import LINDecoder
9 >>> decoder = LINDecoder(baudrate=19200)
10 >>> for packet in decoder.decode(trace):
11 ... print(f"ID: 0x{packet.annotations['frame_id']:02X}")
13References:
14 LIN Specification 1.3, 2.0, 2.1, 2.2A
15"""
17from __future__ import annotations
19from enum import Enum
20from typing import TYPE_CHECKING, Literal
22from tracekit.analyzers.protocols.base import (
23 AnnotationLevel,
24 AsyncDecoder,
25 ChannelDef,
26 OptionDef,
27)
28from tracekit.core.types import DigitalTrace, ProtocolPacket, WaveformTrace
30if TYPE_CHECKING:
31 from collections.abc import Iterator
33 import numpy as np
34 from numpy.typing import NDArray
37class LINVersion(Enum):
38 """LIN protocol version."""
40 LIN_1X = "1.x"
41 LIN_2X = "2.x"
44class LINDecoder(AsyncDecoder):
45 """LIN protocol decoder.
47 Decodes LIN bus frames with sync field validation, identifier extraction,
48 and checksum validation for both LIN 1.x (classic) and 2.x (enhanced).
50 Attributes:
51 id: "lin"
52 name: "LIN"
53 channels: [bus] (required)
55 Example:
56 >>> decoder = LINDecoder(baudrate=19200, version="2.x")
57 >>> for packet in decoder.decode(trace):
58 ... print(f"ID: 0x{packet.annotations['frame_id']:02X}")
59 """
61 id = "lin"
62 name = "LIN"
63 longname = "Local Interconnect Network"
64 desc = "LIN automotive bus protocol decoder"
66 channels = [ # noqa: RUF012
67 ChannelDef("bus", "BUS", "LIN bus signal", required=True),
68 ]
70 optional_channels = [] # noqa: RUF012
72 options = [ # noqa: RUF012
73 OptionDef(
74 "baudrate",
75 "Baud rate",
76 "Bits per second",
77 default=19200,
78 values=[9600, 19200, 20000],
79 ),
80 OptionDef(
81 "version",
82 "LIN version",
83 "Protocol version",
84 default="2.x",
85 values=["1.x", "2.x"],
86 ),
87 ]
89 annotations = [ # noqa: RUF012
90 ("sync", "Sync field"),
91 ("pid", "Protected identifier"),
92 ("data", "Data bytes"),
93 ("checksum", "Checksum"),
94 ("error", "Error"),
95 ]
97 def __init__(
98 self,
99 baudrate: int = 19200,
100 version: Literal["1.x", "2.x"] = "2.x",
101 ) -> None:
102 """Initialize LIN decoder.
104 Args:
105 baudrate: Baud rate in bps (9600, 19200, 20000).
106 version: LIN version ("1.x" or "2.x").
107 """
108 super().__init__(baudrate=baudrate, version=version)
109 self._version = LINVersion.LIN_1X if version == "1.x" else LINVersion.LIN_2X
111 def decode(
112 self,
113 trace: DigitalTrace | WaveformTrace,
114 **channels: NDArray[np.bool_],
115 ) -> Iterator[ProtocolPacket]:
116 """Decode LIN frames from trace.
118 Args:
119 trace: Input digital trace.
120 **channels: Additional channel data.
122 Yields:
123 Decoded LIN frames as ProtocolPacket objects.
125 Example:
126 >>> decoder = LINDecoder(baudrate=19200)
127 >>> for packet in decoder.decode(trace):
128 ... print(f"Data: {packet.data.hex()}")
129 """
130 # Convert to digital if needed
131 if isinstance(trace, WaveformTrace):
132 from tracekit.analyzers.digital.extraction import to_digital
134 digital_trace = to_digital(trace, threshold="auto")
135 else:
136 digital_trace = trace
138 data = digital_trace.data
139 sample_rate = digital_trace.metadata.sample_rate
141 bit_period = sample_rate / self._baudrate
142 half_bit = bit_period / 2
144 idx = 0
145 frame_num = 0
147 while idx < len(data):
148 # Look for break field (dominant for at least 13 bit times)
149 break_start = self._find_break_field(data, idx, bit_period)
150 if break_start is None:
151 break
153 # After break field, find the end of the dominant period (break)
154 # and skip the delimiter (recessive) to reach the sync byte
155 sync_start_idx = break_start
156 while sync_start_idx < len(data) and not data[sync_start_idx]:
157 sync_start_idx += 1
159 # Skip delimiter (recessive period) to reach sync byte start bit
160 # The delimiter is at least 1 bit time recessive
161 while sync_start_idx < len(data) and data[sync_start_idx]:
162 sync_start_idx += 1
164 if sync_start_idx >= len(data): 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true
165 break
167 # Sync field is 0x55 (01010101)
168 sync_byte, sync_errors = self._decode_byte(data, sync_start_idx, bit_period, half_bit)
170 if sync_byte != 0x55:
171 sync_errors.append(f"Invalid sync field: 0x{sync_byte:02X} (expected 0x55)")
173 # Protected identifier (PID)
174 pid_start_idx = int(sync_start_idx + 10 * bit_period)
175 if pid_start_idx >= len(data): 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true
176 break
178 pid_byte, pid_errors = self._decode_byte(data, pid_start_idx, bit_period, half_bit)
180 # Extract ID and parity
181 frame_id = pid_byte & 0x3F
182 parity = (pid_byte >> 6) & 0x03
184 # Validate parity
185 expected_parity = self._compute_parity(frame_id)
186 if parity != expected_parity:
187 pid_errors.append(f"Parity error: {parity} (expected {expected_parity})")
189 # Data length depends on frame ID
190 data_length = self._get_data_length(frame_id)
192 # Decode data bytes
193 data_bytes = []
194 data_errors = []
195 data_start_idx = int(pid_start_idx + 10 * bit_period)
197 for i in range(data_length):
198 byte_start_idx = int(data_start_idx + i * 10 * bit_period)
199 if byte_start_idx >= len(data):
200 break
202 byte_val, byte_errors = self._decode_byte(
203 data, byte_start_idx, bit_period, half_bit
204 )
205 data_bytes.append(byte_val)
206 data_errors.extend(byte_errors)
208 # Decode checksum
209 checksum_start_idx = int(data_start_idx + data_length * 10 * bit_period)
210 if checksum_start_idx < len(data):
211 checksum_byte, checksum_errors = self._decode_byte(
212 data, checksum_start_idx, bit_period, half_bit
213 )
215 # Validate checksum
216 expected_checksum = self._compute_checksum(frame_id, data_bytes)
217 if checksum_byte != expected_checksum:
218 checksum_errors.append(
219 f"Checksum error: 0x{checksum_byte:02X} (expected 0x{expected_checksum:02X})"
220 )
221 else:
222 checksum_byte = 0
223 checksum_errors = ["Missing checksum"]
225 # Calculate timestamps
226 start_time = break_start / sample_rate
227 end_time = (checksum_start_idx + 10 * bit_period) / sample_rate
229 # Collect all errors
230 errors = sync_errors + pid_errors + data_errors + checksum_errors
232 # Add annotations
233 self.put_annotation(
234 start_time,
235 end_time,
236 AnnotationLevel.PACKETS,
237 f"ID: 0x{frame_id:02X}",
238 data=bytes(data_bytes),
239 )
241 # Create packet
242 annotations = {
243 "frame_num": frame_num,
244 "frame_id": frame_id,
245 "pid": pid_byte,
246 "data_length": data_length,
247 "checksum": checksum_byte,
248 "version": self._version.value,
249 }
251 packet = ProtocolPacket(
252 timestamp=start_time,
253 protocol="lin",
254 data=bytes(data_bytes),
255 annotations=annotations,
256 errors=errors,
257 )
259 yield packet
261 frame_num += 1
262 idx = int(checksum_start_idx + 10 * bit_period)
264 def _find_break_field(
265 self,
266 data: NDArray[np.bool_],
267 start_idx: int,
268 bit_period: float,
269 ) -> int | None:
270 """Find LIN break field (dominant for >= 13 bits).
272 Args:
273 data: Digital data array.
274 start_idx: Index to start searching.
275 bit_period: Bit period in samples.
277 Returns:
278 Index of break field start, or None if not found.
279 """
280 # Use a slightly smaller threshold to account for rounding
281 # LIN spec requires >= 13 bit times, use 12.5 to be tolerant
282 min_break_samples = int(12.5 * bit_period)
284 idx = start_idx
285 while idx < len(data) - min_break_samples:
286 # Look for recessive-to-dominant transition
287 if idx > 0 and data[idx - 1] and not data[idx]:
288 # Check if dominant for at least 12.5 bit periods
289 dominant_length = 0
290 check_idx = idx
291 while check_idx < len(data) and not data[check_idx]:
292 dominant_length += 1
293 check_idx += 1
295 if dominant_length >= min_break_samples:
296 return idx
298 idx += 1
300 return None
302 def _decode_byte(
303 self,
304 data: NDArray[np.bool_],
305 start_idx: int,
306 bit_period: float,
307 half_bit: float,
308 ) -> tuple[int, list[str]]:
309 """Decode UART-style byte (1 start, 8 data, 1 stop).
311 Args:
312 data: Digital data array.
313 start_idx: Start index (at start bit).
314 bit_period: Bit period in samples.
315 half_bit: Half bit period in samples.
317 Returns:
318 (byte_value, errors) tuple.
319 """
320 errors = []
322 # Sample at center of each bit
323 sample_points = []
324 for bit_num in range(10): # Start + 8 data + stop
325 sample_idx = int(start_idx + half_bit + bit_num * bit_period)
326 if sample_idx < len(data):
327 sample_points.append(sample_idx)
329 if len(sample_points) < 10:
330 return 0, ["Incomplete byte"]
332 # Verify start bit (should be 0)
333 if data[sample_points[0]]:
334 errors.append("Invalid start bit")
336 # Extract data bits (LSB first)
337 byte_val = 0
338 for i in range(8):
339 bit = 1 if data[sample_points[1 + i]] else 0
340 byte_val |= bit << i
342 # Verify stop bit (should be 1)
343 if not data[sample_points[9]]:
344 errors.append("Framing error")
346 return byte_val, errors
348 def _compute_parity(self, frame_id: int) -> int:
349 """Compute LIN 2.x protected identifier parity.
351 Args:
352 frame_id: 6-bit frame identifier.
354 Returns:
355 2-bit parity value.
356 """
357 # Extract ID bits
358 id0 = (frame_id >> 0) & 1
359 id1 = (frame_id >> 1) & 1
360 id2 = (frame_id >> 2) & 1
361 id3 = (frame_id >> 3) & 1
362 id4 = (frame_id >> 4) & 1
363 id5 = (frame_id >> 5) & 1
365 # P0 = ID0 ^ ID1 ^ ID2 ^ ID4
366 p0 = id0 ^ id1 ^ id2 ^ id4
368 # P1 = !(ID1 ^ ID3 ^ ID4 ^ ID5)
369 p1 = (id1 ^ id3 ^ id4 ^ id5) ^ 1
371 return (p1 << 1) | p0
373 def _get_data_length(self, frame_id: int) -> int:
374 """Get data length for frame ID.
376 Args:
377 frame_id: Frame identifier.
379 Returns:
380 Data length in bytes (0-8).
381 """
382 # Standard frame IDs have predefined lengths
383 # For simplicity, assume 8 bytes (can be configured per application)
384 return 8
386 def _compute_checksum(self, frame_id: int, data_bytes: list[int]) -> int:
387 """Compute LIN checksum.
389 Args:
390 frame_id: Frame identifier.
391 data_bytes: Data bytes.
393 Returns:
394 Checksum byte.
395 """
396 if self._version == LINVersion.LIN_1X:
397 # Classic checksum: sum of data bytes
398 checksum = sum(data_bytes)
399 else:
400 # Enhanced checksum: sum of PID + data bytes
401 pid = frame_id | (self._compute_parity(frame_id) << 6)
402 checksum = pid + sum(data_bytes)
404 # Handle carries
405 while checksum > 0xFF:
406 checksum = (checksum & 0xFF) + (checksum >> 8)
408 # Invert
409 return (~checksum) & 0xFF
412def decode_lin(
413 data: NDArray[np.bool_] | WaveformTrace | DigitalTrace,
414 sample_rate: float = 1.0,
415 baudrate: int = 19200,
416 version: Literal["1.x", "2.x"] = "2.x",
417) -> list[ProtocolPacket]:
418 """Convenience function to decode LIN frames.
420 Args:
421 data: LIN bus signal (digital array or trace).
422 sample_rate: Sample rate in Hz.
423 baudrate: Baud rate (9600, 19200, 20000).
424 version: LIN version ("1.x" or "2.x").
426 Returns:
427 List of decoded LIN frames.
429 Example:
430 >>> packets = decode_lin(signal, sample_rate=1e6, baudrate=19200)
431 >>> for pkt in packets:
432 ... print(f"ID: 0x{pkt.annotations['frame_id']:02X}")
433 """
434 decoder = LINDecoder(baudrate=baudrate, version=version)
435 if isinstance(data, WaveformTrace | DigitalTrace):
436 return list(decoder.decode(data))
437 else:
438 from tracekit.core.types import TraceMetadata
440 metadata = TraceMetadata(sample_rate=sample_rate)
441 trace = DigitalTrace(data=data, metadata=metadata)
442 return list(decoder.decode(trace))
445__all__ = ["LINDecoder", "LINVersion", "decode_lin"]