Coverage for src / tracekit / analyzers / statistical / entropy.py: 88%
360 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Shannon entropy analysis for data classification and boundary detection.
3 - RE-ENT-002: Byte Frequency Distribution
5This module provides tools for computing Shannon entropy at both byte and bit
6levels, analyzing entropy profiles over sliding windows, detecting entropy
7transitions for field boundary identification, and classifying data types
8based on entropy characteristics.
9"""
11from collections import Counter
12from dataclasses import dataclass, field
13from typing import TYPE_CHECKING, Literal, Union
15import numpy as np
17if TYPE_CHECKING:
18 from numpy.typing import NDArray
20# Type alias for input data
21DataType = Union[bytes, bytearray, "NDArray[np.uint8]"]
24@dataclass
25class EntropyResult:
26 """Entropy analysis result.
28 Attributes:
29 entropy: Shannon entropy value (0-8 bits for byte-level)
30 classification: Data type classification based on entropy
31 confidence: Confidence score for classification (0-1)
32 """
34 entropy: float
35 classification: Literal["structured", "text", "compressed", "random", "constant"]
36 confidence: float
39@dataclass
40class EntropyTransition:
41 """Detected entropy transition (potential field boundary).
43 Attributes:
44 offset: Byte offset where transition occurs
45 entropy_before: Entropy value before transition
46 entropy_after: Entropy value after transition
47 delta: Change in entropy (entropy_after - entropy_before)
48 transition_type: Direction of entropy change
49 """
51 offset: int
52 entropy_before: float
53 entropy_after: float
54 delta: float
55 transition_type: str # 'low_to_high', 'high_to_low'
57 @property
58 def entropy_change(self) -> float:
59 """Alias for delta - provides compatibility with test expectations."""
60 return abs(self.delta)
63@dataclass
64class ByteFrequencyResult:
65 """Result of byte frequency distribution analysis.
67 Implements RE-ENT-002: Byte Frequency Distribution.
69 Attributes:
70 counts: Byte value counts (256-element array).
71 frequencies: Normalized frequencies (256-element array).
72 entropy: Shannon entropy of distribution.
73 unique_bytes: Number of unique byte values.
74 most_common: List of (byte_value, count) for most common bytes.
75 least_common: List of (byte_value, count) for least common bytes.
76 uniformity_score: How uniform the distribution is (0-1).
77 zero_byte_ratio: Proportion of zero bytes.
78 printable_ratio: Proportion of printable ASCII.
79 """
81 counts: "NDArray[np.int64]"
82 frequencies: "NDArray[np.float64]"
83 entropy: float
84 unique_bytes: int
85 most_common: list[tuple[int, int]]
86 least_common: list[tuple[int, int]]
87 uniformity_score: float
88 zero_byte_ratio: float
89 printable_ratio: float
92@dataclass
93class FrequencyAnomalyResult:
94 """Result of frequency anomaly detection.
96 Implements RE-ENT-002: Byte Frequency Distribution.
98 Attributes:
99 anomalous_bytes: Byte values with unusual frequencies.
100 z_scores: Z-score for each byte value.
101 is_anomalous: Boolean mask for anomalous bytes.
102 expected_frequency: Expected frequency for uniform distribution.
103 """
105 anomalous_bytes: list[int]
106 z_scores: "NDArray[np.float64]"
107 is_anomalous: "NDArray[np.bool_]"
108 expected_frequency: float
111@dataclass
112class CompressionIndicator:
113 """Indicators suggesting compression or encryption.
115 Implements RE-ENT-002: Byte Frequency Distribution.
117 Attributes:
118 is_compressed: Likely compressed data.
119 is_encrypted: Likely encrypted data.
120 compression_ratio_estimate: Estimated compression ratio.
121 confidence: Confidence in classification (0-1).
122 indicators: List of detected indicators.
123 """
125 is_compressed: bool
126 is_encrypted: bool
127 compression_ratio_estimate: float
128 confidence: float
129 indicators: list[str] = field(default_factory=list)
132def shannon_entropy(data: DataType) -> float:
133 """Calculate Shannon entropy in bits (0-8 for bytes).
135 : Shannon Entropy Analysis
137 Shannon entropy measures the average information content per byte.
138 For byte data, maximum entropy is 8 bits (uniform distribution).
140 Args:
141 data: Input data as bytes, bytearray, or numpy array
143 Returns:
144 Entropy value in bits (0.0 to 8.0)
146 Raises:
147 ValueError: If data is empty
149 Example:
150 >>> shannon_entropy(b'\\x00' * 100) # All zeros
151 0.0
152 >>> shannon_entropy(bytes(range(256))) # Uniform
153 8.0
154 """
155 if isinstance(data, np.ndarray):
156 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten())
158 if not data:
159 raise ValueError("Cannot calculate entropy of empty data")
161 # Count byte frequencies
162 counts = Counter(data)
163 length = len(data)
165 # Calculate Shannon entropy
166 entropy = 0.0
167 for count in counts.values():
168 if count > 0: 168 ↛ 167line 168 didn't jump to line 167 because the condition on line 168 was always true
169 prob = count / length
170 entropy -= prob * np.log2(prob)
172 return float(entropy)
175def bit_entropy(data: DataType) -> float:
176 """Calculate bit-level entropy (0-1).
178 : Shannon Entropy Analysis
180 Computes entropy of the bit distribution (0s vs 1s) across all bytes.
182 Args:
183 data: Input data as bytes, bytearray, or numpy array
185 Returns:
186 Bit-level entropy (0.0 to 1.0)
188 Raises:
189 ValueError: If data is empty
191 Example:
192 >>> bit_entropy(b'\\x00' * 100) # All bits are 0
193 0.0
194 >>> bit_entropy(b'\\xAA' * 100) # Equal 0s and 1s
195 1.0
196 """
197 if isinstance(data, np.ndarray):
198 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten())
200 if not data:
201 raise ValueError("Cannot calculate entropy of empty data")
203 # Count total bits
204 total_bits = len(data) * 8
206 # Count set bits
207 ones = sum(bin(byte).count("1") for byte in data)
208 zeros = total_bits - ones
210 if ones == 0 or zeros == 0:
211 return 0.0
213 # Calculate bit entropy
214 p_one = ones / total_bits
215 p_zero = zeros / total_bits
217 entropy = -(p_one * np.log2(p_one) + p_zero * np.log2(p_zero))
219 return float(entropy)
222def sliding_entropy(
223 data: DataType, window: int = 256, step: int = 64, window_size: int | None = None
224) -> "NDArray[np.float64]":
225 """Calculate sliding window entropy profile.
227 : Shannon Entropy Analysis
229 Computes entropy over a sliding window to create an entropy profile
230 of the data, useful for visualization and boundary detection.
232 Args:
233 data: Input data as bytes, bytearray, or numpy array
234 window: Window size in bytes (default: 256)
235 step: Step size for window movement (default: 64)
236 window_size: Alias for window parameter (for compatibility)
238 Returns:
239 Array of entropy values at each window position
241 Raises:
242 ValueError: If window size is larger than data or step is invalid
243 """
244 # Support window_size alias
245 if window_size is not None:
246 window = window_size
248 if isinstance(data, np.ndarray):
249 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten())
251 if len(data) < window:
252 raise ValueError(f"Window size ({window}) larger than data ({len(data)})")
254 if step <= 0:
255 raise ValueError(f"Step size must be positive, got {step}")
257 # Calculate number of windows
258 num_windows = (len(data) - window) // step + 1
259 entropies = np.zeros(num_windows)
261 for i in range(num_windows):
262 start = i * step
263 end = start + window
264 window_data = data[start:end]
265 # Use internal calculation to avoid ValueError for non-empty windows
266 counts = Counter(window_data)
267 length = len(window_data)
268 entropy_val = 0.0
269 for count in counts.values():
270 if count > 0: 270 ↛ 269line 270 didn't jump to line 269 because the condition on line 270 was always true
271 prob = count / length
272 entropy_val -= prob * np.log2(prob)
273 entropies[i] = entropy_val
275 return entropies
278def detect_entropy_transitions(
279 data: DataType,
280 window: int = 256,
281 threshold: float = 1.0,
282 min_gap: int = 64,
283 step: int | None = None,
284) -> list[EntropyTransition]:
285 """Detect significant entropy transitions (field boundaries).
287 : Shannon Entropy Analysis
289 Identifies locations where entropy changes significantly, which often
290 correspond to transitions between different data types or field boundaries.
292 The algorithm uses a dual-approach strategy:
293 1. For each potential boundary point, compute entropy of regions BEFORE
294 and AFTER (non-overlapping) to detect sharp transitions.
295 2. Use sliding window for gradual transition detection.
297 This approach properly handles sharp boundaries like low->high entropy
298 transitions without blending across the boundary.
300 Args:
301 data: Input data as bytes, bytearray, or numpy array
302 window: Window size for entropy calculation (default: 256)
303 threshold: Minimum entropy change to consider a transition (default: 1.0 bits)
304 min_gap: Minimum gap between transitions to avoid duplicates (default: 64 bytes)
305 step: Step size for sliding window (optional, defaults to window//4)
307 Returns:
308 List of detected entropy transitions, sorted by offset
310 Example:
311 >>> data = b'\\x00' * 1000 + b'\\xFF\\xEE\\xDD' * 333 # Low to high entropy
312 >>> transitions = detect_entropy_transitions(data)
313 >>> len(transitions) > 0
314 True
315 """
316 if isinstance(data, np.ndarray): 316 ↛ 317line 316 didn't jump to line 317 because the condition on line 316 was never true
317 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten())
319 data_len = len(data)
321 if data_len < 16:
322 return []
324 # Use boundary scanning approach - this works for both small and large data
325 # by comparing non-overlapping regions before and after each potential boundary
326 transitions = _detect_transitions_boundary_scan(bytes(data), window, threshold, min_gap)
328 # If we found transitions via boundary scan, return them
329 if transitions:
330 return transitions
332 # Fall back to sliding window approach for gradual transitions
333 if data_len < window:
334 return []
336 if step is None: 336 ↛ 339line 336 didn't jump to line 339 because the condition on line 336 was always true
337 step = max(1, window // 4)
339 effective_min_gap = min(min_gap, max(step * 2, data_len // 10))
341 try:
342 entropies = sliding_entropy(data, window=window, step=step)
343 except ValueError:
344 return []
346 if len(entropies) < 2: 346 ↛ 347line 346 didn't jump to line 347 because the condition on line 346 was never true
347 return []
349 last_offset = -effective_min_gap - 1
351 # Find significant entropy changes between adjacent windows
352 for i in range(1, len(entropies)):
353 delta = entropies[i] - entropies[i - 1]
355 if abs(delta) >= threshold: 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true
356 offset = i * step
358 # Enforce minimum gap between transitions
359 if offset - last_offset >= effective_min_gap:
360 transition_type = "low_to_high" if delta > 0 else "high_to_low"
362 transitions.append(
363 EntropyTransition(
364 offset=offset,
365 entropy_before=float(entropies[i - 1]),
366 entropy_after=float(entropies[i]),
367 delta=float(delta),
368 transition_type=transition_type,
369 )
370 )
371 last_offset = offset
373 return transitions
376def _detect_transitions_boundary_scan(
377 data: bytes,
378 window: int,
379 threshold: float,
380 min_gap: int,
381) -> list[EntropyTransition]:
382 """Detect entropy transitions using boundary scanning.
384 For each potential boundary point, compare entropy of the region
385 BEFORE the boundary to the region AFTER (non-overlapping regions).
386 This properly detects sharp transitions without blending.
388 Args:
389 data: Input data as bytes
390 window: Window size for region comparison
391 threshold: Minimum entropy change to consider a transition
392 min_gap: Minimum gap between transitions
394 Returns:
395 List of detected transitions
396 """
397 data_len = len(data)
399 # Region size for comparison - use window or adaptive size
400 region_size = min(window, data_len // 3)
401 if region_size < 8:
402 region_size = max(8, data_len // 4)
404 if region_size < 4: 404 ↛ 405line 404 didn't jump to line 405 because the condition on line 404 was never true
405 return []
407 transitions = []
408 last_offset = -min_gap - 1
410 # Track best transition found
411 best_transition = None
412 best_delta = 0.0
414 # Scan potential boundary points
415 # We need at least region_size bytes on each side
416 scan_start = region_size
417 scan_end = data_len - region_size
419 if scan_start >= scan_end: 419 ↛ 421line 419 didn't jump to line 421 because the condition on line 419 was never true
420 # Data too small for this region size, reduce it
421 region_size = max(4, data_len // 4)
422 scan_start = region_size
423 scan_end = data_len - region_size
425 if scan_start >= scan_end: 425 ↛ 426line 425 didn't jump to line 426 because the condition on line 425 was never true
426 return []
428 # Use a step size to avoid scanning every byte
429 scan_step = max(1, region_size // 4)
431 for offset in range(scan_start, scan_end + 1, scan_step):
432 # Compute entropy of region BEFORE this point
433 region_before = data[offset - region_size : offset]
434 # Compute entropy of region AFTER this point
435 region_after = data[offset : offset + region_size]
437 if len(region_before) < 4 or len(region_after) < 4: 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true
438 continue
440 try:
441 entropy_before = shannon_entropy(region_before)
442 entropy_after = shannon_entropy(region_after)
443 except ValueError:
444 continue
446 delta = entropy_after - entropy_before
448 # Track the strongest transition that exceeds threshold
449 if abs(delta) >= threshold:
450 # Check min_gap constraint
451 if offset - last_offset >= min_gap: 451 ↛ 431line 451 didn't jump to line 431 because the condition on line 451 was always true
452 if abs(delta) > abs(best_delta):
453 best_delta = delta
454 best_transition = EntropyTransition(
455 offset=offset,
456 entropy_before=entropy_before,
457 entropy_after=entropy_after,
458 delta=delta,
459 transition_type="low_to_high" if delta > 0 else "high_to_low",
460 )
462 if best_transition is not None:
463 transitions.append(best_transition)
464 last_offset = best_transition.offset
466 # Continue scanning for more transitions after this one
467 # (for data with multiple transitions)
468 remaining_transitions = _detect_transitions_boundary_scan(
469 data[best_transition.offset :],
470 window,
471 threshold,
472 min_gap,
473 )
474 for t in remaining_transitions:
475 # Adjust offset to be relative to original data
476 adjusted_t = EntropyTransition(
477 offset=t.offset + best_transition.offset,
478 entropy_before=t.entropy_before,
479 entropy_after=t.entropy_after,
480 delta=t.delta,
481 transition_type=t.transition_type,
482 )
483 if adjusted_t.offset - last_offset >= min_gap:
484 transitions.append(adjusted_t)
485 last_offset = adjusted_t.offset
487 return transitions
490def classify_by_entropy(data: DataType) -> EntropyResult:
491 """Classify data type by entropy characteristics.
493 : Shannon Entropy Analysis
495 Classification criteria:
496 - constant: entropy < 0.5 (highly repetitive)
497 - text: entropy 0.5-6.0 AND high printable ratio (>= 0.9)
498 - random: entropy >= 7.5 (encrypted or random data)
499 - compressed: entropy 6.0-7.5 (compressed data)
500 - structured: other (structured binary data)
502 Args:
503 data: Input data as bytes, bytearray, or numpy array
505 Returns:
506 EntropyResult with classification and confidence
508 Raises:
509 ValueError: If data is empty
511 Example:
512 >>> result = classify_by_entropy(b'\\x00' * 100)
513 >>> result.classification
514 'constant'
515 """
516 if isinstance(data, np.ndarray): 516 ↛ 517line 516 didn't jump to line 517 because the condition on line 516 was never true
517 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten())
519 if not data:
520 raise ValueError("Cannot classify empty data")
522 # Calculate entropy
523 entropy_val = shannon_entropy(data)
525 # Calculate printable ratio for text detection
526 # Include standard printable ASCII (32-126) plus tab, newline, carriage return
527 printable_count = sum(1 for b in data if 32 <= b <= 126 or b in (9, 10, 13))
528 printable_ratio = printable_count / len(data)
530 # Classify based on entropy and characteristics
531 # Order matters: check specific cases first, then fall through to general
533 # 1. Constant/repetitive data - very low entropy
534 classification: Literal["structured", "text", "compressed", "random", "constant"]
535 if entropy_val < 0.5:
536 classification = "constant"
537 confidence = 1.0 - (entropy_val / 0.5) * 0.2 # High confidence
539 # 2. Random/encrypted data - very high entropy (near maximum)
540 elif entropy_val >= 7.5:
541 classification = "random"
542 confidence = min(1.0, (entropy_val - 7.5) / 0.5 + 0.8)
544 # 3. Compressed data - high entropy but not maximum
545 elif entropy_val >= 6.0:
546 classification = "compressed"
547 confidence = min(1.0, (entropy_val - 6.0) / 1.5 + 0.6)
549 # 4. Text data - high printable ratio (checked BEFORE structured)
550 # Text can have entropy from ~2.5 to ~5.5 depending on language/content
551 # We use a high printable threshold (0.9) to distinguish from structured binary
552 elif printable_ratio >= 0.9 and entropy_val >= 0.5:
553 classification = "text"
554 confidence = min(1.0, printable_ratio)
556 # 5. Structured binary - everything else
557 else:
558 classification = "structured"
559 confidence = 0.7 # Medium confidence for default case
561 return EntropyResult(
562 entropy=float(entropy_val), classification=classification, confidence=float(confidence)
563 )
566def entropy_profile(data: DataType, window: int = 256) -> "NDArray[np.float64]":
567 """Generate entropy profile for visualization.
569 : Shannon Entropy Analysis
571 Creates a smoothed entropy profile suitable for plotting and visual analysis.
572 Uses overlapping windows with a step size of window/4 for smoother results.
574 Args:
575 data: Input data as bytes, bytearray, or numpy array
576 window: Window size in bytes (default: 256)
578 Returns:
579 Array of entropy values across the data
581 Example:
582 >>> data = bytes(range(256)) * 10
583 >>> profile = entropy_profile(data)
584 >>> len(profile) > 0
585 True
586 """
587 step = max(1, window // 4) # Overlapping windows for smooth profile
588 return sliding_entropy(data, window=window, step=step)
591def entropy_histogram(data: DataType) -> tuple["NDArray[np.intp]", "NDArray[np.float64]"]:
592 """Generate byte frequency histogram.
594 : Shannon Entropy Analysis
596 Creates a histogram of byte values (0-255) showing their frequencies.
597 Useful for visualizing data distribution and entropy characteristics.
599 Args:
600 data: Input data as bytes, bytearray, or numpy array
602 Returns:
603 Tuple of (bin_edges, frequencies) where:
604 - bin_edges: Array of 256 byte values (0-255)
605 - frequencies: Array of normalized frequencies (0-1)
607 Example:
608 >>> bins, freqs = entropy_histogram(b'\\x00' * 50 + b'\\xFF' * 50)
609 >>> len(bins)
610 256
611 >>> sum(freqs)
612 1.0
613 """
614 if isinstance(data, np.ndarray):
615 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten())
617 if not data:
618 return np.arange(256), np.zeros(256)
620 # Count byte frequencies
621 counts = np.zeros(256, dtype=np.int64)
622 for byte in data:
623 counts[byte] += 1
625 # Normalize to frequencies
626 frequencies = counts / len(data)
628 # Bin edges are byte values
629 bin_edges = np.arange(256)
631 return bin_edges, frequencies
634# =============================================================================
635# RE-ENT-002: Byte Frequency Distribution
636# =============================================================================
639def byte_frequency_distribution(data: DataType, n_most_common: int = 10) -> ByteFrequencyResult:
640 """Analyze byte frequency distribution in data.
642 Implements RE-ENT-002: Byte Frequency Distribution.
644 Computes detailed byte frequency statistics including counts, frequencies,
645 most/least common bytes, uniformity score, and characteristic ratios.
647 Args:
648 data: Input data as bytes, bytearray, or numpy array.
649 n_most_common: Number of most/least common bytes to report.
651 Returns:
652 ByteFrequencyResult with comprehensive distribution analysis.
654 Example:
655 >>> data = b'\\x00\\x00\\x01\\x02\\x03'
656 >>> result = byte_frequency_distribution(data)
657 >>> result.unique_bytes
658 4
659 >>> result.most_common[0]
660 (0, 2)
661 """
662 if isinstance(data, np.ndarray): 662 ↛ 663line 662 didn't jump to line 663 because the condition on line 662 was never true
663 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten())
665 if not data:
666 return ByteFrequencyResult(
667 counts=np.zeros(256, dtype=np.int64),
668 frequencies=np.zeros(256, dtype=np.float64),
669 entropy=0.0,
670 unique_bytes=0,
671 most_common=[],
672 least_common=[],
673 uniformity_score=0.0,
674 zero_byte_ratio=0.0,
675 printable_ratio=0.0,
676 )
678 # Count bytes
679 counts = np.zeros(256, dtype=np.int64)
680 for byte in data:
681 counts[byte] += 1
683 # Normalize frequencies
684 length = len(data)
685 frequencies = counts / length
687 # Calculate entropy (use internal calculation to avoid ValueError)
688 byte_counts = Counter(data)
689 entropy_val = 0.0
690 for count in byte_counts.values():
691 if count > 0: 691 ↛ 690line 691 didn't jump to line 690 because the condition on line 691 was always true
692 prob = count / length
693 entropy_val -= prob * np.log2(prob)
695 # Count unique bytes
696 unique_bytes = np.count_nonzero(counts)
698 # Find most and least common bytes
699 nonzero_indices = np.where(counts > 0)[0]
700 sorted_indices = nonzero_indices[np.argsort(-counts[nonzero_indices])]
702 most_common = [(int(i), int(counts[i])) for i in sorted_indices[:n_most_common]]
703 least_common = [(int(i), int(counts[i])) for i in sorted_indices[-n_most_common:][::-1]]
705 # Calculate uniformity score (1 = perfectly uniform, 0 = single byte)
706 expected_freq = 1.0 / 256
707 if unique_bytes > 0: 707 ↛ 714line 707 didn't jump to line 714 because the condition on line 707 was always true
708 # Chi-squared like uniformity measure
709 observed_freqs = frequencies[frequencies > 0]
710 deviation = np.sum((observed_freqs - expected_freq) ** 2)
711 max_deviation = (1.0 - expected_freq) ** 2 + 255 * expected_freq**2
712 uniformity_score = 1.0 - min(1.0, deviation / max_deviation)
713 else:
714 uniformity_score = 0.0
716 # Calculate characteristic ratios
717 zero_byte_ratio = counts[0] / length if length > 0 else 0.0
719 # Printable ASCII range
720 printable_count = sum(counts[i] for i in range(32, 127))
721 printable_count += counts[9] + counts[10] + counts[13] # Tab, LF, CR
722 printable_ratio = printable_count / length if length > 0 else 0.0
724 return ByteFrequencyResult(
725 counts=counts,
726 frequencies=frequencies,
727 entropy=entropy_val,
728 unique_bytes=unique_bytes,
729 most_common=most_common,
730 least_common=least_common,
731 uniformity_score=uniformity_score,
732 zero_byte_ratio=zero_byte_ratio,
733 printable_ratio=printable_ratio,
734 )
737def detect_frequency_anomalies(data: DataType, z_threshold: float = 3.0) -> FrequencyAnomalyResult:
738 """Detect bytes with anomalous frequencies.
740 Implements RE-ENT-002: Byte Frequency Distribution.
742 Identifies byte values that occur with unusual frequency compared to
743 expected distribution using z-score analysis.
745 Args:
746 data: Input data as bytes, bytearray, or numpy array.
747 z_threshold: Z-score threshold for anomaly detection.
749 Returns:
750 FrequencyAnomalyResult with anomalous bytes.
752 Example:
753 >>> data = b'A' * 100 + bytes(range(256))
754 >>> result = detect_frequency_anomalies(data)
755 >>> 65 in result.anomalous_bytes # 'A' is anomalous
756 True
757 """
758 if isinstance(data, np.ndarray): 758 ↛ 759line 758 didn't jump to line 759 because the condition on line 758 was never true
759 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten())
761 length = len(data) if data else 0
763 if length == 0:
764 return FrequencyAnomalyResult(
765 anomalous_bytes=[],
766 z_scores=np.zeros(256),
767 is_anomalous=np.zeros(256, dtype=bool),
768 expected_frequency=0.0,
769 )
771 # Count bytes
772 counts = np.zeros(256, dtype=np.int64)
773 for byte in data:
774 counts[byte] += 1
776 # Expected frequency under uniform distribution
777 expected_count = length / 256
778 expected_freq = 1.0 / 256
780 # Calculate z-scores
781 # Using binomial approximation: std = sqrt(n * p * (1-p))
782 std = np.sqrt(length * expected_freq * (1 - expected_freq))
783 if std == 0: 783 ↛ 784line 783 didn't jump to line 784 because the condition on line 783 was never true
784 std = 1.0 # Avoid division by zero
786 z_scores = (counts - expected_count) / std
788 # Identify anomalies
789 is_anomalous = np.abs(z_scores) > z_threshold
790 anomalous_bytes = list(np.where(is_anomalous)[0])
792 return FrequencyAnomalyResult(
793 anomalous_bytes=[int(b) for b in anomalous_bytes],
794 z_scores=z_scores,
795 is_anomalous=is_anomalous,
796 expected_frequency=expected_freq,
797 )
800def compare_byte_distributions(
801 data_a: DataType, data_b: DataType
802) -> tuple[float, float, "NDArray[np.float64]"]:
803 """Compare byte frequency distributions between two data samples.
805 Implements RE-ENT-002: Byte Frequency Distribution.
807 Computes chi-squared distance, Kullback-Leibler divergence, and
808 per-byte frequency differences.
810 Args:
811 data_a: First data sample.
812 data_b: Second data sample.
814 Returns:
815 Tuple of (chi_squared_distance, kl_divergence, frequency_diffs).
817 Example:
818 >>> data_a = bytes(range(256)) * 10
819 >>> data_b = bytes(range(256)) * 10
820 >>> chi_sq, kl_div, diffs = compare_byte_distributions(data_a, data_b)
821 >>> chi_sq < 0.01 # Very similar
822 True
823 """
824 # Get frequency distributions
825 result_a = byte_frequency_distribution(data_a)
826 result_b = byte_frequency_distribution(data_b)
828 freq_a = result_a.frequencies
829 freq_b = result_b.frequencies
831 # Compute chi-squared distance
832 # Add small epsilon to avoid division by zero
833 eps = 1e-10
834 chi_squared = np.sum((freq_a - freq_b) ** 2 / (freq_a + freq_b + eps))
836 # Compute KL divergence (symmetrized)
837 freq_a_safe = np.clip(freq_a, eps, 1.0)
838 freq_b_safe = np.clip(freq_b, eps, 1.0)
840 kl_ab = np.sum(freq_a_safe * np.log(freq_a_safe / freq_b_safe))
841 kl_ba = np.sum(freq_b_safe * np.log(freq_b_safe / freq_a_safe))
842 kl_divergence = (kl_ab + kl_ba) / 2
844 # Per-byte frequency differences
845 frequency_diffs = freq_a - freq_b
847 return float(chi_squared), float(kl_divergence), frequency_diffs
850def sliding_byte_frequency(
851 data: DataType, window: int = 256, step: int = 64, byte_value: int | None = None
852) -> "NDArray[np.float64]":
853 """Compute sliding window byte frequency profile.
855 Implements RE-ENT-002: Byte Frequency Distribution.
857 Tracks how byte frequency varies across the data, useful for
858 detecting regions with different characteristics.
860 Args:
861 data: Input data.
862 window: Window size in bytes.
863 step: Step size for sliding window.
864 byte_value: Specific byte to track (None for all).
866 Returns:
867 Array of frequencies at each window position.
868 If byte_value is None, returns array of shape (n_windows, 256).
870 Example:
871 >>> data = b'\\x00' * 1000 + b'\\xFF' * 1000
872 >>> profile = sliding_byte_frequency(data, byte_value=0)
873 >>> profile[0] > profile[-1] # More zeros at start
874 True
875 """
876 if isinstance(data, np.ndarray): 876 ↛ 877line 876 didn't jump to line 877 because the condition on line 876 was never true
877 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten())
879 if len(data) < window:
880 if byte_value is not None: 880 ↛ 882line 880 didn't jump to line 882 because the condition on line 880 was always true
881 return np.array([])
882 return np.zeros((0, 256))
884 num_windows = (len(data) - window) // step + 1
886 if byte_value is not None:
887 # Track single byte value
888 profile = np.zeros(num_windows)
889 for i in range(num_windows):
890 start = i * step
891 window_data = data[start : start + window]
892 profile[i] = window_data.count(byte_value) / window
893 return profile
894 else:
895 # Track all byte values
896 profile = np.zeros((num_windows, 256))
897 for i in range(num_windows):
898 start = i * step
899 window_data = data[start : start + window]
900 for byte in window_data:
901 profile[i, byte] += 1
902 profile[i] /= window
903 return profile
906def detect_compression_indicators(data: DataType) -> CompressionIndicator:
907 """Detect indicators of compression or encryption.
909 Implements RE-ENT-002: Byte Frequency Distribution.
911 Analyzes byte frequency distribution to identify characteristics
912 typical of compressed or encrypted data.
914 Args:
915 data: Input data to analyze.
917 Returns:
918 CompressionIndicator with detection results.
920 Example:
921 >>> import os
922 >>> random_data = os.urandom(1000)
923 >>> result = detect_compression_indicators(random_data)
924 >>> result.is_encrypted
925 True
926 """
927 freq_result = byte_frequency_distribution(data)
928 _entropy_result = classify_by_entropy(data)
930 indicators = []
931 is_compressed = False
932 is_encrypted = False
933 confidence = 0.0
934 compression_ratio_estimate = 1.0
936 entropy = freq_result.entropy
938 # High entropy (> 7.5) suggests encryption
939 if entropy >= 7.5:
940 is_encrypted = True
941 confidence = min(1.0, (entropy - 7.5) / 0.5 + 0.7)
942 indicators.append(f"Very high entropy: {entropy:.2f} bits")
944 # Moderately high entropy (6.0-7.5) suggests compression
945 elif entropy >= 6.0: 945 ↛ 946line 945 didn't jump to line 946 because the condition on line 945 was never true
946 is_compressed = True
947 confidence = min(1.0, (entropy - 6.0) / 1.5 + 0.5)
948 compression_ratio_estimate = 1.0 - (entropy - 6.0) / 2.0
949 indicators.append(f"High entropy: {entropy:.2f} bits")
951 # Check uniformity
952 if freq_result.uniformity_score > 0.8: 952 ↛ 959line 952 didn't jump to line 959 because the condition on line 952 was always true
953 if not is_encrypted:
954 is_encrypted = True
955 confidence = max(confidence, 0.6)
956 indicators.append(f"Uniform byte distribution: {freq_result.uniformity_score:.2f}")
958 # Check for few unique bytes (suggests compression)
959 if freq_result.unique_bytes < 128 and entropy > 5.0: 959 ↛ 960line 959 didn't jump to line 960 because the condition on line 959 was never true
960 if not is_compressed:
961 is_compressed = True
962 confidence = max(confidence, 0.5)
963 indicators.append(f"Limited byte vocabulary: {freq_result.unique_bytes}")
965 # Low printable ratio suggests binary/compressed
966 if freq_result.printable_ratio < 0.1 and entropy > 5.0: 966 ↛ 967line 966 didn't jump to line 967 because the condition on line 966 was never true
967 indicators.append(f"Low printable ratio: {freq_result.printable_ratio:.2%}")
969 return CompressionIndicator(
970 is_compressed=is_compressed,
971 is_encrypted=is_encrypted,
972 compression_ratio_estimate=compression_ratio_estimate,
973 confidence=confidence,
974 indicators=indicators,
975 )
978class EntropyAnalyzer:
979 """Object-oriented wrapper for entropy analysis functionality.
981 Provides a class-based interface for entropy operations,
982 wrapping the functional API for consistency with test expectations.
986 Example:
987 >>> analyzer = EntropyAnalyzer()
988 >>> entropy = analyzer.calculate_entropy(data)
989 """
991 def __init__(
992 self,
993 entropy_type: Literal["byte", "bit"] = "byte",
994 window_size: int = 256,
995 ):
996 """Initialize entropy analyzer.
998 Args:
999 entropy_type: Type of entropy calculation ('byte' or 'bit').
1000 window_size: Default window size for sliding operations.
1001 """
1002 self.entropy_type = entropy_type
1003 self.window_size = window_size
1005 def calculate_entropy(self, data: DataType) -> float:
1006 """Calculate Shannon entropy of data.
1008 Args:
1009 data: Input data to analyze.
1011 Returns:
1012 Shannon entropy value.
1014 Example:
1015 >>> analyzer = EntropyAnalyzer()
1016 >>> entropy = analyzer.calculate_entropy(b"Hello World")
1017 """
1018 if self.entropy_type == "byte":
1019 return shannon_entropy(data)
1020 else:
1021 return bit_entropy(data)
1023 def analyze(self, data: DataType) -> EntropyResult:
1024 """Analyze data and classify by entropy.
1026 Args:
1027 data: Input data to analyze.
1029 Returns:
1030 EntropyResult with classification.
1031 """
1032 return classify_by_entropy(data)
1034 def detect_transitions(
1035 self,
1036 data: DataType,
1037 threshold: float = 0.5,
1038 window: int | None = None,
1039 step: int | None = None,
1040 ) -> list[EntropyTransition]:
1041 """Detect entropy transitions in data.
1043 Args:
1044 data: Input data to analyze.
1045 threshold: Minimum entropy change to detect.
1046 window: Window size for sliding entropy (defaults to self.window_size).
1047 step: Step size between windows.
1049 Returns:
1050 List of detected transitions.
1051 """
1052 if window is None: 1052 ↛ 1054line 1052 didn't jump to line 1054 because the condition on line 1052 was always true
1053 window = self.window_size
1054 return detect_entropy_transitions(data, window=window, threshold=threshold, step=step)
1056 def analyze_blocks(self, data: DataType, block_size: int = 256) -> list[float]:
1057 """Analyze entropy of fixed-size blocks.
1059 Args:
1060 data: Input data to analyze.
1061 block_size: Size of each block in bytes.
1063 Returns:
1064 List of entropy values for each block.
1066 Example:
1067 >>> analyzer = EntropyAnalyzer()
1068 >>> entropies = analyzer.analyze_blocks(data, block_size=256)
1069 """
1070 if isinstance(data, np.ndarray): 1070 ↛ 1071line 1070 didn't jump to line 1071 because the condition on line 1070 was never true
1071 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten())
1073 if not data:
1074 return []
1076 entropies = []
1077 for i in range(0, len(data), block_size):
1078 block = data[i : i + block_size]
1079 if len(block) >= block_size // 2: # Only analyze blocks at least half size
1080 # Use internal calculation to avoid ValueError
1081 counts = Counter(block)
1082 length = len(block)
1083 entropy_val = 0.0
1084 for count in counts.values():
1085 if count > 0: 1085 ↛ 1084line 1085 didn't jump to line 1084 because the condition on line 1085 was always true
1086 prob = count / length
1087 entropy_val -= prob * np.log2(prob)
1088 entropies.append(entropy_val)
1090 return entropies
1093__all__ = [
1094 # RE-ENT-002: Byte Frequency Distribution
1095 "ByteFrequencyResult",
1096 "CompressionIndicator",
1097 "EntropyAnalyzer",
1098 "EntropyResult",
1099 "EntropyTransition",
1100 "FrequencyAnomalyResult",
1101 "bit_entropy",
1102 "byte_frequency_distribution",
1103 "classify_by_entropy",
1104 "compare_byte_distributions",
1105 "detect_compression_indicators",
1106 "detect_entropy_transitions",
1107 "detect_frequency_anomalies",
1108 "entropy_histogram",
1109 "entropy_profile",
1110 "shannon_entropy",
1111 "sliding_byte_frequency",
1112 "sliding_entropy",
1113]