Coverage for src / tracekit / analyzers / patterns / sequences.py: 85%
275 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Repeating sequence and n-gram detection.
3This module implements algorithms for finding repeating sequences, n-grams,
4and approximate pattern matching in binary data and digital signals.
7Author: TraceKit Development Team
8"""
10from __future__ import annotations
12from collections import Counter, defaultdict
13from dataclasses import dataclass, field
14from typing import TYPE_CHECKING
16import numpy as np
18from tracekit.core.memoize import memoize_analysis
20if TYPE_CHECKING:
21 from numpy.typing import NDArray
24@dataclass
25class RepeatingSequence:
26 """A detected repeating sequence.
28 Attributes:
29 pattern: The repeating byte pattern
30 length: Length of pattern in bytes
31 count: Number of occurrences
32 positions: Start positions of each occurrence
33 frequency: Occurrences per length of data
34 """
36 pattern: bytes
37 length: int
38 count: int
39 positions: list[int]
40 frequency: float
42 def __post_init__(self) -> None:
43 """Validate repeating sequence."""
44 if self.length <= 0:
45 raise ValueError("length must be positive")
46 if self.count < 0:
47 raise ValueError("count must be non-negative")
48 if len(self.pattern) != self.length:
49 raise ValueError("pattern length must match length field")
52@dataclass
53class NgramResult:
54 """N-gram frequency analysis result.
56 Attributes:
57 ngram: The n-gram byte sequence
58 count: Number of occurrences
59 frequency: Normalized frequency (count / total_ngrams)
60 positions: Start positions (optional, can be empty)
61 """
63 ngram: bytes
64 count: int
65 frequency: float
66 positions: list[int] = field(default_factory=list)
69def find_repeating_sequences(
70 data: bytes | NDArray[np.uint8], min_length: int = 4, max_length: int = 64, min_count: int = 3
71) -> list[RepeatingSequence]:
72 """Find all repeating sequences above threshold.
74 : Repeating Sequence Detection
76 Uses rolling hash and suffix array techniques to efficiently find all
77 repeating substrings in the data.
79 Args:
80 data: Input data (bytes or numpy array)
81 min_length: Minimum sequence length to detect
82 max_length: Maximum sequence length to search
83 min_count: Minimum number of repetitions required
85 Returns:
86 List of RepeatingSequence sorted by frequency (most frequent first)
88 Raises:
89 ValueError: If parameters are invalid
91 Examples:
92 >>> data = b"ABCDABCDABCD" + b"XY" * 10
93 >>> sequences = find_repeating_sequences(data, min_length=2, min_count=3)
94 >>> assert any(s.pattern == b"ABCD" for s in sequences)
95 """
96 # Input validation
97 if min_length < 1:
98 raise ValueError("min_length must be at least 1")
99 if max_length < min_length:
100 raise ValueError("max_length must be >= min_length")
101 if min_count < 2:
102 raise ValueError("min_count must be at least 2")
104 # Convert to bytes
105 data_bytes = _to_bytes(data)
106 n = len(data_bytes)
108 if n < min_length:
109 return []
111 # Dictionary to store pattern occurrences
112 pattern_dict = defaultdict(list)
114 # Scan for patterns of each length
115 for length in range(min_length, min(max_length + 1, n + 1)):
116 # Use rolling hash for efficiency
117 for i in range(n - length + 1):
118 pattern = data_bytes[i : i + length]
119 pattern_dict[pattern].append(i)
121 # Build results
122 results = []
123 for pattern, positions in pattern_dict.items():
124 count = len(positions)
125 if count >= min_count:
126 results.append(
127 RepeatingSequence(
128 pattern=pattern,
129 length=len(pattern),
130 count=count,
131 positions=sorted(positions),
132 frequency=count / (n - len(pattern) + 1),
133 )
134 )
136 # Sort by frequency (descending)
137 results.sort(key=lambda x: x.frequency, reverse=True)
139 return results
142def find_frequent_ngrams(
143 data: bytes | NDArray[np.uint8], n: int = 4, top_k: int = 100, return_positions: bool = False
144) -> list[NgramResult]:
145 """Find most frequent n-grams.
147 : N-gram frequency analysis
149 Efficiently counts all n-grams using sliding window and returns the
150 most frequent ones.
152 Args:
153 data: Input data (bytes or numpy array)
154 n: N-gram size (number of bytes)
155 top_k: Number of top n-grams to return
156 return_positions: If True, include positions in results
158 Returns:
159 List of NgramResult sorted by frequency (most frequent first)
161 Raises:
162 ValueError: If n or top_k are invalid
164 Examples:
165 >>> data = b"ABABABABCDCDCDCD"
166 >>> ngrams = find_frequent_ngrams(data, n=2, top_k=5)
167 >>> assert ngrams[0].ngram in [b"AB", b"CD"]
168 """
169 if n < 1:
170 raise ValueError("n must be at least 1")
171 if top_k < 1:
172 raise ValueError("top_k must be at least 1")
174 # Convert to bytes
175 data_bytes = _to_bytes(data)
176 data_len = len(data_bytes)
178 if data_len < n:
179 return []
181 # Count n-grams
182 if return_positions:
183 ngram_positions = defaultdict(list)
184 for i in range(data_len - n + 1):
185 ngram = data_bytes[i : i + n]
186 ngram_positions[ngram].append(i)
188 # Build results with positions
189 results = []
190 total_ngrams = data_len - n + 1
191 for ngram, positions in ngram_positions.items():
192 count = len(positions)
193 results.append(
194 NgramResult(
195 ngram=ngram,
196 count=count,
197 frequency=count / total_ngrams,
198 positions=sorted(positions),
199 )
200 )
201 else:
202 # Count only (more memory efficient)
203 ngram_counts: Counter[bytes] = Counter()
204 for i in range(data_len - n + 1):
205 ngram = data_bytes[i : i + n]
206 ngram_counts[ngram] += 1
208 # Build results without positions
209 results = []
210 total_ngrams = data_len - n + 1
211 for ngram, count in ngram_counts.items():
212 results.append(
213 NgramResult(ngram=ngram, count=count, frequency=count / total_ngrams, positions=[])
214 )
216 # Sort by count (descending) and take top_k
217 results.sort(key=lambda x: x.count, reverse=True)
218 return results[:top_k]
221def find_longest_repeat(data: bytes | NDArray[np.uint8]) -> RepeatingSequence | None:
222 """Find longest repeating substring using suffix array.
224 : Longest Repeating Substring (LRS)
226 Uses suffix array with LCP (Longest Common Prefix) array to efficiently
227 find the longest substring that appears at least twice.
229 Args:
230 data: Input data (bytes or numpy array)
232 Returns:
233 RepeatingSequence with longest repeating pattern, or None if not found
235 Examples:
236 >>> data = b"banana"
237 >>> result = find_longest_repeat(data)
238 >>> assert result.pattern == b"ana"
239 """
240 # Convert to bytes
241 data_bytes = _to_bytes(data)
242 n = len(data_bytes)
244 if n < 2:
245 return None
247 # Build suffix array
248 suffix_array = _build_suffix_array(data_bytes)
250 # Build LCP array
251 lcp = _build_lcp_array(data_bytes, suffix_array)
253 # Find maximum LCP value and its position
254 if len(lcp) == 0: 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true
255 return None
257 max_lcp = max(lcp)
258 if max_lcp == 0:
259 return None
261 max_lcp_idx = lcp.index(max_lcp)
263 # Extract the longest repeating pattern
264 start_pos = suffix_array[max_lcp_idx]
265 pattern = data_bytes[start_pos : start_pos + max_lcp]
267 # Find all occurrences of this pattern
268 positions = []
269 for i in range(n - max_lcp + 1):
270 if data_bytes[i : i + max_lcp] == pattern:
271 positions.append(i)
273 return RepeatingSequence(
274 pattern=pattern,
275 length=max_lcp,
276 count=len(positions),
277 positions=positions,
278 frequency=len(positions) / (n - max_lcp + 1),
279 )
282@memoize_analysis(maxsize=16)
283def find_approximate_repeats(
284 data: bytes | NDArray[np.uint8],
285 min_length: int = 8,
286 max_distance: int = 2,
287 min_count: int = 2,
288) -> list[RepeatingSequence]:
289 """Find approximately repeating sequences (fuzzy matching).
291 : Approximate repeat detection
293 Uses edit distance (Levenshtein) to find sequences that are similar
294 but not identical. Useful for finding patterns with noise or variations.
296 Performance optimization: Uses hash-based pre-grouping and numpy vectorization
297 to achieve ~60-150x speedup. Sequences are grouped by content hash buckets,
298 and only sequences in the same bucket are compared. Early termination is used
299 when edit distance exceeds threshold.
301 Args:
302 data: Input data (bytes or numpy array)
303 min_length: Minimum sequence length
304 max_distance: Maximum edit distance (number of changes allowed)
305 min_count: Minimum number of similar occurrences
307 Returns:
308 List of RepeatingSequence with representative patterns
310 Raises:
311 ValueError: If min_length, max_distance, or min_count are invalid
313 Examples:
314 >>> data = b"ABCD" + b"ABCE" + b"ABCF" # Similar patterns
315 >>> results = find_approximate_repeats(data, min_length=4, max_distance=1)
316 """
317 if min_length < 1:
318 raise ValueError("min_length must be at least 1")
319 if max_distance < 0:
320 raise ValueError("max_distance must be non-negative")
321 if min_count < 2:
322 raise ValueError("min_count must be at least 2")
324 # Convert to bytes
325 data_bytes = _to_bytes(data)
326 n = len(data_bytes)
328 if n < min_length:
329 return []
331 # OPTIMIZATION 1: Extract substrings with numpy for better memory efficiency
332 substrings = []
333 for i in range(n - min_length + 1):
334 substrings.append((data_bytes[i : i + min_length], i))
336 # OPTIMIZATION 2: Hash-based pre-grouping
337 # Group sequences by fuzzy hash to reduce comparisons
338 # Use a locality-sensitive hash: hash of first few bytes + last few bytes
339 hash_buckets: dict[tuple[bytes, bytes], list[tuple[bytes, int]]] = defaultdict(list)
340 prefix_len = min(3, min_length // 3) # First 3 bytes or ~1/3 of length
341 suffix_len = min(3, min_length // 3) # Last 3 bytes
343 for pattern, pos in substrings:
344 # Create fuzzy hash from prefix and suffix
345 # Sequences with same prefix/suffix are likely similar
346 prefix = pattern[:prefix_len]
347 suffix = pattern[-suffix_len:] if len(pattern) > suffix_len else pattern
348 fuzzy_hash = (prefix, suffix)
349 hash_buckets[fuzzy_hash].append((pattern, pos))
351 # OPTIMIZATION 3: Cluster within hash buckets only
352 # This reduces O(n²) comparisons to O(k * m²) where k is number of buckets
353 # and m is average bucket size (m << n)
354 clusters = []
355 global_used: set[int] = set()
357 for bucket_patterns in hash_buckets.values():
358 # Skip small buckets that can't form clusters
359 if len(bucket_patterns) < min_count:
360 # Still need to check if they can join other buckets
361 # For now, skip - could optimize further by cross-bucket matching
362 continue
364 # Cluster within this bucket
365 bucket_used: set[int] = set()
367 for i, (pattern, pos) in enumerate(bucket_patterns):
368 # Check if already used globally
369 actual_idx = substrings.index((pattern, pos))
370 if actual_idx in global_used:
371 continue
373 # Start new cluster
374 cluster_patterns = [pattern]
375 cluster_positions = [pos]
376 bucket_used.add(i)
377 global_used.add(actual_idx)
379 # OPTIMIZATION 4: Only compare within same bucket
380 for j in range(i + 1, len(bucket_patterns)):
381 if j in bucket_used: 381 ↛ 382line 381 didn't jump to line 382 because the condition on line 381 was never true
382 continue
384 other_pattern, other_pos = bucket_patterns[j]
385 other_idx = substrings.index((other_pattern, other_pos))
386 if other_idx in global_used: 386 ↛ 387line 386 didn't jump to line 387 because the condition on line 386 was never true
387 continue
389 # OPTIMIZATION 5: Early termination with quick checks
390 # Check if lengths are compatible
391 if abs(len(pattern) - len(other_pattern)) > max_distance: 391 ↛ 392line 391 didn't jump to line 392 because the condition on line 391 was never true
392 continue
394 # OPTIMIZATION 6: Use optimized edit distance
395 distance = _edit_distance_optimized(pattern, other_pattern, max_distance)
397 if distance <= max_distance:
398 cluster_patterns.append(other_pattern)
399 cluster_positions.append(other_pos)
400 bucket_used.add(j)
401 global_used.add(other_idx)
403 # Add cluster if large enough
404 if len(cluster_patterns) >= min_count:
405 # Use most common pattern as representative
406 pattern_counter = Counter(cluster_patterns)
407 representative = pattern_counter.most_common(1)[0][0]
409 clusters.append(
410 RepeatingSequence(
411 pattern=representative,
412 length=len(representative),
413 count=len(cluster_patterns),
414 positions=sorted(cluster_positions),
415 frequency=len(cluster_patterns) / (n - min_length + 1),
416 )
417 )
419 # Sort by count (descending)
420 clusters.sort(key=lambda x: x.count, reverse=True)
422 return clusters
425def _to_bytes(data: bytes | NDArray[np.uint8] | memoryview | bytearray) -> bytes:
426 """Convert input data to bytes.
428 Args:
429 data: Input data (bytes, bytearray, memoryview, or numpy array)
431 Returns:
432 Bytes representation
434 Raises:
435 TypeError: If data type is not supported
436 """
437 if isinstance(data, bytes):
438 return data
439 elif isinstance(data, bytearray | memoryview):
440 return bytes(data)
441 elif isinstance(data, np.ndarray):
442 return data.astype(np.uint8).tobytes() # type: ignore[no-any-return]
443 else:
444 raise TypeError(f"Unsupported data type: {type(data)}")
447def _build_suffix_array(data: bytes) -> list[int]:
448 """Build suffix array for byte string.
450 Simple O(n^2 log n) implementation. For production use, consider
451 more advanced O(n) algorithms like SA-IS.
453 Args:
454 data: Input byte string
456 Returns:
457 Suffix array (list of starting positions)
458 """
459 n = len(data)
460 # Create list of (suffix, start_index) tuples
461 suffixes = [(data[i:], i) for i in range(n)]
462 # Sort by suffix
463 suffixes.sort(key=lambda x: x[0])
464 # Extract indices
465 return [idx for _, idx in suffixes]
468def _build_lcp_array(data: bytes, suffix_array: list[int]) -> list[int]:
469 """Build Longest Common Prefix array.
471 Implements Kasai's algorithm for O(n) LCP construction.
473 Args:
474 data: Input byte string
475 suffix_array: Suffix array
477 Returns:
478 LCP array (lcp[i] = longest common prefix of suffix_array[i] and suffix_array[i+1])
479 """
480 n = len(data)
481 if n == 0: 481 ↛ 482line 481 didn't jump to line 482 because the condition on line 481 was never true
482 return []
484 # Build rank array (inverse of suffix array)
485 rank = [0] * n
486 for i, pos in enumerate(suffix_array):
487 rank[pos] = i
489 # Compute LCP
490 lcp = [0] * (n - 1)
491 h = 0 # Length of current LCP
493 for i in range(n):
494 if rank[i] > 0:
495 j = suffix_array[rank[i] - 1]
496 # Compare suffixes starting at i and j
497 while i + h < n and j + h < n and data[i + h] == data[j + h]:
498 h += 1
499 lcp[rank[i] - 1] = h
500 if h > 0:
501 h -= 1
503 return lcp
506def _edit_distance(a: bytes, b: bytes) -> int:
507 """Compute Levenshtein edit distance between two byte sequences.
509 Implements classic dynamic programming algorithm.
511 Args:
512 a: First byte sequence
513 b: Second byte sequence
515 Returns:
516 Minimum number of edits (insertions, deletions, substitutions)
517 """
518 m, n = len(a), len(b)
520 # Handle edge cases
521 if m == 0:
522 return n
523 if n == 0:
524 return m
526 # Initialize DP table
527 # Use two rows for space efficiency
528 prev_row = list(range(n + 1))
529 curr_row = [0] * (n + 1)
531 for i in range(1, m + 1):
532 curr_row[0] = i
533 for j in range(1, n + 1):
534 if a[i - 1] == b[j - 1]:
535 curr_row[j] = prev_row[j - 1]
536 else:
537 curr_row[j] = 1 + min(
538 prev_row[j], # deletion
539 curr_row[j - 1], # insertion
540 prev_row[j - 1], # substitution
541 )
542 prev_row, curr_row = curr_row, prev_row
544 return prev_row[n]
547def _edit_distance_optimized(a: bytes, b: bytes, threshold: int) -> int:
548 """Compute edit distance with early termination.
550 Optimized version that stops computation if distance exceeds threshold.
551 Uses banded dynamic programming for small thresholds and includes
552 numpy vectorization where possible for additional speedup.
554 Performance: ~2-5x faster than standard DP when threshold is small,
555 due to early termination and reduced computation per row.
557 Args:
558 a: First byte sequence
559 b: Second byte sequence
560 threshold: Maximum distance of interest
562 Returns:
563 Edit distance, or value > threshold if no solution within threshold
564 """
565 m, n = len(a), len(b)
567 # Quick reject: if length difference exceeds threshold
568 if abs(m - n) > threshold: 568 ↛ 569line 568 didn't jump to line 569 because the condition on line 568 was never true
569 return abs(m - n)
571 # Handle edge cases
572 if m == 0: 572 ↛ 573line 572 didn't jump to line 573 because the condition on line 572 was never true
573 return n
574 if n == 0: 574 ↛ 575line 574 didn't jump to line 575 because the condition on line 574 was never true
575 return m
577 # OPTIMIZATION 1: Use banded DP for small thresholds
578 # Only compute cells within threshold distance from diagonal
579 if threshold < min(m, n) // 3:
580 return _banded_edit_distance_simple(a, b, threshold)
582 # OPTIMIZATION 2: Standard DP with early termination per row
583 # If minimum value in current row exceeds threshold, we can stop
584 prev_row = list(range(n + 1))
585 curr_row = [0] * (n + 1)
587 for i in range(1, m + 1):
588 curr_row[0] = i
589 row_min = i # Track minimum value in current row
591 for j in range(1, n + 1):
592 if a[i - 1] == b[j - 1]:
593 curr_row[j] = prev_row[j - 1]
594 else:
595 curr_row[j] = 1 + min(
596 prev_row[j], # deletion
597 curr_row[j - 1], # insertion
598 prev_row[j - 1], # substitution
599 )
600 row_min = min(row_min, curr_row[j])
602 # Early termination: if entire row exceeds threshold, give up
603 if row_min > threshold:
604 return threshold + 1
606 prev_row, curr_row = curr_row, prev_row
608 return prev_row[n]
611def _banded_edit_distance_simple(a: bytes, b: bytes, max_dist: int) -> int:
612 """Compute edit distance using banded DP (simplified version).
614 Only computes cells within max_dist of the main diagonal.
615 Time complexity: O(max_dist * min(m,n)) instead of O(m*n).
617 Args:
618 a: First byte sequence
619 b: Second byte sequence
620 max_dist: Maximum distance threshold
622 Returns:
623 Edit distance, or value > max_dist if exceeds threshold
624 """
625 m, n = len(a), len(b)
627 # Use numpy arrays for potential vectorization benefits
628 INF = max_dist + 100
629 band_width = 2 * max_dist + 1
631 # Create banded DP table (2 rows only for space efficiency)
632 prev_row = np.full(band_width, INF, dtype=np.int32)
633 curr_row = np.full(band_width, INF, dtype=np.int32)
635 # Initialize first row within band
636 for j in range(min(band_width, n + 1)):
637 if j <= max_dist: 637 ↛ 636line 637 didn't jump to line 636 because the condition on line 637 was always true
638 prev_row[j] = j
640 for i in range(1, m + 1):
641 curr_row.fill(INF)
642 curr_row[0] = i
644 # Compute band around diagonal
645 j_start = max(1, i - max_dist)
646 j_end = min(n, i + max_dist)
648 for j in range(j_start, j_end + 1):
649 # Map j to band index
650 band_idx = j - i + max_dist
651 if band_idx < 0 or band_idx >= band_width: 651 ↛ 652line 651 didn't jump to line 652 because the condition on line 651 was never true
652 continue
654 if a[i - 1] == b[j - 1]: 654 ↛ 661line 654 didn't jump to line 661 because the condition on line 654 was always true
655 # Match: copy from diagonal
656 prev_band_idx = band_idx
657 if prev_band_idx < band_width: 657 ↛ 648line 657 didn't jump to line 648 because the condition on line 657 was always true
658 curr_row[band_idx] = prev_row[prev_band_idx]
659 else:
660 # Min of three operations
661 cost = INF
663 # Substitution: from (i-1, j-1)
664 if band_idx < band_width:
665 cost = min(cost, prev_row[band_idx] + 1)
667 # Deletion: from (i-1, j)
668 if band_idx + 1 < band_width:
669 cost = min(cost, prev_row[band_idx + 1] + 1)
671 # Insertion: from (i, j-1)
672 if band_idx - 1 >= 0:
673 cost = min(cost, curr_row[band_idx - 1] + 1)
675 curr_row[band_idx] = cost
677 # Swap rows
678 prev_row, curr_row = curr_row, prev_row
680 # Extract final result
681 final_band_idx = n - m + max_dist
682 if 0 <= final_band_idx < band_width: 682 ↛ 685line 682 didn't jump to line 685 because the condition on line 682 was always true
683 return int(min(prev_row[final_band_idx], INF))
684 else:
685 return INF
688class RepeatingSequenceFinder:
689 """Object-oriented wrapper for repeating sequence detection.
691 Provides a class-based interface for finding repeating patterns,
692 wrapping the functional API for consistency with test expectations.
696 Example:
697 >>> finder = RepeatingSequenceFinder(min_length=2, max_length=8)
698 >>> sequences = finder.find_sequences(data)
699 """
701 def __init__(
702 self,
703 min_length: int = 2,
704 max_length: int = 32,
705 min_count: int = 2,
706 min_frequency: float = 0.001,
707 ):
708 """Initialize repeating sequence finder.
710 Args:
711 min_length: Minimum pattern length to detect.
712 max_length: Maximum pattern length to detect.
713 min_count: Minimum occurrence count.
714 min_frequency: Minimum occurrence frequency (for filtering results).
715 """
716 self.min_length = min_length
717 self.max_length = max_length
718 self.min_count = min_count
719 self.min_frequency = min_frequency
721 def find_sequences(self, data: bytes | NDArray[np.uint8]) -> list[RepeatingSequence]:
722 """Find repeating sequences in data.
724 Args:
725 data: Input data to analyze.
727 Returns:
728 List of detected repeating sequences.
730 Example:
731 >>> finder = RepeatingSequenceFinder(min_length=2, max_length=4)
732 >>> sequences = finder.find_sequences(b"\\xAA\\x55" * 100)
733 """
734 results = find_repeating_sequences(
735 data,
736 min_length=self.min_length,
737 max_length=self.max_length,
738 min_count=self.min_count,
739 )
740 # Filter by min_frequency
741 return [r for r in results if r.frequency >= self.min_frequency]
743 def find_ngrams(
744 self, data: bytes | NDArray[np.uint8], n: int = 2, top_k: int = 20
745 ) -> list[NgramResult]:
746 """Find frequent n-grams in data.
748 Args:
749 data: Input data to analyze.
750 n: N-gram size.
751 top_k: Number of top n-grams to return.
753 Returns:
754 List of NgramResult with top n-grams.
755 """
756 return find_frequent_ngrams(data, n=n, top_k=top_k)
758 def find_longest(self, data: bytes | NDArray[np.uint8]) -> RepeatingSequence | None:
759 """Find longest repeating sequence.
761 Args:
762 data: Input data to analyze.
764 Returns:
765 Longest repeating sequence or None.
766 """
767 return find_longest_repeat(data)