Coverage for src / tracekit / exploratory / fuzzy_advanced.py: 94%
290 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Advanced fuzzy matching for binary pattern analysis.
3This module provides advanced fuzzy matching capabilities including
4pattern variant characterization, consensus finding, and multiple
5binary sequence alignment.
8Example:
9 >>> from tracekit.exploratory.fuzzy_advanced import (
10 ... characterize_variants,
11 ... align_sequences,
12 ... )
13 >>> patterns = [b'\\x12\\x34\\x56', b'\\x12\\x35\\x56', b'\\x12\\x34\\x57']
14 >>> result = characterize_variants(patterns)
15 >>> print(f"Consensus: {result.consensus.hex()}")
16"""
18from __future__ import annotations
20import logging
21from dataclasses import dataclass
22from enum import Enum
23from typing import TYPE_CHECKING
25import numpy as np
27if TYPE_CHECKING:
28 from collections.abc import Sequence
30logger = logging.getLogger(__name__)
32__all__ = [
33 "AlignedSequence",
34 "AlignmentResult",
35 "PositionAnalysis",
36 "VariantCharacterization",
37 "VariationType",
38 "align_sequences",
39 "align_two_sequences",
40 "characterize_variants",
41 "compute_conservation_scores",
42]
45# =============================================================================
46# =============================================================================
49class VariationType(Enum):
50 """Classification of position variation.
52 References:
53 FUZZY-004: Binary Pattern Variant Characterization and Consensus
54 """
56 CONSTANT = "constant" # Entropy < 0.5 bits, confidence >= 0.95
57 LOW_VARIATION = "low_variation" # Entropy 0.5-2.0 bits
58 HIGH_VARIATION = "high_variation" # Entropy 2.0-6.0 bits
59 RANDOM = "random" # Entropy > 6.0 bits (likely random or encrypted)
62@dataclass
63class PositionAnalysis:
64 """Analysis of a single byte position.
66 Attributes:
67 position: Byte position index
68 consensus_byte: Most common byte at this position
69 consensus_confidence: Frequency of consensus byte
70 entropy: Shannon entropy in bits
71 variation_type: Classification of variation
72 value_distribution: Distribution of byte values
73 is_error: True if variation likely from errors
75 References:
76 FUZZY-004: Binary Pattern Variant Characterization and Consensus
77 """
79 position: int
80 consensus_byte: int
81 consensus_confidence: float
82 entropy: float
83 variation_type: VariationType
84 value_distribution: dict[int, int]
85 is_error: bool = False
88@dataclass
89class VariantCharacterization:
90 """Result of variant characterization.
92 Attributes:
93 consensus: Consensus pattern (most common byte per position)
94 positions: Per-position analysis
95 constant_positions: Indices of constant positions
96 variable_positions: Indices of variable positions
97 suggested_boundaries: Suggested field boundaries
98 pattern_count: Number of patterns analyzed
99 min_length: Minimum pattern length
101 References:
102 FUZZY-004: Binary Pattern Variant Characterization and Consensus
103 """
105 consensus: bytes
106 positions: list[PositionAnalysis]
107 constant_positions: list[int]
108 variable_positions: list[int]
109 suggested_boundaries: list[int]
110 pattern_count: int
111 min_length: int
114def _compute_entropy(values: list[int]) -> float:
115 """Compute Shannon entropy of byte values.
117 Args:
118 values: List of byte values
120 Returns:
121 Entropy in bits
122 """
123 if not values: 123 ↛ 124line 123 didn't jump to line 124 because the condition on line 123 was never true
124 return 0.0
126 # Count frequencies
127 counts: dict[int, int] = {}
128 for v in values:
129 counts[v] = counts.get(v, 0) + 1
131 total = len(values)
132 entropy = 0.0
134 for count in counts.values():
135 if count > 0: 135 ↛ 134line 135 didn't jump to line 134 because the condition on line 135 was always true
136 p = count / total
137 entropy -= p * np.log2(p)
139 return entropy
142def _classify_variation(entropy: float, confidence: float) -> VariationType:
143 """Classify variation type based on entropy and confidence.
145 Args:
146 entropy: Shannon entropy in bits
147 confidence: Consensus confidence
149 Returns:
150 Variation type classification
151 """
152 if entropy < 0.5 and confidence >= 0.95:
153 return VariationType.CONSTANT
154 elif entropy < 2.0:
155 return VariationType.LOW_VARIATION
156 elif entropy < 6.0: 156 ↛ 159line 156 didn't jump to line 159 because the condition on line 156 was always true
157 return VariationType.HIGH_VARIATION
158 else:
159 return VariationType.RANDOM
162def _detect_error_variation(
163 values: list[int],
164 consensus: int,
165 confidence: float,
166) -> bool:
167 """Detect if variation is likely from errors vs intentional.
169 Args:
170 values: List of byte values at position
171 consensus: Consensus byte
172 confidence: Consensus confidence
174 Returns:
175 True if variation appears to be from errors
176 """
177 if confidence >= 0.99:
178 # Very rare variations are likely errors
179 return True
181 # Check if variations are single-bit flips from consensus
182 variations = [v for v in set(values) if v != consensus]
183 if not variations: 183 ↛ 184line 183 didn't jump to line 184 because the condition on line 183 was never true
184 return False
186 single_bit_flips = 0
187 for v in variations:
188 diff = v ^ consensus
189 # Check if exactly one bit different
190 if diff != 0 and (diff & (diff - 1)) == 0:
191 single_bit_flips += 1
193 # If most variations are single-bit flips, likely errors
194 return single_bit_flips >= len(variations) * 0.8
197def characterize_variants(
198 patterns: Sequence[bytes | bytearray],
199 min_confidence: float = 0.95,
200) -> VariantCharacterization:
201 """Characterize variants in a collection of binary patterns.
203 Analyzes a collection of similar patterns to find consensus sequence,
204 variable positions, and variant frequencies.
206 Args:
207 patterns: Collection of binary patterns
208 min_confidence: Minimum confidence for constant classification
210 Returns:
211 VariantCharacterization with analysis results
213 Example:
214 >>> patterns = [b'\\x12\\x34\\x56', b'\\x12\\x35\\x56', b'\\x12\\x34\\x57']
215 >>> result = characterize_variants(patterns)
216 >>> print(f"Consensus: {result.consensus.hex()}")
217 >>> print(f"Variable positions: {result.variable_positions}")
219 References:
220 FUZZY-004: Binary Pattern Variant Characterization and Consensus
221 """
222 if not patterns:
223 return VariantCharacterization(
224 consensus=b"",
225 positions=[],
226 constant_positions=[],
227 variable_positions=[],
228 suggested_boundaries=[],
229 pattern_count=0,
230 min_length=0,
231 )
233 pattern_count = len(patterns)
234 min_length = min(len(p) for p in patterns)
236 positions: list[PositionAnalysis] = []
237 consensus_bytes: list[int] = []
238 constant_positions: list[int] = []
239 variable_positions: list[int] = []
241 for pos in range(min_length):
242 # Collect values at this position
243 values = [p[pos] for p in patterns if pos < len(p)]
245 # Count distribution
246 distribution: dict[int, int] = {}
247 for v in values:
248 distribution[v] = distribution.get(v, 0) + 1
250 # Find consensus (mode)
251 consensus_byte = max(distribution, key=distribution.get) # type: ignore[arg-type]
252 consensus_count = distribution[consensus_byte]
253 consensus_confidence = consensus_count / len(values)
255 # Compute entropy
256 entropy = _compute_entropy(values)
258 # Classify variation
259 variation_type = _classify_variation(entropy, consensus_confidence)
261 # Detect error vs intentional variation
262 is_error = _detect_error_variation(values, consensus_byte, consensus_confidence)
264 analysis = PositionAnalysis(
265 position=pos,
266 consensus_byte=consensus_byte,
267 consensus_confidence=consensus_confidence,
268 entropy=entropy,
269 variation_type=variation_type,
270 value_distribution=distribution,
271 is_error=is_error,
272 )
274 positions.append(analysis)
275 consensus_bytes.append(consensus_byte)
277 if variation_type == VariationType.CONSTANT:
278 constant_positions.append(pos)
279 else:
280 variable_positions.append(pos)
282 # Suggest field boundaries (transitions between constant/variable)
283 boundaries: list[int] = []
284 prev_is_constant = None
286 for pos, analysis in enumerate(positions):
287 is_constant = analysis.variation_type == VariationType.CONSTANT
288 if prev_is_constant is not None and is_constant != prev_is_constant:
289 boundaries.append(pos)
290 prev_is_constant = is_constant
292 return VariantCharacterization(
293 consensus=bytes(consensus_bytes),
294 positions=positions,
295 constant_positions=constant_positions,
296 variable_positions=variable_positions,
297 suggested_boundaries=boundaries,
298 pattern_count=pattern_count,
299 min_length=min_length,
300 )
303# =============================================================================
304# =============================================================================
307@dataclass
308class AlignedSequence:
309 """A sequence with alignment information.
311 Attributes:
312 original: Original sequence
313 aligned: Aligned sequence with gaps
314 gaps: Gap positions
315 score: Alignment score
317 References:
318 FUZZY-005: Multiple Binary Sequence Alignment (MSA)
319 """
321 original: bytes
322 aligned: bytes
323 gaps: list[int]
324 score: float
327@dataclass
328class AlignmentResult:
329 """Result of sequence alignment.
331 Attributes:
332 sequences: Aligned sequences
333 conservation_scores: Per-position conservation scores
334 conserved_regions: Indices of highly conserved regions
335 gap_positions: Common gap positions
336 alignment_score: Overall alignment score
338 References:
339 FUZZY-005: Multiple Binary Sequence Alignment (MSA)
340 """
342 sequences: list[AlignedSequence]
343 conservation_scores: list[float]
344 conserved_regions: list[tuple[int, int]]
345 gap_positions: list[int]
346 alignment_score: float
349# Gap representation
350GAP_BYTE = 0xFF # Using 0xFF as gap marker
353def _needleman_wunsch(
354 seq1: bytes,
355 seq2: bytes,
356 match_bonus: int = 1,
357 mismatch_penalty: int = -1,
358 gap_open: int = -2,
359 gap_extend: int = -1,
360) -> tuple[bytes, bytes, float]:
361 """Needleman-Wunsch global alignment algorithm.
363 Args:
364 seq1: First sequence
365 seq2: Second sequence
366 match_bonus: Score for match
367 mismatch_penalty: Score for mismatch
368 gap_open: Gap opening penalty
369 gap_extend: Gap extension penalty
371 Returns:
372 (aligned_seq1, aligned_seq2, score)
373 """
374 m, n = len(seq1), len(seq2)
376 # Initialize score matrix
377 score = np.zeros((m + 1, n + 1), dtype=np.int32)
378 traceback = np.zeros((m + 1, n + 1), dtype=np.int8)
380 # Direction constants
381 DIAG, UP, LEFT = 0, 1, 2
383 # Initialize first row and column
384 for i in range(1, m + 1):
385 score[i, 0] = gap_open + (i - 1) * gap_extend
386 traceback[i, 0] = UP
388 for j in range(1, n + 1):
389 score[0, j] = gap_open + (j - 1) * gap_extend
390 traceback[0, j] = LEFT
392 # Fill matrix
393 for i in range(1, m + 1):
394 for j in range(1, n + 1):
395 # Match/mismatch
396 if seq1[i - 1] == seq2[j - 1]:
397 diag_score = score[i - 1, j - 1] + match_bonus
398 else:
399 diag_score = score[i - 1, j - 1] + mismatch_penalty
401 # Gap in seq2 (moving down)
402 if traceback[i - 1, j] == UP:
403 up_score = score[i - 1, j] + gap_extend
404 else:
405 up_score = score[i - 1, j] + gap_open
407 # Gap in seq1 (moving right)
408 if traceback[i, j - 1] == LEFT:
409 left_score = score[i, j - 1] + gap_extend
410 else:
411 left_score = score[i, j - 1] + gap_open
413 # Choose best
414 best = max(diag_score, up_score, left_score)
415 score[i, j] = best
417 if best == diag_score:
418 traceback[i, j] = DIAG
419 elif best == up_score:
420 traceback[i, j] = UP
421 else:
422 traceback[i, j] = LEFT
424 # Traceback
425 aligned1: list[int] = []
426 aligned2: list[int] = []
427 i, j = m, n
429 while i > 0 or j > 0:
430 if i > 0 and j > 0 and traceback[i, j] == DIAG:
431 aligned1.append(seq1[i - 1])
432 aligned2.append(seq2[j - 1])
433 i -= 1
434 j -= 1
435 elif i > 0 and traceback[i, j] == UP:
436 aligned1.append(seq1[i - 1])
437 aligned2.append(GAP_BYTE)
438 i -= 1
439 else:
440 aligned1.append(GAP_BYTE)
441 aligned2.append(seq2[j - 1])
442 j -= 1
444 # Reverse (traceback goes backwards)
445 aligned1.reverse()
446 aligned2.reverse()
448 return bytes(aligned1), bytes(aligned2), float(score[m, n])
451def _smith_waterman(
452 seq1: bytes,
453 seq2: bytes,
454 match_bonus: int = 1,
455 mismatch_penalty: int = -1,
456 gap_penalty: int = -2,
457) -> tuple[bytes, bytes, float, int, int]:
458 """Smith-Waterman local alignment algorithm.
460 Args:
461 seq1: First sequence
462 seq2: Second sequence
463 match_bonus: Score for match
464 mismatch_penalty: Score for mismatch
465 gap_penalty: Gap penalty
467 Returns:
468 (aligned_seq1, aligned_seq2, score, start1, start2)
469 """
470 m, n = len(seq1), len(seq2)
472 # Initialize score matrix
473 score = np.zeros((m + 1, n + 1), dtype=np.int32)
474 traceback = np.zeros((m + 1, n + 1), dtype=np.int8)
476 max_score = 0
477 max_i, max_j = 0, 0
479 # Direction constants
480 DIAG, UP, LEFT, STOP = 0, 1, 2, 3
482 # Fill matrix
483 for i in range(1, m + 1):
484 for j in range(1, n + 1):
485 # Match/mismatch
486 if seq1[i - 1] == seq2[j - 1]:
487 diag_score = score[i - 1, j - 1] + match_bonus
488 else:
489 diag_score = score[i - 1, j - 1] + mismatch_penalty
491 up_score = score[i - 1, j] + gap_penalty
492 left_score = score[i, j - 1] + gap_penalty
494 # Local alignment can restart (score 0)
495 best = max(0, diag_score, up_score, left_score)
496 score[i, j] = best
498 if best == 0:
499 traceback[i, j] = STOP
500 elif best == diag_score:
501 traceback[i, j] = DIAG
502 elif best == up_score: 502 ↛ 505line 502 didn't jump to line 505 because the condition on line 502 was always true
503 traceback[i, j] = UP
504 else:
505 traceback[i, j] = LEFT
507 if best > max_score:
508 max_score = best
509 max_i, max_j = i, j
511 # Traceback from max score
512 aligned1: list[int] = []
513 aligned2: list[int] = []
514 i, j = max_i, max_j
516 while i > 0 and j > 0 and traceback[i, j] != STOP:
517 if traceback[i, j] == DIAG: 517 ↛ 522line 517 didn't jump to line 522 because the condition on line 517 was always true
518 aligned1.append(seq1[i - 1])
519 aligned2.append(seq2[j - 1])
520 i -= 1
521 j -= 1
522 elif traceback[i, j] == UP:
523 aligned1.append(seq1[i - 1])
524 aligned2.append(GAP_BYTE)
525 i -= 1
526 else:
527 aligned1.append(GAP_BYTE)
528 aligned2.append(seq2[j - 1])
529 j -= 1
531 aligned1.reverse()
532 aligned2.reverse()
534 start1 = i
535 start2 = j
537 return bytes(aligned1), bytes(aligned2), float(max_score), start1, start2
540def align_two_sequences(
541 seq1: bytes,
542 seq2: bytes,
543 method: str = "global",
544 gap_open: int = -2,
545 gap_extend: int = -1,
546 mismatch_penalty: int = -1,
547 match_bonus: int = 1,
548) -> tuple[bytes, bytes, float]:
549 """Align two binary sequences.
551 Args:
552 seq1: First sequence
553 seq2: Second sequence
554 method: 'global' (Needleman-Wunsch) or 'local' (Smith-Waterman)
555 gap_open: Gap opening penalty
556 gap_extend: Gap extension penalty
557 mismatch_penalty: Mismatch penalty
558 match_bonus: Match bonus
560 Returns:
561 (aligned_seq1, aligned_seq2, score)
563 Raises:
564 ValueError: If method is unknown.
566 Example:
567 >>> seq1 = b'\\x12\\x34\\x56\\x78'
568 >>> seq2 = b'\\x12\\x34\\x78'
569 >>> aligned1, aligned2, score = align_two_sequences(seq1, seq2)
571 References:
572 FUZZY-005: Multiple Binary Sequence Alignment (MSA)
573 """
574 if method == "global":
575 return _needleman_wunsch(seq1, seq2, match_bonus, mismatch_penalty, gap_open, gap_extend)
576 elif method == "local":
577 aligned1, aligned2, score, _, _ = _smith_waterman(
578 seq1, seq2, match_bonus, mismatch_penalty, gap_open
579 )
580 return aligned1, aligned2, score
581 else:
582 raise ValueError(f"Unknown alignment method: {method}")
585def align_sequences(
586 sequences: Sequence[bytes],
587 method: str = "progressive",
588 gap_open: int = -2,
589 gap_extend: int = -1,
590) -> AlignmentResult:
591 """Align multiple binary sequences.
593 Performs multiple sequence alignment (MSA) on a collection of
594 binary sequences.
596 Args:
597 sequences: Sequences to align
598 method: 'progressive' or 'iterative'
599 gap_open: Gap opening penalty
600 gap_extend: Gap extension penalty
602 Returns:
603 AlignmentResult with aligned sequences
605 Raises:
606 ValueError: If method is unknown.
608 Example:
609 >>> seqs = [b'\\x12\\x34\\x56', b'\\x12\\x56', b'\\x12\\x34\\x78']
610 >>> result = align_sequences(seqs)
611 >>> for seq in result.sequences:
612 ... print(seq.aligned.hex())
614 References:
615 FUZZY-005: Multiple Binary Sequence Alignment (MSA)
616 """
617 # Validate method parameter first
618 if method not in ("progressive", "iterative"):
619 raise ValueError(f"Unknown alignment method: {method}")
621 if not sequences:
622 return AlignmentResult(
623 sequences=[],
624 conservation_scores=[],
625 conserved_regions=[],
626 gap_positions=[],
627 alignment_score=0.0,
628 )
630 if len(sequences) == 1:
631 return AlignmentResult(
632 sequences=[
633 AlignedSequence(
634 original=sequences[0],
635 aligned=sequences[0],
636 gaps=[],
637 score=0.0,
638 )
639 ],
640 conservation_scores=[1.0] * len(sequences[0]),
641 conserved_regions=[(0, len(sequences[0]) - 1)] if sequences[0] else [],
642 gap_positions=[],
643 alignment_score=0.0,
644 )
646 if method == "progressive":
647 return _progressive_alignment(sequences, gap_open, gap_extend)
648 elif method == "iterative": 648 ↛ 654line 648 didn't jump to line 654 because the condition on line 648 was always true
649 # Iterative refinement (simplified)
650 result = _progressive_alignment(sequences, gap_open, gap_extend)
651 # Could add refinement passes here
652 return result
653 else:
654 raise ValueError(f"Unknown alignment method: {method}")
657def _progressive_alignment(
658 sequences: Sequence[bytes],
659 gap_open: int,
660 gap_extend: int,
661) -> AlignmentResult:
662 """Progressive multiple sequence alignment."""
663 # Start with first sequence as reference
664 ref = sequences[0]
665 aligned_seqs: list[bytes] = [ref]
666 total_score = 0.0
668 # Align each sequence to growing profile
669 for seq in sequences[1:]:
670 # Align to last aligned sequence (simplified - real MSA uses profiles)
671 ref_aligned, seq_aligned, score = _needleman_wunsch(
672 aligned_seqs[0],
673 seq,
674 gap_open=gap_open,
675 gap_extend=gap_extend,
676 )
678 # Update all previous alignments to match new length
679 new_aligned: list[bytes] = []
680 for prev in aligned_seqs:
681 # Insert gaps where ref got new gaps
682 new_prev: list[int] = []
683 prev_idx = 0
684 for byte in ref_aligned:
685 if byte == GAP_BYTE: 685 ↛ 686line 685 didn't jump to line 686 because the condition on line 685 was never true
686 new_prev.append(GAP_BYTE)
687 elif prev_idx < len(prev): 687 ↛ 684line 687 didn't jump to line 684 because the condition on line 687 was always true
688 new_prev.append(prev[prev_idx])
689 prev_idx += 1
690 new_aligned.append(bytes(new_prev))
692 aligned_seqs = new_aligned
693 aligned_seqs.append(seq_aligned)
694 total_score += score
696 # Build result
697 result_seqs: list[AlignedSequence] = []
698 for orig, aligned in zip(sequences, aligned_seqs, strict=False):
699 gaps = [i for i, b in enumerate(aligned) if b == GAP_BYTE]
700 result_seqs.append(
701 AlignedSequence(
702 original=orig,
703 aligned=aligned,
704 gaps=gaps,
705 score=0.0, # Individual scores not tracked in progressive
706 )
707 )
709 # Compute conservation scores
710 alignment_length = len(aligned_seqs[0]) if aligned_seqs else 0
711 conservation = compute_conservation_scores(aligned_seqs)
713 # Find conserved regions
714 conserved_regions: list[tuple[int, int]] = []
715 in_region = False
716 region_start = 0
718 for i, score in enumerate(conservation):
719 if score >= 0.8 and not in_region:
720 in_region = True
721 region_start = i
722 elif score < 0.8 and in_region:
723 in_region = False
724 conserved_regions.append((region_start, i - 1))
726 if in_region:
727 conserved_regions.append((region_start, len(conservation) - 1))
729 # Find common gap positions
730 gap_positions: list[int] = []
731 for pos in range(alignment_length):
732 gap_count = sum(1 for seq in aligned_seqs if seq[pos] == GAP_BYTE)
733 if gap_count > len(aligned_seqs) // 2:
734 gap_positions.append(pos)
736 return AlignmentResult(
737 sequences=result_seqs,
738 conservation_scores=conservation,
739 conserved_regions=conserved_regions,
740 gap_positions=gap_positions,
741 alignment_score=total_score,
742 )
745def compute_conservation_scores(
746 aligned_sequences: list[bytes],
747) -> list[float]:
748 """Compute per-position conservation scores.
750 Conservation score at position i = frequency of most common byte
751 (1.0 = fully conserved, <0.5 = poorly conserved).
753 Args:
754 aligned_sequences: Aligned sequences (same length)
756 Returns:
757 List of conservation scores
759 References:
760 FUZZY-005: Multiple Binary Sequence Alignment (MSA)
761 """
762 if not aligned_sequences:
763 return []
765 alignment_length = len(aligned_sequences[0])
766 len(aligned_sequences)
767 scores: list[float] = []
769 for pos in range(alignment_length):
770 # Count byte frequencies (excluding gaps)
771 counts: dict[int, int] = {}
772 non_gap_count = 0
774 for seq in aligned_sequences:
775 byte = seq[pos]
776 if byte != GAP_BYTE:
777 counts[byte] = counts.get(byte, 0) + 1
778 non_gap_count += 1
780 if non_gap_count == 0:
781 scores.append(0.0)
782 else:
783 max_count = max(counts.values())
784 scores.append(max_count / non_gap_count)
786 return scores