Coverage for src / tracekit / analyzers / protocols / can_fd.py: 100%
130 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""CAN-FD protocol decoder.
3This module implements CAN with Flexible Data-rate (CAN-FD) decoder
4supporting variable data rate and extended payloads up to 64 bytes.
7Example:
8 >>> from tracekit.analyzers.protocols.can_fd import CANFDDecoder
9 >>> decoder = CANFDDecoder(nominal_bitrate=500000, data_bitrate=2000000)
10 >>> for packet in decoder.decode(trace):
11 ... print(f"ID: 0x{packet.annotations['arbitration_id']:03X}")
13References:
14 ISO 11898-1:2015 CAN-FD Specification
15 Bosch CAN-FD Specification v1.0
16"""
18from __future__ import annotations
20from dataclasses import dataclass
21from enum import IntEnum
22from typing import TYPE_CHECKING
24from tracekit.analyzers.protocols.base import (
25 AnnotationLevel,
26 AsyncDecoder,
27 ChannelDef,
28 OptionDef,
29)
30from tracekit.core.types import (
31 DigitalTrace,
32 ProtocolPacket,
33 TraceMetadata,
34 WaveformTrace,
35)
37if TYPE_CHECKING:
38 from collections.abc import Iterator
40 import numpy as np
41 from numpy.typing import NDArray
44class CANFDFrameType(IntEnum):
45 """CAN-FD frame types."""
47 DATA = 0
48 REMOTE = 1
51@dataclass
52class CANFDFrame:
53 """Decoded CAN-FD frame.
55 Attributes:
56 arbitration_id: CAN ID (11-bit or 29-bit).
57 is_extended: True for 29-bit extended ID.
58 is_fd: True for CAN-FD frame.
59 brs: Bit Rate Switch flag.
60 esi: Error State Indicator.
61 dlc: Data length code (0-15).
62 data: Data bytes (0-64).
63 crc: Received CRC value.
64 timestamp: Frame start time in seconds.
65 errors: List of detected errors.
66 """
68 arbitration_id: int
69 is_extended: bool
70 is_fd: bool
71 brs: bool
72 esi: bool
73 dlc: int
74 data: bytes
75 crc: int
76 timestamp: float
77 errors: list[str]
80# CAN-FD DLC to data length mapping
81CANFD_DLC_TO_LENGTH = {
82 0: 0,
83 1: 1,
84 2: 2,
85 3: 3,
86 4: 4,
87 5: 5,
88 6: 6,
89 7: 7,
90 8: 8,
91 9: 12,
92 10: 16,
93 11: 20,
94 12: 24,
95 13: 32,
96 14: 48,
97 15: 64,
98}
101class CANFDDecoder(AsyncDecoder):
102 """CAN-FD protocol decoder.
104 Decodes CAN-FD frames with dual bit rate support, extended payloads,
105 and CRC-17/CRC-21 validation.
107 Attributes:
108 id: "can_fd"
109 name: "CAN-FD"
110 channels: [can_h, can_l] (optional differential) or [can] (single-ended)
112 Example:
113 >>> decoder = CANFDDecoder(nominal_bitrate=500000, data_bitrate=2000000)
114 >>> for packet in decoder.decode(trace):
115 ... print(f"Data ({len(packet.data)} bytes): {packet.data.hex()}")
116 """
118 id = "can_fd"
119 name = "CAN-FD"
120 longname = "CAN with Flexible Data-rate"
121 desc = "CAN-FD protocol decoder"
123 channels = [ # noqa: RUF012
124 ChannelDef("can", "CAN", "CAN bus signal", required=True),
125 ]
127 optional_channels = [ # noqa: RUF012
128 ChannelDef("can_h", "CAN_H", "CAN High differential signal", required=False),
129 ChannelDef("can_l", "CAN_L", "CAN Low differential signal", required=False),
130 ]
132 options = [ # noqa: RUF012
133 OptionDef(
134 "nominal_bitrate",
135 "Nominal bitrate",
136 "Arbitration phase bitrate",
137 default=500000,
138 values=None,
139 ),
140 OptionDef(
141 "data_bitrate",
142 "Data bitrate",
143 "Data phase bitrate",
144 default=2000000,
145 values=None,
146 ),
147 ]
149 annotations = [ # noqa: RUF012
150 ("sof", "Start of Frame"),
151 ("arbitration", "Arbitration field"),
152 ("control", "Control field"),
153 ("data", "Data field"),
154 ("crc", "CRC field"),
155 ("ack", "Acknowledge"),
156 ("eof", "End of Frame"),
157 ("error", "Error"),
158 ]
160 def __init__(
161 self,
162 nominal_bitrate: int = 500000,
163 data_bitrate: int = 2000000,
164 ) -> None:
165 """Initialize CAN-FD decoder.
167 Args:
168 nominal_bitrate: Nominal bitrate for arbitration phase (bps).
169 data_bitrate: Data phase bitrate for BRS frames (bps).
170 """
171 super().__init__(
172 baudrate=nominal_bitrate,
173 nominal_bitrate=nominal_bitrate,
174 data_bitrate=data_bitrate,
175 )
176 self._nominal_bitrate = nominal_bitrate
177 self._data_bitrate = data_bitrate
179 def decode(
180 self,
181 trace: DigitalTrace | WaveformTrace,
182 **channels: NDArray[np.bool_],
183 ) -> Iterator[ProtocolPacket]:
184 """Decode CAN-FD frames from trace.
186 Args:
187 trace: Input digital trace.
188 **channels: Additional channel data.
190 Yields:
191 Decoded CAN-FD frames as ProtocolPacket objects.
193 Example:
194 >>> decoder = CANFDDecoder(nominal_bitrate=500000)
195 >>> for packet in decoder.decode(trace):
196 ... print(f"ID: 0x{packet.annotations['arbitration_id']:X}")
197 """
198 # Convert to digital if needed
199 if isinstance(trace, WaveformTrace):
200 from tracekit.analyzers.digital.extraction import to_digital
202 digital_trace = to_digital(trace, threshold="auto")
203 else:
204 digital_trace = trace
206 data = digital_trace.data
207 sample_rate = digital_trace.metadata.sample_rate
209 nominal_bit_period = sample_rate / self._nominal_bitrate
210 data_bit_period = sample_rate / self._data_bitrate
212 frame_num = 0
213 idx = 0
215 while idx < len(data):
216 # Look for SOF (dominant bit during idle)
217 sof_idx = self._find_sof(data, idx)
218 if sof_idx is None:
219 break
221 # Decode frame starting from SOF
222 frame, end_idx = self._decode_frame(
223 data, sof_idx, sample_rate, nominal_bit_period, data_bit_period
224 )
226 if frame is not None:
227 # Calculate timing
228 start_time = sof_idx / sample_rate
230 # Add annotation
231 self.put_annotation(
232 start_time,
233 frame.timestamp + 0.001, # Approximate end
234 AnnotationLevel.PACKETS,
235 f"ID: 0x{frame.arbitration_id:X}, {len(frame.data)} bytes",
236 )
238 # Create packet
239 annotations = {
240 "frame_num": frame_num,
241 "arbitration_id": frame.arbitration_id,
242 "is_extended": frame.is_extended,
243 "is_fd": frame.is_fd,
244 "brs": frame.brs,
245 "esi": frame.esi,
246 "dlc": frame.dlc,
247 "data_length": len(frame.data),
248 "crc": frame.crc,
249 }
251 packet = ProtocolPacket(
252 timestamp=start_time,
253 protocol="can_fd",
254 data=frame.data,
255 annotations=annotations,
256 errors=frame.errors,
257 )
259 yield packet
260 frame_num += 1
262 idx = end_idx if end_idx > idx else idx + int(nominal_bit_period)
264 def _find_sof(self, data: NDArray[np.bool_], start_idx: int) -> int | None:
265 """Find Start of Frame (dominant bit during recessive idle).
267 Args:
268 data: Digital data array.
269 start_idx: Start search index.
271 Returns:
272 Index of SOF, or None if not found.
273 """
274 # Look for recessive-to-dominant transition (1 to 0)
275 idx = start_idx
276 while idx < len(data) - 1:
277 if data[idx] and not data[idx + 1]:
278 return idx + 1
279 idx += 1
280 return None
282 def _decode_frame(
283 self,
284 data: NDArray[np.bool_],
285 sof_idx: int,
286 sample_rate: float,
287 nominal_bit_period: float,
288 data_bit_period: float,
289 ) -> tuple[CANFDFrame | None, int]:
290 """Decode CAN-FD frame starting from SOF.
292 Args:
293 data: Digital data array.
294 sof_idx: SOF index.
295 sample_rate: Sample rate in Hz.
296 nominal_bit_period: Nominal bit period in samples.
297 data_bit_period: Data bit period in samples.
299 Returns:
300 (frame, end_index) tuple.
301 """
302 errors = [] # type: ignore[var-annotated]
303 bit_idx = sof_idx
304 current_bit_period = nominal_bit_period
306 # Sample bits (simplified - ignores bit stuffing for brevity)
307 def sample_bits(count: int) -> list[int]:
308 nonlocal bit_idx
309 bits = []
310 for _ in range(count):
311 sample_idx = int(bit_idx + current_bit_period / 2)
312 if sample_idx < len(data):
313 bits.append(0 if data[sample_idx] else 1) # Dominant=1, Recessive=0
314 bit_idx += current_bit_period # type: ignore[assignment]
315 else:
316 return bits
317 return bits
319 # Arbitration field (11 bits for standard, 29 for extended)
320 arb_bits = sample_bits(11)
321 if len(arb_bits) < 11:
322 return None, int(bit_idx)
324 arbitration_id = 0
325 for bit in arb_bits:
326 arbitration_id = (arbitration_id << 1) | bit
328 # Check for extended frame (IDE bit)
329 ide_bits = sample_bits(1)
330 is_extended = ide_bits[0] == 1 if ide_bits else False
332 if is_extended:
333 # Extended ID: read additional 18 bits
334 ext_bits = sample_bits(18)
335 for bit in ext_bits:
336 arbitration_id = (arbitration_id << 1) | bit
338 # Control field
339 # FDF (EDL), res, BRS, ESI, DLC (4 bits)
340 ctrl_bits = sample_bits(7 if not is_extended else 6)
342 if len(ctrl_bits) < (7 if not is_extended else 6):
343 return None, int(bit_idx)
345 # FDF/EDL bit - first bit of control field regardless of frame type
346 fdf = ctrl_bits[0]
347 is_fd = fdf == 1
348 brs = ctrl_bits[2] == 1 if len(ctrl_bits) > 2 else False
349 esi = ctrl_bits[3] == 1 if len(ctrl_bits) > 3 else False
351 # DLC (4 bits)
352 dlc_start = 3 if not is_extended else 2
353 dlc_bits = (
354 ctrl_bits[dlc_start : dlc_start + 4]
355 if len(ctrl_bits) >= dlc_start + 4
356 else [0, 0, 0, 0]
357 )
358 dlc = 0
359 for bit in dlc_bits:
360 dlc = (dlc << 1) | bit
362 # Get data length from DLC
363 data_length = CANFD_DLC_TO_LENGTH.get(dlc, 0)
365 # Switch to data bitrate if BRS is set
366 if is_fd and brs:
367 current_bit_period = data_bit_period
369 # Data field
370 data_bytes = []
371 for _ in range(data_length):
372 byte_bits = sample_bits(8)
373 if len(byte_bits) == 8:
374 byte_val = 0
375 for bit in byte_bits:
376 byte_val = (byte_val << 1) | bit
377 data_bytes.append(byte_val)
379 # CRC field (CRC-17 for <=16 bytes, CRC-21 for >16 bytes)
380 crc_length = 17 if data_length <= 16 else 21
381 crc_bits = sample_bits(crc_length)
382 crc = 0
383 for bit in crc_bits:
384 crc = (crc << 1) | bit
386 # Switch back to nominal bitrate for CRC delimiter, ACK, EOF
387 current_bit_period = nominal_bit_period
389 # CRC delimiter, ACK slot, ACK delimiter, EOF (7 bits)
390 sample_bits(10)
392 # Create frame
393 frame = CANFDFrame(
394 arbitration_id=arbitration_id,
395 is_extended=is_extended,
396 is_fd=is_fd,
397 brs=brs,
398 esi=esi,
399 dlc=dlc,
400 data=bytes(data_bytes),
401 crc=crc,
402 timestamp=sof_idx / sample_rate,
403 errors=errors,
404 )
406 return frame, int(bit_idx)
409def decode_can_fd(
410 data: NDArray[np.bool_] | WaveformTrace | DigitalTrace,
411 sample_rate: float = 1.0,
412 nominal_bitrate: int = 500000,
413 data_bitrate: int = 2000000,
414) -> list[ProtocolPacket]:
415 """Convenience function to decode CAN-FD frames.
417 Args:
418 data: CAN bus signal (digital array or trace).
419 sample_rate: Sample rate in Hz.
420 nominal_bitrate: Nominal bitrate in bps.
421 data_bitrate: Data phase bitrate in bps.
423 Returns:
424 List of decoded CAN-FD frames.
426 Example:
427 >>> packets = decode_can_fd(signal, sample_rate=100e6, nominal_bitrate=500000)
428 >>> for pkt in packets:
429 ... print(f"ID: 0x{pkt.annotations['arbitration_id']:X}")
430 """
431 decoder = CANFDDecoder(nominal_bitrate=nominal_bitrate, data_bitrate=data_bitrate)
432 if isinstance(data, WaveformTrace | DigitalTrace):
433 return list(decoder.decode(data))
434 else:
435 trace = DigitalTrace(
436 data=data,
437 metadata=TraceMetadata(sample_rate=sample_rate),
438 )
439 return list(decoder.decode(trace))
442__all__ = [
443 "CANFD_DLC_TO_LENGTH",
444 "CANFDDecoder",
445 "CANFDFrame",
446 "CANFDFrameType",
447 "decode_can_fd",
448]