Coverage for src / tracekit / analyzers / statistical / ngrams.py: 97%
133 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""N-gram frequency analysis for protocol fingerprinting.
4This module provides tools for analyzing n-gram (byte sequence) frequencies
5in binary data, useful for pattern identification, data characterization,
6and protocol fingerprinting.
7"""
9from collections import Counter
10from dataclasses import dataclass
11from typing import TYPE_CHECKING, Any, Union
13import numpy as np
15if TYPE_CHECKING:
16 from numpy.typing import NDArray
18# Type alias for input data
19DataType = Union[bytes, bytearray, "NDArray[Any]"]
22@dataclass
23class NgramProfile:
24 """N-gram frequency profile.
26 Attributes:
27 n: N-gram size (number of bytes)
28 frequencies: Dictionary mapping n-grams to their counts
29 total_ngrams: Total number of n-grams extracted
30 unique_ngrams: Number of unique n-grams found
31 top_k: List of top n-grams with (ngram, count, frequency)
32 entropy: Shannon entropy of n-gram distribution
33 """
35 n: int
36 frequencies: dict[bytes, int]
37 total_ngrams: int
38 unique_ngrams: int
39 top_k: list[tuple[bytes, int, float]] # (ngram, count, frequency)
40 entropy: float
43@dataclass
44class NgramComparison:
45 """Comparison of two n-gram profiles.
47 Attributes:
48 similarity: Jaccard similarity coefficient (0-1, 1 = identical)
49 cosine_similarity: Cosine similarity of frequency vectors (0-1)
50 chi_square: Chi-square distance between distributions
51 common_ngrams: Number of n-grams present in both profiles
52 unique_to_a: Number of n-grams unique to first profile
53 unique_to_b: Number of n-grams unique to second profile
54 """
56 similarity: float
57 cosine_similarity: float
58 chi_square: float
59 common_ngrams: int
60 unique_to_a: int
61 unique_to_b: int
64def ngram_frequency(data: DataType, n: int = 2, overlap: bool = True) -> NgramProfile:
65 """Compute n-gram frequencies.
67 : N-gram Frequency Analysis
69 Extracts all n-grams from the data and computes their frequency distribution.
71 Args:
72 data: Input data as bytes, bytearray, or numpy array
73 n: N-gram size in bytes (default: 2 for bigrams)
74 overlap: If True, use overlapping n-grams; if False, non-overlapping (default: True)
76 Returns:
77 NgramProfile containing frequency statistics
79 Raises:
80 ValueError: If n < 1
82 Example:
83 >>> profile = ngram_frequency(b'ABCABC', n=2)
84 >>> profile.n
85 2
86 >>> profile.total_ngrams
87 5
88 """
89 if isinstance(data, np.ndarray):
90 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten())
92 if n < 1:
93 raise ValueError(f"N-gram size must be >= 1, got {n}")
95 # Handle data shorter than n - return empty profile
96 if len(data) < n:
97 return NgramProfile(
98 n=n,
99 frequencies={},
100 total_ngrams=0,
101 unique_ngrams=0,
102 top_k=[],
103 entropy=0.0,
104 )
106 # Extract n-grams
107 step = 1 if overlap else n
108 ngrams = []
110 for i in range(0, len(data) - n + 1, step):
111 ngrams.append(bytes(data[i : i + n]))
113 # Count frequencies
114 freq_counter = Counter(ngrams)
115 total = len(ngrams)
116 unique = len(freq_counter)
118 # Get top k n-grams (sorted by count, then by bytes for consistency)
119 top_k = [
120 (ngram, count, count / total)
121 for ngram, count in sorted(
122 freq_counter.items(),
123 key=lambda x: (-x[1], x[0]), # Sort by count desc, then ngram asc
124 )[:100] # Limit to top 100
125 ]
127 # Calculate n-gram entropy
128 entropy_val = 0.0
129 for count in freq_counter.values():
130 prob = count / total
131 entropy_val -= prob * np.log2(prob)
133 return NgramProfile(
134 n=n,
135 frequencies=dict(freq_counter),
136 total_ngrams=total,
137 unique_ngrams=unique,
138 top_k=top_k,
139 entropy=float(entropy_val),
140 )
143def ngram_entropy(data: DataType, n: int = 2) -> float:
144 """Calculate entropy over n-gram distribution.
146 : N-gram Frequency Analysis
148 Computes Shannon entropy of the n-gram distribution, which measures
149 the predictability of n-gram sequences.
151 Args:
152 data: Input data as bytes, bytearray, or numpy array
153 n: N-gram size in bytes (default: 2)
155 Returns:
156 N-gram entropy in bits (0.0 if data is too short)
158 Example:
159 >>> entropy = ngram_entropy(b'AAAA', n=2)
160 >>> entropy
161 0.0
162 """
163 profile = ngram_frequency(data, n=n, overlap=True)
164 return profile.entropy
167def compare_ngram_profiles(data_a: DataType, data_b: DataType, n: int = 2) -> NgramComparison:
168 """Compare n-gram profiles between two datasets.
170 : N-gram Frequency Analysis
172 Computes multiple similarity metrics between two n-gram distributions
173 for protocol fingerprinting and classification.
175 Args:
176 data_a: First dataset
177 data_b: Second dataset
178 n: N-gram size in bytes (default: 2)
180 Returns:
181 NgramComparison with similarity metrics
183 Example:
184 >>> comp = compare_ngram_profiles(b'ABCABC', b'ABCABC', n=2)
185 >>> comp.similarity
186 1.0
187 """
188 # Generate profiles
189 profile_a = ngram_frequency(data_a, n=n, overlap=True)
190 profile_b = ngram_frequency(data_b, n=n, overlap=True)
192 freq_a = profile_a.frequencies
193 freq_b = profile_b.frequencies
195 # Get all unique n-grams
196 all_ngrams = set(freq_a.keys()) | set(freq_b.keys())
197 common = set(freq_a.keys()) & set(freq_b.keys())
199 # Jaccard similarity (set-based)
200 jaccard = len(common) / len(all_ngrams) if all_ngrams else 1.0
202 # Cosine similarity (frequency-based)
203 if all_ngrams:
204 vec_a = np.array([freq_a.get(ng, 0) for ng in all_ngrams])
205 vec_b = np.array([freq_b.get(ng, 0) for ng in all_ngrams])
207 norm_a = np.linalg.norm(vec_a)
208 norm_b = np.linalg.norm(vec_b)
210 if norm_a > 0 and norm_b > 0: 210 ↛ 213line 210 didn't jump to line 213 because the condition on line 210 was always true
211 cosine_sim = np.dot(vec_a, vec_b) / (norm_a * norm_b)
212 else:
213 cosine_sim = 1.0 if norm_a == norm_b else 0.0
214 else:
215 cosine_sim = 1.0
217 # Chi-square distance
218 chi_square_val = 0.0
219 if all_ngrams:
220 total_a = profile_a.total_ngrams
221 total_b = profile_b.total_ngrams
223 for ngram in all_ngrams:
224 freq_a_norm = freq_a.get(ngram, 0) / total_a if total_a > 0 else 0
225 freq_b_norm = freq_b.get(ngram, 0) / total_b if total_b > 0 else 0
226 expected = (freq_a_norm + freq_b_norm) / 2
228 if expected > 0: 228 ↛ 223line 228 didn't jump to line 223 because the condition on line 228 was always true
229 chi_square_val += (
230 (freq_a_norm - expected) ** 2 + (freq_b_norm - expected) ** 2
231 ) / expected
233 return NgramComparison(
234 similarity=float(jaccard),
235 cosine_similarity=float(cosine_sim),
236 chi_square=float(chi_square_val),
237 common_ngrams=len(common),
238 unique_to_a=len(freq_a) - len(common),
239 unique_to_b=len(freq_b) - len(common),
240 )
243def find_unusual_ngrams(
244 data: DataType, baseline: NgramProfile | None = None, n: int = 2, z_threshold: float = 3.0
245) -> list[tuple[bytes, float]]:
246 """Find unusually frequent or rare n-grams.
248 : N-gram Frequency Analysis
250 Identifies n-grams with frequencies that deviate significantly from
251 expected (baseline) distributions using z-score analysis.
253 Args:
254 data: Input data to analyze
255 baseline: Baseline n-gram profile for comparison (None = use uniform)
256 n: N-gram size in bytes (default: 2)
257 z_threshold: Z-score threshold for unusual classification (default: 3.0)
259 Returns:
260 List of (ngram, z_score) tuples for unusual n-grams, sorted by |z_score|
262 Raises:
263 ValueError: If baseline n-gram size doesn't match requested size.
265 Example:
266 >>> unusual = find_unusual_ngrams(b'AAABBBCCC', n=1)
267 >>> len(unusual) >= 0
268 True
269 """
270 if isinstance(data, np.ndarray): 270 ↛ 271line 270 didn't jump to line 271 because the condition on line 270 was never true
271 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten())
273 # Generate profile for current data
274 profile = ngram_frequency(data, n=n, overlap=True)
276 if profile.total_ngrams == 0:
277 return []
279 if baseline is None:
280 # Use uniform distribution as baseline
281 expected_count = profile.total_ngrams / (256**n)
282 baseline_freqs = {} # Empty means all n-grams have same expected frequency
283 else:
284 if baseline.n != n:
285 raise ValueError(f"Baseline n-gram size ({baseline.n}) != requested size ({n})")
286 # Normalize baseline frequencies
287 baseline_freqs = {
288 ng: count / baseline.total_ngrams * profile.total_ngrams
289 for ng, count in baseline.frequencies.items()
290 }
291 expected_count = profile.total_ngrams / (256**n)
293 # Calculate z-scores
294 unusual = []
296 for ngram, observed in profile.frequencies.items():
297 expected = baseline_freqs.get(ngram, expected_count)
299 # Use Poisson approximation for count data
300 if expected > 0: 300 ↛ 296line 300 didn't jump to line 296 because the condition on line 300 was always true
301 # Z-score = (observed - expected) / sqrt(expected)
302 z_score = (observed - expected) / np.sqrt(expected)
304 if abs(z_score) >= z_threshold:
305 unusual.append((ngram, float(z_score)))
307 # Sort by absolute z-score descending
308 unusual.sort(key=lambda x: abs(x[1]), reverse=True)
310 return unusual
313def ngram_heatmap(data: DataType, n: int = 2) -> "NDArray[np.float64]":
314 """Generate n-gram co-occurrence heatmap.
316 : N-gram Frequency Analysis
318 Creates a heatmap matrix showing n-gram frequencies. For bigrams (n=2),
319 this produces a 256x256 matrix where entry [i,j] is the count of bigram
320 (byte_i, byte_j).
322 Args:
323 data: Input data as bytes, bytearray, or numpy array
324 n: N-gram size in bytes (must be 2 for heatmap visualization)
326 Returns:
327 Numpy array of shape (256, 256) for bigrams, normalized to [0, 1]
329 Raises:
330 ValueError: If n != 2 (only bigrams supported for heatmap)
332 Example:
333 >>> heatmap = ngram_heatmap(b'ABAB', n=2)
334 >>> heatmap.shape
335 (256, 256)
336 """
337 if n != 2:
338 raise ValueError(f"Heatmap only supported for bigrams (n=2), got n={n}")
340 if isinstance(data, np.ndarray):
341 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten())
343 # Initialize 256x256 matrix
344 heatmap = np.zeros((256, 256), dtype=np.float64)
346 # Count bigrams
347 for i in range(len(data) - 1):
348 byte1 = data[i]
349 byte2 = data[i + 1]
350 heatmap[byte1, byte2] += 1
352 # Normalize to [0, 1]
353 max_val = heatmap.max()
354 if max_val > 0:
355 heatmap = heatmap / max_val
357 return heatmap
360class NGramAnalyzer:
361 """Object-oriented wrapper for n-gram analysis.
363 Provides a class-based interface for n-gram analysis operations,
364 wrapping the functional API for consistency with test expectations.
368 Example:
369 >>> analyzer = NGramAnalyzer(n=2)
370 >>> frequencies = analyzer.analyze(b'ABCABC')
371 >>> frequencies[b'AB'] # Direct dict access
372 2
373 """
375 def __init__(self, n: int = 2, overlap: bool = True):
376 """Initialize n-gram analyzer.
378 Args:
379 n: N-gram size in bytes.
380 overlap: Whether to use overlapping n-grams.
381 """
382 self.n = n
383 self.overlap = overlap
384 self._last_profile: NgramProfile | None = None
386 def analyze(self, data: DataType) -> dict[bytes, int]:
387 """Analyze n-gram frequencies in data.
389 Returns a dictionary mapping n-grams to counts for direct access.
390 Returns empty dict if data is shorter than n.
392 Args:
393 data: Input data as bytes, bytearray, or numpy array.
395 Returns:
396 Dictionary mapping n-grams (bytes) to their counts (int).
398 Example:
399 >>> analyzer = NGramAnalyzer(n=2)
400 >>> frequencies = analyzer.analyze(b'ABCABC')
401 >>> frequencies[b'AB']
402 2
403 """
404 self._last_profile = ngram_frequency(data, n=self.n, overlap=self.overlap)
405 return self._last_profile.frequencies
407 def analyze_profile(self, data: DataType) -> NgramProfile:
408 """Analyze n-gram frequencies and return full profile.
410 Args:
411 data: Input data as bytes, bytearray, or numpy array.
413 Returns:
414 NgramProfile with full frequency statistics.
416 Example:
417 >>> analyzer = NGramAnalyzer(n=2)
418 >>> profile = analyzer.analyze_profile(b'ABCDEF')
419 >>> profile.n == 2
420 True
421 """
422 self._last_profile = ngram_frequency(data, n=self.n, overlap=self.overlap)
423 return self._last_profile
425 def get_distribution(self, frequencies: dict[bytes, int]) -> dict[bytes, float]:
426 """Convert frequency counts to normalized distribution.
428 Args:
429 frequencies: Dictionary mapping n-grams to counts.
431 Returns:
432 Dictionary mapping n-grams to normalized frequencies (0-1).
434 Example:
435 >>> analyzer = NGramAnalyzer(n=2)
436 >>> freqs = analyzer.analyze(b'ABAB')
437 >>> dist = analyzer.get_distribution(freqs)
438 >>> sum(dist.values()) # Should sum to 1.0
439 1.0
440 """
441 total = sum(frequencies.values())
442 if total == 0:
443 return {}
444 return {ngram: count / total for ngram, count in frequencies.items()}
446 def entropy(self, data: DataType) -> float:
447 """Calculate n-gram entropy.
449 Args:
450 data: Input data.
452 Returns:
453 Entropy in bits.
454 """
455 return ngram_entropy(data, n=self.n)
457 def compare(self, data_a: DataType, data_b: DataType) -> NgramComparison:
458 """Compare n-gram profiles of two datasets.
460 Args:
461 data_a: First dataset.
462 data_b: Second dataset.
464 Returns:
465 NgramComparison with similarity metrics.
466 """
467 return compare_ngram_profiles(data_a, data_b, n=self.n)
469 def find_unusual(
470 self, data: DataType, baseline: NgramProfile | None = None, z_threshold: float = 3.0
471 ) -> list[tuple[bytes, float]]:
472 """Find unusual n-grams in data.
474 Args:
475 data: Input data.
476 baseline: Baseline profile for comparison.
477 z_threshold: Z-score threshold.
479 Returns:
480 List of (ngram, z_score) tuples.
481 """
482 return find_unusual_ngrams(data, baseline=baseline, n=self.n, z_threshold=z_threshold)
484 def heatmap(self, data: DataType) -> "NDArray[np.float64]":
485 """Generate bigram heatmap.
487 Args:
488 data: Input data.
490 Returns:
491 256x256 heatmap array.
492 """
493 return ngram_heatmap(data, n=self.n)
496__all__ = [
497 "DataType",
498 "NGramAnalyzer",
499 "NgramComparison",
500 "NgramProfile",
501 "compare_ngram_profiles",
502 "find_unusual_ngrams",
503 "ngram_entropy",
504 "ngram_frequency",
505 "ngram_heatmap",
506]