Coverage for src / tracekit / analyzers / protocols / can.py: 94%
222 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""CAN 2.0A/B protocol decoder.
3This module implements a CAN (Controller Area Network) protocol decoder
4supporting both standard (11-bit ID) and extended (29-bit ID) frames.
7Example:
8 >>> from tracekit.analyzers.protocols.can import CANDecoder
9 >>> decoder = CANDecoder(bitrate=500000)
10 >>> for packet in decoder.decode(trace):
11 ... print(f"ID: {packet.annotations['arbitration_id']:03X}")
12 ... print(f"Data: {packet.data.hex()}")
14References:
15 ISO 11898-1:2015 Road vehicles - CAN - Part 1: Data link layer
16 CAN Specification Version 2.0 (Bosch, 1991)
17"""
19from __future__ import annotations
21from dataclasses import dataclass
22from enum import IntEnum
23from typing import TYPE_CHECKING
25import numpy as np
27from tracekit.analyzers.protocols.base import (
28 AnnotationLevel,
29 AsyncDecoder,
30 ChannelDef,
31 DecoderState,
32 OptionDef,
33)
34from tracekit.core.types import DigitalTrace, ProtocolPacket
36if TYPE_CHECKING:
37 from collections.abc import Iterator
39 from numpy.typing import NDArray
42class CANFrameType(IntEnum):
43 """CAN frame types."""
45 DATA = 0
46 REMOTE = 1
47 ERROR = 2
48 OVERLOAD = 3
51@dataclass
52class CANFrame:
53 """Decoded CAN frame.
55 Attributes:
56 arbitration_id: CAN ID (11-bit or 29-bit).
57 is_extended: True for 29-bit extended ID.
58 is_remote: True for remote transmission request.
59 dlc: Data length code (0-8).
60 data: Data bytes.
61 crc: Received CRC value.
62 crc_computed: Computed CRC value.
63 timestamp: Frame start time in seconds.
64 end_timestamp: Frame end time in seconds.
65 errors: List of detected errors.
66 """
68 arbitration_id: int
69 is_extended: bool
70 is_remote: bool
71 dlc: int
72 data: bytes
73 crc: int
74 crc_computed: int
75 timestamp: float
76 end_timestamp: float
77 errors: list[str]
79 @property
80 def crc_valid(self) -> bool:
81 """Check if CRC matches."""
82 return self.crc == self.crc_computed
85class CANDecoderState(DecoderState):
86 """State machine for CAN decoder."""
88 def reset(self) -> None:
89 """Reset state."""
90 self.bit_position = 0
91 self.stuff_count = 0
92 self.last_five_bits = 0
93 self.frame_bits: list[int] = []
94 self.in_frame = False
95 self.frame_start_time = 0.0
98# CAN bit timing constants
99CAN_BITRATES = {
100 10000: "10 kbps",
101 20000: "20 kbps",
102 50000: "50 kbps",
103 100000: "100 kbps",
104 125000: "125 kbps",
105 250000: "250 kbps",
106 500000: "500 kbps",
107 800000: "800 kbps",
108 1000000: "1 Mbps",
109}
111# CRC polynomial for CAN: x^15 + x^14 + x^10 + x^8 + x^7 + x^4 + x^3 + 1
112CAN_CRC_POLY = 0x4599
113CAN_CRC_INIT = 0x0000
116class CANDecoder(AsyncDecoder):
117 """CAN 2.0A/B protocol decoder.
119 Decodes CAN frames from digital signal captures, supporting:
120 - CAN 2.0A: Standard 11-bit identifiers
121 - CAN 2.0B: Extended 29-bit identifiers
122 - Bit stuffing detection and removal
123 - CRC checking
124 - Error detection
126 Attributes:
127 id: Decoder identifier.
128 name: Human-readable name.
129 channels: Required input channels.
130 options: Configurable decoder options.
132 Example:
133 >>> decoder = CANDecoder(bitrate=500000)
134 >>> frames = list(decoder.decode(trace))
135 >>> for frame in frames:
136 ... print(f"CAN ID: 0x{frame.annotations['arbitration_id']:03X}")
137 """
139 id = "can"
140 name = "CAN"
141 longname = "Controller Area Network"
142 desc = "CAN 2.0A/B bus decoder"
143 license = "MIT"
145 channels = [ # noqa: RUF012
146 ChannelDef("can", "CAN", "CAN bus signal (CAN_H - CAN_L or single-ended)"),
147 ]
149 options = [ # noqa: RUF012
150 OptionDef(
151 "bitrate",
152 "Bit Rate",
153 "CAN bit rate in bps",
154 default=500000,
155 values=list(CAN_BITRATES.keys()),
156 ),
157 OptionDef(
158 "sample_point",
159 "Sample Point",
160 "Sample point as fraction of bit time",
161 default=0.75,
162 ),
163 ]
165 def __init__( # type: ignore[no-untyped-def]
166 self,
167 bitrate: int = 500000,
168 sample_point: float = 0.75,
169 **options,
170 ) -> None:
171 """Initialize CAN decoder.
173 Args:
174 bitrate: CAN bus bit rate in bps.
175 sample_point: Sample point as fraction of bit time (0.5-0.9).
176 **options: Additional decoder options.
177 """
178 super().__init__(baudrate=bitrate, **options)
179 self._bitrate = bitrate
180 self._sample_point = sample_point
181 self._state = CANDecoderState()
183 @property
184 def bitrate(self) -> int:
185 """Get CAN bit rate."""
186 return self._bitrate
188 @bitrate.setter
189 def bitrate(self, value: int) -> None:
190 """Set CAN bit rate."""
191 self._bitrate = value
192 self._baudrate = value
194 def decode(
195 self,
196 trace: DigitalTrace,
197 **channels: NDArray[np.bool_],
198 ) -> Iterator[ProtocolPacket]:
199 """Decode CAN frames from digital trace.
201 Args:
202 trace: Digital trace containing CAN signal.
203 **channels: Additional channel data (not used for single-wire CAN).
205 Yields:
206 ProtocolPacket for each decoded CAN frame.
208 Example:
209 >>> decoder = CANDecoder(bitrate=500000)
210 >>> for packet in decoder.decode(trace):
211 ... can_id = packet.annotations['arbitration_id']
212 ... print(f"ID: 0x{can_id:03X}, Data: {packet.data.hex()}")
213 """
214 self.reset()
216 data = trace.data
217 sample_rate = trace.metadata.sample_rate
218 1.0 / sample_rate
220 # Calculate samples per bit
221 1.0 / self._bitrate
222 samples_per_bit = round(sample_rate / self._bitrate)
224 if samples_per_bit < 2:
225 self.put_annotation(
226 0,
227 trace.duration,
228 AnnotationLevel.MESSAGES,
229 "Error: Sample rate too low for CAN decoding",
230 )
231 return
233 # Sample offset within bit (where to sample)
234 sample_offset = int(samples_per_bit * self._sample_point)
236 # Find start of frames (falling edge from recessive to dominant)
237 # In CAN, recessive = 1, dominant = 0
238 frame_starts = self._find_frame_starts(data, samples_per_bit)
240 for frame_start_idx in frame_starts:
241 # Try to decode frame starting at this position
242 frame = self._decode_frame(
243 data,
244 frame_start_idx,
245 sample_rate,
246 samples_per_bit,
247 sample_offset,
248 )
250 if frame is not None:
251 # Create packet
252 packet = ProtocolPacket(
253 timestamp=frame.timestamp,
254 protocol="can",
255 data=frame.data,
256 annotations={
257 "arbitration_id": frame.arbitration_id,
258 "is_extended": frame.is_extended,
259 "is_remote": frame.is_remote,
260 "dlc": frame.dlc,
261 "crc": frame.crc,
262 "crc_valid": frame.crc_valid,
263 },
264 errors=frame.errors,
265 end_timestamp=frame.end_timestamp,
266 )
268 self._packets.append(packet)
269 yield packet
271 def _find_frame_starts(
272 self,
273 data: NDArray[np.bool_],
274 samples_per_bit: int,
275 ) -> list[int]:
276 """Find potential frame start positions.
278 CAN frames start with a Start of Frame (SOF) bit, which is a
279 dominant (0) bit following bus idle (recessive/1).
281 Args:
282 data: Digital signal data.
283 samples_per_bit: Samples per CAN bit.
285 Returns:
286 List of sample indices for potential frame starts.
287 """
288 frame_starts = []
290 # Look for falling edges (1 -> 0) after idle period
291 min_idle_bits = 3 # Minimum idle time before frame
292 min_idle_samples = min_idle_bits * samples_per_bit
294 i = min_idle_samples
295 while i < len(data) - samples_per_bit:
296 # Check if previous samples are mostly high (idle)
297 idle_region = data[max(0, i - min_idle_samples) : i]
298 if np.mean(idle_region) > 0.8: # Mostly recessive
299 # Check for falling edge (SOF)
300 if data[i - 1] and not data[i]:
301 frame_starts.append(i)
302 # Skip ahead to avoid detecting same frame
303 i += samples_per_bit * 20 # Skip at least 20 bits
304 continue
305 i += 1
307 return frame_starts
309 def _decode_frame(
310 self,
311 data: NDArray[np.bool_],
312 start_idx: int,
313 sample_rate: float,
314 samples_per_bit: int,
315 sample_offset: int,
316 ) -> CANFrame | None:
317 """Decode a single CAN frame.
319 Args:
320 data: Digital signal data.
321 start_idx: Sample index of frame start (SOF).
322 sample_rate: Sample rate in Hz.
323 samples_per_bit: Samples per CAN bit.
324 sample_offset: Offset within bit for sampling.
326 Returns:
327 Decoded CANFrame or None if decode fails.
328 """
329 sample_period = 1.0 / sample_rate
330 frame_start_time = start_idx * sample_period
332 # Extract bits with bit stuffing removal
333 bits = [] # type: ignore[var-annotated]
334 stuff_count = 0
335 consecutive_same = 0
336 last_bit = None
338 bit_idx = 0
339 max_frame_bits = 150 # Maximum bits in extended frame with stuffing
341 current_idx = start_idx
343 while len(bits) < 128 and bit_idx < max_frame_bits:
344 # Calculate sample position
345 sample_pos = current_idx + sample_offset
347 if sample_pos >= len(data):
348 break
350 # Sample the bit
351 bit = data[sample_pos]
353 # Check for bit stuffing
354 if last_bit is not None:
355 if bit == last_bit:
356 consecutive_same += 1
357 else:
358 consecutive_same = 1
360 # After 5 consecutive same bits, next bit should be opposite (stuff bit)
361 if consecutive_same == 5:
362 # Next bit should be stuff bit - skip it
363 current_idx += samples_per_bit
364 bit_idx += 1
365 stuff_count += 1
367 # Sample the stuff bit to verify
368 stuff_sample_pos = current_idx + sample_offset
369 if stuff_sample_pos < len(data):
370 stuff_bit = data[stuff_sample_pos]
371 if stuff_bit == bit:
372 # Stuff error
373 pass
374 consecutive_same = 0
375 current_idx += samples_per_bit
376 bit_idx += 1
377 continue
379 bits.append(int(bit))
380 last_bit = bit
382 current_idx += samples_per_bit
383 bit_idx += 1
385 if len(bits) < 20: # Minimum frame length
386 return None
388 # Parse frame fields
389 frame = self._parse_frame_bits(bits, frame_start_time, sample_period, current_idx)
390 return frame
392 def _parse_frame_bits(
393 self,
394 bits: list[int],
395 start_time: float,
396 sample_period: float,
397 end_idx: int,
398 ) -> CANFrame | None:
399 """Parse decoded bits into CAN frame.
401 Args:
402 bits: List of bit values (after stuff bit removal).
403 start_time: Frame start time.
404 sample_period: Sample period.
405 end_idx: End sample index.
407 Returns:
408 Parsed CANFrame or None if invalid.
409 """
410 errors = []
412 try:
413 pos = 0
415 # SOF (should be 0)
416 if pos >= len(bits): 416 ↛ 417line 416 didn't jump to line 417 because the condition on line 416 was never true
417 return None
418 sof = bits[pos]
419 pos += 1
421 if sof != 0: 421 ↛ 422line 421 didn't jump to line 422 because the condition on line 421 was never true
422 errors.append("Invalid SOF")
424 # Arbitration field
425 if pos + 11 > len(bits): 425 ↛ 426line 425 didn't jump to line 426 because the condition on line 425 was never true
426 return None
428 # First 11 bits of ID
429 arb_id = 0
430 for i in range(11):
431 arb_id = (arb_id << 1) | bits[pos + i]
432 pos += 11
434 # RTR bit (for standard) or SRR bit (for extended)
435 if pos >= len(bits): 435 ↛ 436line 435 didn't jump to line 436 because the condition on line 435 was never true
436 return None
437 rtr_or_srr = bits[pos]
438 pos += 1
440 # IDE bit
441 if pos >= len(bits): 441 ↛ 442line 441 didn't jump to line 442 because the condition on line 441 was never true
442 return None
443 ide = bits[pos]
444 pos += 1
446 is_extended = bool(ide)
447 is_remote = False
449 if is_extended:
450 # Extended frame: 18 more ID bits
451 if pos + 18 > len(bits):
452 return None
454 # ID extension (18 bits)
455 for i in range(18):
456 arb_id = (arb_id << 1) | bits[pos + i]
457 pos += 18
459 # RTR bit
460 if pos >= len(bits): 460 ↛ 461line 460 didn't jump to line 461 because the condition on line 460 was never true
461 return None
462 is_remote = bool(bits[pos])
463 pos += 1
465 # r1, r0 reserved bits
466 pos += 2
467 else:
468 # Standard frame
469 is_remote = bool(rtr_or_srr)
470 # r0 reserved bit
471 pos += 1
473 # DLC (4 bits)
474 if pos + 4 > len(bits): 474 ↛ 475line 474 didn't jump to line 475 because the condition on line 474 was never true
475 return None
477 dlc = 0
478 for i in range(4):
479 dlc = (dlc << 1) | bits[pos + i]
480 pos += 4
482 # Limit DLC to 8
483 data_len = min(dlc, 8)
485 # Data field (0-8 bytes)
486 if not is_remote:
487 if pos + data_len * 8 > len(bits):
488 return None
490 data_bytes = bytearray()
491 for byte_idx in range(data_len):
492 byte_val = 0
493 for bit_idx in range(8):
494 byte_val = (byte_val << 1) | bits[pos + byte_idx * 8 + bit_idx + bit_idx]
495 data_bytes.append(byte_val)
496 pos += 8
498 data = bytes(data_bytes)
499 else:
500 data = b""
502 # CRC field (15 bits)
503 if pos + 15 > len(bits): 503 ↛ 504line 503 didn't jump to line 504 because the condition on line 503 was never true
504 return None
506 crc_received = 0
507 for i in range(15):
508 crc_received = (crc_received << 1) | bits[pos + i]
509 pos += 15
511 # Compute CRC on frame bits before CRC field
512 # CRC covers SOF through data field
513 crc_data_end = pos - 15
514 crc_computed = self._compute_crc(bits[:crc_data_end])
516 if crc_received != crc_computed: 516 ↛ 522line 516 didn't jump to line 522 because the condition on line 516 was always true
517 errors.append(
518 f"CRC error: received 0x{crc_received:04X}, computed 0x{crc_computed:04X}"
519 )
521 # CRC delimiter (should be 1)
522 if pos < len(bits) and bits[pos] != 1:
523 errors.append("CRC delimiter error")
524 pos += 1
526 # ACK slot and delimiter
527 pos += 2
529 # EOF (7 recessive bits)
530 # We don't strictly check this
532 end_time = start_time + pos * (1.0 / self._bitrate)
534 return CANFrame(
535 arbitration_id=arb_id,
536 is_extended=is_extended,
537 is_remote=is_remote,
538 dlc=dlc,
539 data=data,
540 crc=crc_received,
541 crc_computed=crc_computed,
542 timestamp=start_time,
543 end_timestamp=end_time,
544 errors=errors,
545 )
547 except (IndexError, ValueError):
548 return None
550 def _compute_crc(self, bits: list[int]) -> int:
551 """Compute CAN CRC-15.
553 Args:
554 bits: Input bits for CRC calculation.
556 Returns:
557 15-bit CRC value.
558 """
559 crc = CAN_CRC_INIT
561 for bit in bits:
562 crc_next = (crc >> 14) & 1
563 crc = (crc << 1) & 0x7FFF
565 if bit ^ crc_next:
566 crc ^= CAN_CRC_POLY
568 return crc
571def decode_can(
572 trace: DigitalTrace,
573 *,
574 bitrate: int = 500000,
575 sample_point: float = 0.75,
576) -> list[CANFrame]:
577 """Convenience function to decode CAN frames.
579 Args:
580 trace: Digital trace containing CAN signal.
581 bitrate: CAN bit rate in bps (default 500000).
582 sample_point: Sample point as fraction of bit time.
584 Returns:
585 List of decoded CANFrame objects.
587 Example:
588 >>> frames = decode_can(trace, bitrate=500000)
589 >>> for frame in frames:
590 ... print(f"ID: 0x{frame.arbitration_id:03X}")
591 """
592 decoder = CANDecoder(bitrate=bitrate, sample_point=sample_point)
593 frames = []
595 for packet in decoder.decode(trace):
596 # Reconstruct CANFrame from packet
597 frame = CANFrame(
598 arbitration_id=packet.annotations["arbitration_id"],
599 is_extended=packet.annotations["is_extended"],
600 is_remote=packet.annotations["is_remote"],
601 dlc=packet.annotations["dlc"],
602 data=packet.data,
603 crc=packet.annotations["crc"],
604 crc_computed=packet.annotations["crc"], # Reconstruct as same
605 timestamp=packet.timestamp,
606 end_timestamp=packet.end_timestamp or packet.timestamp,
607 errors=packet.errors,
608 )
609 frames.append(frame)
611 return frames
614__all__ = [
615 "CAN_BITRATES",
616 "CANDecoder",
617 "CANFrame",
618 "CANFrameType",
619 "decode_can",
620]