Coverage for src / tracekit / analyzers / statistical / checksum.py: 88%

333 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Checksum and CRC field detection and identification. 

2 

3 

4This module provides tools for detecting checksum and CRC fields in binary 

5messages by analyzing field correlations and testing common algorithms. 

6""" 

7 

8from dataclasses import dataclass, field 

9from typing import TYPE_CHECKING, Any, Literal, Union 

10 

11import numpy as np 

12 

13if TYPE_CHECKING: 

14 from numpy.typing import NDArray 

15 

16# Type alias for input data 

17DataType = Union[bytes, bytearray, "NDArray[np.uint8]"] 

18 

19 

20@dataclass 

21class ChecksumCandidate: 

22 """Candidate checksum field. 

23 

24 Attributes: 

25 offset: Byte offset in message 

26 size: Field size in bytes (1, 2, or 4) 

27 position: Location in message structure 

28 correlation: Correlation with content (0-1) 

29 likely_scope: Byte range likely covered by checksum (start, end) 

30 """ 

31 

32 offset: int 

33 size: int 

34 position: Literal["header", "trailer"] 

35 correlation: float 

36 likely_scope: tuple[int, int] 

37 

38 

39@dataclass 

40class ChecksumMatch: 

41 """Identified checksum algorithm. 

42 

43 Attributes: 

44 algorithm: Algorithm name 

45 offset: Field offset in message 

46 size: Field size in bytes 

47 scope_start: Start of checksummed region 

48 scope_end: End of checksummed region 

49 match_rate: Fraction of messages that match (0-1) 

50 polynomial: CRC polynomial (for CRC algorithms) 

51 init_value: Initial value (for CRC algorithms) 

52 xor_out: Final XOR value (for CRC algorithms) 

53 """ 

54 

55 algorithm: str 

56 offset: int 

57 size: int 

58 scope_start: int 

59 scope_end: int 

60 match_rate: float 

61 polynomial: int | None = None 

62 init_value: int | None = None 

63 xor_out: int | None = None 

64 

65 

66def detect_checksum_fields( 

67 messages: list[DataType], candidate_offsets: list[int] | None = None 

68) -> list[ChecksumCandidate]: 

69 """Detect fields that are correlated with message content. 

70 

71 : Checksum and CRC Field Detection 

72 

73 Analyzes message fields to find those that vary consistently with 

74 content changes, indicating potential checksum/CRC fields. 

75 

76 Args: 

77 messages: List of messages to analyze 

78 candidate_offsets: Optional list of specific offsets to check 

79 

80 Returns: 

81 List of checksum candidates sorted by correlation 

82 

83 Example: 

84 >>> msgs = [b'\\x00\\x00DATA', b'\\x01\\x00DATA'] 

85 >>> candidates = detect_checksum_fields(msgs) 

86 >>> len(candidates) >= 0 

87 True 

88 """ 

89 if not messages: 

90 return [] 

91 

92 # Convert all messages to bytes 

93 byte_messages = [] 

94 for msg in messages: 

95 if isinstance(msg, np.ndarray): 

96 byte_messages.append(msg.tobytes() if msg.dtype == np.uint8 else bytes(msg.flatten())) 

97 else: 

98 byte_messages.append(bytes(msg)) 

99 

100 # Find minimum message length 

101 min_len = min(len(msg) for msg in byte_messages) 

102 

103 if min_len < 2: 

104 return [] 

105 

106 # Determine candidate positions 

107 if candidate_offsets is None: 

108 # Check header (first 16 bytes) and trailer (last 16 bytes) 

109 header_positions = list(range(min(16, min_len - 1))) 

110 trailer_start = max(0, min_len - 16) 

111 trailer_positions = list(range(trailer_start, min_len - 1)) 

112 candidate_offsets = list(set(header_positions + trailer_positions)) 

113 

114 candidates = [] 

115 

116 # Test each candidate offset for different field sizes 

117 for offset in candidate_offsets: 

118 for size in [1, 2, 4]: 

119 if offset + size > min_len: 

120 continue 

121 

122 # Extract field values and content 

123 field_values = [] 

124 content_hashes = [] 

125 

126 for msg in byte_messages: 

127 if len(msg) < offset + size: 127 ↛ 128line 127 didn't jump to line 128 because the condition on line 127 was never true

128 continue 

129 

130 # Extract field value 

131 field_bytes = msg[offset : offset + size] 

132 field_value = int.from_bytes(field_bytes, byteorder="big") 

133 field_values.append(field_value) 

134 

135 # Hash content (excluding the field itself) 

136 content = msg[:offset] + msg[offset + size :] 

137 content_hash = hash(content) 

138 content_hashes.append(content_hash) 

139 

140 if len(field_values) < 2: 140 ↛ 141line 140 didn't jump to line 141 because the condition on line 140 was never true

141 continue 

142 

143 # Calculate correlation between field and content 

144 # If field varies with content, it's a good candidate 

145 unique_content = len(set(content_hashes)) 

146 unique_fields = len(set(field_values)) 

147 

148 if unique_content > 1: 

149 # Correlation estimate: how much field varies with content 

150 correlation = min(1.0, unique_fields / unique_content) 

151 else: 

152 correlation = 0.0 

153 

154 # Skip if correlation is too low 

155 if correlation < 0.3: 

156 continue 

157 

158 # Determine position (header vs trailer) 

159 position: Literal["header", "trailer"] = ( 

160 "header" if offset < min_len // 2 else "trailer" 

161 ) 

162 

163 # Estimate likely scope 

164 if position == "header": 

165 likely_scope = (offset + size, min_len) 

166 else: 

167 likely_scope = (0, offset) 

168 

169 candidates.append( 

170 ChecksumCandidate( 

171 offset=offset, 

172 size=size, 

173 position=position, 

174 correlation=correlation, 

175 likely_scope=likely_scope, 

176 ) 

177 ) 

178 

179 # Sort by correlation descending 

180 candidates.sort(key=lambda c: c.correlation, reverse=True) 

181 

182 return candidates 

183 

184 

185def identify_checksum_algorithm( 

186 messages: list[DataType], field_offset: int, field_size: int | None = None 

187) -> ChecksumMatch | None: 

188 """Identify which checksum algorithm is used. 

189 

190 : Checksum and CRC Field Detection 

191 

192 Tests common checksum algorithms to identify which one matches 

193 the observed field values. 

194 

195 Args: 

196 messages: List of messages to analyze 

197 field_offset: Offset of checksum field 

198 field_size: Size of field (1, 2, or 4 bytes), auto-detect if None 

199 

200 Returns: 

201 ChecksumMatch if algorithm identified, None otherwise 

202 

203 Example: 

204 >>> msgs = [b'\\x41ABC', b'\\x42BCD'] # XOR checksum 

205 >>> match = identify_checksum_algorithm(msgs, 0, 1) 

206 >>> match is not None 

207 True 

208 """ 

209 if not messages: 

210 return None 

211 

212 # Convert messages to bytes 

213 byte_messages = [] 

214 for msg in messages: 

215 if isinstance(msg, np.ndarray): 

216 byte_messages.append(msg.tobytes() if msg.dtype == np.uint8 else bytes(msg.flatten())) 

217 else: 

218 byte_messages.append(bytes(msg)) 

219 

220 # Determine field size if not specified 

221 if field_size is None: 

222 field_sizes = [1, 2, 4] 

223 else: 

224 field_sizes = [field_size] 

225 

226 best_match = None 

227 best_rate = 0.0 

228 

229 # Try each field size 

230 for size in field_sizes: 

231 if any(len(msg) < field_offset + size for msg in byte_messages): 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true

232 continue 

233 

234 # Define algorithm tests based on field size 

235 if size == 1: 

236 algorithms = ["xor", "sum8"] 

237 elif size == 2: 

238 # Include both big and little endian CRC variants 

239 algorithms = [ 

240 "sum16_big", 

241 "sum16_little", 

242 "crc16_ccitt", 

243 "crc16_ibm", 

244 "crc16", 

245 "checksum", 

246 ] 

247 elif size == 4: 247 ↛ 250line 247 didn't jump to line 250 because the condition on line 247 was always true

248 algorithms = ["crc32"] 

249 else: 

250 continue 

251 

252 # Test each algorithm 

253 for algo in algorithms: 

254 # Map algorithm names to computation functions 

255 actual_algo = algo 

256 if algo == "crc16": 

257 actual_algo = "crc16_ccitt" 

258 elif algo == "checksum": 

259 actual_algo = "sum16_big" 

260 

261 # For CRC algorithms, try different init values 

262 init_values: list[int | None] = [None] 

263 if actual_algo in ["crc16_ccitt", "crc16_ibm"]: 

264 init_values = [0x0000, 0xFFFF] 

265 

266 for init_val in init_values: 

267 # Try different scopes 

268 for scope_start in [0, field_offset + size]: 

269 for scope_end in [field_offset, len(byte_messages[0])]: 

270 if scope_end <= scope_start: 

271 continue 

272 

273 # Test algorithm on all messages 

274 matches = 0 

275 total = 0 

276 

277 for msg in byte_messages: 

278 if len(msg) < scope_end: 278 ↛ 279line 278 didn't jump to line 279 because the condition on line 278 was never true

279 continue 

280 

281 # Try both big and little endian for field extraction 

282 endian_val: Literal["big", "little"] 

283 for endian_val in ("big", "little"): # type: ignore[assignment] 

284 expected = int.from_bytes( 

285 msg[field_offset : field_offset + size], byteorder=endian_val 

286 ) 

287 

288 # Extract data to checksum 

289 if scope_start < field_offset < scope_end: 

290 # Exclude checksum field from data 

291 data = ( 

292 msg[scope_start:field_offset] 

293 + msg[field_offset + size : scope_end] 

294 ) 

295 else: 

296 data = msg[scope_start:scope_end] 

297 

298 # Compute checksum 

299 try: 

300 if init_val is not None: 

301 computed = compute_checksum( 

302 data, actual_algo, init=init_val 

303 ) 

304 else: 

305 computed = compute_checksum(data, actual_algo) 

306 if computed == expected: 

307 matches += 1 

308 break # Found match with this endian 

309 except Exception: 

310 pass 

311 

312 total += 1 

313 

314 if total == 0: 314 ↛ 315line 314 didn't jump to line 315 because the condition on line 314 was never true

315 continue 

316 

317 match_rate = matches / total 

318 

319 # Consider it a match if >= 80% of messages match 

320 if match_rate >= 0.8 and match_rate > best_rate: 

321 best_rate = match_rate 

322 best_match = ChecksumMatch( 

323 algorithm=algo, 

324 offset=field_offset, 

325 size=size, 

326 scope_start=scope_start, 

327 scope_end=scope_end, 

328 match_rate=match_rate, 

329 init_value=init_val, 

330 ) 

331 

332 return best_match 

333 

334 

335def verify_checksums( 

336 messages: list[DataType], 

337 algorithm: str, 

338 field_offset: int, 

339 scope_start: int = 0, 

340 scope_end: int | None = None, 

341 init_value: int | None = None, 

342) -> tuple[int, int]: 

343 """Verify checksums using identified algorithm. 

344 

345 : Checksum and CRC Field Detection 

346 

347 Validates checksums across multiple messages using the specified algorithm. 

348 

349 Args: 

350 messages: List of messages to verify 

351 algorithm: Checksum algorithm name 

352 field_offset: Offset of checksum field 

353 scope_start: Start of checksummed data (default: 0) 

354 scope_end: End of checksummed data (None = message end) 

355 init_value: Initial value for CRC algorithms (None = use default) 

356 

357 Returns: 

358 Tuple of (passed, failed) counts 

359 

360 Example: 

361 >>> msgs = [b'\\x41ABC'] 

362 >>> passed, failed = verify_checksums(msgs, 'xor', 0, 1) 

363 >>> passed + failed == len(msgs) 

364 True 

365 """ 

366 if not messages: 

367 return (0, 0) 

368 

369 passed = 0 

370 failed = 0 

371 

372 # Determine field size from algorithm 

373 if algorithm in ["xor", "sum8"]: 

374 field_size = 1 

375 elif algorithm.startswith("sum16") or algorithm.startswith("crc16"): 

376 field_size = 2 

377 elif algorithm == "crc32": 377 ↛ 381line 377 didn't jump to line 381 because the condition on line 377 was always true

378 field_size = 4 

379 else: 

380 # Try to infer from first message 

381 field_size = 1 

382 

383 for msg in messages: 

384 if isinstance(msg, np.ndarray): 

385 msg = msg.tobytes() if msg.dtype == np.uint8 else bytes(msg.flatten()) 

386 else: 

387 msg = bytes(msg) 

388 

389 msg_scope_end = scope_end if scope_end is not None else len(msg) 

390 

391 if len(msg) < field_offset + field_size or len(msg) < msg_scope_end: 

392 failed += 1 

393 continue 

394 

395 # Try both endiannesses 

396 matched = False 

397 endian_val2: Literal["big", "little"] 

398 for endian_val2 in ("big", "little"): # type: ignore[assignment] 

399 expected = int.from_bytes( 

400 msg[field_offset : field_offset + field_size], byteorder=endian_val2 

401 ) 

402 

403 # Extract data to checksum 

404 if scope_start < field_offset < msg_scope_end: 404 ↛ 405line 404 didn't jump to line 405 because the condition on line 404 was never true

405 data = ( 

406 msg[scope_start:field_offset] + msg[field_offset + field_size : msg_scope_end] 

407 ) 

408 else: 

409 data = msg[scope_start:msg_scope_end] 

410 

411 # Compute checksum 

412 try: 

413 if init_value is not None: 

414 computed = compute_checksum(data, algorithm, init=init_value) 

415 else: 

416 computed = compute_checksum(data, algorithm) 

417 if computed == expected: 

418 matched = True 

419 break 

420 except Exception: 

421 pass 

422 

423 if matched: 

424 passed += 1 

425 else: 

426 failed += 1 

427 

428 return (passed, failed) 

429 

430 

431def compute_checksum(data: bytes, algorithm: str, **kwargs: Any) -> int: 

432 """Compute checksum using specified algorithm. 

433 

434 : Checksum and CRC Field Detection 

435 

436 Args: 

437 data: Data to checksum 

438 algorithm: Algorithm name 

439 **kwargs: Algorithm-specific parameters 

440 

441 Returns: 

442 Computed checksum value 

443 

444 Raises: 

445 ValueError: If algorithm is unknown 

446 

447 Example: 

448 >>> compute_checksum(b'ABC', 'xor') 

449 2 

450 """ 

451 if algorithm == "xor": 

452 return xor_checksum(data) 

453 elif algorithm == "sum8": 

454 return sum8(data) 

455 elif algorithm == "sum16_big": 

456 return sum16(data, endian="big") 

457 elif algorithm == "sum16_little": 

458 return sum16(data, endian="little") 

459 elif algorithm == "crc8": 

460 poly = kwargs.get("poly", 0x07) 

461 init = kwargs.get("init", 0x00) 

462 return crc8(data, poly=poly, init=init) 

463 elif algorithm == "crc16_ccitt": 

464 init = kwargs.get("init", 0xFFFF) 

465 return crc16_ccitt(data, init=init) 

466 elif algorithm == "crc16_ibm": 

467 init = kwargs.get("init", 0x0000) 

468 return crc16_ibm(data, init=init) 

469 elif algorithm == "crc32": 

470 return crc32(data) 

471 else: 

472 raise ValueError(f"Unknown checksum algorithm: {algorithm}") 

473 

474 

475def crc8(data: bytes, poly: int = 0x07, init: int = 0x00) -> int: 

476 """Calculate CRC-8. 

477 

478 : Checksum and CRC Field Detection 

479 

480 Standard CRC-8 with configurable polynomial. 

481 

482 Args: 

483 data: Data to checksum 

484 poly: Polynomial (default: 0x07) 

485 init: Initial value (default: 0x00) 

486 

487 Returns: 

488 CRC-8 value (0-255) 

489 

490 Example: 

491 >>> crc8(b'123456789') 

492 244 

493 """ 

494 crc = init 

495 for byte in data: 

496 crc ^= byte 

497 for _ in range(8): 

498 if crc & 0x80: 

499 crc = (crc << 1) ^ poly 

500 else: 

501 crc = crc << 1 

502 crc &= 0xFF 

503 return crc 

504 

505 

506def crc16_ccitt(data: bytes, init: int = 0xFFFF) -> int: 

507 """Calculate CRC-16-CCITT. 

508 

509 : Checksum and CRC Field Detection 

510 

511 CCITT polynomial: 0x1021 

512 

513 Args: 

514 data: Data to checksum 

515 init: Initial value (default: 0xFFFF) 

516 

517 Returns: 

518 CRC-16 value (0-65535) 

519 

520 Example: 

521 >>> crc16_ccitt(b'123456789') 

522 10673 

523 """ 

524 poly = 0x1021 

525 crc = init 

526 

527 for byte in data: 

528 crc ^= byte << 8 

529 for _ in range(8): 

530 if crc & 0x8000: 

531 crc = (crc << 1) ^ poly 

532 else: 

533 crc = crc << 1 

534 crc &= 0xFFFF 

535 

536 return crc 

537 

538 

539def crc16_ibm(data: bytes, init: int = 0x0000) -> int: 

540 """Calculate CRC-16-IBM (also known as CRC-16-ANSI). 

541 

542 : Checksum and CRC Field Detection 

543 

544 IBM polynomial: 0x8005 (reversed: 0xA001) 

545 

546 Args: 

547 data: Data to checksum 

548 init: Initial value (default: 0x0000) 

549 

550 Returns: 

551 CRC-16 value (0-65535) 

552 

553 Example: 

554 >>> crc16_ibm(b'123456789') 

555 47933 

556 """ 

557 poly = 0xA001 # Reversed polynomial for LSB-first 

558 crc = init 

559 

560 for byte in data: 

561 crc ^= byte 

562 for _ in range(8): 

563 if crc & 0x0001: 

564 crc = (crc >> 1) ^ poly 

565 else: 

566 crc = crc >> 1 

567 

568 return crc 

569 

570 

571def crc32(data: bytes) -> int: 

572 """Calculate CRC-32 (IEEE 802.3). 

573 

574 : Checksum and CRC Field Detection 

575 

576 Standard CRC-32 as used in Ethernet, ZIP, etc. 

577 

578 Args: 

579 data: Data to checksum 

580 

581 Returns: 

582 CRC-32 value (0-4294967295) 

583 

584 Example: 

585 >>> crc32(b'123456789') 

586 3421780262 

587 """ 

588 poly = 0xEDB88320 # Reversed polynomial 

589 crc = 0xFFFFFFFF 

590 

591 for byte in data: 

592 crc ^= byte 

593 for _ in range(8): 

594 if crc & 0x00000001: 

595 crc = (crc >> 1) ^ poly 

596 else: 

597 crc = crc >> 1 

598 

599 return crc ^ 0xFFFFFFFF 

600 

601 

602def sum8(data: bytes) -> int: 

603 """Calculate 8-bit sum checksum. 

604 

605 : Checksum and CRC Field Detection 

606 

607 Simple sum of all bytes, modulo 256. 

608 

609 Args: 

610 data: Data to checksum 

611 

612 Returns: 

613 Sum modulo 256 (0-255) 

614 

615 Example: 

616 >>> sum8(b'ABC') 

617 198 

618 """ 

619 return sum(data) & 0xFF 

620 

621 

622def sum16(data: bytes, endian: Literal["big", "little"] = "big") -> int: 

623 """Calculate 16-bit sum checksum. 

624 

625 : Checksum and CRC Field Detection 

626 

627 Sum of 16-bit words with configurable endianness. 

628 

629 Args: 

630 data: Data to checksum 

631 endian: Byte order ('big' or 'little', default: 'big') 

632 

633 Returns: 

634 Sum modulo 65536 (0-65535) 

635 

636 Example: 

637 >>> sum16(b'ABCD', endian='big') 

638 33923 

639 """ 

640 total = 0 

641 

642 # Process 16-bit words 

643 for i in range(0, len(data) - 1, 2): 

644 if endian == "big": 

645 word = (data[i] << 8) | data[i + 1] 

646 else: 

647 word = (data[i + 1] << 8) | data[i] 

648 total += word 

649 

650 # Handle odd byte 

651 if len(data) % 2 == 1: 

652 if endian == "big": 

653 total += data[-1] << 8 

654 else: 

655 total += data[-1] 

656 

657 return total & 0xFFFF 

658 

659 

660def xor_checksum(data: bytes) -> int: 

661 """Calculate XOR checksum. 

662 

663 : Checksum and CRC Field Detection 

664 

665 XOR of all bytes. 

666 

667 Args: 

668 data: Data to checksum 

669 

670 Returns: 

671 XOR result (0-255) 

672 

673 Example: 

674 >>> xor_checksum(b'ABC') 

675 2 

676 """ 

677 result = 0 

678 for byte in data: 

679 result ^= byte 

680 return result 

681 

682 

683@dataclass 

684class ChecksumDetectionResult: 

685 """Result of checksum detection. 

686 

687 Attributes: 

688 has_checksum: Whether a checksum was detected. 

689 offset: Byte offset of the checksum field (None if not detected). 

690 size: Size of the checksum field in bytes (None if not detected). 

691 algorithm: Identified algorithm name (None if not identified). 

692 confidence: Detection confidence (0-1). 

693 candidates: All candidate positions found. 

694 scope_start: Start of checksummed region (None if not identified). 

695 scope_end: End of checksummed region (None if not identified). 

696 init_value: Initial value for CRC algorithms (None if not applicable). 

697 """ 

698 

699 has_checksum: bool 

700 offset: int | None = None 

701 size: int | None = None 

702 algorithm: str | None = None 

703 confidence: float = 0.0 

704 candidates: list[ChecksumCandidate] = field(default_factory=list) 

705 scope_start: int | None = None 

706 scope_end: int | None = None 

707 init_value: int | None = None 

708 

709 

710class ChecksumDetector: 

711 """Object-oriented wrapper for checksum detection functionality. 

712 

713 Provides a class-based interface for checksum detection operations, 

714 wrapping the functional API for consistency with test expectations. 

715 

716 

717 

718 Example: 

719 >>> detector = ChecksumDetector() 

720 >>> result = detector.detect_checksum_field(messages) 

721 >>> if result.has_checksum: 

722 ... print(f"Checksum at offset {result.offset}") 

723 """ 

724 

725 def __init__(self, correlation_threshold: float = 0.5): 

726 """Initialize checksum detector. 

727 

728 Args: 

729 correlation_threshold: Minimum correlation for detection. 

730 """ 

731 self.correlation_threshold = correlation_threshold 

732 self._detection_result: ChecksumDetectionResult | None = None 

733 self._messages: list[DataType] = [] 

734 

735 def detect_checksum_field(self, messages: list[DataType]) -> ChecksumDetectionResult: 

736 """Detect checksum field in messages. 

737 

738 Args: 

739 messages: List of messages to analyze. 

740 

741 Returns: 

742 ChecksumDetectionResult with detection results. 

743 

744 Example: 

745 >>> detector = ChecksumDetector() 

746 >>> result = detector.detect_checksum_field(messages) 

747 """ 

748 self._messages = messages 

749 candidates = detect_checksum_fields(messages) 

750 

751 if not candidates: 751 ↛ 752line 751 didn't jump to line 752 because the condition on line 751 was never true

752 self._detection_result = ChecksumDetectionResult(has_checksum=False, confidence=0.0) 

753 return self._detection_result 

754 

755 # Filter by correlation threshold 

756 good_candidates = [c for c in candidates if c.correlation >= self.correlation_threshold] 

757 

758 if not good_candidates: 758 ↛ 760line 758 didn't jump to line 760 because the condition on line 758 was never true

759 # Report no checksum with low confidence if candidates exist but none pass threshold 

760 max_correlation = max(c.correlation for c in candidates) if candidates else 0.0 

761 self._detection_result = ChecksumDetectionResult( 

762 has_checksum=False, candidates=candidates, confidence=max_correlation 

763 ) 

764 return self._detection_result 

765 

766 # Use best candidate, preferring trailer checksums when correlation is similar 

767 best = good_candidates[0] 

768 

769 # Check if there's a trailer checksum with similar correlation 

770 for candidate in good_candidates[1:]: 770 ↛ 778line 770 didn't jump to line 778 because the loop on line 770 didn't complete

771 if candidate.position == "trailer" and best.position == "header": 

772 # Prefer trailer if correlation is within 5% of header 

773 if candidate.correlation >= best.correlation * 0.95: 773 ↛ 770line 773 didn't jump to line 770 because the condition on line 773 was always true

774 best = candidate 

775 break 

776 

777 # Try to identify algorithm for best candidate 

778 algorithm_match = identify_checksum_algorithm(messages, best.offset, best.size) 

779 

780 # If algorithm identification fails, try other high-correlation candidates 

781 if algorithm_match is None and len(good_candidates) > 1: 781 ↛ 796line 781 didn't jump to line 796 because the condition on line 781 was always true

782 for candidate in good_candidates[1:]: 

783 # Skip if correlation is too much lower 

784 if candidate.correlation < best.correlation * 0.9: 784 ↛ 785line 784 didn't jump to line 785 because the condition on line 784 was never true

785 break 

786 

787 alt_match = identify_checksum_algorithm(messages, candidate.offset, candidate.size) 

788 if alt_match is not None: 

789 # Found a candidate with identifiable algorithm 

790 best = candidate 

791 algorithm_match = alt_match 

792 break 

793 

794 # Reduce confidence if algorithm couldn't be identified 

795 # High correlation but no identifiable algorithm suggests false positive 

796 final_confidence = best.correlation 

797 if algorithm_match is None: 

798 final_confidence = best.correlation * 0.3 # Penalize unidentified algorithms 

799 

800 self._detection_result = ChecksumDetectionResult( 

801 has_checksum=True, 

802 offset=best.offset, 

803 size=best.size, 

804 algorithm=algorithm_match.algorithm if algorithm_match else None, 

805 confidence=final_confidence, 

806 candidates=good_candidates, 

807 scope_start=algorithm_match.scope_start if algorithm_match else None, 

808 scope_end=algorithm_match.scope_end if algorithm_match else None, 

809 init_value=algorithm_match.init_value if algorithm_match else None, 

810 ) 

811 return self._detection_result 

812 

813 def identify_algorithm( 

814 self, messages: list[DataType], offset: int, size: int | None = None 

815 ) -> ChecksumMatch | None: 

816 """Identify checksum algorithm at given offset. 

817 

818 Args: 

819 messages: List of messages. 

820 offset: Checksum field offset. 

821 size: Field size (auto-detect if None). 

822 

823 Returns: 

824 ChecksumMatch or None if no match found. 

825 """ 

826 return identify_checksum_algorithm(messages, offset, size) 

827 

828 def verify( 

829 self, messages: list[DataType], algorithm: str, offset: int, **kwargs: Any 

830 ) -> tuple[int, int]: 

831 """Verify checksums in messages. 

832 

833 Args: 

834 messages: List of messages. 

835 algorithm: Checksum algorithm name. 

836 offset: Checksum field offset. 

837 **kwargs: Algorithm-specific parameters. 

838 

839 Returns: 

840 Tuple of (passed, failed) counts. 

841 """ 

842 return verify_checksums(messages, algorithm, offset, **kwargs) 

843 

844 def verify_checksum(self, message: DataType) -> bool: 

845 """Verify checksum for a single message. 

846 

847 Uses previously detected checksum parameters if available. 

848 

849 Args: 

850 message: Single message to verify. 

851 

852 Returns: 

853 True if checksum is valid, False otherwise. 

854 

855 Example: 

856 >>> detector = ChecksumDetector() 

857 >>> detector.detect_checksum_field(messages) 

858 >>> is_valid = detector.verify_checksum(messages[0]) 

859 """ 

860 if self._detection_result is None or not self._detection_result.has_checksum: 860 ↛ 862line 860 didn't jump to line 862 because the condition on line 860 was never true

861 # Try to detect checksum from the single message 

862 return False 

863 

864 offset = self._detection_result.offset 

865 size = self._detection_result.size 

866 

867 if offset is None or size is None: 867 ↛ 868line 867 didn't jump to line 868 because the condition on line 867 was never true

868 return False 

869 

870 # Convert message to bytes 

871 if isinstance(message, np.ndarray): 871 ↛ 872line 871 didn't jump to line 872 because the condition on line 871 was never true

872 msg = message.tobytes() if message.dtype == np.uint8 else bytes(message.flatten()) 

873 else: 

874 msg = bytes(message) 

875 

876 if self._detection_result.algorithm is None: 876 ↛ 878line 876 didn't jump to line 878 because the condition on line 876 was never true

877 # No algorithm identified - try common ones 

878 if size == 1: 

879 algorithms = ["xor", "sum8"] 

880 elif size == 2: 

881 algorithms = ["crc16_ccitt", "crc16_ibm", "sum16_big", "sum16_little"] 

882 elif size == 4: 

883 algorithms = ["crc32"] 

884 else: 

885 algorithms = ["xor", "sum8", "crc16_ccitt", "crc16_ibm", "sum16_big", "crc32"] 

886 

887 # Try each algorithm 

888 for algo in algorithms: 

889 passed, _failed = verify_checksums([msg], algo, offset) 

890 if passed == 1: 

891 return True 

892 

893 return False 

894 

895 # Use identified algorithm 

896 passed, _failed = verify_checksums( 

897 [msg], 

898 self._detection_result.algorithm, 

899 self._detection_result.offset or 0, 

900 scope_start=self._detection_result.scope_start or 0, 

901 scope_end=self._detection_result.scope_end, 

902 init_value=self._detection_result.init_value, 

903 ) 

904 return passed == 1 

905 

906 

907__all__ = [ 

908 "ChecksumCandidate", 

909 "ChecksumDetectionResult", 

910 "ChecksumDetector", 

911 "ChecksumMatch", 

912 "compute_checksum", 

913 "crc8", 

914 "crc16_ccitt", 

915 "crc16_ibm", 

916 "crc32", 

917 "detect_checksum_fields", 

918 "identify_checksum_algorithm", 

919 "sum8", 

920 "sum16", 

921 "verify_checksums", 

922 "xor_checksum", 

923]