Coverage for src / tracekit / analyzers / statistical / checksum.py: 88%
333 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Checksum and CRC field detection and identification.
4This module provides tools for detecting checksum and CRC fields in binary
5messages by analyzing field correlations and testing common algorithms.
6"""
8from dataclasses import dataclass, field
9from typing import TYPE_CHECKING, Any, Literal, Union
11import numpy as np
13if TYPE_CHECKING:
14 from numpy.typing import NDArray
16# Type alias for input data
17DataType = Union[bytes, bytearray, "NDArray[np.uint8]"]
20@dataclass
21class ChecksumCandidate:
22 """Candidate checksum field.
24 Attributes:
25 offset: Byte offset in message
26 size: Field size in bytes (1, 2, or 4)
27 position: Location in message structure
28 correlation: Correlation with content (0-1)
29 likely_scope: Byte range likely covered by checksum (start, end)
30 """
32 offset: int
33 size: int
34 position: Literal["header", "trailer"]
35 correlation: float
36 likely_scope: tuple[int, int]
39@dataclass
40class ChecksumMatch:
41 """Identified checksum algorithm.
43 Attributes:
44 algorithm: Algorithm name
45 offset: Field offset in message
46 size: Field size in bytes
47 scope_start: Start of checksummed region
48 scope_end: End of checksummed region
49 match_rate: Fraction of messages that match (0-1)
50 polynomial: CRC polynomial (for CRC algorithms)
51 init_value: Initial value (for CRC algorithms)
52 xor_out: Final XOR value (for CRC algorithms)
53 """
55 algorithm: str
56 offset: int
57 size: int
58 scope_start: int
59 scope_end: int
60 match_rate: float
61 polynomial: int | None = None
62 init_value: int | None = None
63 xor_out: int | None = None
66def detect_checksum_fields(
67 messages: list[DataType], candidate_offsets: list[int] | None = None
68) -> list[ChecksumCandidate]:
69 """Detect fields that are correlated with message content.
71 : Checksum and CRC Field Detection
73 Analyzes message fields to find those that vary consistently with
74 content changes, indicating potential checksum/CRC fields.
76 Args:
77 messages: List of messages to analyze
78 candidate_offsets: Optional list of specific offsets to check
80 Returns:
81 List of checksum candidates sorted by correlation
83 Example:
84 >>> msgs = [b'\\x00\\x00DATA', b'\\x01\\x00DATA']
85 >>> candidates = detect_checksum_fields(msgs)
86 >>> len(candidates) >= 0
87 True
88 """
89 if not messages:
90 return []
92 # Convert all messages to bytes
93 byte_messages = []
94 for msg in messages:
95 if isinstance(msg, np.ndarray):
96 byte_messages.append(msg.tobytes() if msg.dtype == np.uint8 else bytes(msg.flatten()))
97 else:
98 byte_messages.append(bytes(msg))
100 # Find minimum message length
101 min_len = min(len(msg) for msg in byte_messages)
103 if min_len < 2:
104 return []
106 # Determine candidate positions
107 if candidate_offsets is None:
108 # Check header (first 16 bytes) and trailer (last 16 bytes)
109 header_positions = list(range(min(16, min_len - 1)))
110 trailer_start = max(0, min_len - 16)
111 trailer_positions = list(range(trailer_start, min_len - 1))
112 candidate_offsets = list(set(header_positions + trailer_positions))
114 candidates = []
116 # Test each candidate offset for different field sizes
117 for offset in candidate_offsets:
118 for size in [1, 2, 4]:
119 if offset + size > min_len:
120 continue
122 # Extract field values and content
123 field_values = []
124 content_hashes = []
126 for msg in byte_messages:
127 if len(msg) < offset + size: 127 ↛ 128line 127 didn't jump to line 128 because the condition on line 127 was never true
128 continue
130 # Extract field value
131 field_bytes = msg[offset : offset + size]
132 field_value = int.from_bytes(field_bytes, byteorder="big")
133 field_values.append(field_value)
135 # Hash content (excluding the field itself)
136 content = msg[:offset] + msg[offset + size :]
137 content_hash = hash(content)
138 content_hashes.append(content_hash)
140 if len(field_values) < 2: 140 ↛ 141line 140 didn't jump to line 141 because the condition on line 140 was never true
141 continue
143 # Calculate correlation between field and content
144 # If field varies with content, it's a good candidate
145 unique_content = len(set(content_hashes))
146 unique_fields = len(set(field_values))
148 if unique_content > 1:
149 # Correlation estimate: how much field varies with content
150 correlation = min(1.0, unique_fields / unique_content)
151 else:
152 correlation = 0.0
154 # Skip if correlation is too low
155 if correlation < 0.3:
156 continue
158 # Determine position (header vs trailer)
159 position: Literal["header", "trailer"] = (
160 "header" if offset < min_len // 2 else "trailer"
161 )
163 # Estimate likely scope
164 if position == "header":
165 likely_scope = (offset + size, min_len)
166 else:
167 likely_scope = (0, offset)
169 candidates.append(
170 ChecksumCandidate(
171 offset=offset,
172 size=size,
173 position=position,
174 correlation=correlation,
175 likely_scope=likely_scope,
176 )
177 )
179 # Sort by correlation descending
180 candidates.sort(key=lambda c: c.correlation, reverse=True)
182 return candidates
185def identify_checksum_algorithm(
186 messages: list[DataType], field_offset: int, field_size: int | None = None
187) -> ChecksumMatch | None:
188 """Identify which checksum algorithm is used.
190 : Checksum and CRC Field Detection
192 Tests common checksum algorithms to identify which one matches
193 the observed field values.
195 Args:
196 messages: List of messages to analyze
197 field_offset: Offset of checksum field
198 field_size: Size of field (1, 2, or 4 bytes), auto-detect if None
200 Returns:
201 ChecksumMatch if algorithm identified, None otherwise
203 Example:
204 >>> msgs = [b'\\x41ABC', b'\\x42BCD'] # XOR checksum
205 >>> match = identify_checksum_algorithm(msgs, 0, 1)
206 >>> match is not None
207 True
208 """
209 if not messages:
210 return None
212 # Convert messages to bytes
213 byte_messages = []
214 for msg in messages:
215 if isinstance(msg, np.ndarray):
216 byte_messages.append(msg.tobytes() if msg.dtype == np.uint8 else bytes(msg.flatten()))
217 else:
218 byte_messages.append(bytes(msg))
220 # Determine field size if not specified
221 if field_size is None:
222 field_sizes = [1, 2, 4]
223 else:
224 field_sizes = [field_size]
226 best_match = None
227 best_rate = 0.0
229 # Try each field size
230 for size in field_sizes:
231 if any(len(msg) < field_offset + size for msg in byte_messages): 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true
232 continue
234 # Define algorithm tests based on field size
235 if size == 1:
236 algorithms = ["xor", "sum8"]
237 elif size == 2:
238 # Include both big and little endian CRC variants
239 algorithms = [
240 "sum16_big",
241 "sum16_little",
242 "crc16_ccitt",
243 "crc16_ibm",
244 "crc16",
245 "checksum",
246 ]
247 elif size == 4: 247 ↛ 250line 247 didn't jump to line 250 because the condition on line 247 was always true
248 algorithms = ["crc32"]
249 else:
250 continue
252 # Test each algorithm
253 for algo in algorithms:
254 # Map algorithm names to computation functions
255 actual_algo = algo
256 if algo == "crc16":
257 actual_algo = "crc16_ccitt"
258 elif algo == "checksum":
259 actual_algo = "sum16_big"
261 # For CRC algorithms, try different init values
262 init_values: list[int | None] = [None]
263 if actual_algo in ["crc16_ccitt", "crc16_ibm"]:
264 init_values = [0x0000, 0xFFFF]
266 for init_val in init_values:
267 # Try different scopes
268 for scope_start in [0, field_offset + size]:
269 for scope_end in [field_offset, len(byte_messages[0])]:
270 if scope_end <= scope_start:
271 continue
273 # Test algorithm on all messages
274 matches = 0
275 total = 0
277 for msg in byte_messages:
278 if len(msg) < scope_end: 278 ↛ 279line 278 didn't jump to line 279 because the condition on line 278 was never true
279 continue
281 # Try both big and little endian for field extraction
282 endian_val: Literal["big", "little"]
283 for endian_val in ("big", "little"): # type: ignore[assignment]
284 expected = int.from_bytes(
285 msg[field_offset : field_offset + size], byteorder=endian_val
286 )
288 # Extract data to checksum
289 if scope_start < field_offset < scope_end:
290 # Exclude checksum field from data
291 data = (
292 msg[scope_start:field_offset]
293 + msg[field_offset + size : scope_end]
294 )
295 else:
296 data = msg[scope_start:scope_end]
298 # Compute checksum
299 try:
300 if init_val is not None:
301 computed = compute_checksum(
302 data, actual_algo, init=init_val
303 )
304 else:
305 computed = compute_checksum(data, actual_algo)
306 if computed == expected:
307 matches += 1
308 break # Found match with this endian
309 except Exception:
310 pass
312 total += 1
314 if total == 0: 314 ↛ 315line 314 didn't jump to line 315 because the condition on line 314 was never true
315 continue
317 match_rate = matches / total
319 # Consider it a match if >= 80% of messages match
320 if match_rate >= 0.8 and match_rate > best_rate:
321 best_rate = match_rate
322 best_match = ChecksumMatch(
323 algorithm=algo,
324 offset=field_offset,
325 size=size,
326 scope_start=scope_start,
327 scope_end=scope_end,
328 match_rate=match_rate,
329 init_value=init_val,
330 )
332 return best_match
335def verify_checksums(
336 messages: list[DataType],
337 algorithm: str,
338 field_offset: int,
339 scope_start: int = 0,
340 scope_end: int | None = None,
341 init_value: int | None = None,
342) -> tuple[int, int]:
343 """Verify checksums using identified algorithm.
345 : Checksum and CRC Field Detection
347 Validates checksums across multiple messages using the specified algorithm.
349 Args:
350 messages: List of messages to verify
351 algorithm: Checksum algorithm name
352 field_offset: Offset of checksum field
353 scope_start: Start of checksummed data (default: 0)
354 scope_end: End of checksummed data (None = message end)
355 init_value: Initial value for CRC algorithms (None = use default)
357 Returns:
358 Tuple of (passed, failed) counts
360 Example:
361 >>> msgs = [b'\\x41ABC']
362 >>> passed, failed = verify_checksums(msgs, 'xor', 0, 1)
363 >>> passed + failed == len(msgs)
364 True
365 """
366 if not messages:
367 return (0, 0)
369 passed = 0
370 failed = 0
372 # Determine field size from algorithm
373 if algorithm in ["xor", "sum8"]:
374 field_size = 1
375 elif algorithm.startswith("sum16") or algorithm.startswith("crc16"):
376 field_size = 2
377 elif algorithm == "crc32": 377 ↛ 381line 377 didn't jump to line 381 because the condition on line 377 was always true
378 field_size = 4
379 else:
380 # Try to infer from first message
381 field_size = 1
383 for msg in messages:
384 if isinstance(msg, np.ndarray):
385 msg = msg.tobytes() if msg.dtype == np.uint8 else bytes(msg.flatten())
386 else:
387 msg = bytes(msg)
389 msg_scope_end = scope_end if scope_end is not None else len(msg)
391 if len(msg) < field_offset + field_size or len(msg) < msg_scope_end:
392 failed += 1
393 continue
395 # Try both endiannesses
396 matched = False
397 endian_val2: Literal["big", "little"]
398 for endian_val2 in ("big", "little"): # type: ignore[assignment]
399 expected = int.from_bytes(
400 msg[field_offset : field_offset + field_size], byteorder=endian_val2
401 )
403 # Extract data to checksum
404 if scope_start < field_offset < msg_scope_end: 404 ↛ 405line 404 didn't jump to line 405 because the condition on line 404 was never true
405 data = (
406 msg[scope_start:field_offset] + msg[field_offset + field_size : msg_scope_end]
407 )
408 else:
409 data = msg[scope_start:msg_scope_end]
411 # Compute checksum
412 try:
413 if init_value is not None:
414 computed = compute_checksum(data, algorithm, init=init_value)
415 else:
416 computed = compute_checksum(data, algorithm)
417 if computed == expected:
418 matched = True
419 break
420 except Exception:
421 pass
423 if matched:
424 passed += 1
425 else:
426 failed += 1
428 return (passed, failed)
431def compute_checksum(data: bytes, algorithm: str, **kwargs: Any) -> int:
432 """Compute checksum using specified algorithm.
434 : Checksum and CRC Field Detection
436 Args:
437 data: Data to checksum
438 algorithm: Algorithm name
439 **kwargs: Algorithm-specific parameters
441 Returns:
442 Computed checksum value
444 Raises:
445 ValueError: If algorithm is unknown
447 Example:
448 >>> compute_checksum(b'ABC', 'xor')
449 2
450 """
451 if algorithm == "xor":
452 return xor_checksum(data)
453 elif algorithm == "sum8":
454 return sum8(data)
455 elif algorithm == "sum16_big":
456 return sum16(data, endian="big")
457 elif algorithm == "sum16_little":
458 return sum16(data, endian="little")
459 elif algorithm == "crc8":
460 poly = kwargs.get("poly", 0x07)
461 init = kwargs.get("init", 0x00)
462 return crc8(data, poly=poly, init=init)
463 elif algorithm == "crc16_ccitt":
464 init = kwargs.get("init", 0xFFFF)
465 return crc16_ccitt(data, init=init)
466 elif algorithm == "crc16_ibm":
467 init = kwargs.get("init", 0x0000)
468 return crc16_ibm(data, init=init)
469 elif algorithm == "crc32":
470 return crc32(data)
471 else:
472 raise ValueError(f"Unknown checksum algorithm: {algorithm}")
475def crc8(data: bytes, poly: int = 0x07, init: int = 0x00) -> int:
476 """Calculate CRC-8.
478 : Checksum and CRC Field Detection
480 Standard CRC-8 with configurable polynomial.
482 Args:
483 data: Data to checksum
484 poly: Polynomial (default: 0x07)
485 init: Initial value (default: 0x00)
487 Returns:
488 CRC-8 value (0-255)
490 Example:
491 >>> crc8(b'123456789')
492 244
493 """
494 crc = init
495 for byte in data:
496 crc ^= byte
497 for _ in range(8):
498 if crc & 0x80:
499 crc = (crc << 1) ^ poly
500 else:
501 crc = crc << 1
502 crc &= 0xFF
503 return crc
506def crc16_ccitt(data: bytes, init: int = 0xFFFF) -> int:
507 """Calculate CRC-16-CCITT.
509 : Checksum and CRC Field Detection
511 CCITT polynomial: 0x1021
513 Args:
514 data: Data to checksum
515 init: Initial value (default: 0xFFFF)
517 Returns:
518 CRC-16 value (0-65535)
520 Example:
521 >>> crc16_ccitt(b'123456789')
522 10673
523 """
524 poly = 0x1021
525 crc = init
527 for byte in data:
528 crc ^= byte << 8
529 for _ in range(8):
530 if crc & 0x8000:
531 crc = (crc << 1) ^ poly
532 else:
533 crc = crc << 1
534 crc &= 0xFFFF
536 return crc
539def crc16_ibm(data: bytes, init: int = 0x0000) -> int:
540 """Calculate CRC-16-IBM (also known as CRC-16-ANSI).
542 : Checksum and CRC Field Detection
544 IBM polynomial: 0x8005 (reversed: 0xA001)
546 Args:
547 data: Data to checksum
548 init: Initial value (default: 0x0000)
550 Returns:
551 CRC-16 value (0-65535)
553 Example:
554 >>> crc16_ibm(b'123456789')
555 47933
556 """
557 poly = 0xA001 # Reversed polynomial for LSB-first
558 crc = init
560 for byte in data:
561 crc ^= byte
562 for _ in range(8):
563 if crc & 0x0001:
564 crc = (crc >> 1) ^ poly
565 else:
566 crc = crc >> 1
568 return crc
571def crc32(data: bytes) -> int:
572 """Calculate CRC-32 (IEEE 802.3).
574 : Checksum and CRC Field Detection
576 Standard CRC-32 as used in Ethernet, ZIP, etc.
578 Args:
579 data: Data to checksum
581 Returns:
582 CRC-32 value (0-4294967295)
584 Example:
585 >>> crc32(b'123456789')
586 3421780262
587 """
588 poly = 0xEDB88320 # Reversed polynomial
589 crc = 0xFFFFFFFF
591 for byte in data:
592 crc ^= byte
593 for _ in range(8):
594 if crc & 0x00000001:
595 crc = (crc >> 1) ^ poly
596 else:
597 crc = crc >> 1
599 return crc ^ 0xFFFFFFFF
602def sum8(data: bytes) -> int:
603 """Calculate 8-bit sum checksum.
605 : Checksum and CRC Field Detection
607 Simple sum of all bytes, modulo 256.
609 Args:
610 data: Data to checksum
612 Returns:
613 Sum modulo 256 (0-255)
615 Example:
616 >>> sum8(b'ABC')
617 198
618 """
619 return sum(data) & 0xFF
622def sum16(data: bytes, endian: Literal["big", "little"] = "big") -> int:
623 """Calculate 16-bit sum checksum.
625 : Checksum and CRC Field Detection
627 Sum of 16-bit words with configurable endianness.
629 Args:
630 data: Data to checksum
631 endian: Byte order ('big' or 'little', default: 'big')
633 Returns:
634 Sum modulo 65536 (0-65535)
636 Example:
637 >>> sum16(b'ABCD', endian='big')
638 33923
639 """
640 total = 0
642 # Process 16-bit words
643 for i in range(0, len(data) - 1, 2):
644 if endian == "big":
645 word = (data[i] << 8) | data[i + 1]
646 else:
647 word = (data[i + 1] << 8) | data[i]
648 total += word
650 # Handle odd byte
651 if len(data) % 2 == 1:
652 if endian == "big":
653 total += data[-1] << 8
654 else:
655 total += data[-1]
657 return total & 0xFFFF
660def xor_checksum(data: bytes) -> int:
661 """Calculate XOR checksum.
663 : Checksum and CRC Field Detection
665 XOR of all bytes.
667 Args:
668 data: Data to checksum
670 Returns:
671 XOR result (0-255)
673 Example:
674 >>> xor_checksum(b'ABC')
675 2
676 """
677 result = 0
678 for byte in data:
679 result ^= byte
680 return result
683@dataclass
684class ChecksumDetectionResult:
685 """Result of checksum detection.
687 Attributes:
688 has_checksum: Whether a checksum was detected.
689 offset: Byte offset of the checksum field (None if not detected).
690 size: Size of the checksum field in bytes (None if not detected).
691 algorithm: Identified algorithm name (None if not identified).
692 confidence: Detection confidence (0-1).
693 candidates: All candidate positions found.
694 scope_start: Start of checksummed region (None if not identified).
695 scope_end: End of checksummed region (None if not identified).
696 init_value: Initial value for CRC algorithms (None if not applicable).
697 """
699 has_checksum: bool
700 offset: int | None = None
701 size: int | None = None
702 algorithm: str | None = None
703 confidence: float = 0.0
704 candidates: list[ChecksumCandidate] = field(default_factory=list)
705 scope_start: int | None = None
706 scope_end: int | None = None
707 init_value: int | None = None
710class ChecksumDetector:
711 """Object-oriented wrapper for checksum detection functionality.
713 Provides a class-based interface for checksum detection operations,
714 wrapping the functional API for consistency with test expectations.
718 Example:
719 >>> detector = ChecksumDetector()
720 >>> result = detector.detect_checksum_field(messages)
721 >>> if result.has_checksum:
722 ... print(f"Checksum at offset {result.offset}")
723 """
725 def __init__(self, correlation_threshold: float = 0.5):
726 """Initialize checksum detector.
728 Args:
729 correlation_threshold: Minimum correlation for detection.
730 """
731 self.correlation_threshold = correlation_threshold
732 self._detection_result: ChecksumDetectionResult | None = None
733 self._messages: list[DataType] = []
735 def detect_checksum_field(self, messages: list[DataType]) -> ChecksumDetectionResult:
736 """Detect checksum field in messages.
738 Args:
739 messages: List of messages to analyze.
741 Returns:
742 ChecksumDetectionResult with detection results.
744 Example:
745 >>> detector = ChecksumDetector()
746 >>> result = detector.detect_checksum_field(messages)
747 """
748 self._messages = messages
749 candidates = detect_checksum_fields(messages)
751 if not candidates: 751 ↛ 752line 751 didn't jump to line 752 because the condition on line 751 was never true
752 self._detection_result = ChecksumDetectionResult(has_checksum=False, confidence=0.0)
753 return self._detection_result
755 # Filter by correlation threshold
756 good_candidates = [c for c in candidates if c.correlation >= self.correlation_threshold]
758 if not good_candidates: 758 ↛ 760line 758 didn't jump to line 760 because the condition on line 758 was never true
759 # Report no checksum with low confidence if candidates exist but none pass threshold
760 max_correlation = max(c.correlation for c in candidates) if candidates else 0.0
761 self._detection_result = ChecksumDetectionResult(
762 has_checksum=False, candidates=candidates, confidence=max_correlation
763 )
764 return self._detection_result
766 # Use best candidate, preferring trailer checksums when correlation is similar
767 best = good_candidates[0]
769 # Check if there's a trailer checksum with similar correlation
770 for candidate in good_candidates[1:]: 770 ↛ 778line 770 didn't jump to line 778 because the loop on line 770 didn't complete
771 if candidate.position == "trailer" and best.position == "header":
772 # Prefer trailer if correlation is within 5% of header
773 if candidate.correlation >= best.correlation * 0.95: 773 ↛ 770line 773 didn't jump to line 770 because the condition on line 773 was always true
774 best = candidate
775 break
777 # Try to identify algorithm for best candidate
778 algorithm_match = identify_checksum_algorithm(messages, best.offset, best.size)
780 # If algorithm identification fails, try other high-correlation candidates
781 if algorithm_match is None and len(good_candidates) > 1: 781 ↛ 796line 781 didn't jump to line 796 because the condition on line 781 was always true
782 for candidate in good_candidates[1:]:
783 # Skip if correlation is too much lower
784 if candidate.correlation < best.correlation * 0.9: 784 ↛ 785line 784 didn't jump to line 785 because the condition on line 784 was never true
785 break
787 alt_match = identify_checksum_algorithm(messages, candidate.offset, candidate.size)
788 if alt_match is not None:
789 # Found a candidate with identifiable algorithm
790 best = candidate
791 algorithm_match = alt_match
792 break
794 # Reduce confidence if algorithm couldn't be identified
795 # High correlation but no identifiable algorithm suggests false positive
796 final_confidence = best.correlation
797 if algorithm_match is None:
798 final_confidence = best.correlation * 0.3 # Penalize unidentified algorithms
800 self._detection_result = ChecksumDetectionResult(
801 has_checksum=True,
802 offset=best.offset,
803 size=best.size,
804 algorithm=algorithm_match.algorithm if algorithm_match else None,
805 confidence=final_confidence,
806 candidates=good_candidates,
807 scope_start=algorithm_match.scope_start if algorithm_match else None,
808 scope_end=algorithm_match.scope_end if algorithm_match else None,
809 init_value=algorithm_match.init_value if algorithm_match else None,
810 )
811 return self._detection_result
813 def identify_algorithm(
814 self, messages: list[DataType], offset: int, size: int | None = None
815 ) -> ChecksumMatch | None:
816 """Identify checksum algorithm at given offset.
818 Args:
819 messages: List of messages.
820 offset: Checksum field offset.
821 size: Field size (auto-detect if None).
823 Returns:
824 ChecksumMatch or None if no match found.
825 """
826 return identify_checksum_algorithm(messages, offset, size)
828 def verify(
829 self, messages: list[DataType], algorithm: str, offset: int, **kwargs: Any
830 ) -> tuple[int, int]:
831 """Verify checksums in messages.
833 Args:
834 messages: List of messages.
835 algorithm: Checksum algorithm name.
836 offset: Checksum field offset.
837 **kwargs: Algorithm-specific parameters.
839 Returns:
840 Tuple of (passed, failed) counts.
841 """
842 return verify_checksums(messages, algorithm, offset, **kwargs)
844 def verify_checksum(self, message: DataType) -> bool:
845 """Verify checksum for a single message.
847 Uses previously detected checksum parameters if available.
849 Args:
850 message: Single message to verify.
852 Returns:
853 True if checksum is valid, False otherwise.
855 Example:
856 >>> detector = ChecksumDetector()
857 >>> detector.detect_checksum_field(messages)
858 >>> is_valid = detector.verify_checksum(messages[0])
859 """
860 if self._detection_result is None or not self._detection_result.has_checksum: 860 ↛ 862line 860 didn't jump to line 862 because the condition on line 860 was never true
861 # Try to detect checksum from the single message
862 return False
864 offset = self._detection_result.offset
865 size = self._detection_result.size
867 if offset is None or size is None: 867 ↛ 868line 867 didn't jump to line 868 because the condition on line 867 was never true
868 return False
870 # Convert message to bytes
871 if isinstance(message, np.ndarray): 871 ↛ 872line 871 didn't jump to line 872 because the condition on line 871 was never true
872 msg = message.tobytes() if message.dtype == np.uint8 else bytes(message.flatten())
873 else:
874 msg = bytes(message)
876 if self._detection_result.algorithm is None: 876 ↛ 878line 876 didn't jump to line 878 because the condition on line 876 was never true
877 # No algorithm identified - try common ones
878 if size == 1:
879 algorithms = ["xor", "sum8"]
880 elif size == 2:
881 algorithms = ["crc16_ccitt", "crc16_ibm", "sum16_big", "sum16_little"]
882 elif size == 4:
883 algorithms = ["crc32"]
884 else:
885 algorithms = ["xor", "sum8", "crc16_ccitt", "crc16_ibm", "sum16_big", "crc32"]
887 # Try each algorithm
888 for algo in algorithms:
889 passed, _failed = verify_checksums([msg], algo, offset)
890 if passed == 1:
891 return True
893 return False
895 # Use identified algorithm
896 passed, _failed = verify_checksums(
897 [msg],
898 self._detection_result.algorithm,
899 self._detection_result.offset or 0,
900 scope_start=self._detection_result.scope_start or 0,
901 scope_end=self._detection_result.scope_end,
902 init_value=self._detection_result.init_value,
903 )
904 return passed == 1
907__all__ = [
908 "ChecksumCandidate",
909 "ChecksumDetectionResult",
910 "ChecksumDetector",
911 "ChecksumMatch",
912 "compute_checksum",
913 "crc8",
914 "crc16_ccitt",
915 "crc16_ibm",
916 "crc32",
917 "detect_checksum_fields",
918 "identify_checksum_algorithm",
919 "sum8",
920 "sum16",
921 "verify_checksums",
922 "xor_checksum",
923]