Coverage for src / tracekit / analyzers / patterns / learning.py: 89%
296 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Pattern learning and automatic discovery from binary data.
3 - RE-PAT-004: Pattern Learning and Discovery
5This module provides machine learning inspired approaches for discovering
6patterns in binary data without prior knowledge, including entropy-based
7segmentation, frequency analysis, and structural inference.
8"""
10from __future__ import annotations
12from collections import Counter, defaultdict
13from collections.abc import Sequence
14from dataclasses import dataclass, field
16import numpy as np
19@dataclass
20class LearnedPattern:
21 """A pattern discovered through learning.
23 Implements RE-PAT-004: Learned pattern representation.
25 Attributes:
26 pattern: The pattern bytes.
27 frequency: Number of occurrences.
28 confidence: Confidence score (0-1).
29 positions: List of positions where found.
30 context_before: Common bytes appearing before pattern.
31 context_after: Common bytes appearing after pattern.
32 is_structural: Whether pattern appears to be structural.
33 is_delimiter: Whether pattern appears to be a delimiter.
34 """
36 pattern: bytes
37 frequency: int
38 confidence: float
39 positions: list[int] = field(default_factory=list)
40 context_before: bytes = b""
41 context_after: bytes = b""
42 is_structural: bool = False
43 is_delimiter: bool = False
46@dataclass
47class StructureHypothesis:
48 """Hypothesis about data structure.
50 Implements RE-PAT-004: Structure hypothesis.
52 Attributes:
53 field_boundaries: Detected field boundaries.
54 field_types: Inferred field types.
55 header_size: Estimated header size.
56 record_size: Estimated record size (if fixed).
57 delimiters: Detected delimiters.
58 confidence: Overall confidence.
59 """
61 field_boundaries: list[int]
62 field_types: list[str]
63 header_size: int
64 record_size: int | None
65 delimiters: list[bytes]
66 confidence: float
69@dataclass
70class NgramModel:
71 """N-gram language model for binary data.
73 Implements RE-PAT-004: N-gram modeling.
75 Attributes:
76 n: N-gram size.
77 counts: N-gram frequency counts.
78 total: Total n-grams observed.
79 vocabulary_size: Number of unique n-grams.
80 """
82 n: int
83 counts: dict[bytes, int] = field(default_factory=dict)
84 total: int = 0
85 vocabulary_size: int = 0
88class PatternLearner:
89 """Learn patterns from binary data samples.
91 Implements RE-PAT-004: Pattern Learning and Discovery.
93 Uses entropy analysis, n-gram frequency, and positional statistics
94 to discover recurring patterns without prior knowledge.
96 Example:
97 >>> learner = PatternLearner()
98 >>> learner.add_sample(data1)
99 >>> learner.add_sample(data2)
100 >>> patterns = learner.learn_patterns()
101 """
103 def __init__(
104 self,
105 min_pattern_length: int = 2,
106 max_pattern_length: int = 16,
107 min_frequency: int = 3,
108 min_confidence: float = 0.5,
109 ) -> None:
110 """Initialize pattern learner.
112 Args:
113 min_pattern_length: Minimum pattern length to consider.
114 max_pattern_length: Maximum pattern length to consider.
115 min_frequency: Minimum occurrences to consider pattern.
116 min_confidence: Minimum confidence threshold.
117 """
118 self.min_pattern_length = min_pattern_length
119 self.max_pattern_length = max_pattern_length
120 self.min_frequency = min_frequency
121 self.min_confidence = min_confidence
123 self._samples: list[bytes] = []
124 self._ngram_models: dict[int, NgramModel] = {}
125 self._position_stats: dict[bytes, list[int]] = defaultdict(list)
127 def add_sample(self, data: bytes) -> None:
128 """Add a data sample for learning.
130 Args:
131 data: Binary data sample.
132 """
133 self._samples.append(data)
135 def add_samples(self, samples: Sequence[bytes]) -> None:
136 """Add multiple data samples.
138 Args:
139 samples: List of binary data samples.
140 """
141 self._samples.extend(samples)
143 def learn_patterns(self, top_k: int = 20) -> list[LearnedPattern]:
144 """Learn patterns from accumulated samples.
146 Implements RE-PAT-004: Pattern discovery workflow.
148 Args:
149 top_k: Maximum number of patterns to return.
151 Returns:
152 List of discovered patterns, sorted by confidence.
153 """
154 if not self._samples:
155 return []
157 # Build n-gram models
158 self._build_ngram_models()
160 # Find candidate patterns
161 candidates = self._find_candidates()
163 # Score and filter patterns
164 scored = self._score_patterns(candidates)
166 # Sort by confidence and return top K
167 scored.sort(key=lambda p: -p.confidence)
168 return scored[:top_k]
170 def learn_structure(self) -> StructureHypothesis:
171 """Learn structural patterns from samples.
173 Implements RE-PAT-004: Structure inference.
175 Returns:
176 StructureHypothesis about data organization.
177 """
178 if not self._samples:
179 return StructureHypothesis(
180 field_boundaries=[],
181 field_types=[],
182 header_size=0,
183 record_size=None,
184 delimiters=[],
185 confidence=0.0,
186 )
188 # Analyze entropy profile for field boundaries
189 boundaries = self._detect_field_boundaries()
191 # Infer field types
192 field_types = self._infer_field_types(boundaries)
194 # Estimate header size
195 header_size = self._estimate_header_size(boundaries)
197 # Check for fixed record size
198 record_size = self._detect_record_size()
200 # Find delimiters
201 delimiters = self._find_delimiters()
203 # Calculate confidence
204 confidence = self._calculate_structure_confidence(
205 boundaries, field_types, record_size, delimiters
206 )
208 return StructureHypothesis(
209 field_boundaries=boundaries,
210 field_types=field_types,
211 header_size=header_size,
212 record_size=record_size,
213 delimiters=delimiters,
214 confidence=confidence,
215 )
217 def predict_next_bytes(
218 self, context: bytes, n_predictions: int = 5
219 ) -> list[tuple[bytes, float]]:
220 """Predict likely next bytes given context.
222 Implements RE-PAT-004: Byte prediction using n-gram models.
224 Args:
225 context: Context bytes.
226 n_predictions: Number of predictions to return.
228 Returns:
229 List of (next_byte, probability) tuples.
230 """
231 predictions = []
233 # Use largest n-gram model that fits context
234 for n in range(min(len(context) + 1, self.max_pattern_length), 0, -1):
235 if n not in self._ngram_models:
236 continue
238 model = self._ngram_models[n]
239 prefix = context[-(n - 1) :] if n > 1 else b""
241 # Find matching prefixes
242 matching = {}
243 for ngram, count in model.counts.items():
244 if ngram[:-1] == prefix:
245 matching[ngram[-1:]] = count
247 if matching:
248 total = sum(matching.values())
249 for byte_val, count in matching.items():
250 prob = count / total
251 predictions.append((byte_val, prob))
252 break
254 # Sort by probability
255 predictions.sort(key=lambda x: -x[1])
256 return predictions[:n_predictions]
258 def build_ngram_model(self, n: int) -> NgramModel:
259 """Build n-gram model from samples.
261 Args:
262 n: N-gram size.
264 Returns:
265 NgramModel with frequency statistics.
266 """
267 model = NgramModel(n=n)
269 for sample in self._samples:
270 for i in range(len(sample) - n + 1):
271 ngram = sample[i : i + n]
272 if ngram not in model.counts:
273 model.counts[ngram] = 0
274 model.vocabulary_size += 1
275 model.counts[ngram] += 1
276 model.total += 1
278 self._ngram_models[n] = model
279 return model
281 def _build_ngram_models(self) -> None:
282 """Build n-gram models for all sizes."""
283 for n in range(self.min_pattern_length, self.max_pattern_length + 1):
284 self.build_ngram_model(n)
286 def _find_candidates(self) -> dict[bytes, int]:
287 """Find candidate patterns based on frequency.
289 Returns:
290 Dictionary mapping patterns to frequencies.
291 """
292 candidates = {}
294 for n in range(self.min_pattern_length, self.max_pattern_length + 1):
295 if n not in self._ngram_models: 295 ↛ 296line 295 didn't jump to line 296 because the condition on line 295 was never true
296 continue
298 model = self._ngram_models[n]
299 for pattern, count in model.counts.items():
300 if count >= self.min_frequency:
301 candidates[pattern] = count
303 return candidates
305 def _score_patterns(self, candidates: dict[bytes, int]) -> list[LearnedPattern]:
306 """Score candidate patterns.
308 Args:
309 candidates: Dictionary of pattern -> frequency.
311 Returns:
312 List of scored LearnedPattern objects.
313 """
314 patterns = []
316 for pattern, frequency in candidates.items():
317 # Find all positions across samples
318 positions = []
319 for sample_idx, sample in enumerate(self._samples):
320 start = 0
321 while True:
322 pos = sample.find(pattern, start)
323 if pos == -1:
324 break
325 positions.append((sample_idx, pos))
326 start = pos + 1
328 # Calculate confidence based on distribution
329 confidence = self._calculate_pattern_confidence(pattern, positions)
331 if confidence < self.min_confidence:
332 continue
334 # Get context
335 context_before, context_after = self._get_context(pattern, positions)
337 # Check if structural
338 is_structural = self._is_structural(pattern, positions)
340 # Check if delimiter
341 is_delimiter = self._is_delimiter(pattern, positions)
343 patterns.append(
344 LearnedPattern(
345 pattern=pattern,
346 frequency=frequency,
347 confidence=confidence,
348 positions=[p for _, p in positions],
349 context_before=context_before,
350 context_after=context_after,
351 is_structural=is_structural,
352 is_delimiter=is_delimiter,
353 )
354 )
356 return patterns
358 def _calculate_pattern_confidence(
359 self, pattern: bytes, positions: list[tuple[int, int]]
360 ) -> float:
361 """Calculate confidence score for pattern.
363 Args:
364 pattern: The pattern.
365 positions: List of (sample_idx, position) tuples.
367 Returns:
368 Confidence score (0-1).
369 """
370 if not positions:
371 return 0.0
373 # Factor 1: Frequency across samples
374 samples_with_pattern = len({p[0] for p in positions})
375 sample_coverage = samples_with_pattern / len(self._samples)
377 # Factor 2: Positional consistency
378 position_offsets = [p[1] for p in positions]
379 if len(position_offsets) > 1:
380 variance = float(np.var(position_offsets))
381 max_pos = max(max(len(s) for s in self._samples), 1)
382 position_consistency = 1.0 / (1.0 + variance / (max_pos**2))
383 else:
384 position_consistency = 0.5
386 # Factor 3: Pattern complexity (non-trivial patterns)
387 unique_bytes = len(set(pattern))
388 complexity = unique_bytes / len(pattern) if pattern else 0
390 # Combined score
391 confidence = 0.4 * sample_coverage + 0.3 * position_consistency + 0.3 * complexity
393 return float(min(1.0, confidence))
395 def _get_context(self, pattern: bytes, positions: list[tuple[int, int]]) -> tuple[bytes, bytes]:
396 """Get common context before and after pattern.
398 Args:
399 pattern: The pattern.
400 positions: List of (sample_idx, position) tuples.
402 Returns:
403 Tuple of (context_before, context_after).
404 """
405 before_bytes = []
406 after_bytes = []
408 context_len = min(4, self.min_pattern_length)
410 for sample_idx, pos in positions[:100]: # Limit samples
411 sample = self._samples[sample_idx]
413 # Bytes before
414 if pos >= context_len:
415 before_bytes.append(sample[pos - context_len : pos])
417 # Bytes after
418 end_pos = pos + len(pattern)
419 if end_pos + context_len <= len(sample):
420 after_bytes.append(sample[end_pos : end_pos + context_len])
422 # Find most common
423 context_before = b""
424 context_after = b""
426 if before_bytes:
427 counter = Counter(before_bytes)
428 most_common = counter.most_common(1)
429 if most_common and most_common[0][1] >= 2:
430 context_before = most_common[0][0]
432 if after_bytes:
433 counter = Counter(after_bytes)
434 most_common = counter.most_common(1)
435 if most_common and most_common[0][1] >= 2:
436 context_after = most_common[0][0]
438 return context_before, context_after
440 def _is_structural(self, pattern: bytes, positions: list[tuple[int, int]]) -> bool:
441 """Check if pattern appears structural.
443 Args:
444 pattern: The pattern.
445 positions: List of positions.
447 Returns:
448 True if pattern appears structural.
449 """
450 if not positions: 450 ↛ 451line 450 didn't jump to line 451 because the condition on line 450 was never true
451 return False
453 # Structural patterns tend to appear at consistent offsets
454 offsets = [p[1] for p in positions]
455 if len(set(offsets)) == 1:
456 return True
458 # Or at regular intervals
459 if len(offsets) > 2:
460 diffs = [
461 offsets[i + 1] - offsets[i]
462 for i in range(len(offsets) - 1)
463 if offsets[i + 1] > offsets[i]
464 ]
465 if diffs and len(set(diffs)) == 1:
466 return True
468 return False
470 def _is_delimiter(self, pattern: bytes, positions: list[tuple[int, int]]) -> bool:
471 """Check if pattern appears to be a delimiter.
473 Args:
474 pattern: The pattern.
475 positions: List of positions.
477 Returns:
478 True if pattern appears to be a delimiter.
479 """
480 # Delimiters often have regular spacing
481 if not positions: 481 ↛ 482line 481 didn't jump to line 482 because the condition on line 481 was never true
482 return False
484 # Group by sample
485 by_sample = defaultdict(list)
486 for sample_idx, pos in positions:
487 by_sample[sample_idx].append(pos)
489 regular_count = 0
490 for sample_positions in by_sample.values():
491 if len(sample_positions) >= 3:
492 diffs = [
493 sample_positions[i + 1] - sample_positions[i]
494 for i in range(len(sample_positions) - 1)
495 ]
496 # Check for regular intervals
497 if len(set(diffs)) == 1 or (diffs and max(diffs) - min(diffs) < 4):
498 regular_count += 1
500 return regular_count >= len(by_sample) * 0.5
502 def _detect_field_boundaries(self) -> list[int]:
503 """Detect field boundaries using entropy transitions."""
504 if not self._samples: 504 ↛ 505line 504 didn't jump to line 505 because the condition on line 504 was never true
505 return []
507 # Use first sample or combined samples
508 combined = b"".join(self._samples[:10])
510 from tracekit.analyzers.statistical.entropy import detect_entropy_transitions
512 try:
513 transitions = detect_entropy_transitions(combined, window=64, threshold=0.8, min_gap=4)
514 return [t.offset for t in transitions]
515 except ValueError:
516 return []
518 def _infer_field_types(self, boundaries: list[int]) -> list[str]:
519 """Infer field types based on content patterns.
521 Args:
522 boundaries: Field boundary offsets.
524 Returns:
525 List of inferred field types.
526 """
527 if not boundaries or not self._samples: 527 ↛ 530line 527 didn't jump to line 530 because the condition on line 527 was always true
528 return []
530 field_types = []
531 sample = self._samples[0]
532 boundaries = [0] + boundaries + [len(sample)]
534 for i in range(len(boundaries) - 1):
535 start = boundaries[i]
536 end = min(boundaries[i + 1], len(sample))
537 field_data = sample[start:end]
539 field_type = self._classify_field(field_data)
540 field_types.append(field_type)
542 return field_types
544 def _classify_field(self, data: bytes) -> str:
545 """Classify a field based on its content.
547 Args:
548 data: Field data.
550 Returns:
551 Field type string.
552 """
553 if not data:
554 return "empty"
556 # Check for constant
557 if len(set(data)) == 1:
558 return "constant"
560 # Check for counter (monotonic)
561 if len(data) <= 4:
562 values = list(data)
563 if all(values[i] <= values[i + 1] for i in range(len(values) - 1)): 563 ↛ 567line 563 didn't jump to line 567 because the condition on line 563 was always true
564 return "counter"
566 # Check for printable text
567 printable = sum(1 for b in data if 32 <= b <= 126)
568 if printable / len(data) > 0.8:
569 return "text"
571 # Check for high entropy (random/encrypted)
572 from tracekit.analyzers.statistical.entropy import shannon_entropy
574 entropy = shannon_entropy(data)
575 if entropy > 7.0: 575 ↛ 576line 575 didn't jump to line 576 because the condition on line 575 was never true
576 return "random"
577 elif entropy > 5.0: 577 ↛ 578line 577 didn't jump to line 578 because the condition on line 577 was never true
578 return "binary"
580 return "structured"
582 def _estimate_header_size(self, boundaries: list[int]) -> int:
583 """Estimate header size from boundaries.
585 Args:
586 boundaries: Field boundary offsets.
588 Returns:
589 Estimated header size.
590 """
591 if not boundaries: 591 ↛ 595line 591 didn't jump to line 595 because the condition on line 591 was always true
592 return 0
594 # Header typically ends at first high-entropy transition
595 for b in boundaries:
596 if b > 0:
597 return b
599 return boundaries[0] if boundaries else 0
601 def _detect_record_size(self) -> int | None:
602 """Detect fixed record size if present.
604 Returns:
605 Record size or None if variable.
606 """
607 if len(self._samples) < 2:
608 return None
610 # Check if all samples have same length
611 lengths = [len(s) for s in self._samples]
612 if len(set(lengths)) == 1:
613 return lengths[0]
615 # Check for GCD of lengths (might indicate record size)
616 from functools import reduce
617 from math import gcd
619 if all(length > 0 for length in lengths): 619 ↛ 624line 619 didn't jump to line 624 because the condition on line 619 was always true
620 common_div = reduce(gcd, lengths)
621 if common_div > 1 and common_div != min(lengths):
622 return common_div
624 return None
626 def _find_delimiters(self) -> list[bytes]:
627 """Find delimiter patterns.
629 Returns:
630 List of likely delimiter bytes.
631 """
632 patterns = self.learn_patterns(top_k=50)
633 return [p.pattern for p in patterns if p.is_delimiter][:5]
635 def _calculate_structure_confidence(
636 self,
637 boundaries: list[int],
638 field_types: list[str],
639 record_size: int | None,
640 delimiters: list[bytes],
641 ) -> float:
642 """Calculate confidence in structure hypothesis.
644 Args:
645 boundaries: Detected boundaries.
646 field_types: Inferred types.
647 record_size: Detected record size.
648 delimiters: Found delimiters.
650 Returns:
651 Confidence score (0-1).
652 """
653 score = 0.0
655 # Having boundaries adds confidence
656 if boundaries: 656 ↛ 657line 656 didn't jump to line 657 because the condition on line 656 was never true
657 score += 0.3
659 # Having non-unknown field types adds confidence
660 known_types = sum(1 for t in field_types if t != "structured")
661 if field_types: 661 ↛ 662line 661 didn't jump to line 662 because the condition on line 661 was never true
662 score += 0.2 * (known_types / len(field_types))
664 # Fixed record size adds confidence
665 if record_size is not None:
666 score += 0.2
668 # Delimiters add confidence
669 if delimiters: 669 ↛ 670line 669 didn't jump to line 670 because the condition on line 669 was never true
670 score += 0.2
672 # Multiple samples add confidence
673 if len(self._samples) > 5: 673 ↛ 674line 673 didn't jump to line 674 because the condition on line 673 was never true
674 score += 0.1
676 return min(1.0, score)
679def learn_patterns_from_data(
680 data: bytes | Sequence[bytes],
681 min_length: int = 2,
682 max_length: int = 16,
683 min_frequency: int = 3,
684 top_k: int = 20,
685) -> list[LearnedPattern]:
686 """Learn patterns from binary data.
688 Implements RE-PAT-004: Pattern Learning and Discovery.
690 Args:
691 data: Single data sample or list of samples.
692 min_length: Minimum pattern length.
693 max_length: Maximum pattern length.
694 min_frequency: Minimum occurrences.
695 top_k: Number of patterns to return.
697 Returns:
698 List of discovered patterns.
700 Example:
701 >>> patterns = learn_patterns_from_data(binary_data)
702 >>> for p in patterns:
703 ... print(f"Pattern: {p.pattern.hex()}, freq: {p.frequency}")
704 """
705 learner = PatternLearner(
706 min_pattern_length=min_length,
707 max_pattern_length=max_length,
708 min_frequency=min_frequency,
709 )
711 if isinstance(data, bytes):
712 learner.add_sample(data)
713 else:
714 learner.add_samples(data)
716 return learner.learn_patterns(top_k=top_k)
719def infer_structure(samples: Sequence[bytes]) -> StructureHypothesis:
720 """Infer data structure from samples.
722 Implements RE-PAT-004: Structure inference.
724 Args:
725 samples: List of binary data samples.
727 Returns:
728 StructureHypothesis about data organization.
730 Example:
731 >>> hypothesis = infer_structure(packet_samples)
732 >>> print(f"Header size: {hypothesis.header_size}")
733 """
734 learner = PatternLearner()
735 learner.add_samples(samples)
736 return learner.learn_structure()
739def find_recurring_structures(
740 data: bytes,
741 min_size: int = 8,
742 max_size: int = 256,
743) -> list[tuple[int, int, float]]:
744 """Find recurring fixed-size structures in data.
746 Implements RE-PAT-004: Structure detection.
748 Args:
749 data: Binary data.
750 min_size: Minimum structure size.
751 max_size: Maximum structure size.
753 Returns:
754 List of (size, offset, confidence) tuples for detected structures.
755 """
756 results = []
758 for size in range(min_size, min(max_size, len(data) // 2) + 1):
759 # Check if data divides evenly
760 if len(data) % size != 0:
761 continue
763 num_records = len(data) // size
764 if num_records < 2: 764 ↛ 765line 764 didn't jump to line 765 because the condition on line 764 was never true
765 continue
767 # Compare records for similarity
768 records = [data[i * size : (i + 1) * size] for i in range(num_records)]
770 # Calculate similarity between consecutive records
771 similarities = []
772 for i in range(len(records) - 1):
773 matching = sum(a == b for a, b in zip(records[i], records[i + 1], strict=True))
774 similarities.append(matching / size)
776 if similarities: 776 ↛ 758line 776 didn't jump to line 758 because the condition on line 776 was always true
777 avg_similarity = sum(similarities) / len(similarities)
778 if avg_similarity > 0.3: # Some structural similarity
779 results.append((size, 0, avg_similarity))
781 # Sort by confidence
782 results.sort(key=lambda x: -x[2])
783 return results[:5]
786__all__ = [
787 # Data classes
788 "LearnedPattern",
789 "NgramModel",
790 # Classes
791 "PatternLearner",
792 "StructureHypothesis",
793 "find_recurring_structures",
794 "infer_structure",
795 # Functions
796 "learn_patterns_from_data",
797]