Coverage for src / tracekit / analyzers / patterns / matching.py: 91%

413 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Binary pattern matching with regex, Aho-Corasick, and fuzzy matching. 

2 

3 - RE-PAT-001: Binary Regex Pattern Matching 

4 - RE-PAT-002: Multi-Pattern Search (Aho-Corasick) 

5 - RE-PAT-003: Fuzzy Pattern Matching 

6 

7This module provides comprehensive pattern matching capabilities for binary 

8data reverse engineering, including regex-like matching, efficient multi-pattern 

9search using Aho-Corasick, and approximate matching with configurable 

10similarity thresholds. 

11""" 

12 

13from __future__ import annotations 

14 

15import re 

16from collections import defaultdict, deque 

17from collections.abc import Iterator 

18from dataclasses import dataclass, field 

19 

20 

21@dataclass 

22class PatternMatchResult: 

23 """Result of a pattern match. 

24 

25 Implements RE-PAT-001: Pattern match result. 

26 

27 Attributes: 

28 pattern_name: Name or identifier of the pattern. 

29 offset: Byte offset of match in data. 

30 length: Length of matched bytes. 

31 matched_data: The matched bytes. 

32 pattern: Original pattern that matched. 

33 similarity: Similarity score for fuzzy matches (1.0 for exact). 

34 """ 

35 

36 pattern_name: str 

37 offset: int 

38 length: int 

39 matched_data: bytes 

40 pattern: bytes | str 

41 similarity: float = 1.0 

42 

43 

44@dataclass 

45class BinaryRegex: 

46 """Binary regex pattern for matching. 

47 

48 Implements RE-PAT-001: Binary Regex specification. 

49 

50 Supports: 

51 - Literal bytes: \\xAA\\xBB 

52 - Wildcards: ?? (any byte), ?0 (nibble match) 

53 - Ranges: [\\x00-\\x1F] (byte range) 

54 - Repetition: {n} {n,m} (repeat n to m times) 

55 - Alternation: (\\x00|\\xFF) (either byte) 

56 - Anchors: ^ (start), $ (end) 

57 

58 Attributes: 

59 pattern: The pattern string. 

60 compiled: Compiled regex object. 

61 name: Optional pattern name. 

62 """ 

63 

64 pattern: str 

65 compiled: re.Pattern[bytes] | None = None 

66 name: str = "" 

67 

68 def __post_init__(self) -> None: 

69 """Compile the pattern.""" 

70 try: 

71 # Convert binary pattern to Python regex 

72 regex_pattern = self._convert_to_regex(self.pattern) 

73 self.compiled = re.compile(regex_pattern, re.DOTALL) 

74 except re.error: 

75 self.compiled = None 

76 

77 def _convert_to_regex(self, pattern: str) -> bytes: 

78 """Convert binary pattern syntax to Python regex. 

79 

80 Args: 

81 pattern: Binary pattern string. 

82 

83 Returns: 

84 Python regex pattern as bytes. 

85 """ 

86 result = [] 

87 i = 0 

88 pattern_bytes = pattern.encode() if isinstance(pattern, str) else pattern 

89 

90 while i < len(pattern_bytes): 

91 char = chr(pattern_bytes[i]) 

92 

93 if char == "\\": 

94 # Escape sequence 

95 if i + 1 < len(pattern_bytes): 95 ↛ 115line 95 didn't jump to line 115 because the condition on line 95 was always true

96 next_char = chr(pattern_bytes[i + 1]) 

97 if next_char == "x": 

98 # Hex byte \xAA 

99 if i + 3 < len(pattern_bytes): 99 ↛ 112line 99 didn't jump to line 112 because the condition on line 99 was always true

100 hex_str = chr(pattern_bytes[i + 2]) + chr(pattern_bytes[i + 3]) 

101 try: 

102 byte_val = int(hex_str, 16) 

103 # Escape special regex chars 

104 if chr(byte_val) in ".^$*+?{}[]\\|()": 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true

105 result.append(b"\\" + bytes([byte_val])) 

106 else: 

107 result.append(bytes([byte_val])) 

108 i += 4 

109 continue 

110 except ValueError: 

111 pass 

112 result.append(pattern_bytes[i : i + 2]) 

113 i += 2 

114 else: 

115 result.append(b"\\") 

116 i += 1 

117 

118 elif char == "?": 

119 # Wildcard 

120 if i + 1 < len(pattern_bytes) and chr(pattern_bytes[i + 1]) == "?": 

121 # ?? = any byte 

122 result.append(b".") 

123 i += 2 

124 else: 

125 # Single ? = any nibble (simplified to any byte) 

126 result.append(b".") 

127 i += 1 

128 

129 elif char == "[": 

130 # Byte range [\\x00-\\x1F] 

131 end = pattern_bytes.find(b"]", i) 

132 if end != -1: 

133 range_spec = pattern_bytes[i : end + 1] 

134 result.append(range_spec) 

135 i = end + 1 

136 else: 

137 result.append(b"[") 

138 i += 1 

139 

140 elif char in "^$": 

141 # Anchors 

142 result.append(pattern_bytes[i : i + 1]) 

143 i += 1 

144 

145 elif char == "{": 

146 # Repetition {n} or {n,m} 

147 end = pattern_bytes.find(b"}", i) 

148 if end != -1: 148 ↛ 153line 148 didn't jump to line 153 because the condition on line 148 was always true

149 rep_spec = pattern_bytes[i : end + 1] 

150 result.append(rep_spec) 

151 i = end + 1 

152 else: 

153 result.append(b"{") 

154 i += 1 

155 

156 elif char == "(": 

157 # Grouping 

158 result.append(b"(") 

159 i += 1 

160 

161 elif char == ")": 

162 result.append(b")") 

163 i += 1 

164 

165 elif char == "|": 

166 # Alternation 

167 result.append(b"|") 

168 i += 1 

169 

170 elif char == "*": 

171 result.append(b"*") 

172 i += 1 

173 

174 elif char == "+": 

175 result.append(b"+") 

176 i += 1 

177 

178 else: 

179 # Literal byte - escape if special 

180 byte_val = pattern_bytes[i] 

181 if chr(byte_val) in ".^$*+?{}[]\\|()": 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true

182 result.append(b"\\" + bytes([byte_val])) 

183 else: 

184 result.append(bytes([byte_val])) 

185 i += 1 

186 

187 return b"".join(result) 

188 

189 def match(self, data: bytes, start: int = 0) -> PatternMatchResult | None: 

190 """Try to match pattern at start of data. 

191 

192 Args: 

193 data: Data to match against. 

194 start: Starting offset. 

195 

196 Returns: 

197 PatternMatchResult if matched, None otherwise. 

198 """ 

199 if self.compiled is None: 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true

200 return None 

201 

202 match = self.compiled.match(data, start) 

203 if match: 

204 return PatternMatchResult( 

205 pattern_name=self.name, 

206 offset=match.start(), 

207 length=match.end() - match.start(), 

208 matched_data=match.group(), 

209 pattern=self.pattern, 

210 ) 

211 return None 

212 

213 def search(self, data: bytes, start: int = 0) -> PatternMatchResult | None: 

214 """Search for pattern anywhere in data. 

215 

216 Args: 

217 data: Data to search. 

218 start: Starting offset. 

219 

220 Returns: 

221 PatternMatchResult if found, None otherwise. 

222 """ 

223 if self.compiled is None: 

224 return None 

225 

226 match = self.compiled.search(data, start) 

227 if match: 

228 return PatternMatchResult( 

229 pattern_name=self.name, 

230 offset=match.start(), 

231 length=match.end() - match.start(), 

232 matched_data=match.group(), 

233 pattern=self.pattern, 

234 ) 

235 return None 

236 

237 def findall(self, data: bytes) -> list[PatternMatchResult]: 

238 """Find all occurrences of pattern in data. 

239 

240 Args: 

241 data: Data to search. 

242 

243 Returns: 

244 List of all matches. 

245 """ 

246 if self.compiled is None: 246 ↛ 247line 246 didn't jump to line 247 because the condition on line 246 was never true

247 return [] 

248 

249 results = [] 

250 for match in self.compiled.finditer(data): 

251 results.append( 

252 PatternMatchResult( 

253 pattern_name=self.name, 

254 offset=match.start(), 

255 length=match.end() - match.start(), 

256 matched_data=match.group(), 

257 pattern=self.pattern, 

258 ) 

259 ) 

260 return results 

261 

262 

263class AhoCorasickMatcher: 

264 """Multi-pattern search using Aho-Corasick algorithm. 

265 

266 Implements RE-PAT-002: Multi-Pattern Search. 

267 

268 Efficiently searches for multiple patterns simultaneously in O(n + m + z) 

269 time where n is text length, m is total pattern length, and z is matches. 

270 

271 Example: 

272 >>> matcher = AhoCorasickMatcher() 

273 >>> matcher.add_pattern(b'\\xAA\\x55', 'header') 

274 >>> matcher.add_pattern(b'\\xDE\\xAD', 'marker') 

275 >>> matcher.build() 

276 >>> matches = matcher.search(data) 

277 """ 

278 

279 def __init__(self) -> None: 

280 """Initialize Aho-Corasick automaton.""" 

281 self._goto: dict[int, dict[int, int]] = defaultdict(dict) 

282 self._fail: dict[int, int] = {} 

283 self._output: dict[int, list[tuple[bytes, str]]] = defaultdict(list) 

284 self._patterns: list[tuple[bytes, str]] = [] 

285 self._state_count = 0 

286 self._built = False 

287 

288 def add_pattern(self, pattern: bytes | str, name: str = "") -> None: 

289 """Add a pattern to the automaton. 

290 

291 Args: 

292 pattern: Pattern bytes to search for. 

293 name: Optional name for the pattern. 

294 """ 

295 if isinstance(pattern, str): 

296 pattern = pattern.encode() 

297 if not name: 

298 name = pattern.hex() 

299 

300 self._patterns.append((pattern, name)) 

301 self._built = False 

302 

303 def add_patterns(self, patterns: dict[str, bytes | str]) -> None: 

304 """Add multiple patterns at once. 

305 

306 Args: 

307 patterns: Dictionary mapping names to patterns. 

308 """ 

309 for name, pattern in patterns.items(): 

310 self.add_pattern(pattern, name) 

311 

312 def build(self) -> None: 

313 """Build the automaton from added patterns. 

314 

315 Must be called after adding patterns and before searching. 

316 """ 

317 # Reset automaton 

318 self._goto = defaultdict(dict) 

319 self._fail = {} 

320 self._output = defaultdict(list) 

321 self._state_count = 0 

322 

323 # Build goto function 

324 for pattern, name in self._patterns: 

325 state = 0 

326 for byte in pattern: 

327 if byte not in self._goto[state]: 327 ↛ 330line 327 didn't jump to line 330 because the condition on line 327 was always true

328 self._state_count += 1 

329 self._goto[state][byte] = self._state_count 

330 state = self._goto[state][byte] 

331 self._output[state].append((pattern, name)) 

332 

333 # Build fail function using BFS 

334 queue: deque[int] = deque() 

335 

336 # Initialize fail for depth 1 states 

337 for state in self._goto[0].values(): 

338 self._fail[state] = 0 

339 queue.append(state) 

340 

341 # BFS to build fail function 

342 while queue: 

343 r = queue.popleft() 

344 for byte, s in self._goto[r].items(): 

345 queue.append(s) 

346 

347 # Follow fail links to find fail state 

348 state = self._fail[r] 

349 while state != 0 and byte not in self._goto[state]: 349 ↛ 350line 349 didn't jump to line 350 because the condition on line 349 was never true

350 state = self._fail.get(state, 0) 

351 

352 self._fail[s] = self._goto[state].get(byte, 0) 

353 

354 # Merge outputs 

355 if self._fail[s] in self._output: 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true

356 self._output[s].extend(self._output[self._fail[s]]) 

357 

358 self._built = True 

359 

360 def search(self, data: bytes) -> list[PatternMatchResult]: 

361 """Search for all patterns in data. 

362 

363 Args: 

364 data: Data to search. 

365 

366 Returns: 

367 List of all pattern matches. 

368 

369 Raises: 

370 RuntimeError: If automaton not built. 

371 """ 

372 if not self._built: 

373 raise RuntimeError("Must call build() before search()") 

374 

375 results = [] 

376 state = 0 

377 

378 for i, byte in enumerate(data): 

379 # Follow fail links until match or root 

380 while state != 0 and byte not in self._goto[state]: 

381 state = self._fail.get(state, 0) 

382 

383 state = self._goto[state].get(byte, 0) 

384 

385 # Check for outputs 

386 if state in self._output: 

387 for pattern, name in self._output[state]: 

388 offset = i - len(pattern) + 1 

389 results.append( 

390 PatternMatchResult( 

391 pattern_name=name, 

392 offset=offset, 

393 length=len(pattern), 

394 matched_data=data[offset : offset + len(pattern)], 

395 pattern=pattern, 

396 ) 

397 ) 

398 

399 return results 

400 

401 def iter_search(self, data: bytes) -> Iterator[PatternMatchResult]: 

402 """Iterate over pattern matches (memory-efficient). 

403 

404 Args: 

405 data: Data to search. 

406 

407 Yields: 

408 PatternMatchResult for each match. 

409 

410 Raises: 

411 RuntimeError: If automaton not built 

412 """ 

413 if not self._built: 

414 raise RuntimeError("Must call build() before search()") 

415 

416 state = 0 

417 

418 for i, byte in enumerate(data): 

419 while state != 0 and byte not in self._goto[state]: 

420 state = self._fail.get(state, 0) 

421 

422 state = self._goto[state].get(byte, 0) 

423 

424 if state in self._output: 

425 for pattern, name in self._output[state]: 

426 offset = i - len(pattern) + 1 

427 yield PatternMatchResult( 

428 pattern_name=name, 

429 offset=offset, 

430 length=len(pattern), 

431 matched_data=data[offset : offset + len(pattern)], 

432 pattern=pattern, 

433 ) 

434 

435 

436@dataclass 

437class FuzzyMatchResult: 

438 """Result of fuzzy pattern matching. 

439 

440 Implements RE-PAT-003: Fuzzy match result. 

441 

442 Attributes: 

443 pattern_name: Name of the pattern. 

444 offset: Byte offset of match. 

445 length: Length of matched region. 

446 matched_data: The matched bytes. 

447 pattern: Original pattern. 

448 similarity: Similarity score (0-1). 

449 edit_distance: Levenshtein edit distance. 

450 substitutions: List of (position, expected, actual) substitutions. 

451 """ 

452 

453 pattern_name: str 

454 offset: int 

455 length: int 

456 matched_data: bytes 

457 pattern: bytes 

458 similarity: float 

459 edit_distance: int 

460 substitutions: list[tuple[int, int, int]] = field(default_factory=list) 

461 

462 

463class FuzzyMatcher: 

464 """Fuzzy pattern matching with configurable similarity. 

465 

466 Implements RE-PAT-003: Fuzzy Pattern Matching. 

467 

468 Supports approximate matching with edit distance thresholds and 

469 flexible match criteria. 

470 

471 Example: 

472 >>> matcher = FuzzyMatcher(max_edit_distance=2) 

473 >>> matches = matcher.search(data, pattern=b'\\xAA\\x55\\x00') 

474 """ 

475 

476 def __init__( 

477 self, 

478 max_edit_distance: int = 2, 

479 min_similarity: float | None = None, 

480 allow_substitutions: bool = True, 

481 allow_insertions: bool = True, 

482 allow_deletions: bool = True, 

483 ) -> None: 

484 """Initialize fuzzy matcher. 

485 

486 Args: 

487 max_edit_distance: Maximum allowed edit distance. 

488 min_similarity: Minimum similarity threshold (0-1). If None, it's 

489 automatically calculated to allow max_edit_distance edits. 

490 allow_substitutions: Allow byte substitutions. 

491 allow_insertions: Allow byte insertions. 

492 allow_deletions: Allow byte deletions. 

493 """ 

494 self.max_edit_distance = max_edit_distance 

495 self._min_similarity = min_similarity # Store original value 

496 self.allow_substitutions = allow_substitutions 

497 self.allow_insertions = allow_insertions 

498 self.allow_deletions = allow_deletions 

499 

500 @property 

501 def min_similarity(self) -> float: 

502 """Get minimum similarity (computed or explicit).""" 

503 if self._min_similarity is not None: 

504 return self._min_similarity 

505 # Default: no similarity filtering when using edit distance 

506 return 0.0 

507 

508 def search( 

509 self, 

510 data: bytes, 

511 pattern: bytes | str, 

512 pattern_name: str = "", 

513 ) -> list[FuzzyMatchResult]: 

514 """Search for fuzzy matches of pattern in data. 

515 

516 Args: 

517 data: Data to search. 

518 pattern: Pattern to match. 

519 pattern_name: Optional pattern name. 

520 

521 Returns: 

522 List of fuzzy matches meeting criteria. 

523 """ 

524 if isinstance(pattern, str): 

525 pattern = pattern.encode() 

526 

527 if not pattern_name: 

528 pattern_name = pattern.hex() 

529 

530 results = [] 

531 pattern_len = len(pattern) 

532 

533 # Sliding window search 

534 for i in range(len(data) - pattern_len + 1 + self.max_edit_distance): 

535 if i >= len(data): 

536 break 

537 # Check windows of varying sizes 

538 for window_len in range( 

539 max(1, pattern_len - self.max_edit_distance), 

540 min(len(data) - i + 1, pattern_len + self.max_edit_distance + 1), 

541 ): 

542 if i + window_len > len(data): 542 ↛ 543line 542 didn't jump to line 543 because the condition on line 542 was never true

543 continue 

544 

545 window = data[i : i + window_len] 

546 distance, substitutions = self._edit_distance_detailed(pattern, window) 

547 

548 if distance <= self.max_edit_distance: 

549 similarity = 1.0 - (distance / max(pattern_len, window_len)) 

550 

551 if similarity >= self.min_similarity: 

552 results.append( 

553 FuzzyMatchResult( 

554 pattern_name=pattern_name, 

555 offset=i, 

556 length=window_len, 

557 matched_data=window, 

558 pattern=pattern, 

559 similarity=similarity, 

560 edit_distance=distance, 

561 substitutions=substitutions, 

562 ) 

563 ) 

564 

565 # Remove overlapping matches, keeping best 

566 return self._remove_overlapping(results) 

567 

568 def match_with_wildcards( 

569 self, 

570 data: bytes, 

571 pattern: bytes, 

572 wildcard: int = 0xFF, 

573 pattern_name: str = "", 

574 ) -> list[FuzzyMatchResult]: 

575 """Match pattern with wildcard bytes. 

576 

577 Args: 

578 data: Data to search. 

579 pattern: Pattern with wildcards. 

580 wildcard: Byte value treated as wildcard. 

581 pattern_name: Optional pattern name. 

582 

583 Returns: 

584 List of matches. 

585 """ 

586 if not pattern_name: 586 ↛ 589line 586 didn't jump to line 589 because the condition on line 586 was always true

587 pattern_name = pattern.hex() 

588 

589 results = [] 

590 pattern_len = len(pattern) 

591 

592 for i in range(len(data) - pattern_len + 1): 

593 window = data[i : i + pattern_len] 

594 matches = True 

595 mismatches = 0 

596 

597 for j in range(pattern_len): 

598 if pattern[j] != wildcard and pattern[j] != window[j]: 598 ↛ 599line 598 didn't jump to line 599 because the condition on line 598 was never true

599 mismatches += 1 

600 if mismatches > self.max_edit_distance: 

601 matches = False 

602 break 

603 

604 if matches: 604 ↛ 592line 604 didn't jump to line 592 because the condition on line 604 was always true

605 non_wildcard_count = sum(1 for b in pattern if b != wildcard) 

606 similarity = ( 

607 (non_wildcard_count - mismatches) / non_wildcard_count 

608 if non_wildcard_count > 0 

609 else 1.0 

610 ) 

611 

612 if similarity >= self.min_similarity: 612 ↛ 592line 612 didn't jump to line 592 because the condition on line 612 was always true

613 results.append( 

614 FuzzyMatchResult( 

615 pattern_name=pattern_name, 

616 offset=i, 

617 length=pattern_len, 

618 matched_data=window, 

619 pattern=pattern, 

620 similarity=similarity, 

621 edit_distance=mismatches, 

622 ) 

623 ) 

624 

625 return results 

626 

627 def _edit_distance_detailed( 

628 self, pattern: bytes, text: bytes 

629 ) -> tuple[int, list[tuple[int, int, int]]]: 

630 """Calculate edit distance with substitution details. 

631 

632 Args: 

633 pattern: Pattern bytes. 

634 text: Text to compare. 

635 

636 Returns: 

637 Tuple of (distance, substitutions). 

638 """ 

639 m, n = len(pattern), len(text) 

640 

641 # Create DP table (using float to accommodate inf values) 

642 dp: list[list[float]] = [[0.0] * (n + 1) for _ in range(m + 1)] 

643 

644 # Initialize base cases 

645 for i in range(m + 1): 

646 dp[i][0] = float(i) if self.allow_deletions else float("inf") 

647 for j in range(n + 1): 

648 dp[0][j] = float(j) if self.allow_insertions else float("inf") 

649 dp[0][0] = 0.0 

650 

651 # Fill DP table 

652 for i in range(1, m + 1): 

653 for j in range(1, n + 1): 

654 if pattern[i - 1] == text[j - 1]: 

655 dp[i][j] = dp[i - 1][j - 1] 

656 else: 

657 candidates = [float("inf")] 

658 if self.allow_substitutions: 

659 candidates.append(dp[i - 1][j - 1] + 1) 

660 if self.allow_insertions: 660 ↛ 662line 660 didn't jump to line 662 because the condition on line 660 was always true

661 candidates.append(dp[i][j - 1] + 1) 

662 if self.allow_deletions: 662 ↛ 664line 662 didn't jump to line 664 because the condition on line 662 was always true

663 candidates.append(dp[i - 1][j] + 1) 

664 dp[i][j] = min(candidates) 

665 

666 # Backtrack to find substitutions 

667 substitutions = [] 

668 i, j = m, n 

669 while i > 0 and j > 0: 

670 if pattern[i - 1] == text[j - 1]: 

671 i -= 1 

672 j -= 1 

673 elif dp[i][j] == dp[i - 1][j - 1] + 1 and self.allow_substitutions: 

674 substitutions.append((i - 1, pattern[i - 1], text[j - 1])) 

675 i -= 1 

676 j -= 1 

677 elif dp[i][j] == dp[i - 1][j] + 1 and self.allow_deletions: 

678 i -= 1 

679 elif dp[i][j] == dp[i][j - 1] + 1 and self.allow_insertions: 679 ↛ 682line 679 didn't jump to line 682 because the condition on line 679 was always true

680 j -= 1 

681 else: 

682 break 

683 

684 return int(dp[m][n]), substitutions 

685 

686 def _remove_overlapping(self, results: list[FuzzyMatchResult]) -> list[FuzzyMatchResult]: 

687 """Remove overlapping matches, keeping highest similarity. 

688 

689 Args: 

690 results: List of fuzzy match results. 

691 

692 Returns: 

693 Non-overlapping results. 

694 """ 

695 if not results: 

696 return [] 

697 

698 # Sort by similarity (descending) then offset 

699 sorted_results = sorted(results, key=lambda r: (-r.similarity, r.offset)) 

700 

701 kept = [] 

702 covered: set[int] = set() 

703 

704 for result in sorted_results: 

705 # Check if any position is already covered 

706 positions = set(range(result.offset, result.offset + result.length)) 

707 if not positions & covered: 

708 kept.append(result) 

709 covered.update(positions) 

710 

711 return sorted(kept, key=lambda r: r.offset) 

712 

713 

714# ============================================================================= 

715# Convenience functions 

716# ============================================================================= 

717 

718 

719def binary_regex_search( 

720 data: bytes, 

721 pattern: str, 

722 name: str = "", 

723) -> list[PatternMatchResult]: 

724 """Search data using binary regex pattern. 

725 

726 Implements RE-PAT-001: Binary Regex Pattern Matching. 

727 

728 Args: 

729 data: Data to search. 

730 pattern: Binary regex pattern. 

731 name: Optional pattern name. 

732 

733 Returns: 

734 List of all matches. 

735 

736 Example: 

737 >>> matches = binary_regex_search(data, r'\\xAA.{4}\\x55') 

738 """ 

739 regex = BinaryRegex(pattern=pattern, name=name) 

740 return regex.findall(data) 

741 

742 

743def multi_pattern_search( 

744 data: bytes, 

745 patterns: dict[str, bytes | str], 

746) -> dict[str, list[PatternMatchResult]]: 

747 """Search for multiple patterns simultaneously. 

748 

749 Implements RE-PAT-002: Multi-Pattern Search. 

750 

751 Args: 

752 data: Data to search. 

753 patterns: Dictionary mapping names to patterns. 

754 

755 Returns: 

756 Dictionary mapping pattern names to match lists. 

757 

758 Example: 

759 >>> patterns = {'header': b'\\xAA\\x55', 'footer': b'\\x00\\x00'} 

760 >>> results = multi_pattern_search(data, patterns) 

761 """ 

762 matcher = AhoCorasickMatcher() 

763 matcher.add_patterns(patterns) 

764 matcher.build() 

765 

766 all_matches = matcher.search(data) 

767 

768 # Group by pattern name 

769 result: dict[str, list[PatternMatchResult]] = {name: [] for name in patterns} 

770 for match in all_matches: 

771 result[match.pattern_name].append(match) 

772 

773 return result 

774 

775 

776def fuzzy_search( 

777 data: bytes, 

778 pattern: bytes | str, 

779 max_distance: int = 2, 

780 min_similarity: float | None = None, 

781 name: str = "", 

782) -> list[FuzzyMatchResult]: 

783 """Search with fuzzy/approximate matching. 

784 

785 Implements RE-PAT-003: Fuzzy Pattern Matching. 

786 

787 Args: 

788 data: Data to search. 

789 pattern: Pattern to match. 

790 max_distance: Maximum edit distance. 

791 min_similarity: Minimum similarity threshold (None = no filtering). 

792 name: Optional pattern name. 

793 

794 Returns: 

795 List of fuzzy matches. 

796 

797 Example: 

798 >>> matches = fuzzy_search(data, b'\\xAA\\x55\\x00', max_distance=1) 

799 """ 

800 matcher = FuzzyMatcher( 

801 max_edit_distance=max_distance, 

802 min_similarity=min_similarity, 

803 ) 

804 return matcher.search(data, pattern, pattern_name=name) 

805 

806 

807def find_similar_sequences( 

808 data: bytes, 

809 min_length: int = 4, 

810 max_distance: int = 1, 

811) -> list[tuple[int, int, float]]: 

812 """Find similar byte sequences within data. 

813 

814 Implements RE-PAT-003: Fuzzy Pattern Matching. 

815 

816 Identifies pairs of positions with similar byte sequences. 

817 

818 Performance optimization: Uses hash-based pre-grouping to reduce O(n²) 

819 comparisons by ~60-150x. Instead of comparing all pairs, sequences are 

820 grouped by length buckets and only sequences in the same/adjacent buckets 

821 are compared. Early termination is used when edit distance threshold is 

822 exceeded. 

823 

824 Args: 

825 data: Data to analyze. 

826 min_length: Minimum sequence length. 

827 max_distance: Maximum edit distance. 

828 

829 Returns: 

830 List of (offset1, offset2, similarity) tuples. 

831 """ 

832 results: list[tuple[int, int, float]] = [] 

833 data_len = len(data) 

834 

835 if data_len < min_length: 835 ↛ 836line 835 didn't jump to line 836 because the condition on line 835 was never true

836 return results 

837 

838 matcher = FuzzyMatcher(max_edit_distance=max_distance) 

839 

840 # Sample sequences from data 

841 step = max(1, min_length // 2) 

842 sequences = [] 

843 for i in range(0, data_len - min_length, step): 

844 sequences.append((i, data[i : i + min_length])) 

845 

846 # OPTIMIZATION 1: Hash-based pre-grouping by length bucket 

847 # Group sequences by length bucket (±10%) to reduce comparisons 

848 # This exploits the fact that similar sequences have similar lengths 

849 length_groups: dict[int, list[tuple[int, bytes]]] = defaultdict(list) 

850 bucket_size = max(1, min_length // 10) # 10% bucket width 

851 

852 for offset, seq in sequences: 

853 seq_len = len(seq) 

854 bucket = seq_len // bucket_size 

855 length_groups[bucket].append((offset, seq)) 

856 

857 # OPTIMIZATION 2: Only compare within same/adjacent buckets 

858 # This reduces the number of pairwise comparisons significantly 

859 for bucket in sorted(length_groups.keys()): 

860 # Get sequences from current and adjacent buckets 

861 candidates = length_groups[bucket].copy() 

862 if bucket + 1 in length_groups: 862 ↛ 863line 862 didn't jump to line 863 because the condition on line 862 was never true

863 candidates.extend(length_groups[bucket + 1]) 

864 

865 # Compare within this group 

866 for i, (offset1, seq1) in enumerate(candidates): 

867 for offset2, seq2 in candidates[i + 1 :]: 

868 # Skip overlapping sequences 

869 if abs(offset1 - offset2) < min_length: 

870 continue 

871 

872 # OPTIMIZATION 3: Early termination on length ratio 

873 # If lengths differ too much, similarity can't meet threshold 

874 len1, len2 = len(seq1), len(seq2) 

875 len_diff = abs(len1 - len2) 

876 max_len = max(len1, len2) 

877 

878 # Quick rejection: if length difference alone exceeds max_distance 

879 if len_diff > max_distance: 879 ↛ 880line 879 didn't jump to line 880 because the condition on line 879 was never true

880 continue 

881 

882 # Calculate minimum possible similarity based on length difference 

883 min_possible_similarity = 1.0 - (len_diff / max_len) 

884 threshold_similarity = 1.0 - (max_distance / min_length) 

885 

886 if min_possible_similarity < threshold_similarity: 886 ↛ 887line 886 didn't jump to line 887 because the condition on line 886 was never true

887 continue 

888 

889 # OPTIMIZATION 4: Use optimized edit distance calculation 

890 distance, _ = _edit_distance_with_threshold(seq1, seq2, max_distance, matcher) 

891 

892 if distance <= max_distance: 

893 similarity = 1.0 - (distance / min_length) 

894 results.append((offset1, offset2, similarity)) 

895 

896 return results 

897 

898 

899def _edit_distance_with_threshold( 

900 seq1: bytes, seq2: bytes, threshold: int, matcher: FuzzyMatcher 

901) -> tuple[int, list[tuple[int, int, int]]]: 

902 """Calculate edit distance with early termination. 

903 

904 Optimized version that stops computation if distance exceeds threshold. 

905 Uses banded dynamic programming to only compute cells near the diagonal, 

906 which is sufficient when the maximum allowed distance is small. 

907 

908 Performance: ~2-3x faster than full DP when threshold is small relative 

909 to sequence length, as it avoids computing cells that can't contribute 

910 to a solution within the threshold. 

911 

912 Args: 

913 seq1: First sequence. 

914 seq2: Second sequence. 

915 threshold: Maximum allowed edit distance. 

916 matcher: FuzzyMatcher instance for detailed computation. 

917 

918 Returns: 

919 Tuple of (distance, substitutions). Distance may be > threshold 

920 if no solution exists within threshold. 

921 """ 

922 m, n = len(seq1), len(seq2) 

923 

924 # Quick reject: if length difference exceeds threshold 

925 if abs(m - n) > threshold: 925 ↛ 926line 925 didn't jump to line 926 because the condition on line 925 was never true

926 return (abs(m - n), []) 

927 

928 # For small thresholds, use banded algorithm 

929 # Band width = 2 * threshold + 1 (cells within threshold of diagonal) 

930 if threshold < min(m, n) // 2: 930 ↛ 935line 930 didn't jump to line 935 because the condition on line 930 was always true

931 # Use banded DP for better performance 

932 return _banded_edit_distance(seq1, seq2, threshold) 

933 else: 

934 # Fall back to full computation for large thresholds 

935 return matcher._edit_distance_detailed(seq1, seq2) 

936 

937 

938def _banded_edit_distance( 

939 seq1: bytes, seq2: bytes, max_dist: int 

940) -> tuple[int, list[tuple[int, int, int]]]: 

941 """Compute edit distance using banded DP algorithm. 

942 

943 Only computes cells within max_dist of the main diagonal, which is 

944 sufficient when we only care about distances up to max_dist. This 

945 reduces time complexity from O(m*n) to O(max_dist * min(m,n)). 

946 

947 Args: 

948 seq1: First sequence. 

949 seq2: Second sequence. 

950 max_dist: Maximum distance threshold. 

951 

952 Returns: 

953 Tuple of (distance, substitutions). Substitutions may be approximate. 

954 """ 

955 m, n = len(seq1), len(seq2) 

956 

957 # Use two rows for space efficiency 

958 INF = max_dist + 100 # Sentinel value for unreachable cells 

959 band_width = 2 * max_dist + 1 

960 

961 prev_row = [INF] * band_width 

962 curr_row = [INF] * band_width 

963 

964 # Initialize first row 

965 for j in range(min(band_width, n + 1)): 

966 prev_row[j] = j 

967 

968 for i in range(1, m + 1): 

969 # Reset current row 

970 for k in range(band_width): 

971 curr_row[k] = INF 

972 

973 curr_row[0] = i 

974 

975 # Compute band around diagonal 

976 # j ranges from max(1, i-max_dist) to min(n, i+max_dist) 

977 j_start = max(1, i - max_dist) 

978 j_end = min(n, i + max_dist) 

979 

980 for j in range(j_start, j_end + 1): 

981 # Map j to band index 

982 band_idx = j - i + max_dist 

983 if band_idx < 0 or band_idx >= band_width: 983 ↛ 984line 983 didn't jump to line 984 because the condition on line 983 was never true

984 continue 

985 

986 if seq1[i - 1] == seq2[j - 1]: 

987 # Match: no cost 

988 prev_band_idx = band_idx 

989 curr_row[band_idx] = prev_row[prev_band_idx] if prev_band_idx < band_width else INF 

990 else: 

991 # Min of substitution, insertion, deletion 

992 cost = INF 

993 

994 # Substitution: from (i-1, j-1) 

995 prev_band_idx = band_idx 

996 if prev_band_idx < band_width: 996 ↛ 1000line 996 didn't jump to line 1000 because the condition on line 996 was always true

997 cost = min(cost, prev_row[prev_band_idx] + 1) 

998 

999 # Deletion: from (i-1, j) 

1000 prev_band_idx = band_idx + 1 

1001 if prev_band_idx < band_width: 

1002 cost = min(cost, prev_row[prev_band_idx] + 1) 

1003 

1004 # Insertion: from (i, j-1) 

1005 curr_band_idx = band_idx - 1 

1006 if curr_band_idx >= 0: 

1007 cost = min(cost, curr_row[curr_band_idx] + 1) 

1008 

1009 curr_row[band_idx] = cost 

1010 

1011 # Swap rows 

1012 prev_row, curr_row = curr_row, prev_row 

1013 

1014 # Extract result from final position 

1015 final_band_idx = n - m + max_dist 

1016 if final_band_idx >= 0 and final_band_idx < band_width: 1016 ↛ 1019line 1016 didn't jump to line 1019 because the condition on line 1016 was always true

1017 distance = prev_row[final_band_idx] 

1018 else: 

1019 distance = INF 

1020 

1021 # Don't compute detailed substitutions for banded version (expensive) 

1022 # Return empty list - caller should use this for filtering only 

1023 return (min(distance, INF), []) 

1024 

1025 

1026def count_pattern_occurrences( 

1027 data: bytes, 

1028 patterns: dict[str, bytes | str], 

1029) -> dict[str, int]: 

1030 """Count occurrences of multiple patterns. 

1031 

1032 Implements RE-PAT-002: Multi-Pattern Search. 

1033 

1034 Args: 

1035 data: Data to search. 

1036 patterns: Dictionary mapping names to patterns. 

1037 

1038 Returns: 

1039 Dictionary mapping pattern names to counts. 

1040 """ 

1041 results = multi_pattern_search(data, patterns) 

1042 return {name: len(matches) for name, matches in results.items()} 

1043 

1044 

1045def find_pattern_positions( 

1046 data: bytes, 

1047 pattern: bytes | str, 

1048) -> list[int]: 

1049 """Find all positions of a pattern in data. 

1050 

1051 Args: 

1052 data: Data to search. 

1053 pattern: Pattern to find. 

1054 

1055 Returns: 

1056 List of byte offsets. 

1057 """ 

1058 if isinstance(pattern, str): 

1059 pattern = pattern.encode() 

1060 

1061 positions = [] 

1062 start = 0 

1063 while True: 

1064 pos = data.find(pattern, start) 

1065 if pos == -1: 

1066 break 

1067 positions.append(pos) 

1068 start = pos + 1 

1069 

1070 return positions 

1071 

1072 

1073__all__ = [ 

1074 "AhoCorasickMatcher", 

1075 # Classes 

1076 "BinaryRegex", 

1077 "FuzzyMatchResult", 

1078 "FuzzyMatcher", 

1079 # Data classes 

1080 "PatternMatchResult", 

1081 # RE-PAT-001: Binary Regex 

1082 "binary_regex_search", 

1083 "count_pattern_occurrences", 

1084 # Utilities 

1085 "find_pattern_positions", 

1086 "find_similar_sequences", 

1087 # RE-PAT-003: Fuzzy Matching 

1088 "fuzzy_search", 

1089 # RE-PAT-002: Multi-Pattern Search 

1090 "multi_pattern_search", 

1091]