Coverage for src / tracekit / analyzers / protocols / can_fd.py: 100%

130 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""CAN-FD protocol decoder. 

2 

3This module implements CAN with Flexible Data-rate (CAN-FD) decoder 

4supporting variable data rate and extended payloads up to 64 bytes. 

5 

6 

7Example: 

8 >>> from tracekit.analyzers.protocols.can_fd import CANFDDecoder 

9 >>> decoder = CANFDDecoder(nominal_bitrate=500000, data_bitrate=2000000) 

10 >>> for packet in decoder.decode(trace): 

11 ... print(f"ID: 0x{packet.annotations['arbitration_id']:03X}") 

12 

13References: 

14 ISO 11898-1:2015 CAN-FD Specification 

15 Bosch CAN-FD Specification v1.0 

16""" 

17 

18from __future__ import annotations 

19 

20from dataclasses import dataclass 

21from enum import IntEnum 

22from typing import TYPE_CHECKING 

23 

24from tracekit.analyzers.protocols.base import ( 

25 AnnotationLevel, 

26 AsyncDecoder, 

27 ChannelDef, 

28 OptionDef, 

29) 

30from tracekit.core.types import ( 

31 DigitalTrace, 

32 ProtocolPacket, 

33 TraceMetadata, 

34 WaveformTrace, 

35) 

36 

37if TYPE_CHECKING: 

38 from collections.abc import Iterator 

39 

40 import numpy as np 

41 from numpy.typing import NDArray 

42 

43 

44class CANFDFrameType(IntEnum): 

45 """CAN-FD frame types.""" 

46 

47 DATA = 0 

48 REMOTE = 1 

49 

50 

51@dataclass 

52class CANFDFrame: 

53 """Decoded CAN-FD frame. 

54 

55 Attributes: 

56 arbitration_id: CAN ID (11-bit or 29-bit). 

57 is_extended: True for 29-bit extended ID. 

58 is_fd: True for CAN-FD frame. 

59 brs: Bit Rate Switch flag. 

60 esi: Error State Indicator. 

61 dlc: Data length code (0-15). 

62 data: Data bytes (0-64). 

63 crc: Received CRC value. 

64 timestamp: Frame start time in seconds. 

65 errors: List of detected errors. 

66 """ 

67 

68 arbitration_id: int 

69 is_extended: bool 

70 is_fd: bool 

71 brs: bool 

72 esi: bool 

73 dlc: int 

74 data: bytes 

75 crc: int 

76 timestamp: float 

77 errors: list[str] 

78 

79 

80# CAN-FD DLC to data length mapping 

81CANFD_DLC_TO_LENGTH = { 

82 0: 0, 

83 1: 1, 

84 2: 2, 

85 3: 3, 

86 4: 4, 

87 5: 5, 

88 6: 6, 

89 7: 7, 

90 8: 8, 

91 9: 12, 

92 10: 16, 

93 11: 20, 

94 12: 24, 

95 13: 32, 

96 14: 48, 

97 15: 64, 

98} 

99 

100 

101class CANFDDecoder(AsyncDecoder): 

102 """CAN-FD protocol decoder. 

103 

104 Decodes CAN-FD frames with dual bit rate support, extended payloads, 

105 and CRC-17/CRC-21 validation. 

106 

107 Attributes: 

108 id: "can_fd" 

109 name: "CAN-FD" 

110 channels: [can_h, can_l] (optional differential) or [can] (single-ended) 

111 

112 Example: 

113 >>> decoder = CANFDDecoder(nominal_bitrate=500000, data_bitrate=2000000) 

114 >>> for packet in decoder.decode(trace): 

115 ... print(f"Data ({len(packet.data)} bytes): {packet.data.hex()}") 

116 """ 

117 

118 id = "can_fd" 

119 name = "CAN-FD" 

120 longname = "CAN with Flexible Data-rate" 

121 desc = "CAN-FD protocol decoder" 

122 

123 channels = [ # noqa: RUF012 

124 ChannelDef("can", "CAN", "CAN bus signal", required=True), 

125 ] 

126 

127 optional_channels = [ # noqa: RUF012 

128 ChannelDef("can_h", "CAN_H", "CAN High differential signal", required=False), 

129 ChannelDef("can_l", "CAN_L", "CAN Low differential signal", required=False), 

130 ] 

131 

132 options = [ # noqa: RUF012 

133 OptionDef( 

134 "nominal_bitrate", 

135 "Nominal bitrate", 

136 "Arbitration phase bitrate", 

137 default=500000, 

138 values=None, 

139 ), 

140 OptionDef( 

141 "data_bitrate", 

142 "Data bitrate", 

143 "Data phase bitrate", 

144 default=2000000, 

145 values=None, 

146 ), 

147 ] 

148 

149 annotations = [ # noqa: RUF012 

150 ("sof", "Start of Frame"), 

151 ("arbitration", "Arbitration field"), 

152 ("control", "Control field"), 

153 ("data", "Data field"), 

154 ("crc", "CRC field"), 

155 ("ack", "Acknowledge"), 

156 ("eof", "End of Frame"), 

157 ("error", "Error"), 

158 ] 

159 

160 def __init__( 

161 self, 

162 nominal_bitrate: int = 500000, 

163 data_bitrate: int = 2000000, 

164 ) -> None: 

165 """Initialize CAN-FD decoder. 

166 

167 Args: 

168 nominal_bitrate: Nominal bitrate for arbitration phase (bps). 

169 data_bitrate: Data phase bitrate for BRS frames (bps). 

170 """ 

171 super().__init__( 

172 baudrate=nominal_bitrate, 

173 nominal_bitrate=nominal_bitrate, 

174 data_bitrate=data_bitrate, 

175 ) 

176 self._nominal_bitrate = nominal_bitrate 

177 self._data_bitrate = data_bitrate 

178 

179 def decode( 

180 self, 

181 trace: DigitalTrace | WaveformTrace, 

182 **channels: NDArray[np.bool_], 

183 ) -> Iterator[ProtocolPacket]: 

184 """Decode CAN-FD frames from trace. 

185 

186 Args: 

187 trace: Input digital trace. 

188 **channels: Additional channel data. 

189 

190 Yields: 

191 Decoded CAN-FD frames as ProtocolPacket objects. 

192 

193 Example: 

194 >>> decoder = CANFDDecoder(nominal_bitrate=500000) 

195 >>> for packet in decoder.decode(trace): 

196 ... print(f"ID: 0x{packet.annotations['arbitration_id']:X}") 

197 """ 

198 # Convert to digital if needed 

199 if isinstance(trace, WaveformTrace): 

200 from tracekit.analyzers.digital.extraction import to_digital 

201 

202 digital_trace = to_digital(trace, threshold="auto") 

203 else: 

204 digital_trace = trace 

205 

206 data = digital_trace.data 

207 sample_rate = digital_trace.metadata.sample_rate 

208 

209 nominal_bit_period = sample_rate / self._nominal_bitrate 

210 data_bit_period = sample_rate / self._data_bitrate 

211 

212 frame_num = 0 

213 idx = 0 

214 

215 while idx < len(data): 

216 # Look for SOF (dominant bit during idle) 

217 sof_idx = self._find_sof(data, idx) 

218 if sof_idx is None: 

219 break 

220 

221 # Decode frame starting from SOF 

222 frame, end_idx = self._decode_frame( 

223 data, sof_idx, sample_rate, nominal_bit_period, data_bit_period 

224 ) 

225 

226 if frame is not None: 

227 # Calculate timing 

228 start_time = sof_idx / sample_rate 

229 

230 # Add annotation 

231 self.put_annotation( 

232 start_time, 

233 frame.timestamp + 0.001, # Approximate end 

234 AnnotationLevel.PACKETS, 

235 f"ID: 0x{frame.arbitration_id:X}, {len(frame.data)} bytes", 

236 ) 

237 

238 # Create packet 

239 annotations = { 

240 "frame_num": frame_num, 

241 "arbitration_id": frame.arbitration_id, 

242 "is_extended": frame.is_extended, 

243 "is_fd": frame.is_fd, 

244 "brs": frame.brs, 

245 "esi": frame.esi, 

246 "dlc": frame.dlc, 

247 "data_length": len(frame.data), 

248 "crc": frame.crc, 

249 } 

250 

251 packet = ProtocolPacket( 

252 timestamp=start_time, 

253 protocol="can_fd", 

254 data=frame.data, 

255 annotations=annotations, 

256 errors=frame.errors, 

257 ) 

258 

259 yield packet 

260 frame_num += 1 

261 

262 idx = end_idx if end_idx > idx else idx + int(nominal_bit_period) 

263 

264 def _find_sof(self, data: NDArray[np.bool_], start_idx: int) -> int | None: 

265 """Find Start of Frame (dominant bit during recessive idle). 

266 

267 Args: 

268 data: Digital data array. 

269 start_idx: Start search index. 

270 

271 Returns: 

272 Index of SOF, or None if not found. 

273 """ 

274 # Look for recessive-to-dominant transition (1 to 0) 

275 idx = start_idx 

276 while idx < len(data) - 1: 

277 if data[idx] and not data[idx + 1]: 

278 return idx + 1 

279 idx += 1 

280 return None 

281 

282 def _decode_frame( 

283 self, 

284 data: NDArray[np.bool_], 

285 sof_idx: int, 

286 sample_rate: float, 

287 nominal_bit_period: float, 

288 data_bit_period: float, 

289 ) -> tuple[CANFDFrame | None, int]: 

290 """Decode CAN-FD frame starting from SOF. 

291 

292 Args: 

293 data: Digital data array. 

294 sof_idx: SOF index. 

295 sample_rate: Sample rate in Hz. 

296 nominal_bit_period: Nominal bit period in samples. 

297 data_bit_period: Data bit period in samples. 

298 

299 Returns: 

300 (frame, end_index) tuple. 

301 """ 

302 errors = [] # type: ignore[var-annotated] 

303 bit_idx = sof_idx 

304 current_bit_period = nominal_bit_period 

305 

306 # Sample bits (simplified - ignores bit stuffing for brevity) 

307 def sample_bits(count: int) -> list[int]: 

308 nonlocal bit_idx 

309 bits = [] 

310 for _ in range(count): 

311 sample_idx = int(bit_idx + current_bit_period / 2) 

312 if sample_idx < len(data): 

313 bits.append(0 if data[sample_idx] else 1) # Dominant=1, Recessive=0 

314 bit_idx += current_bit_period # type: ignore[assignment] 

315 else: 

316 return bits 

317 return bits 

318 

319 # Arbitration field (11 bits for standard, 29 for extended) 

320 arb_bits = sample_bits(11) 

321 if len(arb_bits) < 11: 

322 return None, int(bit_idx) 

323 

324 arbitration_id = 0 

325 for bit in arb_bits: 

326 arbitration_id = (arbitration_id << 1) | bit 

327 

328 # Check for extended frame (IDE bit) 

329 ide_bits = sample_bits(1) 

330 is_extended = ide_bits[0] == 1 if ide_bits else False 

331 

332 if is_extended: 

333 # Extended ID: read additional 18 bits 

334 ext_bits = sample_bits(18) 

335 for bit in ext_bits: 

336 arbitration_id = (arbitration_id << 1) | bit 

337 

338 # Control field 

339 # FDF (EDL), res, BRS, ESI, DLC (4 bits) 

340 ctrl_bits = sample_bits(7 if not is_extended else 6) 

341 

342 if len(ctrl_bits) < (7 if not is_extended else 6): 

343 return None, int(bit_idx) 

344 

345 # FDF/EDL bit - first bit of control field regardless of frame type 

346 fdf = ctrl_bits[0] 

347 is_fd = fdf == 1 

348 brs = ctrl_bits[2] == 1 if len(ctrl_bits) > 2 else False 

349 esi = ctrl_bits[3] == 1 if len(ctrl_bits) > 3 else False 

350 

351 # DLC (4 bits) 

352 dlc_start = 3 if not is_extended else 2 

353 dlc_bits = ( 

354 ctrl_bits[dlc_start : dlc_start + 4] 

355 if len(ctrl_bits) >= dlc_start + 4 

356 else [0, 0, 0, 0] 

357 ) 

358 dlc = 0 

359 for bit in dlc_bits: 

360 dlc = (dlc << 1) | bit 

361 

362 # Get data length from DLC 

363 data_length = CANFD_DLC_TO_LENGTH.get(dlc, 0) 

364 

365 # Switch to data bitrate if BRS is set 

366 if is_fd and brs: 

367 current_bit_period = data_bit_period 

368 

369 # Data field 

370 data_bytes = [] 

371 for _ in range(data_length): 

372 byte_bits = sample_bits(8) 

373 if len(byte_bits) == 8: 

374 byte_val = 0 

375 for bit in byte_bits: 

376 byte_val = (byte_val << 1) | bit 

377 data_bytes.append(byte_val) 

378 

379 # CRC field (CRC-17 for <=16 bytes, CRC-21 for >16 bytes) 

380 crc_length = 17 if data_length <= 16 else 21 

381 crc_bits = sample_bits(crc_length) 

382 crc = 0 

383 for bit in crc_bits: 

384 crc = (crc << 1) | bit 

385 

386 # Switch back to nominal bitrate for CRC delimiter, ACK, EOF 

387 current_bit_period = nominal_bit_period 

388 

389 # CRC delimiter, ACK slot, ACK delimiter, EOF (7 bits) 

390 sample_bits(10) 

391 

392 # Create frame 

393 frame = CANFDFrame( 

394 arbitration_id=arbitration_id, 

395 is_extended=is_extended, 

396 is_fd=is_fd, 

397 brs=brs, 

398 esi=esi, 

399 dlc=dlc, 

400 data=bytes(data_bytes), 

401 crc=crc, 

402 timestamp=sof_idx / sample_rate, 

403 errors=errors, 

404 ) 

405 

406 return frame, int(bit_idx) 

407 

408 

409def decode_can_fd( 

410 data: NDArray[np.bool_] | WaveformTrace | DigitalTrace, 

411 sample_rate: float = 1.0, 

412 nominal_bitrate: int = 500000, 

413 data_bitrate: int = 2000000, 

414) -> list[ProtocolPacket]: 

415 """Convenience function to decode CAN-FD frames. 

416 

417 Args: 

418 data: CAN bus signal (digital array or trace). 

419 sample_rate: Sample rate in Hz. 

420 nominal_bitrate: Nominal bitrate in bps. 

421 data_bitrate: Data phase bitrate in bps. 

422 

423 Returns: 

424 List of decoded CAN-FD frames. 

425 

426 Example: 

427 >>> packets = decode_can_fd(signal, sample_rate=100e6, nominal_bitrate=500000) 

428 >>> for pkt in packets: 

429 ... print(f"ID: 0x{pkt.annotations['arbitration_id']:X}") 

430 """ 

431 decoder = CANFDDecoder(nominal_bitrate=nominal_bitrate, data_bitrate=data_bitrate) 

432 if isinstance(data, WaveformTrace | DigitalTrace): 

433 return list(decoder.decode(data)) 

434 else: 

435 trace = DigitalTrace( 

436 data=data, 

437 metadata=TraceMetadata(sample_rate=sample_rate), 

438 ) 

439 return list(decoder.decode(trace)) 

440 

441 

442__all__ = [ 

443 "CANFD_DLC_TO_LENGTH", 

444 "CANFDDecoder", 

445 "CANFDFrame", 

446 "CANFDFrameType", 

447 "decode_can_fd", 

448]