Coverage for src / tracekit / analyzers / statistical / ngrams.py: 97%

133 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""N-gram frequency analysis for protocol fingerprinting. 

2 

3 

4This module provides tools for analyzing n-gram (byte sequence) frequencies 

5in binary data, useful for pattern identification, data characterization, 

6and protocol fingerprinting. 

7""" 

8 

9from collections import Counter 

10from dataclasses import dataclass 

11from typing import TYPE_CHECKING, Any, Union 

12 

13import numpy as np 

14 

15if TYPE_CHECKING: 

16 from numpy.typing import NDArray 

17 

18# Type alias for input data 

19DataType = Union[bytes, bytearray, "NDArray[Any]"] 

20 

21 

22@dataclass 

23class NgramProfile: 

24 """N-gram frequency profile. 

25 

26 Attributes: 

27 n: N-gram size (number of bytes) 

28 frequencies: Dictionary mapping n-grams to their counts 

29 total_ngrams: Total number of n-grams extracted 

30 unique_ngrams: Number of unique n-grams found 

31 top_k: List of top n-grams with (ngram, count, frequency) 

32 entropy: Shannon entropy of n-gram distribution 

33 """ 

34 

35 n: int 

36 frequencies: dict[bytes, int] 

37 total_ngrams: int 

38 unique_ngrams: int 

39 top_k: list[tuple[bytes, int, float]] # (ngram, count, frequency) 

40 entropy: float 

41 

42 

43@dataclass 

44class NgramComparison: 

45 """Comparison of two n-gram profiles. 

46 

47 Attributes: 

48 similarity: Jaccard similarity coefficient (0-1, 1 = identical) 

49 cosine_similarity: Cosine similarity of frequency vectors (0-1) 

50 chi_square: Chi-square distance between distributions 

51 common_ngrams: Number of n-grams present in both profiles 

52 unique_to_a: Number of n-grams unique to first profile 

53 unique_to_b: Number of n-grams unique to second profile 

54 """ 

55 

56 similarity: float 

57 cosine_similarity: float 

58 chi_square: float 

59 common_ngrams: int 

60 unique_to_a: int 

61 unique_to_b: int 

62 

63 

64def ngram_frequency(data: DataType, n: int = 2, overlap: bool = True) -> NgramProfile: 

65 """Compute n-gram frequencies. 

66 

67 : N-gram Frequency Analysis 

68 

69 Extracts all n-grams from the data and computes their frequency distribution. 

70 

71 Args: 

72 data: Input data as bytes, bytearray, or numpy array 

73 n: N-gram size in bytes (default: 2 for bigrams) 

74 overlap: If True, use overlapping n-grams; if False, non-overlapping (default: True) 

75 

76 Returns: 

77 NgramProfile containing frequency statistics 

78 

79 Raises: 

80 ValueError: If n < 1 

81 

82 Example: 

83 >>> profile = ngram_frequency(b'ABCABC', n=2) 

84 >>> profile.n 

85 2 

86 >>> profile.total_ngrams 

87 5 

88 """ 

89 if isinstance(data, np.ndarray): 

90 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten()) 

91 

92 if n < 1: 

93 raise ValueError(f"N-gram size must be >= 1, got {n}") 

94 

95 # Handle data shorter than n - return empty profile 

96 if len(data) < n: 

97 return NgramProfile( 

98 n=n, 

99 frequencies={}, 

100 total_ngrams=0, 

101 unique_ngrams=0, 

102 top_k=[], 

103 entropy=0.0, 

104 ) 

105 

106 # Extract n-grams 

107 step = 1 if overlap else n 

108 ngrams = [] 

109 

110 for i in range(0, len(data) - n + 1, step): 

111 ngrams.append(bytes(data[i : i + n])) 

112 

113 # Count frequencies 

114 freq_counter = Counter(ngrams) 

115 total = len(ngrams) 

116 unique = len(freq_counter) 

117 

118 # Get top k n-grams (sorted by count, then by bytes for consistency) 

119 top_k = [ 

120 (ngram, count, count / total) 

121 for ngram, count in sorted( 

122 freq_counter.items(), 

123 key=lambda x: (-x[1], x[0]), # Sort by count desc, then ngram asc 

124 )[:100] # Limit to top 100 

125 ] 

126 

127 # Calculate n-gram entropy 

128 entropy_val = 0.0 

129 for count in freq_counter.values(): 

130 prob = count / total 

131 entropy_val -= prob * np.log2(prob) 

132 

133 return NgramProfile( 

134 n=n, 

135 frequencies=dict(freq_counter), 

136 total_ngrams=total, 

137 unique_ngrams=unique, 

138 top_k=top_k, 

139 entropy=float(entropy_val), 

140 ) 

141 

142 

143def ngram_entropy(data: DataType, n: int = 2) -> float: 

144 """Calculate entropy over n-gram distribution. 

145 

146 : N-gram Frequency Analysis 

147 

148 Computes Shannon entropy of the n-gram distribution, which measures 

149 the predictability of n-gram sequences. 

150 

151 Args: 

152 data: Input data as bytes, bytearray, or numpy array 

153 n: N-gram size in bytes (default: 2) 

154 

155 Returns: 

156 N-gram entropy in bits (0.0 if data is too short) 

157 

158 Example: 

159 >>> entropy = ngram_entropy(b'AAAA', n=2) 

160 >>> entropy 

161 0.0 

162 """ 

163 profile = ngram_frequency(data, n=n, overlap=True) 

164 return profile.entropy 

165 

166 

167def compare_ngram_profiles(data_a: DataType, data_b: DataType, n: int = 2) -> NgramComparison: 

168 """Compare n-gram profiles between two datasets. 

169 

170 : N-gram Frequency Analysis 

171 

172 Computes multiple similarity metrics between two n-gram distributions 

173 for protocol fingerprinting and classification. 

174 

175 Args: 

176 data_a: First dataset 

177 data_b: Second dataset 

178 n: N-gram size in bytes (default: 2) 

179 

180 Returns: 

181 NgramComparison with similarity metrics 

182 

183 Example: 

184 >>> comp = compare_ngram_profiles(b'ABCABC', b'ABCABC', n=2) 

185 >>> comp.similarity 

186 1.0 

187 """ 

188 # Generate profiles 

189 profile_a = ngram_frequency(data_a, n=n, overlap=True) 

190 profile_b = ngram_frequency(data_b, n=n, overlap=True) 

191 

192 freq_a = profile_a.frequencies 

193 freq_b = profile_b.frequencies 

194 

195 # Get all unique n-grams 

196 all_ngrams = set(freq_a.keys()) | set(freq_b.keys()) 

197 common = set(freq_a.keys()) & set(freq_b.keys()) 

198 

199 # Jaccard similarity (set-based) 

200 jaccard = len(common) / len(all_ngrams) if all_ngrams else 1.0 

201 

202 # Cosine similarity (frequency-based) 

203 if all_ngrams: 

204 vec_a = np.array([freq_a.get(ng, 0) for ng in all_ngrams]) 

205 vec_b = np.array([freq_b.get(ng, 0) for ng in all_ngrams]) 

206 

207 norm_a = np.linalg.norm(vec_a) 

208 norm_b = np.linalg.norm(vec_b) 

209 

210 if norm_a > 0 and norm_b > 0: 210 ↛ 213line 210 didn't jump to line 213 because the condition on line 210 was always true

211 cosine_sim = np.dot(vec_a, vec_b) / (norm_a * norm_b) 

212 else: 

213 cosine_sim = 1.0 if norm_a == norm_b else 0.0 

214 else: 

215 cosine_sim = 1.0 

216 

217 # Chi-square distance 

218 chi_square_val = 0.0 

219 if all_ngrams: 

220 total_a = profile_a.total_ngrams 

221 total_b = profile_b.total_ngrams 

222 

223 for ngram in all_ngrams: 

224 freq_a_norm = freq_a.get(ngram, 0) / total_a if total_a > 0 else 0 

225 freq_b_norm = freq_b.get(ngram, 0) / total_b if total_b > 0 else 0 

226 expected = (freq_a_norm + freq_b_norm) / 2 

227 

228 if expected > 0: 228 ↛ 223line 228 didn't jump to line 223 because the condition on line 228 was always true

229 chi_square_val += ( 

230 (freq_a_norm - expected) ** 2 + (freq_b_norm - expected) ** 2 

231 ) / expected 

232 

233 return NgramComparison( 

234 similarity=float(jaccard), 

235 cosine_similarity=float(cosine_sim), 

236 chi_square=float(chi_square_val), 

237 common_ngrams=len(common), 

238 unique_to_a=len(freq_a) - len(common), 

239 unique_to_b=len(freq_b) - len(common), 

240 ) 

241 

242 

243def find_unusual_ngrams( 

244 data: DataType, baseline: NgramProfile | None = None, n: int = 2, z_threshold: float = 3.0 

245) -> list[tuple[bytes, float]]: 

246 """Find unusually frequent or rare n-grams. 

247 

248 : N-gram Frequency Analysis 

249 

250 Identifies n-grams with frequencies that deviate significantly from 

251 expected (baseline) distributions using z-score analysis. 

252 

253 Args: 

254 data: Input data to analyze 

255 baseline: Baseline n-gram profile for comparison (None = use uniform) 

256 n: N-gram size in bytes (default: 2) 

257 z_threshold: Z-score threshold for unusual classification (default: 3.0) 

258 

259 Returns: 

260 List of (ngram, z_score) tuples for unusual n-grams, sorted by |z_score| 

261 

262 Raises: 

263 ValueError: If baseline n-gram size doesn't match requested size. 

264 

265 Example: 

266 >>> unusual = find_unusual_ngrams(b'AAABBBCCC', n=1) 

267 >>> len(unusual) >= 0 

268 True 

269 """ 

270 if isinstance(data, np.ndarray): 270 ↛ 271line 270 didn't jump to line 271 because the condition on line 270 was never true

271 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten()) 

272 

273 # Generate profile for current data 

274 profile = ngram_frequency(data, n=n, overlap=True) 

275 

276 if profile.total_ngrams == 0: 

277 return [] 

278 

279 if baseline is None: 

280 # Use uniform distribution as baseline 

281 expected_count = profile.total_ngrams / (256**n) 

282 baseline_freqs = {} # Empty means all n-grams have same expected frequency 

283 else: 

284 if baseline.n != n: 

285 raise ValueError(f"Baseline n-gram size ({baseline.n}) != requested size ({n})") 

286 # Normalize baseline frequencies 

287 baseline_freqs = { 

288 ng: count / baseline.total_ngrams * profile.total_ngrams 

289 for ng, count in baseline.frequencies.items() 

290 } 

291 expected_count = profile.total_ngrams / (256**n) 

292 

293 # Calculate z-scores 

294 unusual = [] 

295 

296 for ngram, observed in profile.frequencies.items(): 

297 expected = baseline_freqs.get(ngram, expected_count) 

298 

299 # Use Poisson approximation for count data 

300 if expected > 0: 300 ↛ 296line 300 didn't jump to line 296 because the condition on line 300 was always true

301 # Z-score = (observed - expected) / sqrt(expected) 

302 z_score = (observed - expected) / np.sqrt(expected) 

303 

304 if abs(z_score) >= z_threshold: 

305 unusual.append((ngram, float(z_score))) 

306 

307 # Sort by absolute z-score descending 

308 unusual.sort(key=lambda x: abs(x[1]), reverse=True) 

309 

310 return unusual 

311 

312 

313def ngram_heatmap(data: DataType, n: int = 2) -> "NDArray[np.float64]": 

314 """Generate n-gram co-occurrence heatmap. 

315 

316 : N-gram Frequency Analysis 

317 

318 Creates a heatmap matrix showing n-gram frequencies. For bigrams (n=2), 

319 this produces a 256x256 matrix where entry [i,j] is the count of bigram 

320 (byte_i, byte_j). 

321 

322 Args: 

323 data: Input data as bytes, bytearray, or numpy array 

324 n: N-gram size in bytes (must be 2 for heatmap visualization) 

325 

326 Returns: 

327 Numpy array of shape (256, 256) for bigrams, normalized to [0, 1] 

328 

329 Raises: 

330 ValueError: If n != 2 (only bigrams supported for heatmap) 

331 

332 Example: 

333 >>> heatmap = ngram_heatmap(b'ABAB', n=2) 

334 >>> heatmap.shape 

335 (256, 256) 

336 """ 

337 if n != 2: 

338 raise ValueError(f"Heatmap only supported for bigrams (n=2), got n={n}") 

339 

340 if isinstance(data, np.ndarray): 

341 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten()) 

342 

343 # Initialize 256x256 matrix 

344 heatmap = np.zeros((256, 256), dtype=np.float64) 

345 

346 # Count bigrams 

347 for i in range(len(data) - 1): 

348 byte1 = data[i] 

349 byte2 = data[i + 1] 

350 heatmap[byte1, byte2] += 1 

351 

352 # Normalize to [0, 1] 

353 max_val = heatmap.max() 

354 if max_val > 0: 

355 heatmap = heatmap / max_val 

356 

357 return heatmap 

358 

359 

360class NGramAnalyzer: 

361 """Object-oriented wrapper for n-gram analysis. 

362 

363 Provides a class-based interface for n-gram analysis operations, 

364 wrapping the functional API for consistency with test expectations. 

365 

366 

367 

368 Example: 

369 >>> analyzer = NGramAnalyzer(n=2) 

370 >>> frequencies = analyzer.analyze(b'ABCABC') 

371 >>> frequencies[b'AB'] # Direct dict access 

372 2 

373 """ 

374 

375 def __init__(self, n: int = 2, overlap: bool = True): 

376 """Initialize n-gram analyzer. 

377 

378 Args: 

379 n: N-gram size in bytes. 

380 overlap: Whether to use overlapping n-grams. 

381 """ 

382 self.n = n 

383 self.overlap = overlap 

384 self._last_profile: NgramProfile | None = None 

385 

386 def analyze(self, data: DataType) -> dict[bytes, int]: 

387 """Analyze n-gram frequencies in data. 

388 

389 Returns a dictionary mapping n-grams to counts for direct access. 

390 Returns empty dict if data is shorter than n. 

391 

392 Args: 

393 data: Input data as bytes, bytearray, or numpy array. 

394 

395 Returns: 

396 Dictionary mapping n-grams (bytes) to their counts (int). 

397 

398 Example: 

399 >>> analyzer = NGramAnalyzer(n=2) 

400 >>> frequencies = analyzer.analyze(b'ABCABC') 

401 >>> frequencies[b'AB'] 

402 2 

403 """ 

404 self._last_profile = ngram_frequency(data, n=self.n, overlap=self.overlap) 

405 return self._last_profile.frequencies 

406 

407 def analyze_profile(self, data: DataType) -> NgramProfile: 

408 """Analyze n-gram frequencies and return full profile. 

409 

410 Args: 

411 data: Input data as bytes, bytearray, or numpy array. 

412 

413 Returns: 

414 NgramProfile with full frequency statistics. 

415 

416 Example: 

417 >>> analyzer = NGramAnalyzer(n=2) 

418 >>> profile = analyzer.analyze_profile(b'ABCDEF') 

419 >>> profile.n == 2 

420 True 

421 """ 

422 self._last_profile = ngram_frequency(data, n=self.n, overlap=self.overlap) 

423 return self._last_profile 

424 

425 def get_distribution(self, frequencies: dict[bytes, int]) -> dict[bytes, float]: 

426 """Convert frequency counts to normalized distribution. 

427 

428 Args: 

429 frequencies: Dictionary mapping n-grams to counts. 

430 

431 Returns: 

432 Dictionary mapping n-grams to normalized frequencies (0-1). 

433 

434 Example: 

435 >>> analyzer = NGramAnalyzer(n=2) 

436 >>> freqs = analyzer.analyze(b'ABAB') 

437 >>> dist = analyzer.get_distribution(freqs) 

438 >>> sum(dist.values()) # Should sum to 1.0 

439 1.0 

440 """ 

441 total = sum(frequencies.values()) 

442 if total == 0: 

443 return {} 

444 return {ngram: count / total for ngram, count in frequencies.items()} 

445 

446 def entropy(self, data: DataType) -> float: 

447 """Calculate n-gram entropy. 

448 

449 Args: 

450 data: Input data. 

451 

452 Returns: 

453 Entropy in bits. 

454 """ 

455 return ngram_entropy(data, n=self.n) 

456 

457 def compare(self, data_a: DataType, data_b: DataType) -> NgramComparison: 

458 """Compare n-gram profiles of two datasets. 

459 

460 Args: 

461 data_a: First dataset. 

462 data_b: Second dataset. 

463 

464 Returns: 

465 NgramComparison with similarity metrics. 

466 """ 

467 return compare_ngram_profiles(data_a, data_b, n=self.n) 

468 

469 def find_unusual( 

470 self, data: DataType, baseline: NgramProfile | None = None, z_threshold: float = 3.0 

471 ) -> list[tuple[bytes, float]]: 

472 """Find unusual n-grams in data. 

473 

474 Args: 

475 data: Input data. 

476 baseline: Baseline profile for comparison. 

477 z_threshold: Z-score threshold. 

478 

479 Returns: 

480 List of (ngram, z_score) tuples. 

481 """ 

482 return find_unusual_ngrams(data, baseline=baseline, n=self.n, z_threshold=z_threshold) 

483 

484 def heatmap(self, data: DataType) -> "NDArray[np.float64]": 

485 """Generate bigram heatmap. 

486 

487 Args: 

488 data: Input data. 

489 

490 Returns: 

491 256x256 heatmap array. 

492 """ 

493 return ngram_heatmap(data, n=self.n) 

494 

495 

496__all__ = [ 

497 "DataType", 

498 "NGramAnalyzer", 

499 "NgramComparison", 

500 "NgramProfile", 

501 "compare_ngram_profiles", 

502 "find_unusual_ngrams", 

503 "ngram_entropy", 

504 "ngram_frequency", 

505 "ngram_heatmap", 

506]