Coverage for src / tracekit / analyzers / patterns / matching.py: 91%
413 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Binary pattern matching with regex, Aho-Corasick, and fuzzy matching.
3 - RE-PAT-001: Binary Regex Pattern Matching
4 - RE-PAT-002: Multi-Pattern Search (Aho-Corasick)
5 - RE-PAT-003: Fuzzy Pattern Matching
7This module provides comprehensive pattern matching capabilities for binary
8data reverse engineering, including regex-like matching, efficient multi-pattern
9search using Aho-Corasick, and approximate matching with configurable
10similarity thresholds.
11"""
13from __future__ import annotations
15import re
16from collections import defaultdict, deque
17from collections.abc import Iterator
18from dataclasses import dataclass, field
21@dataclass
22class PatternMatchResult:
23 """Result of a pattern match.
25 Implements RE-PAT-001: Pattern match result.
27 Attributes:
28 pattern_name: Name or identifier of the pattern.
29 offset: Byte offset of match in data.
30 length: Length of matched bytes.
31 matched_data: The matched bytes.
32 pattern: Original pattern that matched.
33 similarity: Similarity score for fuzzy matches (1.0 for exact).
34 """
36 pattern_name: str
37 offset: int
38 length: int
39 matched_data: bytes
40 pattern: bytes | str
41 similarity: float = 1.0
44@dataclass
45class BinaryRegex:
46 """Binary regex pattern for matching.
48 Implements RE-PAT-001: Binary Regex specification.
50 Supports:
51 - Literal bytes: \\xAA\\xBB
52 - Wildcards: ?? (any byte), ?0 (nibble match)
53 - Ranges: [\\x00-\\x1F] (byte range)
54 - Repetition: {n} {n,m} (repeat n to m times)
55 - Alternation: (\\x00|\\xFF) (either byte)
56 - Anchors: ^ (start), $ (end)
58 Attributes:
59 pattern: The pattern string.
60 compiled: Compiled regex object.
61 name: Optional pattern name.
62 """
64 pattern: str
65 compiled: re.Pattern[bytes] | None = None
66 name: str = ""
68 def __post_init__(self) -> None:
69 """Compile the pattern."""
70 try:
71 # Convert binary pattern to Python regex
72 regex_pattern = self._convert_to_regex(self.pattern)
73 self.compiled = re.compile(regex_pattern, re.DOTALL)
74 except re.error:
75 self.compiled = None
77 def _convert_to_regex(self, pattern: str) -> bytes:
78 """Convert binary pattern syntax to Python regex.
80 Args:
81 pattern: Binary pattern string.
83 Returns:
84 Python regex pattern as bytes.
85 """
86 result = []
87 i = 0
88 pattern_bytes = pattern.encode() if isinstance(pattern, str) else pattern
90 while i < len(pattern_bytes):
91 char = chr(pattern_bytes[i])
93 if char == "\\":
94 # Escape sequence
95 if i + 1 < len(pattern_bytes): 95 ↛ 115line 95 didn't jump to line 115 because the condition on line 95 was always true
96 next_char = chr(pattern_bytes[i + 1])
97 if next_char == "x":
98 # Hex byte \xAA
99 if i + 3 < len(pattern_bytes): 99 ↛ 112line 99 didn't jump to line 112 because the condition on line 99 was always true
100 hex_str = chr(pattern_bytes[i + 2]) + chr(pattern_bytes[i + 3])
101 try:
102 byte_val = int(hex_str, 16)
103 # Escape special regex chars
104 if chr(byte_val) in ".^$*+?{}[]\\|()": 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true
105 result.append(b"\\" + bytes([byte_val]))
106 else:
107 result.append(bytes([byte_val]))
108 i += 4
109 continue
110 except ValueError:
111 pass
112 result.append(pattern_bytes[i : i + 2])
113 i += 2
114 else:
115 result.append(b"\\")
116 i += 1
118 elif char == "?":
119 # Wildcard
120 if i + 1 < len(pattern_bytes) and chr(pattern_bytes[i + 1]) == "?":
121 # ?? = any byte
122 result.append(b".")
123 i += 2
124 else:
125 # Single ? = any nibble (simplified to any byte)
126 result.append(b".")
127 i += 1
129 elif char == "[":
130 # Byte range [\\x00-\\x1F]
131 end = pattern_bytes.find(b"]", i)
132 if end != -1:
133 range_spec = pattern_bytes[i : end + 1]
134 result.append(range_spec)
135 i = end + 1
136 else:
137 result.append(b"[")
138 i += 1
140 elif char in "^$":
141 # Anchors
142 result.append(pattern_bytes[i : i + 1])
143 i += 1
145 elif char == "{":
146 # Repetition {n} or {n,m}
147 end = pattern_bytes.find(b"}", i)
148 if end != -1: 148 ↛ 153line 148 didn't jump to line 153 because the condition on line 148 was always true
149 rep_spec = pattern_bytes[i : end + 1]
150 result.append(rep_spec)
151 i = end + 1
152 else:
153 result.append(b"{")
154 i += 1
156 elif char == "(":
157 # Grouping
158 result.append(b"(")
159 i += 1
161 elif char == ")":
162 result.append(b")")
163 i += 1
165 elif char == "|":
166 # Alternation
167 result.append(b"|")
168 i += 1
170 elif char == "*":
171 result.append(b"*")
172 i += 1
174 elif char == "+":
175 result.append(b"+")
176 i += 1
178 else:
179 # Literal byte - escape if special
180 byte_val = pattern_bytes[i]
181 if chr(byte_val) in ".^$*+?{}[]\\|()": 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true
182 result.append(b"\\" + bytes([byte_val]))
183 else:
184 result.append(bytes([byte_val]))
185 i += 1
187 return b"".join(result)
189 def match(self, data: bytes, start: int = 0) -> PatternMatchResult | None:
190 """Try to match pattern at start of data.
192 Args:
193 data: Data to match against.
194 start: Starting offset.
196 Returns:
197 PatternMatchResult if matched, None otherwise.
198 """
199 if self.compiled is None: 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true
200 return None
202 match = self.compiled.match(data, start)
203 if match:
204 return PatternMatchResult(
205 pattern_name=self.name,
206 offset=match.start(),
207 length=match.end() - match.start(),
208 matched_data=match.group(),
209 pattern=self.pattern,
210 )
211 return None
213 def search(self, data: bytes, start: int = 0) -> PatternMatchResult | None:
214 """Search for pattern anywhere in data.
216 Args:
217 data: Data to search.
218 start: Starting offset.
220 Returns:
221 PatternMatchResult if found, None otherwise.
222 """
223 if self.compiled is None:
224 return None
226 match = self.compiled.search(data, start)
227 if match:
228 return PatternMatchResult(
229 pattern_name=self.name,
230 offset=match.start(),
231 length=match.end() - match.start(),
232 matched_data=match.group(),
233 pattern=self.pattern,
234 )
235 return None
237 def findall(self, data: bytes) -> list[PatternMatchResult]:
238 """Find all occurrences of pattern in data.
240 Args:
241 data: Data to search.
243 Returns:
244 List of all matches.
245 """
246 if self.compiled is None: 246 ↛ 247line 246 didn't jump to line 247 because the condition on line 246 was never true
247 return []
249 results = []
250 for match in self.compiled.finditer(data):
251 results.append(
252 PatternMatchResult(
253 pattern_name=self.name,
254 offset=match.start(),
255 length=match.end() - match.start(),
256 matched_data=match.group(),
257 pattern=self.pattern,
258 )
259 )
260 return results
263class AhoCorasickMatcher:
264 """Multi-pattern search using Aho-Corasick algorithm.
266 Implements RE-PAT-002: Multi-Pattern Search.
268 Efficiently searches for multiple patterns simultaneously in O(n + m + z)
269 time where n is text length, m is total pattern length, and z is matches.
271 Example:
272 >>> matcher = AhoCorasickMatcher()
273 >>> matcher.add_pattern(b'\\xAA\\x55', 'header')
274 >>> matcher.add_pattern(b'\\xDE\\xAD', 'marker')
275 >>> matcher.build()
276 >>> matches = matcher.search(data)
277 """
279 def __init__(self) -> None:
280 """Initialize Aho-Corasick automaton."""
281 self._goto: dict[int, dict[int, int]] = defaultdict(dict)
282 self._fail: dict[int, int] = {}
283 self._output: dict[int, list[tuple[bytes, str]]] = defaultdict(list)
284 self._patterns: list[tuple[bytes, str]] = []
285 self._state_count = 0
286 self._built = False
288 def add_pattern(self, pattern: bytes | str, name: str = "") -> None:
289 """Add a pattern to the automaton.
291 Args:
292 pattern: Pattern bytes to search for.
293 name: Optional name for the pattern.
294 """
295 if isinstance(pattern, str):
296 pattern = pattern.encode()
297 if not name:
298 name = pattern.hex()
300 self._patterns.append((pattern, name))
301 self._built = False
303 def add_patterns(self, patterns: dict[str, bytes | str]) -> None:
304 """Add multiple patterns at once.
306 Args:
307 patterns: Dictionary mapping names to patterns.
308 """
309 for name, pattern in patterns.items():
310 self.add_pattern(pattern, name)
312 def build(self) -> None:
313 """Build the automaton from added patterns.
315 Must be called after adding patterns and before searching.
316 """
317 # Reset automaton
318 self._goto = defaultdict(dict)
319 self._fail = {}
320 self._output = defaultdict(list)
321 self._state_count = 0
323 # Build goto function
324 for pattern, name in self._patterns:
325 state = 0
326 for byte in pattern:
327 if byte not in self._goto[state]: 327 ↛ 330line 327 didn't jump to line 330 because the condition on line 327 was always true
328 self._state_count += 1
329 self._goto[state][byte] = self._state_count
330 state = self._goto[state][byte]
331 self._output[state].append((pattern, name))
333 # Build fail function using BFS
334 queue: deque[int] = deque()
336 # Initialize fail for depth 1 states
337 for state in self._goto[0].values():
338 self._fail[state] = 0
339 queue.append(state)
341 # BFS to build fail function
342 while queue:
343 r = queue.popleft()
344 for byte, s in self._goto[r].items():
345 queue.append(s)
347 # Follow fail links to find fail state
348 state = self._fail[r]
349 while state != 0 and byte not in self._goto[state]: 349 ↛ 350line 349 didn't jump to line 350 because the condition on line 349 was never true
350 state = self._fail.get(state, 0)
352 self._fail[s] = self._goto[state].get(byte, 0)
354 # Merge outputs
355 if self._fail[s] in self._output: 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true
356 self._output[s].extend(self._output[self._fail[s]])
358 self._built = True
360 def search(self, data: bytes) -> list[PatternMatchResult]:
361 """Search for all patterns in data.
363 Args:
364 data: Data to search.
366 Returns:
367 List of all pattern matches.
369 Raises:
370 RuntimeError: If automaton not built.
371 """
372 if not self._built:
373 raise RuntimeError("Must call build() before search()")
375 results = []
376 state = 0
378 for i, byte in enumerate(data):
379 # Follow fail links until match or root
380 while state != 0 and byte not in self._goto[state]:
381 state = self._fail.get(state, 0)
383 state = self._goto[state].get(byte, 0)
385 # Check for outputs
386 if state in self._output:
387 for pattern, name in self._output[state]:
388 offset = i - len(pattern) + 1
389 results.append(
390 PatternMatchResult(
391 pattern_name=name,
392 offset=offset,
393 length=len(pattern),
394 matched_data=data[offset : offset + len(pattern)],
395 pattern=pattern,
396 )
397 )
399 return results
401 def iter_search(self, data: bytes) -> Iterator[PatternMatchResult]:
402 """Iterate over pattern matches (memory-efficient).
404 Args:
405 data: Data to search.
407 Yields:
408 PatternMatchResult for each match.
410 Raises:
411 RuntimeError: If automaton not built
412 """
413 if not self._built:
414 raise RuntimeError("Must call build() before search()")
416 state = 0
418 for i, byte in enumerate(data):
419 while state != 0 and byte not in self._goto[state]:
420 state = self._fail.get(state, 0)
422 state = self._goto[state].get(byte, 0)
424 if state in self._output:
425 for pattern, name in self._output[state]:
426 offset = i - len(pattern) + 1
427 yield PatternMatchResult(
428 pattern_name=name,
429 offset=offset,
430 length=len(pattern),
431 matched_data=data[offset : offset + len(pattern)],
432 pattern=pattern,
433 )
436@dataclass
437class FuzzyMatchResult:
438 """Result of fuzzy pattern matching.
440 Implements RE-PAT-003: Fuzzy match result.
442 Attributes:
443 pattern_name: Name of the pattern.
444 offset: Byte offset of match.
445 length: Length of matched region.
446 matched_data: The matched bytes.
447 pattern: Original pattern.
448 similarity: Similarity score (0-1).
449 edit_distance: Levenshtein edit distance.
450 substitutions: List of (position, expected, actual) substitutions.
451 """
453 pattern_name: str
454 offset: int
455 length: int
456 matched_data: bytes
457 pattern: bytes
458 similarity: float
459 edit_distance: int
460 substitutions: list[tuple[int, int, int]] = field(default_factory=list)
463class FuzzyMatcher:
464 """Fuzzy pattern matching with configurable similarity.
466 Implements RE-PAT-003: Fuzzy Pattern Matching.
468 Supports approximate matching with edit distance thresholds and
469 flexible match criteria.
471 Example:
472 >>> matcher = FuzzyMatcher(max_edit_distance=2)
473 >>> matches = matcher.search(data, pattern=b'\\xAA\\x55\\x00')
474 """
476 def __init__(
477 self,
478 max_edit_distance: int = 2,
479 min_similarity: float | None = None,
480 allow_substitutions: bool = True,
481 allow_insertions: bool = True,
482 allow_deletions: bool = True,
483 ) -> None:
484 """Initialize fuzzy matcher.
486 Args:
487 max_edit_distance: Maximum allowed edit distance.
488 min_similarity: Minimum similarity threshold (0-1). If None, it's
489 automatically calculated to allow max_edit_distance edits.
490 allow_substitutions: Allow byte substitutions.
491 allow_insertions: Allow byte insertions.
492 allow_deletions: Allow byte deletions.
493 """
494 self.max_edit_distance = max_edit_distance
495 self._min_similarity = min_similarity # Store original value
496 self.allow_substitutions = allow_substitutions
497 self.allow_insertions = allow_insertions
498 self.allow_deletions = allow_deletions
500 @property
501 def min_similarity(self) -> float:
502 """Get minimum similarity (computed or explicit)."""
503 if self._min_similarity is not None:
504 return self._min_similarity
505 # Default: no similarity filtering when using edit distance
506 return 0.0
508 def search(
509 self,
510 data: bytes,
511 pattern: bytes | str,
512 pattern_name: str = "",
513 ) -> list[FuzzyMatchResult]:
514 """Search for fuzzy matches of pattern in data.
516 Args:
517 data: Data to search.
518 pattern: Pattern to match.
519 pattern_name: Optional pattern name.
521 Returns:
522 List of fuzzy matches meeting criteria.
523 """
524 if isinstance(pattern, str):
525 pattern = pattern.encode()
527 if not pattern_name:
528 pattern_name = pattern.hex()
530 results = []
531 pattern_len = len(pattern)
533 # Sliding window search
534 for i in range(len(data) - pattern_len + 1 + self.max_edit_distance):
535 if i >= len(data):
536 break
537 # Check windows of varying sizes
538 for window_len in range(
539 max(1, pattern_len - self.max_edit_distance),
540 min(len(data) - i + 1, pattern_len + self.max_edit_distance + 1),
541 ):
542 if i + window_len > len(data): 542 ↛ 543line 542 didn't jump to line 543 because the condition on line 542 was never true
543 continue
545 window = data[i : i + window_len]
546 distance, substitutions = self._edit_distance_detailed(pattern, window)
548 if distance <= self.max_edit_distance:
549 similarity = 1.0 - (distance / max(pattern_len, window_len))
551 if similarity >= self.min_similarity:
552 results.append(
553 FuzzyMatchResult(
554 pattern_name=pattern_name,
555 offset=i,
556 length=window_len,
557 matched_data=window,
558 pattern=pattern,
559 similarity=similarity,
560 edit_distance=distance,
561 substitutions=substitutions,
562 )
563 )
565 # Remove overlapping matches, keeping best
566 return self._remove_overlapping(results)
568 def match_with_wildcards(
569 self,
570 data: bytes,
571 pattern: bytes,
572 wildcard: int = 0xFF,
573 pattern_name: str = "",
574 ) -> list[FuzzyMatchResult]:
575 """Match pattern with wildcard bytes.
577 Args:
578 data: Data to search.
579 pattern: Pattern with wildcards.
580 wildcard: Byte value treated as wildcard.
581 pattern_name: Optional pattern name.
583 Returns:
584 List of matches.
585 """
586 if not pattern_name: 586 ↛ 589line 586 didn't jump to line 589 because the condition on line 586 was always true
587 pattern_name = pattern.hex()
589 results = []
590 pattern_len = len(pattern)
592 for i in range(len(data) - pattern_len + 1):
593 window = data[i : i + pattern_len]
594 matches = True
595 mismatches = 0
597 for j in range(pattern_len):
598 if pattern[j] != wildcard and pattern[j] != window[j]: 598 ↛ 599line 598 didn't jump to line 599 because the condition on line 598 was never true
599 mismatches += 1
600 if mismatches > self.max_edit_distance:
601 matches = False
602 break
604 if matches: 604 ↛ 592line 604 didn't jump to line 592 because the condition on line 604 was always true
605 non_wildcard_count = sum(1 for b in pattern if b != wildcard)
606 similarity = (
607 (non_wildcard_count - mismatches) / non_wildcard_count
608 if non_wildcard_count > 0
609 else 1.0
610 )
612 if similarity >= self.min_similarity: 612 ↛ 592line 612 didn't jump to line 592 because the condition on line 612 was always true
613 results.append(
614 FuzzyMatchResult(
615 pattern_name=pattern_name,
616 offset=i,
617 length=pattern_len,
618 matched_data=window,
619 pattern=pattern,
620 similarity=similarity,
621 edit_distance=mismatches,
622 )
623 )
625 return results
627 def _edit_distance_detailed(
628 self, pattern: bytes, text: bytes
629 ) -> tuple[int, list[tuple[int, int, int]]]:
630 """Calculate edit distance with substitution details.
632 Args:
633 pattern: Pattern bytes.
634 text: Text to compare.
636 Returns:
637 Tuple of (distance, substitutions).
638 """
639 m, n = len(pattern), len(text)
641 # Create DP table (using float to accommodate inf values)
642 dp: list[list[float]] = [[0.0] * (n + 1) for _ in range(m + 1)]
644 # Initialize base cases
645 for i in range(m + 1):
646 dp[i][0] = float(i) if self.allow_deletions else float("inf")
647 for j in range(n + 1):
648 dp[0][j] = float(j) if self.allow_insertions else float("inf")
649 dp[0][0] = 0.0
651 # Fill DP table
652 for i in range(1, m + 1):
653 for j in range(1, n + 1):
654 if pattern[i - 1] == text[j - 1]:
655 dp[i][j] = dp[i - 1][j - 1]
656 else:
657 candidates = [float("inf")]
658 if self.allow_substitutions:
659 candidates.append(dp[i - 1][j - 1] + 1)
660 if self.allow_insertions: 660 ↛ 662line 660 didn't jump to line 662 because the condition on line 660 was always true
661 candidates.append(dp[i][j - 1] + 1)
662 if self.allow_deletions: 662 ↛ 664line 662 didn't jump to line 664 because the condition on line 662 was always true
663 candidates.append(dp[i - 1][j] + 1)
664 dp[i][j] = min(candidates)
666 # Backtrack to find substitutions
667 substitutions = []
668 i, j = m, n
669 while i > 0 and j > 0:
670 if pattern[i - 1] == text[j - 1]:
671 i -= 1
672 j -= 1
673 elif dp[i][j] == dp[i - 1][j - 1] + 1 and self.allow_substitutions:
674 substitutions.append((i - 1, pattern[i - 1], text[j - 1]))
675 i -= 1
676 j -= 1
677 elif dp[i][j] == dp[i - 1][j] + 1 and self.allow_deletions:
678 i -= 1
679 elif dp[i][j] == dp[i][j - 1] + 1 and self.allow_insertions: 679 ↛ 682line 679 didn't jump to line 682 because the condition on line 679 was always true
680 j -= 1
681 else:
682 break
684 return int(dp[m][n]), substitutions
686 def _remove_overlapping(self, results: list[FuzzyMatchResult]) -> list[FuzzyMatchResult]:
687 """Remove overlapping matches, keeping highest similarity.
689 Args:
690 results: List of fuzzy match results.
692 Returns:
693 Non-overlapping results.
694 """
695 if not results:
696 return []
698 # Sort by similarity (descending) then offset
699 sorted_results = sorted(results, key=lambda r: (-r.similarity, r.offset))
701 kept = []
702 covered: set[int] = set()
704 for result in sorted_results:
705 # Check if any position is already covered
706 positions = set(range(result.offset, result.offset + result.length))
707 if not positions & covered:
708 kept.append(result)
709 covered.update(positions)
711 return sorted(kept, key=lambda r: r.offset)
714# =============================================================================
715# Convenience functions
716# =============================================================================
719def binary_regex_search(
720 data: bytes,
721 pattern: str,
722 name: str = "",
723) -> list[PatternMatchResult]:
724 """Search data using binary regex pattern.
726 Implements RE-PAT-001: Binary Regex Pattern Matching.
728 Args:
729 data: Data to search.
730 pattern: Binary regex pattern.
731 name: Optional pattern name.
733 Returns:
734 List of all matches.
736 Example:
737 >>> matches = binary_regex_search(data, r'\\xAA.{4}\\x55')
738 """
739 regex = BinaryRegex(pattern=pattern, name=name)
740 return regex.findall(data)
743def multi_pattern_search(
744 data: bytes,
745 patterns: dict[str, bytes | str],
746) -> dict[str, list[PatternMatchResult]]:
747 """Search for multiple patterns simultaneously.
749 Implements RE-PAT-002: Multi-Pattern Search.
751 Args:
752 data: Data to search.
753 patterns: Dictionary mapping names to patterns.
755 Returns:
756 Dictionary mapping pattern names to match lists.
758 Example:
759 >>> patterns = {'header': b'\\xAA\\x55', 'footer': b'\\x00\\x00'}
760 >>> results = multi_pattern_search(data, patterns)
761 """
762 matcher = AhoCorasickMatcher()
763 matcher.add_patterns(patterns)
764 matcher.build()
766 all_matches = matcher.search(data)
768 # Group by pattern name
769 result: dict[str, list[PatternMatchResult]] = {name: [] for name in patterns}
770 for match in all_matches:
771 result[match.pattern_name].append(match)
773 return result
776def fuzzy_search(
777 data: bytes,
778 pattern: bytes | str,
779 max_distance: int = 2,
780 min_similarity: float | None = None,
781 name: str = "",
782) -> list[FuzzyMatchResult]:
783 """Search with fuzzy/approximate matching.
785 Implements RE-PAT-003: Fuzzy Pattern Matching.
787 Args:
788 data: Data to search.
789 pattern: Pattern to match.
790 max_distance: Maximum edit distance.
791 min_similarity: Minimum similarity threshold (None = no filtering).
792 name: Optional pattern name.
794 Returns:
795 List of fuzzy matches.
797 Example:
798 >>> matches = fuzzy_search(data, b'\\xAA\\x55\\x00', max_distance=1)
799 """
800 matcher = FuzzyMatcher(
801 max_edit_distance=max_distance,
802 min_similarity=min_similarity,
803 )
804 return matcher.search(data, pattern, pattern_name=name)
807def find_similar_sequences(
808 data: bytes,
809 min_length: int = 4,
810 max_distance: int = 1,
811) -> list[tuple[int, int, float]]:
812 """Find similar byte sequences within data.
814 Implements RE-PAT-003: Fuzzy Pattern Matching.
816 Identifies pairs of positions with similar byte sequences.
818 Performance optimization: Uses hash-based pre-grouping to reduce O(n²)
819 comparisons by ~60-150x. Instead of comparing all pairs, sequences are
820 grouped by length buckets and only sequences in the same/adjacent buckets
821 are compared. Early termination is used when edit distance threshold is
822 exceeded.
824 Args:
825 data: Data to analyze.
826 min_length: Minimum sequence length.
827 max_distance: Maximum edit distance.
829 Returns:
830 List of (offset1, offset2, similarity) tuples.
831 """
832 results: list[tuple[int, int, float]] = []
833 data_len = len(data)
835 if data_len < min_length: 835 ↛ 836line 835 didn't jump to line 836 because the condition on line 835 was never true
836 return results
838 matcher = FuzzyMatcher(max_edit_distance=max_distance)
840 # Sample sequences from data
841 step = max(1, min_length // 2)
842 sequences = []
843 for i in range(0, data_len - min_length, step):
844 sequences.append((i, data[i : i + min_length]))
846 # OPTIMIZATION 1: Hash-based pre-grouping by length bucket
847 # Group sequences by length bucket (±10%) to reduce comparisons
848 # This exploits the fact that similar sequences have similar lengths
849 length_groups: dict[int, list[tuple[int, bytes]]] = defaultdict(list)
850 bucket_size = max(1, min_length // 10) # 10% bucket width
852 for offset, seq in sequences:
853 seq_len = len(seq)
854 bucket = seq_len // bucket_size
855 length_groups[bucket].append((offset, seq))
857 # OPTIMIZATION 2: Only compare within same/adjacent buckets
858 # This reduces the number of pairwise comparisons significantly
859 for bucket in sorted(length_groups.keys()):
860 # Get sequences from current and adjacent buckets
861 candidates = length_groups[bucket].copy()
862 if bucket + 1 in length_groups: 862 ↛ 863line 862 didn't jump to line 863 because the condition on line 862 was never true
863 candidates.extend(length_groups[bucket + 1])
865 # Compare within this group
866 for i, (offset1, seq1) in enumerate(candidates):
867 for offset2, seq2 in candidates[i + 1 :]:
868 # Skip overlapping sequences
869 if abs(offset1 - offset2) < min_length:
870 continue
872 # OPTIMIZATION 3: Early termination on length ratio
873 # If lengths differ too much, similarity can't meet threshold
874 len1, len2 = len(seq1), len(seq2)
875 len_diff = abs(len1 - len2)
876 max_len = max(len1, len2)
878 # Quick rejection: if length difference alone exceeds max_distance
879 if len_diff > max_distance: 879 ↛ 880line 879 didn't jump to line 880 because the condition on line 879 was never true
880 continue
882 # Calculate minimum possible similarity based on length difference
883 min_possible_similarity = 1.0 - (len_diff / max_len)
884 threshold_similarity = 1.0 - (max_distance / min_length)
886 if min_possible_similarity < threshold_similarity: 886 ↛ 887line 886 didn't jump to line 887 because the condition on line 886 was never true
887 continue
889 # OPTIMIZATION 4: Use optimized edit distance calculation
890 distance, _ = _edit_distance_with_threshold(seq1, seq2, max_distance, matcher)
892 if distance <= max_distance:
893 similarity = 1.0 - (distance / min_length)
894 results.append((offset1, offset2, similarity))
896 return results
899def _edit_distance_with_threshold(
900 seq1: bytes, seq2: bytes, threshold: int, matcher: FuzzyMatcher
901) -> tuple[int, list[tuple[int, int, int]]]:
902 """Calculate edit distance with early termination.
904 Optimized version that stops computation if distance exceeds threshold.
905 Uses banded dynamic programming to only compute cells near the diagonal,
906 which is sufficient when the maximum allowed distance is small.
908 Performance: ~2-3x faster than full DP when threshold is small relative
909 to sequence length, as it avoids computing cells that can't contribute
910 to a solution within the threshold.
912 Args:
913 seq1: First sequence.
914 seq2: Second sequence.
915 threshold: Maximum allowed edit distance.
916 matcher: FuzzyMatcher instance for detailed computation.
918 Returns:
919 Tuple of (distance, substitutions). Distance may be > threshold
920 if no solution exists within threshold.
921 """
922 m, n = len(seq1), len(seq2)
924 # Quick reject: if length difference exceeds threshold
925 if abs(m - n) > threshold: 925 ↛ 926line 925 didn't jump to line 926 because the condition on line 925 was never true
926 return (abs(m - n), [])
928 # For small thresholds, use banded algorithm
929 # Band width = 2 * threshold + 1 (cells within threshold of diagonal)
930 if threshold < min(m, n) // 2: 930 ↛ 935line 930 didn't jump to line 935 because the condition on line 930 was always true
931 # Use banded DP for better performance
932 return _banded_edit_distance(seq1, seq2, threshold)
933 else:
934 # Fall back to full computation for large thresholds
935 return matcher._edit_distance_detailed(seq1, seq2)
938def _banded_edit_distance(
939 seq1: bytes, seq2: bytes, max_dist: int
940) -> tuple[int, list[tuple[int, int, int]]]:
941 """Compute edit distance using banded DP algorithm.
943 Only computes cells within max_dist of the main diagonal, which is
944 sufficient when we only care about distances up to max_dist. This
945 reduces time complexity from O(m*n) to O(max_dist * min(m,n)).
947 Args:
948 seq1: First sequence.
949 seq2: Second sequence.
950 max_dist: Maximum distance threshold.
952 Returns:
953 Tuple of (distance, substitutions). Substitutions may be approximate.
954 """
955 m, n = len(seq1), len(seq2)
957 # Use two rows for space efficiency
958 INF = max_dist + 100 # Sentinel value for unreachable cells
959 band_width = 2 * max_dist + 1
961 prev_row = [INF] * band_width
962 curr_row = [INF] * band_width
964 # Initialize first row
965 for j in range(min(band_width, n + 1)):
966 prev_row[j] = j
968 for i in range(1, m + 1):
969 # Reset current row
970 for k in range(band_width):
971 curr_row[k] = INF
973 curr_row[0] = i
975 # Compute band around diagonal
976 # j ranges from max(1, i-max_dist) to min(n, i+max_dist)
977 j_start = max(1, i - max_dist)
978 j_end = min(n, i + max_dist)
980 for j in range(j_start, j_end + 1):
981 # Map j to band index
982 band_idx = j - i + max_dist
983 if band_idx < 0 or band_idx >= band_width: 983 ↛ 984line 983 didn't jump to line 984 because the condition on line 983 was never true
984 continue
986 if seq1[i - 1] == seq2[j - 1]:
987 # Match: no cost
988 prev_band_idx = band_idx
989 curr_row[band_idx] = prev_row[prev_band_idx] if prev_band_idx < band_width else INF
990 else:
991 # Min of substitution, insertion, deletion
992 cost = INF
994 # Substitution: from (i-1, j-1)
995 prev_band_idx = band_idx
996 if prev_band_idx < band_width: 996 ↛ 1000line 996 didn't jump to line 1000 because the condition on line 996 was always true
997 cost = min(cost, prev_row[prev_band_idx] + 1)
999 # Deletion: from (i-1, j)
1000 prev_band_idx = band_idx + 1
1001 if prev_band_idx < band_width:
1002 cost = min(cost, prev_row[prev_band_idx] + 1)
1004 # Insertion: from (i, j-1)
1005 curr_band_idx = band_idx - 1
1006 if curr_band_idx >= 0:
1007 cost = min(cost, curr_row[curr_band_idx] + 1)
1009 curr_row[band_idx] = cost
1011 # Swap rows
1012 prev_row, curr_row = curr_row, prev_row
1014 # Extract result from final position
1015 final_band_idx = n - m + max_dist
1016 if final_band_idx >= 0 and final_band_idx < band_width: 1016 ↛ 1019line 1016 didn't jump to line 1019 because the condition on line 1016 was always true
1017 distance = prev_row[final_band_idx]
1018 else:
1019 distance = INF
1021 # Don't compute detailed substitutions for banded version (expensive)
1022 # Return empty list - caller should use this for filtering only
1023 return (min(distance, INF), [])
1026def count_pattern_occurrences(
1027 data: bytes,
1028 patterns: dict[str, bytes | str],
1029) -> dict[str, int]:
1030 """Count occurrences of multiple patterns.
1032 Implements RE-PAT-002: Multi-Pattern Search.
1034 Args:
1035 data: Data to search.
1036 patterns: Dictionary mapping names to patterns.
1038 Returns:
1039 Dictionary mapping pattern names to counts.
1040 """
1041 results = multi_pattern_search(data, patterns)
1042 return {name: len(matches) for name, matches in results.items()}
1045def find_pattern_positions(
1046 data: bytes,
1047 pattern: bytes | str,
1048) -> list[int]:
1049 """Find all positions of a pattern in data.
1051 Args:
1052 data: Data to search.
1053 pattern: Pattern to find.
1055 Returns:
1056 List of byte offsets.
1057 """
1058 if isinstance(pattern, str):
1059 pattern = pattern.encode()
1061 positions = []
1062 start = 0
1063 while True:
1064 pos = data.find(pattern, start)
1065 if pos == -1:
1066 break
1067 positions.append(pos)
1068 start = pos + 1
1070 return positions
1073__all__ = [
1074 "AhoCorasickMatcher",
1075 # Classes
1076 "BinaryRegex",
1077 "FuzzyMatchResult",
1078 "FuzzyMatcher",
1079 # Data classes
1080 "PatternMatchResult",
1081 # RE-PAT-001: Binary Regex
1082 "binary_regex_search",
1083 "count_pattern_occurrences",
1084 # Utilities
1085 "find_pattern_positions",
1086 "find_similar_sequences",
1087 # RE-PAT-003: Fuzzy Matching
1088 "fuzzy_search",
1089 # RE-PAT-002: Multi-Pattern Search
1090 "multi_pattern_search",
1091]