Coverage for src / tracekit / analyzers / statistical / entropy.py: 88%

360 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Shannon entropy analysis for data classification and boundary detection. 

2 

3 - RE-ENT-002: Byte Frequency Distribution 

4 

5This module provides tools for computing Shannon entropy at both byte and bit 

6levels, analyzing entropy profiles over sliding windows, detecting entropy 

7transitions for field boundary identification, and classifying data types 

8based on entropy characteristics. 

9""" 

10 

11from collections import Counter 

12from dataclasses import dataclass, field 

13from typing import TYPE_CHECKING, Literal, Union 

14 

15import numpy as np 

16 

17if TYPE_CHECKING: 

18 from numpy.typing import NDArray 

19 

20# Type alias for input data 

21DataType = Union[bytes, bytearray, "NDArray[np.uint8]"] 

22 

23 

24@dataclass 

25class EntropyResult: 

26 """Entropy analysis result. 

27 

28 Attributes: 

29 entropy: Shannon entropy value (0-8 bits for byte-level) 

30 classification: Data type classification based on entropy 

31 confidence: Confidence score for classification (0-1) 

32 """ 

33 

34 entropy: float 

35 classification: Literal["structured", "text", "compressed", "random", "constant"] 

36 confidence: float 

37 

38 

39@dataclass 

40class EntropyTransition: 

41 """Detected entropy transition (potential field boundary). 

42 

43 Attributes: 

44 offset: Byte offset where transition occurs 

45 entropy_before: Entropy value before transition 

46 entropy_after: Entropy value after transition 

47 delta: Change in entropy (entropy_after - entropy_before) 

48 transition_type: Direction of entropy change 

49 """ 

50 

51 offset: int 

52 entropy_before: float 

53 entropy_after: float 

54 delta: float 

55 transition_type: str # 'low_to_high', 'high_to_low' 

56 

57 @property 

58 def entropy_change(self) -> float: 

59 """Alias for delta - provides compatibility with test expectations.""" 

60 return abs(self.delta) 

61 

62 

63@dataclass 

64class ByteFrequencyResult: 

65 """Result of byte frequency distribution analysis. 

66 

67 Implements RE-ENT-002: Byte Frequency Distribution. 

68 

69 Attributes: 

70 counts: Byte value counts (256-element array). 

71 frequencies: Normalized frequencies (256-element array). 

72 entropy: Shannon entropy of distribution. 

73 unique_bytes: Number of unique byte values. 

74 most_common: List of (byte_value, count) for most common bytes. 

75 least_common: List of (byte_value, count) for least common bytes. 

76 uniformity_score: How uniform the distribution is (0-1). 

77 zero_byte_ratio: Proportion of zero bytes. 

78 printable_ratio: Proportion of printable ASCII. 

79 """ 

80 

81 counts: "NDArray[np.int64]" 

82 frequencies: "NDArray[np.float64]" 

83 entropy: float 

84 unique_bytes: int 

85 most_common: list[tuple[int, int]] 

86 least_common: list[tuple[int, int]] 

87 uniformity_score: float 

88 zero_byte_ratio: float 

89 printable_ratio: float 

90 

91 

92@dataclass 

93class FrequencyAnomalyResult: 

94 """Result of frequency anomaly detection. 

95 

96 Implements RE-ENT-002: Byte Frequency Distribution. 

97 

98 Attributes: 

99 anomalous_bytes: Byte values with unusual frequencies. 

100 z_scores: Z-score for each byte value. 

101 is_anomalous: Boolean mask for anomalous bytes. 

102 expected_frequency: Expected frequency for uniform distribution. 

103 """ 

104 

105 anomalous_bytes: list[int] 

106 z_scores: "NDArray[np.float64]" 

107 is_anomalous: "NDArray[np.bool_]" 

108 expected_frequency: float 

109 

110 

111@dataclass 

112class CompressionIndicator: 

113 """Indicators suggesting compression or encryption. 

114 

115 Implements RE-ENT-002: Byte Frequency Distribution. 

116 

117 Attributes: 

118 is_compressed: Likely compressed data. 

119 is_encrypted: Likely encrypted data. 

120 compression_ratio_estimate: Estimated compression ratio. 

121 confidence: Confidence in classification (0-1). 

122 indicators: List of detected indicators. 

123 """ 

124 

125 is_compressed: bool 

126 is_encrypted: bool 

127 compression_ratio_estimate: float 

128 confidence: float 

129 indicators: list[str] = field(default_factory=list) 

130 

131 

132def shannon_entropy(data: DataType) -> float: 

133 """Calculate Shannon entropy in bits (0-8 for bytes). 

134 

135 : Shannon Entropy Analysis 

136 

137 Shannon entropy measures the average information content per byte. 

138 For byte data, maximum entropy is 8 bits (uniform distribution). 

139 

140 Args: 

141 data: Input data as bytes, bytearray, or numpy array 

142 

143 Returns: 

144 Entropy value in bits (0.0 to 8.0) 

145 

146 Raises: 

147 ValueError: If data is empty 

148 

149 Example: 

150 >>> shannon_entropy(b'\\x00' * 100) # All zeros 

151 0.0 

152 >>> shannon_entropy(bytes(range(256))) # Uniform 

153 8.0 

154 """ 

155 if isinstance(data, np.ndarray): 

156 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten()) 

157 

158 if not data: 

159 raise ValueError("Cannot calculate entropy of empty data") 

160 

161 # Count byte frequencies 

162 counts = Counter(data) 

163 length = len(data) 

164 

165 # Calculate Shannon entropy 

166 entropy = 0.0 

167 for count in counts.values(): 

168 if count > 0: 168 ↛ 167line 168 didn't jump to line 167 because the condition on line 168 was always true

169 prob = count / length 

170 entropy -= prob * np.log2(prob) 

171 

172 return float(entropy) 

173 

174 

175def bit_entropy(data: DataType) -> float: 

176 """Calculate bit-level entropy (0-1). 

177 

178 : Shannon Entropy Analysis 

179 

180 Computes entropy of the bit distribution (0s vs 1s) across all bytes. 

181 

182 Args: 

183 data: Input data as bytes, bytearray, or numpy array 

184 

185 Returns: 

186 Bit-level entropy (0.0 to 1.0) 

187 

188 Raises: 

189 ValueError: If data is empty 

190 

191 Example: 

192 >>> bit_entropy(b'\\x00' * 100) # All bits are 0 

193 0.0 

194 >>> bit_entropy(b'\\xAA' * 100) # Equal 0s and 1s 

195 1.0 

196 """ 

197 if isinstance(data, np.ndarray): 

198 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten()) 

199 

200 if not data: 

201 raise ValueError("Cannot calculate entropy of empty data") 

202 

203 # Count total bits 

204 total_bits = len(data) * 8 

205 

206 # Count set bits 

207 ones = sum(bin(byte).count("1") for byte in data) 

208 zeros = total_bits - ones 

209 

210 if ones == 0 or zeros == 0: 

211 return 0.0 

212 

213 # Calculate bit entropy 

214 p_one = ones / total_bits 

215 p_zero = zeros / total_bits 

216 

217 entropy = -(p_one * np.log2(p_one) + p_zero * np.log2(p_zero)) 

218 

219 return float(entropy) 

220 

221 

222def sliding_entropy( 

223 data: DataType, window: int = 256, step: int = 64, window_size: int | None = None 

224) -> "NDArray[np.float64]": 

225 """Calculate sliding window entropy profile. 

226 

227 : Shannon Entropy Analysis 

228 

229 Computes entropy over a sliding window to create an entropy profile 

230 of the data, useful for visualization and boundary detection. 

231 

232 Args: 

233 data: Input data as bytes, bytearray, or numpy array 

234 window: Window size in bytes (default: 256) 

235 step: Step size for window movement (default: 64) 

236 window_size: Alias for window parameter (for compatibility) 

237 

238 Returns: 

239 Array of entropy values at each window position 

240 

241 Raises: 

242 ValueError: If window size is larger than data or step is invalid 

243 """ 

244 # Support window_size alias 

245 if window_size is not None: 

246 window = window_size 

247 

248 if isinstance(data, np.ndarray): 

249 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten()) 

250 

251 if len(data) < window: 

252 raise ValueError(f"Window size ({window}) larger than data ({len(data)})") 

253 

254 if step <= 0: 

255 raise ValueError(f"Step size must be positive, got {step}") 

256 

257 # Calculate number of windows 

258 num_windows = (len(data) - window) // step + 1 

259 entropies = np.zeros(num_windows) 

260 

261 for i in range(num_windows): 

262 start = i * step 

263 end = start + window 

264 window_data = data[start:end] 

265 # Use internal calculation to avoid ValueError for non-empty windows 

266 counts = Counter(window_data) 

267 length = len(window_data) 

268 entropy_val = 0.0 

269 for count in counts.values(): 

270 if count > 0: 270 ↛ 269line 270 didn't jump to line 269 because the condition on line 270 was always true

271 prob = count / length 

272 entropy_val -= prob * np.log2(prob) 

273 entropies[i] = entropy_val 

274 

275 return entropies 

276 

277 

278def detect_entropy_transitions( 

279 data: DataType, 

280 window: int = 256, 

281 threshold: float = 1.0, 

282 min_gap: int = 64, 

283 step: int | None = None, 

284) -> list[EntropyTransition]: 

285 """Detect significant entropy transitions (field boundaries). 

286 

287 : Shannon Entropy Analysis 

288 

289 Identifies locations where entropy changes significantly, which often 

290 correspond to transitions between different data types or field boundaries. 

291 

292 The algorithm uses a dual-approach strategy: 

293 1. For each potential boundary point, compute entropy of regions BEFORE 

294 and AFTER (non-overlapping) to detect sharp transitions. 

295 2. Use sliding window for gradual transition detection. 

296 

297 This approach properly handles sharp boundaries like low->high entropy 

298 transitions without blending across the boundary. 

299 

300 Args: 

301 data: Input data as bytes, bytearray, or numpy array 

302 window: Window size for entropy calculation (default: 256) 

303 threshold: Minimum entropy change to consider a transition (default: 1.0 bits) 

304 min_gap: Minimum gap between transitions to avoid duplicates (default: 64 bytes) 

305 step: Step size for sliding window (optional, defaults to window//4) 

306 

307 Returns: 

308 List of detected entropy transitions, sorted by offset 

309 

310 Example: 

311 >>> data = b'\\x00' * 1000 + b'\\xFF\\xEE\\xDD' * 333 # Low to high entropy 

312 >>> transitions = detect_entropy_transitions(data) 

313 >>> len(transitions) > 0 

314 True 

315 """ 

316 if isinstance(data, np.ndarray): 316 ↛ 317line 316 didn't jump to line 317 because the condition on line 316 was never true

317 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten()) 

318 

319 data_len = len(data) 

320 

321 if data_len < 16: 

322 return [] 

323 

324 # Use boundary scanning approach - this works for both small and large data 

325 # by comparing non-overlapping regions before and after each potential boundary 

326 transitions = _detect_transitions_boundary_scan(bytes(data), window, threshold, min_gap) 

327 

328 # If we found transitions via boundary scan, return them 

329 if transitions: 

330 return transitions 

331 

332 # Fall back to sliding window approach for gradual transitions 

333 if data_len < window: 

334 return [] 

335 

336 if step is None: 336 ↛ 339line 336 didn't jump to line 339 because the condition on line 336 was always true

337 step = max(1, window // 4) 

338 

339 effective_min_gap = min(min_gap, max(step * 2, data_len // 10)) 

340 

341 try: 

342 entropies = sliding_entropy(data, window=window, step=step) 

343 except ValueError: 

344 return [] 

345 

346 if len(entropies) < 2: 346 ↛ 347line 346 didn't jump to line 347 because the condition on line 346 was never true

347 return [] 

348 

349 last_offset = -effective_min_gap - 1 

350 

351 # Find significant entropy changes between adjacent windows 

352 for i in range(1, len(entropies)): 

353 delta = entropies[i] - entropies[i - 1] 

354 

355 if abs(delta) >= threshold: 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true

356 offset = i * step 

357 

358 # Enforce minimum gap between transitions 

359 if offset - last_offset >= effective_min_gap: 

360 transition_type = "low_to_high" if delta > 0 else "high_to_low" 

361 

362 transitions.append( 

363 EntropyTransition( 

364 offset=offset, 

365 entropy_before=float(entropies[i - 1]), 

366 entropy_after=float(entropies[i]), 

367 delta=float(delta), 

368 transition_type=transition_type, 

369 ) 

370 ) 

371 last_offset = offset 

372 

373 return transitions 

374 

375 

376def _detect_transitions_boundary_scan( 

377 data: bytes, 

378 window: int, 

379 threshold: float, 

380 min_gap: int, 

381) -> list[EntropyTransition]: 

382 """Detect entropy transitions using boundary scanning. 

383 

384 For each potential boundary point, compare entropy of the region 

385 BEFORE the boundary to the region AFTER (non-overlapping regions). 

386 This properly detects sharp transitions without blending. 

387 

388 Args: 

389 data: Input data as bytes 

390 window: Window size for region comparison 

391 threshold: Minimum entropy change to consider a transition 

392 min_gap: Minimum gap between transitions 

393 

394 Returns: 

395 List of detected transitions 

396 """ 

397 data_len = len(data) 

398 

399 # Region size for comparison - use window or adaptive size 

400 region_size = min(window, data_len // 3) 

401 if region_size < 8: 

402 region_size = max(8, data_len // 4) 

403 

404 if region_size < 4: 404 ↛ 405line 404 didn't jump to line 405 because the condition on line 404 was never true

405 return [] 

406 

407 transitions = [] 

408 last_offset = -min_gap - 1 

409 

410 # Track best transition found 

411 best_transition = None 

412 best_delta = 0.0 

413 

414 # Scan potential boundary points 

415 # We need at least region_size bytes on each side 

416 scan_start = region_size 

417 scan_end = data_len - region_size 

418 

419 if scan_start >= scan_end: 419 ↛ 421line 419 didn't jump to line 421 because the condition on line 419 was never true

420 # Data too small for this region size, reduce it 

421 region_size = max(4, data_len // 4) 

422 scan_start = region_size 

423 scan_end = data_len - region_size 

424 

425 if scan_start >= scan_end: 425 ↛ 426line 425 didn't jump to line 426 because the condition on line 425 was never true

426 return [] 

427 

428 # Use a step size to avoid scanning every byte 

429 scan_step = max(1, region_size // 4) 

430 

431 for offset in range(scan_start, scan_end + 1, scan_step): 

432 # Compute entropy of region BEFORE this point 

433 region_before = data[offset - region_size : offset] 

434 # Compute entropy of region AFTER this point 

435 region_after = data[offset : offset + region_size] 

436 

437 if len(region_before) < 4 or len(region_after) < 4: 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true

438 continue 

439 

440 try: 

441 entropy_before = shannon_entropy(region_before) 

442 entropy_after = shannon_entropy(region_after) 

443 except ValueError: 

444 continue 

445 

446 delta = entropy_after - entropy_before 

447 

448 # Track the strongest transition that exceeds threshold 

449 if abs(delta) >= threshold: 

450 # Check min_gap constraint 

451 if offset - last_offset >= min_gap: 451 ↛ 431line 451 didn't jump to line 431 because the condition on line 451 was always true

452 if abs(delta) > abs(best_delta): 

453 best_delta = delta 

454 best_transition = EntropyTransition( 

455 offset=offset, 

456 entropy_before=entropy_before, 

457 entropy_after=entropy_after, 

458 delta=delta, 

459 transition_type="low_to_high" if delta > 0 else "high_to_low", 

460 ) 

461 

462 if best_transition is not None: 

463 transitions.append(best_transition) 

464 last_offset = best_transition.offset 

465 

466 # Continue scanning for more transitions after this one 

467 # (for data with multiple transitions) 

468 remaining_transitions = _detect_transitions_boundary_scan( 

469 data[best_transition.offset :], 

470 window, 

471 threshold, 

472 min_gap, 

473 ) 

474 for t in remaining_transitions: 

475 # Adjust offset to be relative to original data 

476 adjusted_t = EntropyTransition( 

477 offset=t.offset + best_transition.offset, 

478 entropy_before=t.entropy_before, 

479 entropy_after=t.entropy_after, 

480 delta=t.delta, 

481 transition_type=t.transition_type, 

482 ) 

483 if adjusted_t.offset - last_offset >= min_gap: 

484 transitions.append(adjusted_t) 

485 last_offset = adjusted_t.offset 

486 

487 return transitions 

488 

489 

490def classify_by_entropy(data: DataType) -> EntropyResult: 

491 """Classify data type by entropy characteristics. 

492 

493 : Shannon Entropy Analysis 

494 

495 Classification criteria: 

496 - constant: entropy < 0.5 (highly repetitive) 

497 - text: entropy 0.5-6.0 AND high printable ratio (>= 0.9) 

498 - random: entropy >= 7.5 (encrypted or random data) 

499 - compressed: entropy 6.0-7.5 (compressed data) 

500 - structured: other (structured binary data) 

501 

502 Args: 

503 data: Input data as bytes, bytearray, or numpy array 

504 

505 Returns: 

506 EntropyResult with classification and confidence 

507 

508 Raises: 

509 ValueError: If data is empty 

510 

511 Example: 

512 >>> result = classify_by_entropy(b'\\x00' * 100) 

513 >>> result.classification 

514 'constant' 

515 """ 

516 if isinstance(data, np.ndarray): 516 ↛ 517line 516 didn't jump to line 517 because the condition on line 516 was never true

517 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten()) 

518 

519 if not data: 

520 raise ValueError("Cannot classify empty data") 

521 

522 # Calculate entropy 

523 entropy_val = shannon_entropy(data) 

524 

525 # Calculate printable ratio for text detection 

526 # Include standard printable ASCII (32-126) plus tab, newline, carriage return 

527 printable_count = sum(1 for b in data if 32 <= b <= 126 or b in (9, 10, 13)) 

528 printable_ratio = printable_count / len(data) 

529 

530 # Classify based on entropy and characteristics 

531 # Order matters: check specific cases first, then fall through to general 

532 

533 # 1. Constant/repetitive data - very low entropy 

534 classification: Literal["structured", "text", "compressed", "random", "constant"] 

535 if entropy_val < 0.5: 

536 classification = "constant" 

537 confidence = 1.0 - (entropy_val / 0.5) * 0.2 # High confidence 

538 

539 # 2. Random/encrypted data - very high entropy (near maximum) 

540 elif entropy_val >= 7.5: 

541 classification = "random" 

542 confidence = min(1.0, (entropy_val - 7.5) / 0.5 + 0.8) 

543 

544 # 3. Compressed data - high entropy but not maximum 

545 elif entropy_val >= 6.0: 

546 classification = "compressed" 

547 confidence = min(1.0, (entropy_val - 6.0) / 1.5 + 0.6) 

548 

549 # 4. Text data - high printable ratio (checked BEFORE structured) 

550 # Text can have entropy from ~2.5 to ~5.5 depending on language/content 

551 # We use a high printable threshold (0.9) to distinguish from structured binary 

552 elif printable_ratio >= 0.9 and entropy_val >= 0.5: 

553 classification = "text" 

554 confidence = min(1.0, printable_ratio) 

555 

556 # 5. Structured binary - everything else 

557 else: 

558 classification = "structured" 

559 confidence = 0.7 # Medium confidence for default case 

560 

561 return EntropyResult( 

562 entropy=float(entropy_val), classification=classification, confidence=float(confidence) 

563 ) 

564 

565 

566def entropy_profile(data: DataType, window: int = 256) -> "NDArray[np.float64]": 

567 """Generate entropy profile for visualization. 

568 

569 : Shannon Entropy Analysis 

570 

571 Creates a smoothed entropy profile suitable for plotting and visual analysis. 

572 Uses overlapping windows with a step size of window/4 for smoother results. 

573 

574 Args: 

575 data: Input data as bytes, bytearray, or numpy array 

576 window: Window size in bytes (default: 256) 

577 

578 Returns: 

579 Array of entropy values across the data 

580 

581 Example: 

582 >>> data = bytes(range(256)) * 10 

583 >>> profile = entropy_profile(data) 

584 >>> len(profile) > 0 

585 True 

586 """ 

587 step = max(1, window // 4) # Overlapping windows for smooth profile 

588 return sliding_entropy(data, window=window, step=step) 

589 

590 

591def entropy_histogram(data: DataType) -> tuple["NDArray[np.intp]", "NDArray[np.float64]"]: 

592 """Generate byte frequency histogram. 

593 

594 : Shannon Entropy Analysis 

595 

596 Creates a histogram of byte values (0-255) showing their frequencies. 

597 Useful for visualizing data distribution and entropy characteristics. 

598 

599 Args: 

600 data: Input data as bytes, bytearray, or numpy array 

601 

602 Returns: 

603 Tuple of (bin_edges, frequencies) where: 

604 - bin_edges: Array of 256 byte values (0-255) 

605 - frequencies: Array of normalized frequencies (0-1) 

606 

607 Example: 

608 >>> bins, freqs = entropy_histogram(b'\\x00' * 50 + b'\\xFF' * 50) 

609 >>> len(bins) 

610 256 

611 >>> sum(freqs) 

612 1.0 

613 """ 

614 if isinstance(data, np.ndarray): 

615 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten()) 

616 

617 if not data: 

618 return np.arange(256), np.zeros(256) 

619 

620 # Count byte frequencies 

621 counts = np.zeros(256, dtype=np.int64) 

622 for byte in data: 

623 counts[byte] += 1 

624 

625 # Normalize to frequencies 

626 frequencies = counts / len(data) 

627 

628 # Bin edges are byte values 

629 bin_edges = np.arange(256) 

630 

631 return bin_edges, frequencies 

632 

633 

634# ============================================================================= 

635# RE-ENT-002: Byte Frequency Distribution 

636# ============================================================================= 

637 

638 

639def byte_frequency_distribution(data: DataType, n_most_common: int = 10) -> ByteFrequencyResult: 

640 """Analyze byte frequency distribution in data. 

641 

642 Implements RE-ENT-002: Byte Frequency Distribution. 

643 

644 Computes detailed byte frequency statistics including counts, frequencies, 

645 most/least common bytes, uniformity score, and characteristic ratios. 

646 

647 Args: 

648 data: Input data as bytes, bytearray, or numpy array. 

649 n_most_common: Number of most/least common bytes to report. 

650 

651 Returns: 

652 ByteFrequencyResult with comprehensive distribution analysis. 

653 

654 Example: 

655 >>> data = b'\\x00\\x00\\x01\\x02\\x03' 

656 >>> result = byte_frequency_distribution(data) 

657 >>> result.unique_bytes 

658 4 

659 >>> result.most_common[0] 

660 (0, 2) 

661 """ 

662 if isinstance(data, np.ndarray): 662 ↛ 663line 662 didn't jump to line 663 because the condition on line 662 was never true

663 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten()) 

664 

665 if not data: 

666 return ByteFrequencyResult( 

667 counts=np.zeros(256, dtype=np.int64), 

668 frequencies=np.zeros(256, dtype=np.float64), 

669 entropy=0.0, 

670 unique_bytes=0, 

671 most_common=[], 

672 least_common=[], 

673 uniformity_score=0.0, 

674 zero_byte_ratio=0.0, 

675 printable_ratio=0.0, 

676 ) 

677 

678 # Count bytes 

679 counts = np.zeros(256, dtype=np.int64) 

680 for byte in data: 

681 counts[byte] += 1 

682 

683 # Normalize frequencies 

684 length = len(data) 

685 frequencies = counts / length 

686 

687 # Calculate entropy (use internal calculation to avoid ValueError) 

688 byte_counts = Counter(data) 

689 entropy_val = 0.0 

690 for count in byte_counts.values(): 

691 if count > 0: 691 ↛ 690line 691 didn't jump to line 690 because the condition on line 691 was always true

692 prob = count / length 

693 entropy_val -= prob * np.log2(prob) 

694 

695 # Count unique bytes 

696 unique_bytes = np.count_nonzero(counts) 

697 

698 # Find most and least common bytes 

699 nonzero_indices = np.where(counts > 0)[0] 

700 sorted_indices = nonzero_indices[np.argsort(-counts[nonzero_indices])] 

701 

702 most_common = [(int(i), int(counts[i])) for i in sorted_indices[:n_most_common]] 

703 least_common = [(int(i), int(counts[i])) for i in sorted_indices[-n_most_common:][::-1]] 

704 

705 # Calculate uniformity score (1 = perfectly uniform, 0 = single byte) 

706 expected_freq = 1.0 / 256 

707 if unique_bytes > 0: 707 ↛ 714line 707 didn't jump to line 714 because the condition on line 707 was always true

708 # Chi-squared like uniformity measure 

709 observed_freqs = frequencies[frequencies > 0] 

710 deviation = np.sum((observed_freqs - expected_freq) ** 2) 

711 max_deviation = (1.0 - expected_freq) ** 2 + 255 * expected_freq**2 

712 uniformity_score = 1.0 - min(1.0, deviation / max_deviation) 

713 else: 

714 uniformity_score = 0.0 

715 

716 # Calculate characteristic ratios 

717 zero_byte_ratio = counts[0] / length if length > 0 else 0.0 

718 

719 # Printable ASCII range 

720 printable_count = sum(counts[i] for i in range(32, 127)) 

721 printable_count += counts[9] + counts[10] + counts[13] # Tab, LF, CR 

722 printable_ratio = printable_count / length if length > 0 else 0.0 

723 

724 return ByteFrequencyResult( 

725 counts=counts, 

726 frequencies=frequencies, 

727 entropy=entropy_val, 

728 unique_bytes=unique_bytes, 

729 most_common=most_common, 

730 least_common=least_common, 

731 uniformity_score=uniformity_score, 

732 zero_byte_ratio=zero_byte_ratio, 

733 printable_ratio=printable_ratio, 

734 ) 

735 

736 

737def detect_frequency_anomalies(data: DataType, z_threshold: float = 3.0) -> FrequencyAnomalyResult: 

738 """Detect bytes with anomalous frequencies. 

739 

740 Implements RE-ENT-002: Byte Frequency Distribution. 

741 

742 Identifies byte values that occur with unusual frequency compared to 

743 expected distribution using z-score analysis. 

744 

745 Args: 

746 data: Input data as bytes, bytearray, or numpy array. 

747 z_threshold: Z-score threshold for anomaly detection. 

748 

749 Returns: 

750 FrequencyAnomalyResult with anomalous bytes. 

751 

752 Example: 

753 >>> data = b'A' * 100 + bytes(range(256)) 

754 >>> result = detect_frequency_anomalies(data) 

755 >>> 65 in result.anomalous_bytes # 'A' is anomalous 

756 True 

757 """ 

758 if isinstance(data, np.ndarray): 758 ↛ 759line 758 didn't jump to line 759 because the condition on line 758 was never true

759 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten()) 

760 

761 length = len(data) if data else 0 

762 

763 if length == 0: 

764 return FrequencyAnomalyResult( 

765 anomalous_bytes=[], 

766 z_scores=np.zeros(256), 

767 is_anomalous=np.zeros(256, dtype=bool), 

768 expected_frequency=0.0, 

769 ) 

770 

771 # Count bytes 

772 counts = np.zeros(256, dtype=np.int64) 

773 for byte in data: 

774 counts[byte] += 1 

775 

776 # Expected frequency under uniform distribution 

777 expected_count = length / 256 

778 expected_freq = 1.0 / 256 

779 

780 # Calculate z-scores 

781 # Using binomial approximation: std = sqrt(n * p * (1-p)) 

782 std = np.sqrt(length * expected_freq * (1 - expected_freq)) 

783 if std == 0: 783 ↛ 784line 783 didn't jump to line 784 because the condition on line 783 was never true

784 std = 1.0 # Avoid division by zero 

785 

786 z_scores = (counts - expected_count) / std 

787 

788 # Identify anomalies 

789 is_anomalous = np.abs(z_scores) > z_threshold 

790 anomalous_bytes = list(np.where(is_anomalous)[0]) 

791 

792 return FrequencyAnomalyResult( 

793 anomalous_bytes=[int(b) for b in anomalous_bytes], 

794 z_scores=z_scores, 

795 is_anomalous=is_anomalous, 

796 expected_frequency=expected_freq, 

797 ) 

798 

799 

800def compare_byte_distributions( 

801 data_a: DataType, data_b: DataType 

802) -> tuple[float, float, "NDArray[np.float64]"]: 

803 """Compare byte frequency distributions between two data samples. 

804 

805 Implements RE-ENT-002: Byte Frequency Distribution. 

806 

807 Computes chi-squared distance, Kullback-Leibler divergence, and 

808 per-byte frequency differences. 

809 

810 Args: 

811 data_a: First data sample. 

812 data_b: Second data sample. 

813 

814 Returns: 

815 Tuple of (chi_squared_distance, kl_divergence, frequency_diffs). 

816 

817 Example: 

818 >>> data_a = bytes(range(256)) * 10 

819 >>> data_b = bytes(range(256)) * 10 

820 >>> chi_sq, kl_div, diffs = compare_byte_distributions(data_a, data_b) 

821 >>> chi_sq < 0.01 # Very similar 

822 True 

823 """ 

824 # Get frequency distributions 

825 result_a = byte_frequency_distribution(data_a) 

826 result_b = byte_frequency_distribution(data_b) 

827 

828 freq_a = result_a.frequencies 

829 freq_b = result_b.frequencies 

830 

831 # Compute chi-squared distance 

832 # Add small epsilon to avoid division by zero 

833 eps = 1e-10 

834 chi_squared = np.sum((freq_a - freq_b) ** 2 / (freq_a + freq_b + eps)) 

835 

836 # Compute KL divergence (symmetrized) 

837 freq_a_safe = np.clip(freq_a, eps, 1.0) 

838 freq_b_safe = np.clip(freq_b, eps, 1.0) 

839 

840 kl_ab = np.sum(freq_a_safe * np.log(freq_a_safe / freq_b_safe)) 

841 kl_ba = np.sum(freq_b_safe * np.log(freq_b_safe / freq_a_safe)) 

842 kl_divergence = (kl_ab + kl_ba) / 2 

843 

844 # Per-byte frequency differences 

845 frequency_diffs = freq_a - freq_b 

846 

847 return float(chi_squared), float(kl_divergence), frequency_diffs 

848 

849 

850def sliding_byte_frequency( 

851 data: DataType, window: int = 256, step: int = 64, byte_value: int | None = None 

852) -> "NDArray[np.float64]": 

853 """Compute sliding window byte frequency profile. 

854 

855 Implements RE-ENT-002: Byte Frequency Distribution. 

856 

857 Tracks how byte frequency varies across the data, useful for 

858 detecting regions with different characteristics. 

859 

860 Args: 

861 data: Input data. 

862 window: Window size in bytes. 

863 step: Step size for sliding window. 

864 byte_value: Specific byte to track (None for all). 

865 

866 Returns: 

867 Array of frequencies at each window position. 

868 If byte_value is None, returns array of shape (n_windows, 256). 

869 

870 Example: 

871 >>> data = b'\\x00' * 1000 + b'\\xFF' * 1000 

872 >>> profile = sliding_byte_frequency(data, byte_value=0) 

873 >>> profile[0] > profile[-1] # More zeros at start 

874 True 

875 """ 

876 if isinstance(data, np.ndarray): 876 ↛ 877line 876 didn't jump to line 877 because the condition on line 876 was never true

877 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten()) 

878 

879 if len(data) < window: 

880 if byte_value is not None: 880 ↛ 882line 880 didn't jump to line 882 because the condition on line 880 was always true

881 return np.array([]) 

882 return np.zeros((0, 256)) 

883 

884 num_windows = (len(data) - window) // step + 1 

885 

886 if byte_value is not None: 

887 # Track single byte value 

888 profile = np.zeros(num_windows) 

889 for i in range(num_windows): 

890 start = i * step 

891 window_data = data[start : start + window] 

892 profile[i] = window_data.count(byte_value) / window 

893 return profile 

894 else: 

895 # Track all byte values 

896 profile = np.zeros((num_windows, 256)) 

897 for i in range(num_windows): 

898 start = i * step 

899 window_data = data[start : start + window] 

900 for byte in window_data: 

901 profile[i, byte] += 1 

902 profile[i] /= window 

903 return profile 

904 

905 

906def detect_compression_indicators(data: DataType) -> CompressionIndicator: 

907 """Detect indicators of compression or encryption. 

908 

909 Implements RE-ENT-002: Byte Frequency Distribution. 

910 

911 Analyzes byte frequency distribution to identify characteristics 

912 typical of compressed or encrypted data. 

913 

914 Args: 

915 data: Input data to analyze. 

916 

917 Returns: 

918 CompressionIndicator with detection results. 

919 

920 Example: 

921 >>> import os 

922 >>> random_data = os.urandom(1000) 

923 >>> result = detect_compression_indicators(random_data) 

924 >>> result.is_encrypted 

925 True 

926 """ 

927 freq_result = byte_frequency_distribution(data) 

928 _entropy_result = classify_by_entropy(data) 

929 

930 indicators = [] 

931 is_compressed = False 

932 is_encrypted = False 

933 confidence = 0.0 

934 compression_ratio_estimate = 1.0 

935 

936 entropy = freq_result.entropy 

937 

938 # High entropy (> 7.5) suggests encryption 

939 if entropy >= 7.5: 

940 is_encrypted = True 

941 confidence = min(1.0, (entropy - 7.5) / 0.5 + 0.7) 

942 indicators.append(f"Very high entropy: {entropy:.2f} bits") 

943 

944 # Moderately high entropy (6.0-7.5) suggests compression 

945 elif entropy >= 6.0: 945 ↛ 946line 945 didn't jump to line 946 because the condition on line 945 was never true

946 is_compressed = True 

947 confidence = min(1.0, (entropy - 6.0) / 1.5 + 0.5) 

948 compression_ratio_estimate = 1.0 - (entropy - 6.0) / 2.0 

949 indicators.append(f"High entropy: {entropy:.2f} bits") 

950 

951 # Check uniformity 

952 if freq_result.uniformity_score > 0.8: 952 ↛ 959line 952 didn't jump to line 959 because the condition on line 952 was always true

953 if not is_encrypted: 

954 is_encrypted = True 

955 confidence = max(confidence, 0.6) 

956 indicators.append(f"Uniform byte distribution: {freq_result.uniformity_score:.2f}") 

957 

958 # Check for few unique bytes (suggests compression) 

959 if freq_result.unique_bytes < 128 and entropy > 5.0: 959 ↛ 960line 959 didn't jump to line 960 because the condition on line 959 was never true

960 if not is_compressed: 

961 is_compressed = True 

962 confidence = max(confidence, 0.5) 

963 indicators.append(f"Limited byte vocabulary: {freq_result.unique_bytes}") 

964 

965 # Low printable ratio suggests binary/compressed 

966 if freq_result.printable_ratio < 0.1 and entropy > 5.0: 966 ↛ 967line 966 didn't jump to line 967 because the condition on line 966 was never true

967 indicators.append(f"Low printable ratio: {freq_result.printable_ratio:.2%}") 

968 

969 return CompressionIndicator( 

970 is_compressed=is_compressed, 

971 is_encrypted=is_encrypted, 

972 compression_ratio_estimate=compression_ratio_estimate, 

973 confidence=confidence, 

974 indicators=indicators, 

975 ) 

976 

977 

978class EntropyAnalyzer: 

979 """Object-oriented wrapper for entropy analysis functionality. 

980 

981 Provides a class-based interface for entropy operations, 

982 wrapping the functional API for consistency with test expectations. 

983 

984 

985 

986 Example: 

987 >>> analyzer = EntropyAnalyzer() 

988 >>> entropy = analyzer.calculate_entropy(data) 

989 """ 

990 

991 def __init__( 

992 self, 

993 entropy_type: Literal["byte", "bit"] = "byte", 

994 window_size: int = 256, 

995 ): 

996 """Initialize entropy analyzer. 

997 

998 Args: 

999 entropy_type: Type of entropy calculation ('byte' or 'bit'). 

1000 window_size: Default window size for sliding operations. 

1001 """ 

1002 self.entropy_type = entropy_type 

1003 self.window_size = window_size 

1004 

1005 def calculate_entropy(self, data: DataType) -> float: 

1006 """Calculate Shannon entropy of data. 

1007 

1008 Args: 

1009 data: Input data to analyze. 

1010 

1011 Returns: 

1012 Shannon entropy value. 

1013 

1014 Example: 

1015 >>> analyzer = EntropyAnalyzer() 

1016 >>> entropy = analyzer.calculate_entropy(b"Hello World") 

1017 """ 

1018 if self.entropy_type == "byte": 

1019 return shannon_entropy(data) 

1020 else: 

1021 return bit_entropy(data) 

1022 

1023 def analyze(self, data: DataType) -> EntropyResult: 

1024 """Analyze data and classify by entropy. 

1025 

1026 Args: 

1027 data: Input data to analyze. 

1028 

1029 Returns: 

1030 EntropyResult with classification. 

1031 """ 

1032 return classify_by_entropy(data) 

1033 

1034 def detect_transitions( 

1035 self, 

1036 data: DataType, 

1037 threshold: float = 0.5, 

1038 window: int | None = None, 

1039 step: int | None = None, 

1040 ) -> list[EntropyTransition]: 

1041 """Detect entropy transitions in data. 

1042 

1043 Args: 

1044 data: Input data to analyze. 

1045 threshold: Minimum entropy change to detect. 

1046 window: Window size for sliding entropy (defaults to self.window_size). 

1047 step: Step size between windows. 

1048 

1049 Returns: 

1050 List of detected transitions. 

1051 """ 

1052 if window is None: 1052 ↛ 1054line 1052 didn't jump to line 1054 because the condition on line 1052 was always true

1053 window = self.window_size 

1054 return detect_entropy_transitions(data, window=window, threshold=threshold, step=step) 

1055 

1056 def analyze_blocks(self, data: DataType, block_size: int = 256) -> list[float]: 

1057 """Analyze entropy of fixed-size blocks. 

1058 

1059 Args: 

1060 data: Input data to analyze. 

1061 block_size: Size of each block in bytes. 

1062 

1063 Returns: 

1064 List of entropy values for each block. 

1065 

1066 Example: 

1067 >>> analyzer = EntropyAnalyzer() 

1068 >>> entropies = analyzer.analyze_blocks(data, block_size=256) 

1069 """ 

1070 if isinstance(data, np.ndarray): 1070 ↛ 1071line 1070 didn't jump to line 1071 because the condition on line 1070 was never true

1071 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten()) 

1072 

1073 if not data: 

1074 return [] 

1075 

1076 entropies = [] 

1077 for i in range(0, len(data), block_size): 

1078 block = data[i : i + block_size] 

1079 if len(block) >= block_size // 2: # Only analyze blocks at least half size 

1080 # Use internal calculation to avoid ValueError 

1081 counts = Counter(block) 

1082 length = len(block) 

1083 entropy_val = 0.0 

1084 for count in counts.values(): 

1085 if count > 0: 1085 ↛ 1084line 1085 didn't jump to line 1084 because the condition on line 1085 was always true

1086 prob = count / length 

1087 entropy_val -= prob * np.log2(prob) 

1088 entropies.append(entropy_val) 

1089 

1090 return entropies 

1091 

1092 

1093__all__ = [ 

1094 # RE-ENT-002: Byte Frequency Distribution 

1095 "ByteFrequencyResult", 

1096 "CompressionIndicator", 

1097 "EntropyAnalyzer", 

1098 "EntropyResult", 

1099 "EntropyTransition", 

1100 "FrequencyAnomalyResult", 

1101 "bit_entropy", 

1102 "byte_frequency_distribution", 

1103 "classify_by_entropy", 

1104 "compare_byte_distributions", 

1105 "detect_compression_indicators", 

1106 "detect_entropy_transitions", 

1107 "detect_frequency_anomalies", 

1108 "entropy_histogram", 

1109 "entropy_profile", 

1110 "shannon_entropy", 

1111 "sliding_byte_frequency", 

1112 "sliding_entropy", 

1113]