Coverage for src / tracekit / inference / protocol.py: 96%
112 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Protocol type auto-detection from signal characteristics.
3This module analyzes edge timing, symbol rate, and idle levels to
4automatically detect serial protocol types.
7Example:
8 >>> import tracekit as tk
9 >>> trace = tk.load('serial_data.wfm')
10 >>> result = tk.detect_protocol(trace)
11 >>> print(f"Protocol: {result['protocol']}")
12 >>> print(f"Confidence: {result['confidence']:.1%}")
14References:
15 UART: TIA-232-F
16 I2C: NXP UM10204
17 SPI: Motorola SPI Block Guide
18 CAN: ISO 11898
19"""
21from __future__ import annotations
23from typing import TYPE_CHECKING, Any
25import numpy as np
27from tracekit.core.exceptions import AnalysisError
29if TYPE_CHECKING:
30 from tracekit.core.types import WaveformTrace
33def detect_protocol(
34 trace: WaveformTrace,
35 *,
36 min_confidence: float = 0.6,
37 return_candidates: bool = False,
38) -> dict[str, Any]:
39 """Auto-detect serial protocol type.
41 Analyzes signal characteristics to identify protocol:
42 - Edge timing and regularity
43 - Symbol rate detection
44 - Idle level analysis
45 - Transition patterns
47 Detects: UART, SPI, I2C, CAN, 1-Wire, Manchester
49 Args:
50 trace: Signal to analyze.
51 min_confidence: Minimum confidence threshold (0-1).
52 return_candidates: If True, return all candidate protocols.
54 Returns:
55 Dictionary containing:
56 - protocol: Detected protocol name
57 - confidence: Detection confidence (0-1)
58 - config: Suggested decoder configuration dict
59 - characteristics: Dict with detected signal characteristics
60 - candidates: List of all candidates (if return_candidates=True)
62 Raises:
63 AnalysisError: If no protocol can be detected with sufficient confidence.
65 Example:
66 >>> trace = tk.load('unknown_serial.wfm')
67 >>> result = tk.detect_protocol(trace, return_candidates=True)
68 >>> print(f"Detected: {result['protocol']}")
69 >>> print(f"Baud rate: {result['config'].get('baud_rate', 'N/A')}")
70 >>> for candidate in result['candidates']:
71 ... print(f" {candidate['protocol']}: {candidate['confidence']:.1%}")
73 References:
74 sigrok Protocol Decoder heuristics
75 UART: Asynchronous, idle high, start bit low
76 I2C: Clock + data, open-drain, pull-up
77 """
78 # Analyze signal characteristics
79 characteristics = _analyze_signal_characteristics(trace)
81 # Score each protocol type
82 candidates = []
84 # UART detection
85 uart_score = _score_uart(characteristics)
86 if uart_score > 0:
87 candidates.append(
88 {
89 "protocol": "UART",
90 "confidence": uart_score,
91 "config": {
92 "baud_rate": characteristics.get("symbol_rate", 115200),
93 "data_bits": 8,
94 "parity": "none",
95 "stop_bits": 1,
96 },
97 }
98 )
100 # SPI detection
101 spi_score = _score_spi(characteristics)
102 if spi_score > 0:
103 candidates.append(
104 {
105 "protocol": "SPI",
106 "confidence": spi_score,
107 "config": {
108 "clock_polarity": 0,
109 "clock_phase": 0,
110 "bit_order": "MSB",
111 },
112 }
113 )
115 # I2C detection
116 i2c_score = _score_i2c(characteristics)
117 if i2c_score > 0:
118 candidates.append(
119 {
120 "protocol": "I2C",
121 "confidence": i2c_score,
122 "config": {
123 "clock_rate": characteristics.get("symbol_rate", 100000),
124 "address_bits": 7,
125 },
126 }
127 )
129 # CAN detection
130 can_score = _score_can(characteristics)
131 if can_score > 0:
132 candidates.append(
133 {
134 "protocol": "CAN",
135 "confidence": can_score,
136 "config": {
137 "baud_rate": characteristics.get("symbol_rate", 500000),
138 "sample_point": 0.75,
139 },
140 }
141 )
143 # Sort by confidence
144 candidates.sort(key=lambda x: x["confidence"], reverse=True) # type: ignore[arg-type, return-value]
146 if not candidates: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true
147 raise AnalysisError(
148 "Could not detect protocol type. Signal may be analog or unsupported protocol."
149 )
151 # Primary detection
152 primary = candidates[0]
154 if primary["confidence"] < min_confidence: # type: ignore[operator]
155 raise AnalysisError(
156 f"Protocol detection confidence too low: {primary['confidence']:.1%} "
157 f"(minimum: {min_confidence:.1%}). Try specifying protocol manually."
158 )
160 # Build result
161 result = {
162 "protocol": primary["protocol"],
163 "confidence": primary["confidence"],
164 "config": primary["config"],
165 "characteristics": characteristics,
166 }
168 if return_candidates:
169 result["candidates"] = candidates
171 return result
174def _analyze_signal_characteristics(trace: WaveformTrace) -> dict[str, Any]:
175 """Analyze signal to extract protocol-relevant characteristics.
177 Args:
178 trace: Signal to analyze.
180 Returns:
181 Dictionary with characteristics.
182 """
183 data = trace.data
184 sample_rate = trace.metadata.sample_rate
186 # Handle empty data
187 if len(data) == 0: 187 ↛ 188line 187 didn't jump to line 188 because the condition on line 187 was never true
188 return {
189 "regularity": 0,
190 "symbol_rate": 0,
191 "idle_level": "low",
192 "duty_cycle": 0,
193 "transition_density": 0,
194 "edge_count": 0,
195 }
197 # Detect edges
198 threshold = (np.max(data) + np.min(data)) / 2
199 digital = data > threshold
200 edges = np.diff(digital.astype(int))
201 edge_indices = np.where(edges != 0)[0]
203 # Edge statistics
204 if len(edge_indices) > 1:
205 edge_times = edge_indices / sample_rate
206 edge_intervals = np.diff(edge_times)
208 # Detect if edges are regular (clock-like) or irregular
209 if len(edge_intervals) > 10:
210 interval_std = np.std(edge_intervals)
211 interval_mean = np.mean(edge_intervals)
212 regularity = 1.0 - min(1.0, interval_std / (interval_mean + 1e-12))
213 else:
214 regularity = 0.5
216 # Estimate symbol rate from edge intervals
217 # For clock-based: symbol_rate = 1 / (2 * edge_interval)
218 # For async: symbol_rate = 1 / min_interval
219 median_interval = np.median(edge_intervals)
220 symbol_rate = 1.0 / median_interval if median_interval > 0 else 0
221 elif len(edge_indices) == 1:
222 # With exactly 1 edge, regularity is indeterminate
223 regularity = 0.5
224 symbol_rate = 0
225 else:
226 # No edges: completely DC signal, no regularity
227 regularity = 0
228 symbol_rate = 0
230 # Idle level (high or low)
231 idle_level = "high" if np.mean(data) > threshold else "low"
233 # Duty cycle
234 duty_cycle = np.sum(digital) / len(digital)
236 # Transition density (edges per second)
237 duration = len(data) / sample_rate
238 transition_density = len(edge_indices) / duration if duration > 0 else 0
240 return {
241 "regularity": regularity,
242 "symbol_rate": symbol_rate,
243 "idle_level": idle_level,
244 "duty_cycle": duty_cycle,
245 "transition_density": transition_density,
246 "edge_count": len(edge_indices),
247 }
250def _score_uart(characteristics: dict[str, Any]) -> float:
251 """Score likelihood of UART protocol.
253 UART characteristics:
254 - Irregular edges (async)
255 - Idle high
256 - Low transition density
258 Args:
259 characteristics: Signal characteristics.
261 Returns:
262 Score from 0 to 1.
263 """
264 score = 0.0
266 # UART is asynchronous - low regularity
267 regularity = characteristics["regularity"]
268 if regularity < 0.3:
269 score += 0.4
270 elif regularity < 0.5:
271 score += 0.2
273 # UART idles high
274 if characteristics["idle_level"] == "high":
275 score += 0.3
277 # UART has moderate transition density
278 density = characteristics["transition_density"]
279 if 1000 < density < 1e6: # Typical UART range
280 score += 0.3
282 # Cap at 0.99 to reflect inherent uncertainty
283 return min(0.99, score)
286def _score_spi(characteristics: dict[str, Any]) -> float:
287 """Score likelihood of SPI protocol.
289 SPI characteristics:
290 - Regular clock edges
291 - ~50% duty cycle
292 - High transition density
294 Args:
295 characteristics: Signal characteristics.
297 Returns:
298 Score from 0 to 1.
299 """
300 score = 0.0
302 # SPI has regular clock - high regularity
303 regularity = characteristics["regularity"]
304 if regularity > 0.8:
305 score += 0.5
306 elif regularity > 0.6: 306 ↛ 307line 306 didn't jump to line 307 because the condition on line 306 was never true
307 score += 0.3
309 # SPI clock typically ~50% duty cycle
310 duty_cycle = characteristics["duty_cycle"]
311 duty_error = abs(duty_cycle - 0.5)
312 if duty_error < 0.1:
313 score += 0.3
315 # SPI has high transition density
316 density = characteristics["transition_density"]
317 if density > 1e5: # High speed
318 score += 0.2
320 # Cap at 0.99 to reflect inherent uncertainty
321 return min(0.99, score)
324def _score_i2c(characteristics: dict[str, Any]) -> float:
325 """Score likelihood of I2C protocol.
327 I2C characteristics:
328 - Clock-like regularity
329 - Idle high (pull-up)
330 - Moderate transition density
332 Args:
333 characteristics: Signal characteristics.
335 Returns:
336 Score from 0 to 1.
337 """
338 score = 0.0
340 # I2C clock has regularity
341 regularity = characteristics["regularity"]
342 if regularity > 0.6:
343 score += 0.4
345 # I2C idles high
346 if characteristics["idle_level"] == "high":
347 score += 0.3
349 # I2C has lower transition density than SPI
350 density = characteristics["transition_density"]
351 if 1e3 < density < 1e6:
352 score += 0.3
354 # Cap at 0.99 to reflect inherent uncertainty
355 return min(0.99, score)
358def _score_can(characteristics: dict[str, Any]) -> float:
359 """Score likelihood of CAN protocol.
361 CAN characteristics:
362 - Irregular edges (NRZ encoding with bit stuffing)
363 - Idle high (recessive)
364 - Moderate to high transition density
366 Args:
367 characteristics: Signal characteristics.
369 Returns:
370 Score from 0 to 1.
371 """
372 score = 0.0
374 # CAN has some irregularity due to bit stuffing
375 regularity = characteristics["regularity"]
376 if 0.3 < regularity < 0.7:
377 score += 0.4
379 # CAN idles high (recessive state)
380 if characteristics["idle_level"] == "high":
381 score += 0.3
383 # CAN has specific baud rates (typically 125k, 250k, 500k, 1M)
384 symbol_rate = characteristics["symbol_rate"]
385 common_rates = [125000, 250000, 500000, 1000000]
386 for rate in common_rates:
387 if abs(symbol_rate - rate) / rate < 0.1: # Within 10%
388 score += 0.3
389 break
391 # Cap at 0.99 to reflect inherent uncertainty
392 return min(0.99, score)
395__all__ = ["detect_protocol"]