Coverage for src / tracekit / exploratory / unknown.py: 92%
257 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Unknown signal analysis and reverse engineering.
3This module provides tools for analyzing signals from unknown systems
4and protocols, including binary field detection and pattern analysis.
6- UNKNOWN-001: Binary Field Detection
7- UNKNOWN-002: Protocol Auto-Detection with Fuzzy Matching
8- UNKNOWN-003: Unknown Signal Characterization
9- UNKNOWN-004: Pattern Frequency Analysis
10- UNKNOWN-005: Reverse Engineering Workflow
12Example:
13 >>> from tracekit.exploratory.unknown import characterize_unknown_signal
14 >>> result = characterize_unknown_signal(trace)
15 >>> print(f"Signal type: {result.signal_type}")
16 >>> print(f"Suggested protocols: {result.suggested_protocols}")
17"""
19from __future__ import annotations
21from dataclasses import dataclass, field
22from typing import TYPE_CHECKING, Any, Literal
24import numpy as np
26if TYPE_CHECKING:
27 from numpy.typing import NDArray
29 from tracekit.core.types import WaveformTrace
32@dataclass
33class BinaryFieldResult:
34 """Result of binary field detection.
36 Attributes:
37 fields: List of detected fields with positions.
38 field_count: Total number of fields detected.
39 bit_rate: Estimated bit rate in bps.
40 encoding: Detected encoding type.
41 confidence: Detection confidence.
42 """
44 fields: list[dict[str, Any]]
45 field_count: int
46 bit_rate: float | None
47 encoding: str
48 confidence: float
51def detect_binary_fields(
52 trace: WaveformTrace,
53 *,
54 min_field_bits: int = 4,
55 max_gap_ratio: float = 2.0,
56) -> BinaryFieldResult:
57 """Detect binary fields in unknown signal per UNKNOWN-001.
59 Analyzes signal for structured binary data patterns including
60 start/stop markers, length fields, and data payloads.
62 Args:
63 trace: Signal trace to analyze.
64 min_field_bits: Minimum bits to consider a field.
65 max_gap_ratio: Maximum gap ratio for field boundaries.
67 Returns:
68 BinaryFieldResult with detected fields.
70 Example:
71 >>> result = detect_binary_fields(trace)
72 >>> for field in result.fields:
73 ... print(f"Field at {field['start_sample']}: {field['length']} bits")
75 References:
76 UNKNOWN-001: Binary Field Detection
77 """
78 data = trace.data
79 sample_rate = trace.metadata.sample_rate
81 # Threshold for digital conversion
82 v_min = np.percentile(data, 5)
83 v_max = np.percentile(data, 95)
84 threshold = (v_min + v_max) / 2
86 # Convert to digital
87 digital = (data > threshold).astype(int)
89 # Find edges
90 edges = np.where(np.diff(digital) != 0)[0]
92 if len(edges) < 2:
93 return BinaryFieldResult(
94 fields=[],
95 field_count=0,
96 bit_rate=None,
97 encoding="unknown",
98 confidence=0.0,
99 )
101 # Estimate bit period from edge spacing
102 edge_gaps = np.diff(edges)
103 median_gap = np.median(edge_gaps)
104 bit_period = median_gap
106 # Group edges into fields
107 fields = []
108 current_field_start = edges[0]
109 current_field_edges = [edges[0]]
111 for i in range(1, len(edges)):
112 gap = edges[i] - edges[i - 1]
114 if gap > max_gap_ratio * bit_period:
115 # End current field
116 if len(current_field_edges) >= min_field_bits:
117 current_field_edges[-1] - current_field_edges[0]
118 n_bits = len(current_field_edges) - 1
120 # Extract bit pattern
121 bits = []
122 for j in range(len(current_field_edges) - 1):
123 start = current_field_edges[j]
124 end = current_field_edges[j + 1]
125 mid = (start + end) // 2
126 bits.append(digital[mid])
128 fields.append(
129 {
130 "start_sample": int(current_field_start),
131 "end_sample": int(current_field_edges[-1]),
132 "length": n_bits,
133 "bits": bits,
134 "timestamp": current_field_start / sample_rate,
135 }
136 )
138 # Start new field
139 current_field_start = edges[i]
140 current_field_edges = [edges[i]]
141 else:
142 current_field_edges.append(edges[i])
144 # Handle last field
145 if len(current_field_edges) >= min_field_bits:
146 current_field_edges[-1] - current_field_edges[0]
147 n_bits = len(current_field_edges) - 1
148 bits = []
149 for j in range(len(current_field_edges) - 1):
150 start = current_field_edges[j]
151 end = current_field_edges[j + 1]
152 mid = (start + end) // 2
153 bits.append(digital[mid])
155 fields.append(
156 {
157 "start_sample": int(current_field_start),
158 "end_sample": int(current_field_edges[-1]),
159 "length": n_bits,
160 "bits": bits,
161 "timestamp": current_field_start / sample_rate,
162 }
163 )
165 # Estimate bit rate
166 bit_rate = sample_rate / bit_period if bit_period > 0 else None
168 # Detect encoding
169 encoding = _detect_encoding(digital, edges, bit_period)
171 # Calculate confidence
172 confidence = min(1.0, len(fields) / 10.0) * 0.8
173 if bit_rate is not None: 173 ↛ 176line 173 didn't jump to line 176 because the condition on line 173 was always true
174 confidence += 0.2
176 return BinaryFieldResult(
177 fields=fields,
178 field_count=len(fields),
179 bit_rate=bit_rate,
180 encoding=encoding,
181 confidence=confidence,
182 )
185def _detect_encoding(
186 digital: NDArray[np.int_],
187 edges: NDArray[np.int_],
188 bit_period: float,
189) -> str:
190 """Detect signal encoding type.
192 Args:
193 digital: Digital signal.
194 edges: Edge positions.
195 bit_period: Estimated bit period.
197 Returns:
198 Encoding type name.
199 """
200 if len(edges) < 4:
201 return "unknown"
203 # Analyze edge spacing patterns
204 gaps = np.diff(edges)
206 # Check for Manchester (edges every half bit)
207 if np.std(gaps) < bit_period * 0.3:
208 return "manchester"
210 # Check for NRZ (edges at bit boundaries)
211 normalized_gaps = gaps / bit_period
212 integer_gaps = np.round(normalized_gaps)
213 residuals = np.abs(normalized_gaps - integer_gaps)
215 if np.mean(residuals) < 0.2: 215 ↛ 219line 215 didn't jump to line 219 because the condition on line 215 was always true
216 return "nrz"
218 # Check for NRZI
219 if np.mean(normalized_gaps > 0.8) > 0.7:
220 return "nrzi"
222 return "unknown"
225@dataclass
226class UnknownSignalCharacterization:
227 """Comprehensive characterization of unknown signal.
229 Attributes:
230 signal_type: 'digital', 'analog', or 'mixed'.
231 is_periodic: True if signal is periodic.
232 fundamental_frequency: Fundamental frequency if periodic.
233 dc_offset: DC offset voltage.
234 amplitude: Signal amplitude.
235 rise_time: Estimated rise time.
236 fall_time: Estimated fall time.
237 suggested_protocols: List of possible protocols.
238 noise_floor: Estimated noise floor.
239 snr_db: Signal-to-noise ratio in dB.
240 features: Dictionary of extracted features.
241 """
243 signal_type: Literal["digital", "analog", "mixed"]
244 is_periodic: bool
245 fundamental_frequency: float | None
246 dc_offset: float
247 amplitude: float
248 rise_time: float | None
249 fall_time: float | None
250 suggested_protocols: list[tuple[str, float]]
251 noise_floor: float
252 snr_db: float
253 features: dict[str, Any] = field(default_factory=dict)
256def characterize_unknown_signal(
257 trace: WaveformTrace,
258) -> UnknownSignalCharacterization:
259 """Comprehensive characterization of unknown signal per UNKNOWN-003.
261 Analyzes signal characteristics to determine type, periodicity,
262 and suggest possible protocols.
264 Args:
265 trace: Signal trace to characterize.
267 Returns:
268 UnknownSignalCharacterization with all extracted features.
270 Example:
271 >>> result = characterize_unknown_signal(trace)
272 >>> print(f"Signal type: {result.signal_type}")
273 >>> print(f"Periodic: {result.is_periodic}")
274 >>> for protocol, confidence in result.suggested_protocols:
275 ... print(f" {protocol}: {confidence:.1%}")
277 References:
278 UNKNOWN-003: Unknown Signal Characterization
279 """
280 data = trace.data
281 sample_rate = trace.metadata.sample_rate
283 # Handle edge case of very short traces
284 if len(data) < 2:
285 return UnknownSignalCharacterization(
286 signal_type="analog",
287 is_periodic=False,
288 fundamental_frequency=None,
289 dc_offset=float(data[0]) if len(data) > 0 else 0.0,
290 amplitude=0.0,
291 rise_time=None,
292 fall_time=None,
293 suggested_protocols=[],
294 noise_floor=0.0,
295 snr_db=float("inf"),
296 features={},
297 )
299 # Basic statistics
300 v_min = np.min(data)
301 v_max = np.max(data)
302 v_mean = np.mean(data)
303 v_std = np.std(data)
305 dc_offset = v_mean
306 amplitude = (v_max - v_min) / 2
308 # Determine signal type
309 # Digital signals have bimodal distribution
310 hist, bin_edges = np.histogram(data, bins=50)
311 centers = (bin_edges[:-1] + bin_edges[1:]) / 2
313 # Find peaks in histogram
314 peaks = []
315 for i in range(1, len(hist) - 1):
316 if hist[i] > hist[i - 1] and hist[i] > hist[i + 1] and hist[i] > 0.1 * np.max(hist):
317 peaks.append((centers[i], hist[i]))
319 if len(peaks) >= 4:
320 # Many peaks suggest analog signal (e.g., sine wave with noisy histogram)
321 signal_type: Literal["digital", "analog", "mixed"] = "analog"
322 elif len(peaks) == 2 or len(peaks) == 3:
323 # Two peaks suggest digital (bimodal), but check if they're well-separated
324 peak_positions = [p[0] for p in peaks]
325 # Normalize peak positions to 0-1 range
326 normalized_peaks = [(p - v_min) / (v_max - v_min) for p in peak_positions]
328 # If peaks are well-separated (one < 0.4, one > 0.6), likely digital
329 has_low_peak = any(p < 0.4 for p in normalized_peaks)
330 has_high_peak = any(p > 0.6 for p in normalized_peaks)
332 if has_low_peak and has_high_peak: 332 ↛ 336line 332 didn't jump to line 336 because the condition on line 332 was always true
333 signal_type = "digital"
334 else:
335 # Peaks not well separated, likely analog
336 signal_type = "analog"
337 elif len(peaks) == 1:
338 # Check for modulated signal
339 signal_type = "mixed" if v_std > 0.2 * amplitude else "analog"
340 else:
341 signal_type = "analog"
343 # Check periodicity via FFT
344 from scipy import signal as sp_signal
346 n = len(data)
347 # Need at least 4 samples for meaningful FFT analysis
348 if n >= 4:
349 f, psd = sp_signal.welch(data, fs=sample_rate, nperseg=min(4096, n))
351 # Find dominant frequency (excluding DC)
352 psd_no_dc = psd.copy()
353 psd_no_dc[0] = 0
355 if len(psd_no_dc) > 0 and np.any(psd_no_dc > 0):
356 peak_idx = np.argmax(psd_no_dc)
357 mean_psd = np.mean(psd_no_dc[psd_no_dc > 0]) if np.any(psd_no_dc > 0) else 0
358 fundamental_frequency = f[peak_idx] if psd_no_dc[peak_idx] > 10 * mean_psd else None
359 else:
360 fundamental_frequency = None
362 is_periodic = fundamental_frequency is not None
363 else:
364 fundamental_frequency = None
365 is_periodic = False
367 # Estimate noise floor
368 if n >= 4:
369 noise_floor = np.median(np.sort(psd)[: len(psd) // 4]) if len(psd) > 0 else 0.0
370 signal_power = np.max(psd) - noise_floor if len(psd) > 0 else 0.0
371 else:
372 noise_floor = 0.0
373 signal_power = 0.0
374 snr_db = 10 * np.log10(signal_power / noise_floor) if noise_floor > 0 else 0
376 # Estimate rise/fall times for digital signals
377 rise_time = None
378 fall_time = None
380 if signal_type == "digital":
381 threshold_low = v_min + 0.1 * (v_max - v_min)
382 threshold_high = v_min + 0.9 * (v_max - v_min)
384 # Find rising edges
385 rising_times = []
386 falling_times = []
388 for i in range(1, len(data) - 1):
389 if data[i - 1] < threshold_low and data[i + 1] > threshold_high: 389 ↛ 391line 389 didn't jump to line 391 because the condition on line 389 was never true
390 # Rising edge
391 rising_times.append(1 / sample_rate)
392 elif data[i - 1] > threshold_high and data[i + 1] < threshold_low: 392 ↛ 394line 392 didn't jump to line 394 because the condition on line 392 was never true
393 # Falling edge
394 falling_times.append(1 / sample_rate)
396 if rising_times: 396 ↛ 397line 396 didn't jump to line 397 because the condition on line 396 was never true
397 rise_time = float(np.median(rising_times))
398 if falling_times: 398 ↛ 399line 398 didn't jump to line 399 because the condition on line 398 was never true
399 fall_time = float(np.median(falling_times))
401 # Suggest protocols
402 suggested_protocols = _suggest_protocols(signal_type, fundamental_frequency, sample_rate, data)
404 # Collect features
405 features = {
406 "v_min": v_min,
407 "v_max": v_max,
408 "v_mean": v_mean,
409 "v_std": v_std,
410 "crest_factor": v_max / np.sqrt(np.mean(data**2)) if np.mean(data**2) > 0 else 0,
411 "n_peaks": len(peaks),
412 "peak_positions": [p[0] for p in peaks],
413 }
415 return UnknownSignalCharacterization(
416 signal_type=signal_type,
417 is_periodic=is_periodic,
418 fundamental_frequency=fundamental_frequency,
419 dc_offset=dc_offset,
420 amplitude=amplitude,
421 rise_time=rise_time,
422 fall_time=fall_time,
423 suggested_protocols=suggested_protocols,
424 noise_floor=noise_floor,
425 snr_db=snr_db,
426 features=features,
427 )
430def _suggest_protocols(
431 signal_type: str,
432 frequency: float | None,
433 sample_rate: float,
434 data: NDArray[np.float64],
435) -> list[tuple[str, float]]:
436 """Suggest possible protocols based on signal characteristics.
438 Args:
439 signal_type: Signal type (digital/analog/mixed).
440 frequency: Fundamental frequency.
441 sample_rate: Sample rate.
442 data: Signal data.
444 Returns:
445 List of (protocol_name, confidence) tuples.
446 """
447 suggestions = [] # type: ignore[var-annotated]
449 if signal_type != "digital":
450 return suggestions
452 # Estimate bit rate from signal
453 v_min = np.percentile(data, 5)
454 v_max = np.percentile(data, 95)
455 threshold = (v_min + v_max) / 2
456 digital = data > threshold
457 edges = np.where(np.diff(digital.astype(int)) != 0)[0]
459 if len(edges) < 2: 459 ↛ 460line 459 didn't jump to line 460 because the condition on line 459 was never true
460 return suggestions
462 median_gap = np.median(np.diff(edges))
463 estimated_bitrate = sample_rate / median_gap
465 # Check common baud rates for UART
466 uart_rates = [9600, 19200, 38400, 57600, 115200, 230400, 460800, 921600]
467 for rate in uart_rates:
468 ratio = estimated_bitrate / rate
469 if 0.9 <= ratio <= 1.1:
470 suggestions.append(("UART", 0.7 + 0.3 * (1 - abs(1 - ratio))))
471 break
473 # Check for I2C (two-wire, specific timing)
474 if 50e3 <= estimated_bitrate <= 400e3:
475 suggestions.append(("I2C", 0.5))
476 elif 400e3 < estimated_bitrate <= 3.4e6: 476 ↛ 477line 476 didn't jump to line 477 because the condition on line 476 was never true
477 suggestions.append(("I2C Fast Mode", 0.5))
479 # Check for SPI (higher speeds)
480 if estimated_bitrate >= 1e6: 480 ↛ 481line 480 didn't jump to line 481 because the condition on line 480 was never true
481 suggestions.append(("SPI", 0.4))
483 # Check for CAN
484 can_rates = [125e3, 250e3, 500e3, 1e6]
485 for rate in can_rates: # type: ignore[assignment]
486 if 0.9 <= estimated_bitrate / rate <= 1.1: 486 ↛ 487line 486 didn't jump to line 487 because the condition on line 486 was never true
487 suggestions.append(("CAN", 0.6))
488 break
490 # Sort by confidence
491 suggestions.sort(key=lambda x: x[1], reverse=True)
493 return suggestions
496@dataclass
497class PatternFrequencyResult:
498 """Result of pattern frequency analysis.
500 Attributes:
501 patterns: Dictionary of pattern to count.
502 most_common: List of (pattern, count) for most common patterns.
503 entropy: Shannon entropy of pattern distribution.
504 repetition_rate: Rate of pattern repetition.
505 """
507 patterns: dict[tuple[int, ...], int]
508 most_common: list[tuple[tuple[int, ...], int]]
509 entropy: float
510 repetition_rate: float
513def analyze_pattern_frequency(
514 trace: WaveformTrace,
515 *,
516 pattern_length: int = 8,
517 min_occurrences: int = 2,
518) -> PatternFrequencyResult:
519 """Analyze frequency of bit patterns per UNKNOWN-004.
521 Identifies recurring patterns that may indicate protocol structure
522 or data framing.
524 Args:
525 trace: Signal trace to analyze.
526 pattern_length: Length of patterns to search for.
527 min_occurrences: Minimum occurrences to report.
529 Returns:
530 PatternFrequencyResult with pattern statistics.
532 Example:
533 >>> result = analyze_pattern_frequency(trace, pattern_length=8)
534 >>> for pattern, count in result.most_common[:5]:
535 ... print(f"Pattern {pattern}: {count} occurrences")
537 References:
538 UNKNOWN-004: Pattern Frequency Analysis
539 """
540 data = trace.data
542 # Convert to digital
543 v_min = np.percentile(data, 5)
544 v_max = np.percentile(data, 95)
545 threshold = (v_min + v_max) / 2
546 digital = (data > threshold).astype(int)
548 # Find bit boundaries from edges
549 edges = np.where(np.diff(digital) != 0)[0]
551 if len(edges) < 2:
552 return PatternFrequencyResult(
553 patterns={},
554 most_common=[],
555 entropy=0.0,
556 repetition_rate=0.0,
557 )
559 # Estimate bit period
560 median_gap = np.median(np.diff(edges))
562 # Sample at bit centers
563 bits = []
564 sample_pos = edges[0] + median_gap / 2
566 while sample_pos < len(digital):
567 idx = int(sample_pos)
568 if idx < len(digital): 568 ↛ 570line 568 didn't jump to line 570 because the condition on line 568 was always true
569 bits.append(digital[idx])
570 sample_pos += median_gap
572 # Count patterns
573 patterns: dict[tuple[int, ...], int] = {}
575 for i in range(len(bits) - pattern_length + 1):
576 pattern = tuple(bits[i : i + pattern_length])
577 patterns[pattern] = patterns.get(pattern, 0) + 1
579 # Filter by minimum occurrences
580 patterns = {p: c for p, c in patterns.items() if c >= min_occurrences}
582 # Find most common
583 most_common = sorted(patterns.items(), key=lambda x: x[1], reverse=True)[:20]
585 # Calculate entropy
586 total = sum(patterns.values())
587 if total > 0:
588 probs = np.array(list(patterns.values())) / total
589 entropy = -np.sum(probs * np.log2(probs + 1e-10))
590 else:
591 entropy = 0.0
593 # Repetition rate
594 repetition_rate = 1 - len(patterns) / total if total > 0 else 0.0
596 return PatternFrequencyResult(
597 patterns=patterns,
598 most_common=most_common,
599 entropy=entropy,
600 repetition_rate=repetition_rate,
601 )
604@dataclass
605class ReverseEngineeringResult:
606 """Result of reverse engineering workflow.
608 Attributes:
609 signal_char: Signal characterization.
610 binary_fields: Detected binary fields.
611 pattern_analysis: Pattern frequency analysis.
612 protocol_hypothesis: Most likely protocol.
613 confidence: Overall confidence.
614 recommendations: List of next steps.
615 """
617 signal_char: UnknownSignalCharacterization
618 binary_fields: BinaryFieldResult
619 pattern_analysis: PatternFrequencyResult
620 protocol_hypothesis: str
621 confidence: float
622 recommendations: list[str]
625def reverse_engineer_protocol(
626 trace: WaveformTrace,
627) -> ReverseEngineeringResult:
628 """Comprehensive reverse engineering workflow per UNKNOWN-005.
630 Combines all unknown signal analysis techniques to build
631 a hypothesis about the protocol in use.
633 Args:
634 trace: Signal trace to reverse engineer.
636 Returns:
637 ReverseEngineeringResult with comprehensive analysis.
639 Example:
640 >>> result = reverse_engineer_protocol(trace)
641 >>> print(f"Protocol hypothesis: {result.protocol_hypothesis}")
642 >>> print(f"Confidence: {result.confidence:.1%}")
643 >>> for rec in result.recommendations:
644 ... print(f"- {rec}")
646 References:
647 UNKNOWN-005: Reverse Engineering Workflow
648 """
649 # Run all analysis steps
650 signal_char = characterize_unknown_signal(trace)
651 binary_fields = detect_binary_fields(trace)
652 pattern_analysis = analyze_pattern_frequency(trace)
654 # Build hypothesis
655 protocol_hypothesis = "Unknown"
656 confidence = 0.0
658 if signal_char.suggested_protocols:
659 protocol_hypothesis = signal_char.suggested_protocols[0][0]
660 confidence = signal_char.suggested_protocols[0][1]
662 # Generate recommendations
663 recommendations = []
665 if signal_char.signal_type != "digital":
666 recommendations.append("Signal appears analog - check if correct probe/channel")
668 if binary_fields.field_count == 0:
669 recommendations.append("No binary fields detected - try adjusting threshold")
671 if binary_fields.encoding == "manchester":
672 recommendations.append("Manchester encoding detected - common in Ethernet, 1-Wire")
674 if pattern_analysis.repetition_rate > 0.5:
675 recommendations.append(
676 "High pattern repetition - likely periodic protocol (e.g., I2C polling)"
677 )
679 if signal_char.snr_db < 10:
680 recommendations.append("Low SNR - consider using averaging or filtering")
682 if not signal_char.suggested_protocols:
683 recommendations.append("No protocol match - try capturing with different settings")
685 if binary_fields.bit_rate is not None:
686 recommendations.append(f"Estimated bit rate: {binary_fields.bit_rate:.0f} bps")
688 return ReverseEngineeringResult(
689 signal_char=signal_char,
690 binary_fields=binary_fields,
691 pattern_analysis=pattern_analysis,
692 protocol_hypothesis=protocol_hypothesis,
693 confidence=confidence,
694 recommendations=recommendations,
695 )
698__all__ = [
699 "BinaryFieldResult",
700 "PatternFrequencyResult",
701 "ReverseEngineeringResult",
702 "UnknownSignalCharacterization",
703 "analyze_pattern_frequency",
704 "characterize_unknown_signal",
705 "detect_binary_fields",
706 "reverse_engineer_protocol",
707]