Coverage for src / tracekit / analyzers / patterns / discovery.py: 97%
154 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Automatic signature and delimiter discovery.
3This module implements algorithms for automatically discovering candidate
4signatures, headers, and delimiters in binary data through statistical analysis.
7Author: TraceKit Development Team
8"""
10from __future__ import annotations
12import math
13from collections import Counter, defaultdict
14from dataclasses import dataclass
15from typing import TYPE_CHECKING
17import numpy as np
19if TYPE_CHECKING:
20 from numpy.typing import NDArray
23@dataclass
24class CandidateSignature:
25 """A candidate signature/header pattern.
27 Attributes:
28 pattern: The signature byte pattern
29 length: Length of signature in bytes
30 occurrences: Number of occurrences in data
31 positions: Start positions of each occurrence
32 interval_mean: Mean interval between occurrences (samples)
33 interval_std: Standard deviation of intervals (consistency measure)
34 entropy: Pattern entropy (low = more structured)
35 score: Overall distinctiveness score (0-1, higher = better)
36 """
38 pattern: bytes
39 length: int
40 occurrences: int
41 positions: list[int]
42 interval_mean: float
43 interval_std: float
44 entropy: float
45 score: float
47 def __post_init__(self) -> None:
48 """Validate candidate signature."""
49 if self.length <= 0:
50 raise ValueError("length must be positive")
51 if self.occurrences < 0:
52 raise ValueError("occurrences must be non-negative")
53 if len(self.pattern) != self.length:
54 raise ValueError("pattern length must match length field")
55 if self.score < 0 or self.score > 1:
56 raise ValueError("score must be in range [0, 1]")
59class SignatureDiscovery:
60 """Automatic signature and header discovery.
62 : Automatic Signature Discovery
64 This class analyzes binary data to automatically identify candidate
65 signatures, headers, and delimiters based on statistical patterns.
66 """
68 def __init__(self, min_length: int = 4, max_length: int = 16, min_occurrences: int = 2):
69 """Initialize signature discovery.
71 Args:
72 min_length: Minimum signature length in bytes
73 max_length: Maximum signature length in bytes
74 min_occurrences: Minimum number of times a pattern must occur to be considered
76 Raises:
77 ValueError: If min_length or max_length are invalid
78 """
79 if min_length < 1:
80 raise ValueError("min_length must be at least 1")
81 if max_length < min_length:
82 raise ValueError("max_length must be >= min_length")
83 if min_occurrences < 1:
84 raise ValueError("min_occurrences must be at least 1")
86 self.min_length = min_length
87 self.max_length = max_length
88 self.min_occurrences = min_occurrences
90 def discover_signatures(
91 self, data: bytes | NDArray[np.uint8] | list[bytes]
92 ) -> list[CandidateSignature]:
93 """Discover candidate signatures in data.
95 : General signature discovery
97 Finds byte patterns that appear regularly throughout the data,
98 suggesting they may be headers, sync markers, or delimiters.
100 Args:
101 data: Input binary data or list of messages
103 Returns:
104 List of CandidateSignature sorted by score (best first)
106 Examples:
107 >>> data = b"\\xAA\\x55DATA" * 100
108 >>> discovery = SignatureDiscovery(min_length=2, max_length=8)
109 >>> sigs = discovery.discover_signatures(data)
110 >>> assert any(s.pattern == b"\\xAA\\x55" for s in sigs)
111 """
112 # Handle list of messages
113 if isinstance(data, list):
114 # Concatenate all messages for analysis
115 data_bytes = b"".join(_to_bytes(msg) for msg in data)
116 else:
117 data_bytes = _to_bytes(data)
118 n = len(data_bytes)
120 if n < self.min_length:
121 return []
123 # Find all repeating patterns
124 pattern_dict = defaultdict(list)
126 for length in range(self.min_length, min(self.max_length + 1, n + 1)):
127 for i in range(n - length + 1):
128 pattern = data_bytes[i : i + length]
129 pattern_dict[pattern].append(i)
131 # Analyze each pattern
132 candidates = []
133 for pattern, positions in pattern_dict.items():
134 # Filter by min_occurrences
135 if len(positions) < self.min_occurrences:
136 continue
138 # Calculate statistics
139 intervals = np.diff(positions)
140 interval_mean = float(np.mean(intervals)) if len(intervals) > 0 else 0.0
141 interval_std = float(np.std(intervals)) if len(intervals) > 0 else 0.0
143 # Calculate pattern entropy
144 entropy = _calculate_entropy(pattern)
146 # Calculate distinctiveness score
147 score = self._calculate_score(
148 pattern=pattern,
149 occurrences=len(positions),
150 interval_mean=interval_mean,
151 interval_std=interval_std,
152 entropy=entropy,
153 data_length=n,
154 )
156 candidates.append(
157 CandidateSignature(
158 pattern=pattern,
159 length=len(pattern),
160 occurrences=len(positions),
161 positions=sorted(positions),
162 interval_mean=interval_mean,
163 interval_std=interval_std,
164 entropy=entropy,
165 score=score,
166 )
167 )
169 # Sort by score (descending)
170 candidates.sort(key=lambda x: x.score, reverse=True)
172 return candidates
174 def find_header_candidates(
175 self, data: bytes | NDArray[np.uint8], max_candidates: int = 20
176 ) -> list[CandidateSignature]:
177 """Find patterns likely to be message headers.
179 : Header candidate detection
181 Headers typically:
182 - Have low entropy (structured, not random)
183 - Appear at regular intervals
184 - Are relatively short (2-16 bytes)
185 - May contain magic bytes or sync markers
187 Args:
188 data: Input binary data
189 max_candidates: Maximum number of candidates to return
191 Returns:
192 List of CandidateSignature sorted by likelihood (best first)
193 """
194 # Discover all signatures
195 candidates = self.discover_signatures(data)
197 # Filter and rank for header characteristics
198 header_candidates = []
199 for sig in candidates:
200 # Headers should have low entropy (structured)
201 if sig.entropy > 6.0: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true
202 continue
204 # Headers should be reasonably frequent
205 if sig.occurrences < 3:
206 continue
208 # Prefer regular intervals (low std deviation)
209 regularity = 1.0 / (1.0 + sig.interval_std / max(sig.interval_mean, 1.0))
211 # Combine score with header-specific features
212 header_score = sig.score * 0.6 + (1.0 - sig.entropy / 8.0) * 0.2 + regularity * 0.2
214 header_candidates.append(
215 CandidateSignature(
216 pattern=sig.pattern,
217 length=sig.length,
218 occurrences=sig.occurrences,
219 positions=sig.positions,
220 interval_mean=sig.interval_mean,
221 interval_std=sig.interval_std,
222 entropy=sig.entropy,
223 score=header_score,
224 )
225 )
227 # Sort by header score
228 header_candidates.sort(key=lambda x: x.score, reverse=True)
230 return header_candidates[:max_candidates]
232 def find_delimiter_candidates(
233 self, data: bytes | NDArray[np.uint8]
234 ) -> list[CandidateSignature]:
235 """Find patterns likely to be message delimiters.
237 : Delimiter candidate detection
239 Delimiters typically:
240 - Are short (1-4 bytes)
241 - Have very low entropy (often single byte like \\n, \\0, etc.)
242 - Appear frequently
243 - May have variable intervals
245 Args:
246 data: Input binary data
248 Returns:
249 List of CandidateSignature sorted by likelihood (best first)
250 """
251 data_bytes = _to_bytes(data)
252 n = len(data_bytes)
254 if n < 2:
255 return []
257 # Focus on short patterns (typical delimiters)
258 delimiter_candidates = []
259 max_delim_length = min(4, self.max_length)
261 for length in range(1, max_delim_length + 1):
262 pattern_positions = defaultdict(list)
264 for i in range(n - length + 1):
265 pattern = data_bytes[i : i + length]
266 pattern_positions[pattern].append(i)
268 for pattern, positions in pattern_positions.items():
269 if len(positions) < 5: # Delimiters should be frequent
270 continue
272 # Calculate statistics
273 intervals = np.diff(positions)
274 interval_mean = float(np.mean(intervals)) if len(intervals) > 0 else 0.0
275 interval_std = float(np.std(intervals)) if len(intervals) > 0 else 0.0
276 entropy = _calculate_entropy(pattern)
278 # Delimiters should have very low entropy
279 if entropy > 3.0: 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true
280 continue
282 # Calculate delimiter score
283 # High frequency + low entropy + short length = good delimiter
284 frequency_score = min(len(positions) / (n / 100.0), 1.0)
285 entropy_score = 1.0 - entropy / 8.0
286 length_score = 1.0 - (length - 1) / max_delim_length
288 delimiter_score = frequency_score * 0.5 + entropy_score * 0.3 + length_score * 0.2
290 delimiter_candidates.append(
291 CandidateSignature(
292 pattern=pattern,
293 length=length,
294 occurrences=len(positions),
295 positions=sorted(positions),
296 interval_mean=interval_mean,
297 interval_std=interval_std,
298 entropy=entropy,
299 score=delimiter_score,
300 )
301 )
303 # Sort by delimiter score
304 delimiter_candidates.sort(key=lambda x: x.score, reverse=True)
306 return delimiter_candidates[:20] # Top 20 delimiter candidates
308 def rank_signatures(self, candidates: list[CandidateSignature]) -> list[CandidateSignature]:
309 """Rank signatures by distinctiveness.
311 : Signature ranking
313 Re-ranks candidates considering:
314 - Frequency vs. expected random occurrence
315 - Regularity of appearance
316 - Entropy characteristics
317 - Pattern uniqueness
319 Args:
320 candidates: List of candidate signatures
322 Returns:
323 Re-ranked list of CandidateSignature
324 """
325 if not candidates:
326 return []
328 # Re-calculate scores with more sophisticated ranking
329 ranked = []
330 for sig in candidates:
331 # Regularity measure
332 if sig.interval_mean > 0:
333 regularity = 1.0 / (1.0 + sig.interval_std / sig.interval_mean)
334 else:
335 regularity = 0.0
337 # Entropy score (prefer low entropy for signatures)
338 entropy_score = max(0.0, 1.0 - sig.entropy / 8.0)
340 # Frequency score (normalized)
341 frequency_score = min(sig.occurrences / 100.0, 1.0)
343 # Combined score
344 new_score = regularity * 0.4 + entropy_score * 0.3 + frequency_score * 0.3
346 ranked.append(
347 CandidateSignature(
348 pattern=sig.pattern,
349 length=sig.length,
350 occurrences=sig.occurrences,
351 positions=sig.positions,
352 interval_mean=sig.interval_mean,
353 interval_std=sig.interval_std,
354 entropy=sig.entropy,
355 score=new_score,
356 )
357 )
359 # Sort by new score
360 ranked.sort(key=lambda x: x.score, reverse=True)
362 return ranked
364 def _calculate_score(
365 self,
366 pattern: bytes,
367 occurrences: int,
368 interval_mean: float,
369 interval_std: float,
370 entropy: float,
371 data_length: int,
372 ) -> float:
373 """Calculate distinctiveness score for a pattern.
375 Args:
376 pattern: The byte pattern
377 occurrences: Number of occurrences
378 interval_mean: Mean interval between occurrences
379 interval_std: Standard deviation of intervals
380 entropy: Pattern entropy
381 data_length: Total data length
383 Returns:
384 Score in range [0, 1], higher is more distinctive
385 """
386 # Frequency score (normalized)
387 frequency_score = min(occurrences / 50.0, 1.0)
389 # Regularity score (prefer consistent intervals)
390 if interval_mean > 0: 390 ↛ 393line 390 didn't jump to line 393 because the condition on line 390 was always true
391 regularity_score = 1.0 / (1.0 + interval_std / interval_mean)
392 else:
393 regularity_score = 0.0
395 # Entropy score (prefer structured patterns, not random)
396 entropy_score = max(0.0, 1.0 - entropy / 8.0)
398 # Length score (prefer medium-length patterns)
399 optimal_length = 4.0
400 length_score = 1.0 - abs(len(pattern) - optimal_length) / 8.0
401 length_score = max(0.0, length_score)
403 # Combine scores
404 score = (
405 frequency_score * 0.3
406 + regularity_score * 0.4
407 + entropy_score * 0.2
408 + length_score * 0.1
409 )
411 return min(1.0, max(0.0, score))
414# Convenience functions
417def discover_signatures(
418 data: bytes | NDArray[np.uint8] | list[bytes],
419 min_length: int = 4,
420 max_length: int = 16,
421 min_occurrences: int = 2,
422) -> list[CandidateSignature]:
423 """Convenience function for signature discovery.
425 : Signature discovery API
427 Args:
428 data: Input binary data or list of messages
429 min_length: Minimum signature length
430 max_length: Maximum signature length
431 min_occurrences: Minimum number of times a pattern must occur
433 Returns:
434 List of CandidateSignature sorted by score
436 Examples:
437 >>> data = b"\\xFF\\xFF" + b"DATA" * 50
438 >>> signatures = discover_signatures(data, min_length=2)
439 """
440 discovery = SignatureDiscovery(min_length, max_length, min_occurrences)
441 return discovery.discover_signatures(data)
444def find_header_candidates(data: bytes | NDArray[np.uint8]) -> list[CandidateSignature]:
445 """Find header candidates.
447 : Header discovery API
449 Args:
450 data: Input binary data
452 Returns:
453 List of header candidates
455 Examples:
456 >>> data = b"HDR" + b"payload" * 20
457 >>> headers = find_header_candidates(data)
458 """
459 discovery = SignatureDiscovery(min_length=2, max_length=16)
460 return discovery.find_header_candidates(data)
463def find_delimiter_candidates(data: bytes | NDArray[np.uint8]) -> list[CandidateSignature]:
464 """Find delimiter candidates.
466 : Delimiter discovery API
468 Args:
469 data: Input binary data
471 Returns:
472 List of delimiter candidates
474 Examples:
475 >>> data = b"field1,field2,field3"
476 >>> delimiters = find_delimiter_candidates(data)
477 >>> assert any(d.pattern == b"," for d in delimiters)
478 """
479 discovery = SignatureDiscovery(min_length=1, max_length=4)
480 return discovery.find_delimiter_candidates(data)
483# Helper functions
486def _to_bytes(data: bytes | NDArray[np.uint8] | memoryview | bytearray) -> bytes:
487 """Convert input data to bytes.
489 Args:
490 data: Input data (bytes, bytearray, memoryview, or numpy array)
492 Returns:
493 Bytes representation
495 Raises:
496 TypeError: If data type is not supported
497 """
498 if isinstance(data, bytes):
499 return data
500 elif isinstance(data, bytearray | memoryview):
501 return bytes(data)
502 elif isinstance(data, np.ndarray):
503 return data.astype(np.uint8).tobytes() # type: ignore[no-any-return]
504 else:
505 raise TypeError(f"Unsupported data type: {type(data)}")
508def _calculate_entropy(data: bytes) -> float:
509 """Calculate Shannon entropy of byte sequence.
511 : Entropy calculation
513 Args:
514 data: Byte sequence
516 Returns:
517 Entropy in bits (0-8 for byte data)
519 Examples:
520 >>> _calculate_entropy(b"\\x00" * 100) # All same byte
521 0.0
522 >>> entropy = _calculate_entropy(bytes(range(256))) # Uniform distribution
523 >>> assert entropy > 7.9 # Close to 8.0
524 """
525 if len(data) == 0:
526 return 0.0
528 # Count byte frequencies
529 byte_counts = Counter(data)
530 n = len(data)
532 # Calculate entropy
533 entropy = 0.0
534 for count in byte_counts.values():
535 if count > 0: 535 ↛ 534line 535 didn't jump to line 534 because the condition on line 535 was always true
536 prob = count / n
537 entropy -= prob * math.log2(prob)
539 return entropy