Coverage for src / tracekit / analyzers / statistical / classification.py: 95%
232 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Statistical data type classification.
4This module provides tools for classifying binary data regions as text,
5binary, compressed, encrypted, or padding using multiple statistical tests
6and heuristics.
7"""
9from dataclasses import dataclass, field
10from typing import Any, Literal, Union
12import numpy as np
14from .entropy import shannon_entropy
16# Type alias for input data
17DataType = Union[bytes, bytearray, "np.ndarray[Any, Any]"]
19# Common compression signatures
20COMPRESSION_SIGNATURES = {
21 b"\x1f\x8b": "gzip",
22 b"BZ": "bzip2",
23 b"\x50\x4b\x03\x04": "zip",
24 b"\x50\x4b\x05\x06": "zip",
25 b"\x50\x4b\x07\x08": "zip",
26 b"\xfd7zXZ\x00": "xz",
27 b"\x28\xb5\x2f\xfd": "zstd",
28 b"\x04\x22\x4d\x18": "lz4",
29}
31# Common executable/binary signatures
32BINARY_SIGNATURES = {
33 b"\x7fELF": "elf", # ELF executable
34 b"MZ": "pe", # Windows PE/DOS executable
35 b"\xca\xfe\xba\xbe": "macho_fat", # Mach-O fat binary
36 b"\xfe\xed\xfa\xce": "macho_32", # Mach-O 32-bit
37 b"\xfe\xed\xfa\xcf": "macho_64", # Mach-O 64-bit
38 b"\xcf\xfa\xed\xfe": "macho_64_le", # Mach-O 64-bit little endian
39 b"\xce\xfa\xed\xfe": "macho_32_le", # Mach-O 32-bit little endian
40}
43@dataclass
44class ClassificationResult:
45 """Data type classification result.
47 Attributes:
48 primary_type: Primary classification category
49 confidence: Confidence score for classification (0-1)
50 entropy: Shannon entropy value
51 printable_ratio: Fraction of printable ASCII characters
52 null_ratio: Fraction of null bytes
53 byte_variance: Variance of byte values
54 details: Additional classification details and metadata
55 """
57 primary_type: Literal["text", "binary", "compressed", "encrypted", "padding", "mixed"]
58 confidence: float
59 entropy: float
60 printable_ratio: float
61 null_ratio: float
62 byte_variance: float
63 details: dict[str, Any] = field(default_factory=dict)
65 # Alias for test compatibility
66 @property
67 def data_type(self) -> str:
68 """Alias for primary_type for test compatibility."""
69 return self.primary_type
72@dataclass
73class RegionClassification:
74 """Classification of a data region.
76 Attributes:
77 start: Start offset in bytes
78 end: End offset in bytes (exclusive)
79 length: Region length in bytes
80 classification: Classification result for this region
81 """
83 start: int
84 end: int
85 length: int
86 classification: ClassificationResult
89def classify_data_type(data: DataType) -> ClassificationResult:
90 """Classify binary data type using multiple heuristics.
92 : Statistical Data Type Classification
94 Uses a combination of entropy analysis, printable character ratio,
95 byte distribution, and signature detection to classify data.
97 Classification logic:
98 1. Check for null/padding (null_ratio > 0.9)
99 2. Check for executable/binary signatures
100 3. Check for compression signatures
101 4. Check for encrypted/random (entropy > 7.5, low structure)
102 5. Check for text (high printable ratio, medium entropy)
103 6. Default to binary/structured
105 Args:
106 data: Input data as bytes, bytearray, or numpy array
108 Returns:
109 ClassificationResult with type and confidence
111 Raises:
112 ValueError: If data is empty
114 Example:
115 >>> result = classify_data_type(b'Hello, World!')
116 >>> result.primary_type
117 'text'
118 """
119 if isinstance(data, np.ndarray):
120 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten())
122 if not data:
123 raise ValueError("Cannot classify empty data")
125 # Calculate statistics
126 entropy_val = shannon_entropy(data)
128 # Printable ASCII: 0x20-0x7E plus tab, newline, carriage return
129 printable_count = sum(1 for b in data if 32 <= b <= 126 or b in (9, 10, 13))
130 printable_ratio = printable_count / len(data)
132 # Null byte ratio
133 null_count = sum(1 for b in data if b == 0)
134 null_ratio = null_count / len(data)
136 # Byte variance
137 byte_array = np.frombuffer(data, dtype=np.uint8)
138 byte_variance = float(np.var(byte_array))
140 details = {}
142 # Classification logic
143 # 1. Padding/null regions
144 if null_ratio > 0.9:
145 return ClassificationResult(
146 primary_type="padding",
147 confidence=min(1.0, null_ratio),
148 entropy=entropy_val,
149 printable_ratio=printable_ratio,
150 null_ratio=null_ratio,
151 byte_variance=byte_variance,
152 details={"reason": "high_null_ratio"},
153 )
155 # 2. Check for executable/binary signatures (BEFORE compression and encrypted)
156 for sig, bin_type in BINARY_SIGNATURES.items():
157 if data[: len(sig)] == sig:
158 details["binary_type"] = bin_type
159 return ClassificationResult(
160 primary_type="binary",
161 confidence=0.95,
162 entropy=entropy_val,
163 printable_ratio=printable_ratio,
164 null_ratio=null_ratio,
165 byte_variance=byte_variance,
166 details=details,
167 )
169 # 3. Check for compression signatures
170 for sig, comp_type in COMPRESSION_SIGNATURES.items():
171 if data[: len(sig)] == sig:
172 details["compression_type"] = comp_type
173 return ClassificationResult(
174 primary_type="compressed",
175 confidence=0.95,
176 entropy=entropy_val,
177 printable_ratio=printable_ratio,
178 null_ratio=null_ratio,
179 byte_variance=byte_variance,
180 details=details,
181 )
183 # 4. Text data (high printable ratio) - check BEFORE entropy-based classification
184 if printable_ratio > 0.75 and entropy_val < 6.5:
185 confidence = min(1.0, printable_ratio * 0.95)
186 details["reason"] = "high_printable_ratio"
187 return ClassificationResult(
188 primary_type="text",
189 confidence=confidence,
190 entropy=entropy_val,
191 printable_ratio=printable_ratio,
192 null_ratio=null_ratio,
193 byte_variance=byte_variance,
194 details=details,
195 )
197 # 5. Encrypted/random data (high entropy, no structure)
198 if entropy_val > 7.5 and byte_variance > 5000:
199 # High entropy with high variance suggests random/encrypted
200 confidence = min(1.0, (entropy_val - 7.5) / 0.5 + 0.7)
201 details["reason"] = "high_entropy_and_variance"
202 return ClassificationResult(
203 primary_type="encrypted",
204 confidence=confidence,
205 entropy=entropy_val,
206 printable_ratio=printable_ratio,
207 null_ratio=null_ratio,
208 byte_variance=byte_variance,
209 details=details,
210 )
212 # 6. Compressed data (high entropy, some structure)
213 if 6.5 <= entropy_val <= 7.5:
214 confidence = 0.7
215 details["reason"] = "compression_entropy_range"
216 return ClassificationResult(
217 primary_type="compressed",
218 confidence=confidence,
219 entropy=entropy_val,
220 printable_ratio=printable_ratio,
221 null_ratio=null_ratio,
222 byte_variance=byte_variance,
223 details=details,
224 )
226 # 7. Default to binary/structured
227 confidence = 0.6
228 details["reason"] = "default_binary"
229 return ClassificationResult(
230 primary_type="binary",
231 confidence=confidence,
232 entropy=entropy_val,
233 printable_ratio=printable_ratio,
234 null_ratio=null_ratio,
235 byte_variance=byte_variance,
236 details=details,
237 )
240def detect_text_regions(
241 data: DataType, min_length: int = 8, min_printable: float = 0.8
242) -> list[RegionClassification]:
243 """Detect ASCII/UTF-8 text regions.
245 : Statistical Data Type Classification
247 Scans for contiguous regions with high printable character ratio.
249 Args:
250 data: Input data as bytes, bytearray, or numpy array
251 min_length: Minimum region length in bytes (default: 8)
252 min_printable: Minimum printable ratio to consider text (default: 0.8)
254 Returns:
255 List of detected text regions
257 Example:
258 >>> data = b'\\x00' * 100 + b'Hello World' + b'\\x00' * 100
259 >>> regions = detect_text_regions(data)
260 >>> len(regions) > 0
261 True
262 """
263 if isinstance(data, np.ndarray):
264 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten())
266 regions = []
267 in_region = False
268 region_start = 0
269 _printable_in_window = 0
270 window_size = min_length
272 for i, byte in enumerate(data):
273 _is_printable = 32 <= byte <= 126 or byte in (9, 10, 13)
275 if not in_region:
276 # Look for start of text region
277 if i >= window_size - 1:
278 # Check window
279 window = data[i - window_size + 1 : i + 1]
280 printable_count = sum(1 for b in window if 32 <= b <= 126 or b in (9, 10, 13))
281 if printable_count / window_size >= min_printable:
282 in_region = True
283 region_start = i - window_size + 1
284 else:
285 # In text region, look for end
286 # Use a sliding window to detect when printable ratio drops
287 if i >= region_start + window_size: 287 ↛ 272line 287 didn't jump to line 272 because the condition on line 287 was always true
288 window = data[i - window_size + 1 : i + 1]
289 printable_count = sum(1 for b in window if 32 <= b <= 126 or b in (9, 10, 13))
290 if printable_count / window_size < min_printable:
291 # End of region
292 region_data = data[region_start : i - window_size + 1]
293 if len(region_data) >= min_length:
294 classification = classify_data_type(region_data)
295 regions.append(
296 RegionClassification(
297 start=region_start,
298 end=i - window_size + 1,
299 length=len(region_data),
300 classification=classification,
301 )
302 )
303 in_region = False
305 # Handle region extending to end
306 if in_region:
307 region_data = data[region_start:]
308 if len(region_data) >= min_length: 308 ↛ 319line 308 didn't jump to line 319 because the condition on line 308 was always true
309 classification = classify_data_type(region_data)
310 regions.append(
311 RegionClassification(
312 start=region_start,
313 end=len(data),
314 length=len(region_data),
315 classification=classification,
316 )
317 )
319 return regions
322def detect_encrypted_regions(
323 data: DataType, min_length: int = 64, min_entropy: float = 7.5
324) -> list[RegionClassification]:
325 """Detect potentially encrypted regions (high entropy, no structure).
327 : Statistical Data Type Classification
329 Identifies regions with very high entropy and uniform byte distribution,
330 characteristic of encrypted or cryptographically random data.
332 Args:
333 data: Input data as bytes, bytearray, or numpy array
334 min_length: Minimum region length in bytes (default: 64)
335 min_entropy: Minimum entropy threshold (default: 7.5)
337 Returns:
338 List of detected encrypted regions
340 Example:
341 >>> import os
342 >>> random_data = os.urandom(100)
343 >>> regions = detect_encrypted_regions(random_data)
344 >>> len(regions) >= 0
345 True
346 """
347 if isinstance(data, np.ndarray):
348 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten())
350 if len(data) < min_length:
351 return []
353 regions = []
354 window_size = min_length
355 step = window_size // 4
357 i = 0
358 while i < len(data) - window_size:
359 window = data[i : i + window_size]
360 entropy_val = shannon_entropy(window)
362 if entropy_val >= min_entropy:
363 # Found potential encrypted region, extend it
364 region_start = i
365 region_end = i + window_size
367 # Extend forward
368 while region_end < len(data): 368 ↛ 378line 368 didn't jump to line 378 because the condition on line 368 was always true
369 next_window = data[region_end : region_end + window_size]
370 if len(next_window) < window_size:
371 break
372 if shannon_entropy(next_window) >= min_entropy: 372 ↛ 375line 372 didn't jump to line 375 because the condition on line 372 was always true
373 region_end += step
374 else:
375 break
377 # Create region
378 region_data = data[region_start:region_end]
379 classification = classify_data_type(region_data)
380 regions.append(
381 RegionClassification(
382 start=region_start,
383 end=region_end,
384 length=len(region_data),
385 classification=classification,
386 )
387 )
389 i = region_end
390 else:
391 i += step
393 return regions
396def detect_compressed_regions(data: DataType, min_length: int = 64) -> list[RegionClassification]:
397 """Detect compressed data regions (signatures + high entropy).
399 : Statistical Data Type Classification
401 Identifies compressed regions by looking for compression signatures
402 and characteristic entropy patterns.
404 Args:
405 data: Input data as bytes, bytearray, or numpy array
406 min_length: Minimum region length in bytes (default: 64)
408 Returns:
409 List of detected compressed regions
411 Example:
412 >>> import gzip
413 >>> compressed = gzip.compress(b'Hello World' * 100)
414 >>> regions = detect_compressed_regions(compressed)
415 >>> len(regions) > 0
416 True
417 """
418 if isinstance(data, np.ndarray):
419 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten())
421 regions = []
423 # Scan for compression signatures
424 for sig, comp_type in COMPRESSION_SIGNATURES.items():
425 offset = 0
426 while True:
427 pos = data.find(sig, offset)
428 if pos == -1:
429 break
431 # Try to determine compressed region size
432 # This is heuristic-based since we don't parse the format
433 region_start = pos
434 region_end = min(pos + min_length, len(data))
436 # Extend based on high entropy
437 window_size = 256
438 while region_end < len(data): 438 ↛ 448line 438 didn't jump to line 448 because the condition on line 438 was always true
439 window = data[region_end : region_end + window_size]
440 if len(window) < window_size: 440 ↛ 442line 440 didn't jump to line 442 because the condition on line 440 was always true
441 break
442 entropy_val = shannon_entropy(window)
443 if entropy_val >= 6.0: # Compressed threshold
444 region_end += window_size
445 else:
446 break
448 if region_end - region_start >= min_length: 448 ↛ 462line 448 didn't jump to line 462 because the condition on line 448 was always true
449 region_data = data[region_start:region_end]
450 classification = classify_data_type(region_data)
451 classification.details["compression_signature"] = comp_type
453 regions.append(
454 RegionClassification(
455 start=region_start,
456 end=region_end,
457 length=len(region_data),
458 classification=classification,
459 )
460 )
462 offset = region_end
464 return regions
467def detect_padding_regions(data: DataType, min_length: int = 4) -> list[RegionClassification]:
468 """Detect padding/null regions.
470 : Statistical Data Type Classification
472 Identifies contiguous regions of null bytes or repetitive padding patterns.
474 Args:
475 data: Input data as bytes, bytearray, or numpy array
476 min_length: Minimum region length in bytes (default: 4)
478 Returns:
479 List of detected padding regions
481 Example:
482 >>> data = b'DATA' + b'\\x00' * 100 + b'DATA'
483 >>> regions = detect_padding_regions(data)
484 >>> len(regions) > 0
485 True
486 """
487 if isinstance(data, np.ndarray):
488 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten())
490 regions = []
491 in_padding = False
492 padding_start = 0
493 padding_byte = None
495 for i, byte in enumerate(data):
496 if not in_padding:
497 # Check if this could be start of padding
498 if byte == 0 or byte == 0xFF:
499 in_padding = True
500 padding_start = i
501 padding_byte = byte
502 else:
503 # In padding region
504 if byte != padding_byte:
505 # End of padding
506 length = i - padding_start
507 if length >= min_length:
508 _region_data = data[padding_start:i]
509 classification = ClassificationResult(
510 primary_type="padding",
511 confidence=1.0,
512 entropy=0.0,
513 printable_ratio=0.0,
514 null_ratio=1.0 if padding_byte == 0 else 0.0,
515 byte_variance=0.0,
516 details={"padding_byte": f"0x{padding_byte:02X}"},
517 )
518 regions.append(
519 RegionClassification(
520 start=padding_start, end=i, length=length, classification=classification
521 )
522 )
523 in_padding = False
525 # Handle padding extending to end
526 if in_padding:
527 length = len(data) - padding_start
528 if length >= min_length: 528 ↛ 545line 528 didn't jump to line 545 because the condition on line 528 was always true
529 _region_data = data[padding_start:]
530 classification = ClassificationResult(
531 primary_type="padding",
532 confidence=1.0,
533 entropy=0.0,
534 printable_ratio=0.0,
535 null_ratio=1.0 if padding_byte == 0 else 0.0,
536 byte_variance=0.0,
537 details={"padding_byte": f"0x{padding_byte:02X}"},
538 )
539 regions.append(
540 RegionClassification(
541 start=padding_start, end=len(data), length=length, classification=classification
542 )
543 )
545 return regions
548def segment_by_type(data: DataType, min_segment: int = 32) -> list[RegionClassification]:
549 """Segment data into regions by type.
551 : Statistical Data Type Classification
553 Divides data into homogeneous regions using a sliding window approach
554 and entropy-based segmentation.
556 Args:
557 data: Input data as bytes, bytearray, or numpy array
558 min_segment: Minimum segment size in bytes (default: 32)
560 Returns:
561 List of classified regions covering the entire input
563 Example:
564 >>> data = b'Hello' + b'\\x00' * 50 + bytes(range(256))
565 >>> segments = segment_by_type(data)
566 >>> len(segments) >= 1
567 True
568 """
569 if isinstance(data, np.ndarray):
570 data = data.tobytes() if data.dtype == np.uint8 else bytes(data.flatten())
572 if len(data) < min_segment:
573 # Single segment
574 classification = classify_data_type(data)
575 return [
576 RegionClassification(
577 start=0, end=len(data), length=len(data), classification=classification
578 )
579 ]
581 segments = []
582 window_size = min_segment
583 step = window_size // 2
585 current_type = None
586 segment_start = 0
588 i = 0
589 while i < len(data): 589 ↛ 622line 589 didn't jump to line 622 because the condition on line 589 was always true
590 window_end = min(i + window_size, len(data))
591 window = data[i:window_end]
593 if len(window) < min_segment and i > 0:
594 # Last small fragment, merge with previous segment
595 break
597 classification = classify_data_type(window)
598 detected_type = classification.primary_type
600 if current_type is None:
601 current_type = detected_type
602 segment_start = i
603 elif detected_type != current_type:
604 # Type changed, finalize previous segment
605 segment_data = data[segment_start:i]
606 if len(segment_data) >= min_segment: 606 ↛ 616line 606 didn't jump to line 616 because the condition on line 606 was always true
607 seg_classification = classify_data_type(segment_data)
608 segments.append(
609 RegionClassification(
610 start=segment_start,
611 end=i,
612 length=len(segment_data),
613 classification=seg_classification,
614 )
615 )
616 current_type = detected_type
617 segment_start = i
619 i += step
621 # Finalize last segment
622 segment_data = data[segment_start:]
623 if len(segment_data) > 0: 623 ↛ 634line 623 didn't jump to line 634 because the condition on line 623 was always true
624 seg_classification = classify_data_type(segment_data)
625 segments.append(
626 RegionClassification(
627 start=segment_start,
628 end=len(data),
629 length=len(segment_data),
630 classification=seg_classification,
631 )
632 )
634 return segments
637class DataClassifier:
638 """Object-oriented wrapper for data type classification.
640 Provides a class-based interface for data classification operations,
641 wrapping the functional API for consistency with test expectations.
645 Example:
646 >>> classifier = DataClassifier()
647 >>> data_type = classifier.classify(b'Hello, World!')
648 >>> data_type
649 'text'
650 """
652 def __init__(self, min_segment_size: int = 32):
653 """Initialize data classifier.
655 Args:
656 min_segment_size: Minimum segment size for region detection.
657 """
658 self.min_segment_size = min_segment_size
660 def classify(self, data: DataType) -> str:
661 """Classify binary data type.
663 Returns the primary type as a string for test compatibility.
665 Args:
666 data: Input data as bytes, bytearray, or numpy array.
668 Returns:
669 String data type classification ('text', 'binary', 'compressed',
670 'encrypted', 'padding', or 'mixed').
672 Example:
673 >>> classifier = DataClassifier()
674 >>> classifier.classify(b'Hello')
675 'text'
676 """
677 result = classify_data_type(data)
678 return result.primary_type
680 def classify_detailed(self, data: DataType) -> ClassificationResult:
681 """Classify binary data type with full details.
683 Args:
684 data: Input data as bytes, bytearray, or numpy array.
686 Returns:
687 ClassificationResult with type, confidence, and metadata.
689 Example:
690 >>> classifier = DataClassifier()
691 >>> result = classifier.classify_detailed(b'Hello')
692 >>> result.data_type == 'text'
693 True
694 """
695 return classify_data_type(data)
697 def detect_text_regions(
698 self, data: DataType, min_length: int = 8, min_printable: float = 0.8
699 ) -> list[RegionClassification]:
700 """Detect text regions in data.
702 Args:
703 data: Input data.
704 min_length: Minimum region length.
705 min_printable: Minimum printable ratio.
707 Returns:
708 List of text region classifications.
709 """
710 return detect_text_regions(data, min_length, min_printable)
712 def detect_encrypted_regions(
713 self, data: DataType, min_length: int = 64, min_entropy: float = 7.5
714 ) -> list[RegionClassification]:
715 """Detect encrypted regions in data.
717 Args:
718 data: Input data.
719 min_length: Minimum region length.
720 min_entropy: Minimum entropy threshold.
722 Returns:
723 List of encrypted region classifications.
724 """
725 return detect_encrypted_regions(data, min_length, min_entropy)
727 def detect_compressed_regions(
728 self, data: DataType, min_length: int = 64
729 ) -> list[RegionClassification]:
730 """Detect compressed regions in data.
732 Args:
733 data: Input data.
734 min_length: Minimum region length.
736 Returns:
737 List of compressed region classifications.
738 """
739 return detect_compressed_regions(data, min_length)
741 def detect_padding_regions(
742 self, data: DataType, min_length: int = 4
743 ) -> list[RegionClassification]:
744 """Detect padding regions in data.
746 Args:
747 data: Input data.
748 min_length: Minimum region length.
750 Returns:
751 List of padding region classifications.
752 """
753 return detect_padding_regions(data, min_length)
755 def segment(self, data: DataType) -> list[RegionClassification]:
756 """Segment data by type.
758 Args:
759 data: Input data.
761 Returns:
762 List of classified segments.
763 """
764 return segment_by_type(data, self.min_segment_size)
767__all__ = [
768 "ClassificationResult",
769 "DataClassifier",
770 "DataType",
771 "RegionClassification",
772 "classify_data_type",
773 "detect_compressed_regions",
774 "detect_encrypted_regions",
775 "detect_padding_regions",
776 "detect_text_regions",
777 "segment_by_type",
778]