Coverage for src / tracekit / analyzers / protocols / can.py: 94%

222 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""CAN 2.0A/B protocol decoder. 

2 

3This module implements a CAN (Controller Area Network) protocol decoder 

4supporting both standard (11-bit ID) and extended (29-bit ID) frames. 

5 

6 

7Example: 

8 >>> from tracekit.analyzers.protocols.can import CANDecoder 

9 >>> decoder = CANDecoder(bitrate=500000) 

10 >>> for packet in decoder.decode(trace): 

11 ... print(f"ID: {packet.annotations['arbitration_id']:03X}") 

12 ... print(f"Data: {packet.data.hex()}") 

13 

14References: 

15 ISO 11898-1:2015 Road vehicles - CAN - Part 1: Data link layer 

16 CAN Specification Version 2.0 (Bosch, 1991) 

17""" 

18 

19from __future__ import annotations 

20 

21from dataclasses import dataclass 

22from enum import IntEnum 

23from typing import TYPE_CHECKING 

24 

25import numpy as np 

26 

27from tracekit.analyzers.protocols.base import ( 

28 AnnotationLevel, 

29 AsyncDecoder, 

30 ChannelDef, 

31 DecoderState, 

32 OptionDef, 

33) 

34from tracekit.core.types import DigitalTrace, ProtocolPacket 

35 

36if TYPE_CHECKING: 

37 from collections.abc import Iterator 

38 

39 from numpy.typing import NDArray 

40 

41 

42class CANFrameType(IntEnum): 

43 """CAN frame types.""" 

44 

45 DATA = 0 

46 REMOTE = 1 

47 ERROR = 2 

48 OVERLOAD = 3 

49 

50 

51@dataclass 

52class CANFrame: 

53 """Decoded CAN frame. 

54 

55 Attributes: 

56 arbitration_id: CAN ID (11-bit or 29-bit). 

57 is_extended: True for 29-bit extended ID. 

58 is_remote: True for remote transmission request. 

59 dlc: Data length code (0-8). 

60 data: Data bytes. 

61 crc: Received CRC value. 

62 crc_computed: Computed CRC value. 

63 timestamp: Frame start time in seconds. 

64 end_timestamp: Frame end time in seconds. 

65 errors: List of detected errors. 

66 """ 

67 

68 arbitration_id: int 

69 is_extended: bool 

70 is_remote: bool 

71 dlc: int 

72 data: bytes 

73 crc: int 

74 crc_computed: int 

75 timestamp: float 

76 end_timestamp: float 

77 errors: list[str] 

78 

79 @property 

80 def crc_valid(self) -> bool: 

81 """Check if CRC matches.""" 

82 return self.crc == self.crc_computed 

83 

84 

85class CANDecoderState(DecoderState): 

86 """State machine for CAN decoder.""" 

87 

88 def reset(self) -> None: 

89 """Reset state.""" 

90 self.bit_position = 0 

91 self.stuff_count = 0 

92 self.last_five_bits = 0 

93 self.frame_bits: list[int] = [] 

94 self.in_frame = False 

95 self.frame_start_time = 0.0 

96 

97 

98# CAN bit timing constants 

99CAN_BITRATES = { 

100 10000: "10 kbps", 

101 20000: "20 kbps", 

102 50000: "50 kbps", 

103 100000: "100 kbps", 

104 125000: "125 kbps", 

105 250000: "250 kbps", 

106 500000: "500 kbps", 

107 800000: "800 kbps", 

108 1000000: "1 Mbps", 

109} 

110 

111# CRC polynomial for CAN: x^15 + x^14 + x^10 + x^8 + x^7 + x^4 + x^3 + 1 

112CAN_CRC_POLY = 0x4599 

113CAN_CRC_INIT = 0x0000 

114 

115 

116class CANDecoder(AsyncDecoder): 

117 """CAN 2.0A/B protocol decoder. 

118 

119 Decodes CAN frames from digital signal captures, supporting: 

120 - CAN 2.0A: Standard 11-bit identifiers 

121 - CAN 2.0B: Extended 29-bit identifiers 

122 - Bit stuffing detection and removal 

123 - CRC checking 

124 - Error detection 

125 

126 Attributes: 

127 id: Decoder identifier. 

128 name: Human-readable name. 

129 channels: Required input channels. 

130 options: Configurable decoder options. 

131 

132 Example: 

133 >>> decoder = CANDecoder(bitrate=500000) 

134 >>> frames = list(decoder.decode(trace)) 

135 >>> for frame in frames: 

136 ... print(f"CAN ID: 0x{frame.annotations['arbitration_id']:03X}") 

137 """ 

138 

139 id = "can" 

140 name = "CAN" 

141 longname = "Controller Area Network" 

142 desc = "CAN 2.0A/B bus decoder" 

143 license = "MIT" 

144 

145 channels = [ # noqa: RUF012 

146 ChannelDef("can", "CAN", "CAN bus signal (CAN_H - CAN_L or single-ended)"), 

147 ] 

148 

149 options = [ # noqa: RUF012 

150 OptionDef( 

151 "bitrate", 

152 "Bit Rate", 

153 "CAN bit rate in bps", 

154 default=500000, 

155 values=list(CAN_BITRATES.keys()), 

156 ), 

157 OptionDef( 

158 "sample_point", 

159 "Sample Point", 

160 "Sample point as fraction of bit time", 

161 default=0.75, 

162 ), 

163 ] 

164 

165 def __init__( # type: ignore[no-untyped-def] 

166 self, 

167 bitrate: int = 500000, 

168 sample_point: float = 0.75, 

169 **options, 

170 ) -> None: 

171 """Initialize CAN decoder. 

172 

173 Args: 

174 bitrate: CAN bus bit rate in bps. 

175 sample_point: Sample point as fraction of bit time (0.5-0.9). 

176 **options: Additional decoder options. 

177 """ 

178 super().__init__(baudrate=bitrate, **options) 

179 self._bitrate = bitrate 

180 self._sample_point = sample_point 

181 self._state = CANDecoderState() 

182 

183 @property 

184 def bitrate(self) -> int: 

185 """Get CAN bit rate.""" 

186 return self._bitrate 

187 

188 @bitrate.setter 

189 def bitrate(self, value: int) -> None: 

190 """Set CAN bit rate.""" 

191 self._bitrate = value 

192 self._baudrate = value 

193 

194 def decode( 

195 self, 

196 trace: DigitalTrace, 

197 **channels: NDArray[np.bool_], 

198 ) -> Iterator[ProtocolPacket]: 

199 """Decode CAN frames from digital trace. 

200 

201 Args: 

202 trace: Digital trace containing CAN signal. 

203 **channels: Additional channel data (not used for single-wire CAN). 

204 

205 Yields: 

206 ProtocolPacket for each decoded CAN frame. 

207 

208 Example: 

209 >>> decoder = CANDecoder(bitrate=500000) 

210 >>> for packet in decoder.decode(trace): 

211 ... can_id = packet.annotations['arbitration_id'] 

212 ... print(f"ID: 0x{can_id:03X}, Data: {packet.data.hex()}") 

213 """ 

214 self.reset() 

215 

216 data = trace.data 

217 sample_rate = trace.metadata.sample_rate 

218 1.0 / sample_rate 

219 

220 # Calculate samples per bit 

221 1.0 / self._bitrate 

222 samples_per_bit = round(sample_rate / self._bitrate) 

223 

224 if samples_per_bit < 2: 

225 self.put_annotation( 

226 0, 

227 trace.duration, 

228 AnnotationLevel.MESSAGES, 

229 "Error: Sample rate too low for CAN decoding", 

230 ) 

231 return 

232 

233 # Sample offset within bit (where to sample) 

234 sample_offset = int(samples_per_bit * self._sample_point) 

235 

236 # Find start of frames (falling edge from recessive to dominant) 

237 # In CAN, recessive = 1, dominant = 0 

238 frame_starts = self._find_frame_starts(data, samples_per_bit) 

239 

240 for frame_start_idx in frame_starts: 

241 # Try to decode frame starting at this position 

242 frame = self._decode_frame( 

243 data, 

244 frame_start_idx, 

245 sample_rate, 

246 samples_per_bit, 

247 sample_offset, 

248 ) 

249 

250 if frame is not None: 

251 # Create packet 

252 packet = ProtocolPacket( 

253 timestamp=frame.timestamp, 

254 protocol="can", 

255 data=frame.data, 

256 annotations={ 

257 "arbitration_id": frame.arbitration_id, 

258 "is_extended": frame.is_extended, 

259 "is_remote": frame.is_remote, 

260 "dlc": frame.dlc, 

261 "crc": frame.crc, 

262 "crc_valid": frame.crc_valid, 

263 }, 

264 errors=frame.errors, 

265 end_timestamp=frame.end_timestamp, 

266 ) 

267 

268 self._packets.append(packet) 

269 yield packet 

270 

271 def _find_frame_starts( 

272 self, 

273 data: NDArray[np.bool_], 

274 samples_per_bit: int, 

275 ) -> list[int]: 

276 """Find potential frame start positions. 

277 

278 CAN frames start with a Start of Frame (SOF) bit, which is a 

279 dominant (0) bit following bus idle (recessive/1). 

280 

281 Args: 

282 data: Digital signal data. 

283 samples_per_bit: Samples per CAN bit. 

284 

285 Returns: 

286 List of sample indices for potential frame starts. 

287 """ 

288 frame_starts = [] 

289 

290 # Look for falling edges (1 -> 0) after idle period 

291 min_idle_bits = 3 # Minimum idle time before frame 

292 min_idle_samples = min_idle_bits * samples_per_bit 

293 

294 i = min_idle_samples 

295 while i < len(data) - samples_per_bit: 

296 # Check if previous samples are mostly high (idle) 

297 idle_region = data[max(0, i - min_idle_samples) : i] 

298 if np.mean(idle_region) > 0.8: # Mostly recessive 

299 # Check for falling edge (SOF) 

300 if data[i - 1] and not data[i]: 

301 frame_starts.append(i) 

302 # Skip ahead to avoid detecting same frame 

303 i += samples_per_bit * 20 # Skip at least 20 bits 

304 continue 

305 i += 1 

306 

307 return frame_starts 

308 

309 def _decode_frame( 

310 self, 

311 data: NDArray[np.bool_], 

312 start_idx: int, 

313 sample_rate: float, 

314 samples_per_bit: int, 

315 sample_offset: int, 

316 ) -> CANFrame | None: 

317 """Decode a single CAN frame. 

318 

319 Args: 

320 data: Digital signal data. 

321 start_idx: Sample index of frame start (SOF). 

322 sample_rate: Sample rate in Hz. 

323 samples_per_bit: Samples per CAN bit. 

324 sample_offset: Offset within bit for sampling. 

325 

326 Returns: 

327 Decoded CANFrame or None if decode fails. 

328 """ 

329 sample_period = 1.0 / sample_rate 

330 frame_start_time = start_idx * sample_period 

331 

332 # Extract bits with bit stuffing removal 

333 bits = [] # type: ignore[var-annotated] 

334 stuff_count = 0 

335 consecutive_same = 0 

336 last_bit = None 

337 

338 bit_idx = 0 

339 max_frame_bits = 150 # Maximum bits in extended frame with stuffing 

340 

341 current_idx = start_idx 

342 

343 while len(bits) < 128 and bit_idx < max_frame_bits: 

344 # Calculate sample position 

345 sample_pos = current_idx + sample_offset 

346 

347 if sample_pos >= len(data): 

348 break 

349 

350 # Sample the bit 

351 bit = data[sample_pos] 

352 

353 # Check for bit stuffing 

354 if last_bit is not None: 

355 if bit == last_bit: 

356 consecutive_same += 1 

357 else: 

358 consecutive_same = 1 

359 

360 # After 5 consecutive same bits, next bit should be opposite (stuff bit) 

361 if consecutive_same == 5: 

362 # Next bit should be stuff bit - skip it 

363 current_idx += samples_per_bit 

364 bit_idx += 1 

365 stuff_count += 1 

366 

367 # Sample the stuff bit to verify 

368 stuff_sample_pos = current_idx + sample_offset 

369 if stuff_sample_pos < len(data): 

370 stuff_bit = data[stuff_sample_pos] 

371 if stuff_bit == bit: 

372 # Stuff error 

373 pass 

374 consecutive_same = 0 

375 current_idx += samples_per_bit 

376 bit_idx += 1 

377 continue 

378 

379 bits.append(int(bit)) 

380 last_bit = bit 

381 

382 current_idx += samples_per_bit 

383 bit_idx += 1 

384 

385 if len(bits) < 20: # Minimum frame length 

386 return None 

387 

388 # Parse frame fields 

389 frame = self._parse_frame_bits(bits, frame_start_time, sample_period, current_idx) 

390 return frame 

391 

392 def _parse_frame_bits( 

393 self, 

394 bits: list[int], 

395 start_time: float, 

396 sample_period: float, 

397 end_idx: int, 

398 ) -> CANFrame | None: 

399 """Parse decoded bits into CAN frame. 

400 

401 Args: 

402 bits: List of bit values (after stuff bit removal). 

403 start_time: Frame start time. 

404 sample_period: Sample period. 

405 end_idx: End sample index. 

406 

407 Returns: 

408 Parsed CANFrame or None if invalid. 

409 """ 

410 errors = [] 

411 

412 try: 

413 pos = 0 

414 

415 # SOF (should be 0) 

416 if pos >= len(bits): 416 ↛ 417line 416 didn't jump to line 417 because the condition on line 416 was never true

417 return None 

418 sof = bits[pos] 

419 pos += 1 

420 

421 if sof != 0: 421 ↛ 422line 421 didn't jump to line 422 because the condition on line 421 was never true

422 errors.append("Invalid SOF") 

423 

424 # Arbitration field 

425 if pos + 11 > len(bits): 425 ↛ 426line 425 didn't jump to line 426 because the condition on line 425 was never true

426 return None 

427 

428 # First 11 bits of ID 

429 arb_id = 0 

430 for i in range(11): 

431 arb_id = (arb_id << 1) | bits[pos + i] 

432 pos += 11 

433 

434 # RTR bit (for standard) or SRR bit (for extended) 

435 if pos >= len(bits): 435 ↛ 436line 435 didn't jump to line 436 because the condition on line 435 was never true

436 return None 

437 rtr_or_srr = bits[pos] 

438 pos += 1 

439 

440 # IDE bit 

441 if pos >= len(bits): 441 ↛ 442line 441 didn't jump to line 442 because the condition on line 441 was never true

442 return None 

443 ide = bits[pos] 

444 pos += 1 

445 

446 is_extended = bool(ide) 

447 is_remote = False 

448 

449 if is_extended: 

450 # Extended frame: 18 more ID bits 

451 if pos + 18 > len(bits): 

452 return None 

453 

454 # ID extension (18 bits) 

455 for i in range(18): 

456 arb_id = (arb_id << 1) | bits[pos + i] 

457 pos += 18 

458 

459 # RTR bit 

460 if pos >= len(bits): 460 ↛ 461line 460 didn't jump to line 461 because the condition on line 460 was never true

461 return None 

462 is_remote = bool(bits[pos]) 

463 pos += 1 

464 

465 # r1, r0 reserved bits 

466 pos += 2 

467 else: 

468 # Standard frame 

469 is_remote = bool(rtr_or_srr) 

470 # r0 reserved bit 

471 pos += 1 

472 

473 # DLC (4 bits) 

474 if pos + 4 > len(bits): 474 ↛ 475line 474 didn't jump to line 475 because the condition on line 474 was never true

475 return None 

476 

477 dlc = 0 

478 for i in range(4): 

479 dlc = (dlc << 1) | bits[pos + i] 

480 pos += 4 

481 

482 # Limit DLC to 8 

483 data_len = min(dlc, 8) 

484 

485 # Data field (0-8 bytes) 

486 if not is_remote: 

487 if pos + data_len * 8 > len(bits): 

488 return None 

489 

490 data_bytes = bytearray() 

491 for byte_idx in range(data_len): 

492 byte_val = 0 

493 for bit_idx in range(8): 

494 byte_val = (byte_val << 1) | bits[pos + byte_idx * 8 + bit_idx + bit_idx] 

495 data_bytes.append(byte_val) 

496 pos += 8 

497 

498 data = bytes(data_bytes) 

499 else: 

500 data = b"" 

501 

502 # CRC field (15 bits) 

503 if pos + 15 > len(bits): 503 ↛ 504line 503 didn't jump to line 504 because the condition on line 503 was never true

504 return None 

505 

506 crc_received = 0 

507 for i in range(15): 

508 crc_received = (crc_received << 1) | bits[pos + i] 

509 pos += 15 

510 

511 # Compute CRC on frame bits before CRC field 

512 # CRC covers SOF through data field 

513 crc_data_end = pos - 15 

514 crc_computed = self._compute_crc(bits[:crc_data_end]) 

515 

516 if crc_received != crc_computed: 516 ↛ 522line 516 didn't jump to line 522 because the condition on line 516 was always true

517 errors.append( 

518 f"CRC error: received 0x{crc_received:04X}, computed 0x{crc_computed:04X}" 

519 ) 

520 

521 # CRC delimiter (should be 1) 

522 if pos < len(bits) and bits[pos] != 1: 

523 errors.append("CRC delimiter error") 

524 pos += 1 

525 

526 # ACK slot and delimiter 

527 pos += 2 

528 

529 # EOF (7 recessive bits) 

530 # We don't strictly check this 

531 

532 end_time = start_time + pos * (1.0 / self._bitrate) 

533 

534 return CANFrame( 

535 arbitration_id=arb_id, 

536 is_extended=is_extended, 

537 is_remote=is_remote, 

538 dlc=dlc, 

539 data=data, 

540 crc=crc_received, 

541 crc_computed=crc_computed, 

542 timestamp=start_time, 

543 end_timestamp=end_time, 

544 errors=errors, 

545 ) 

546 

547 except (IndexError, ValueError): 

548 return None 

549 

550 def _compute_crc(self, bits: list[int]) -> int: 

551 """Compute CAN CRC-15. 

552 

553 Args: 

554 bits: Input bits for CRC calculation. 

555 

556 Returns: 

557 15-bit CRC value. 

558 """ 

559 crc = CAN_CRC_INIT 

560 

561 for bit in bits: 

562 crc_next = (crc >> 14) & 1 

563 crc = (crc << 1) & 0x7FFF 

564 

565 if bit ^ crc_next: 

566 crc ^= CAN_CRC_POLY 

567 

568 return crc 

569 

570 

571def decode_can( 

572 trace: DigitalTrace, 

573 *, 

574 bitrate: int = 500000, 

575 sample_point: float = 0.75, 

576) -> list[CANFrame]: 

577 """Convenience function to decode CAN frames. 

578 

579 Args: 

580 trace: Digital trace containing CAN signal. 

581 bitrate: CAN bit rate in bps (default 500000). 

582 sample_point: Sample point as fraction of bit time. 

583 

584 Returns: 

585 List of decoded CANFrame objects. 

586 

587 Example: 

588 >>> frames = decode_can(trace, bitrate=500000) 

589 >>> for frame in frames: 

590 ... print(f"ID: 0x{frame.arbitration_id:03X}") 

591 """ 

592 decoder = CANDecoder(bitrate=bitrate, sample_point=sample_point) 

593 frames = [] 

594 

595 for packet in decoder.decode(trace): 

596 # Reconstruct CANFrame from packet 

597 frame = CANFrame( 

598 arbitration_id=packet.annotations["arbitration_id"], 

599 is_extended=packet.annotations["is_extended"], 

600 is_remote=packet.annotations["is_remote"], 

601 dlc=packet.annotations["dlc"], 

602 data=packet.data, 

603 crc=packet.annotations["crc"], 

604 crc_computed=packet.annotations["crc"], # Reconstruct as same 

605 timestamp=packet.timestamp, 

606 end_timestamp=packet.end_timestamp or packet.timestamp, 

607 errors=packet.errors, 

608 ) 

609 frames.append(frame) 

610 

611 return frames 

612 

613 

614__all__ = [ 

615 "CAN_BITRATES", 

616 "CANDecoder", 

617 "CANFrame", 

618 "CANFrameType", 

619 "decode_can", 

620]