Coverage for src / tracekit / exploratory / parse.py: 86%
109 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Error-tolerant protocol parsing with timestamp correction.
4This module provides robust protocol decoding that continues after errors
5and timestamp correction for jittery captures.
6"""
8from dataclasses import dataclass
9from enum import Enum
10from typing import Any, Literal
12import numpy as np
13from numpy.typing import NDArray
14from scipy import signal
17class ErrorTolerance(Enum):
18 """Error tolerance modes for protocol decoding.
20 Attributes:
21 STRICT: Abort on first error (backward compatible)
22 TOLERANT: Skip error frame, resync, continue (default)
23 PERMISSIVE: Best-effort decode, report all errors
24 """
26 STRICT = "strict"
27 TOLERANT = "tolerant"
28 PERMISSIVE = "permissive"
31@dataclass
32class DecodedFrame:
33 """Decoded protocol frame with error annotation.
35 Attributes:
36 data: Decoded data bytes
37 timestamp: Frame timestamp in seconds
38 valid: Whether frame is valid or has errors
39 error_type: Type of error if invalid (e.g., 'framing', 'parity')
40 position: Byte position in original trace
41 """
43 data: bytes
44 timestamp: float
45 valid: bool
46 error_type: str | None
47 position: int
50@dataclass
51class TimestampCorrection:
52 """Result from timestamp jitter correction.
54 Attributes:
55 corrected_timestamps: Array of corrected timestamps
56 original_jitter_rms: RMS jitter before correction
57 corrected_jitter_rms: RMS jitter after correction
58 reduction_ratio: Jitter reduction factor (before/after)
59 samples_corrected: Number of samples that were adjusted
60 max_correction: Maximum correction applied to any sample
61 """
63 corrected_timestamps: NDArray[np.float64]
64 original_jitter_rms: float
65 corrected_jitter_rms: float
66 reduction_ratio: float
67 samples_corrected: int
68 max_correction: float
71def correct_timestamp_jitter(
72 timestamps: NDArray[np.float64],
73 expected_rate: float,
74 *,
75 method: Literal["lowpass", "pll"] = "lowpass",
76 max_correction_factor: float = 2.0,
77) -> TimestampCorrection:
78 """Correct timestamp jitter using filtering or PLL model.
80 : Compensates for clock jitter in logic analyzer
81 captures (e.g., USB transmission jitter) while preserving phase.
83 Correction constraints (DAQ-003):
84 - Max correction per sample: ±max_correction_factor × expected_period # noqa: RUF002, RUF003
85 - Filter cutoff: expected_rate / 10 (removes 10× jitter frequency) # noqa: RUF002, RUF003
86 - Target reduction: ≥5× for typical USB jitter # noqa: RUF002, RUF003
88 Args:
89 timestamps: Original jittery timestamps in seconds
90 expected_rate: Expected nominal sample rate in Hz
91 method: Correction method ('lowpass' or 'pll')
92 max_correction_factor: Max correction as multiple of period
94 Returns:
95 TimestampCorrection with corrected timestamps and metrics
97 Raises:
98 ValueError: If timestamps array is empty
99 ValueError: If expected_rate <= 0
100 ValueError: If max_correction_factor <= 0
102 Examples:
103 >>> # Correct jittery timestamps from USB logic analyzer
104 >>> import numpy as np
105 >>> timestamps = np.linspace(0, 1e-3, 1000)
106 >>> jitter = np.random.normal(0, 1e-7, 1000) # 100ns jitter
107 >>> jittery = timestamps + jitter
108 >>> result = correct_timestamp_jitter(jittery, expected_rate=1e6)
109 >>> print(f"Jitter reduced by {result.reduction_ratio:.1f}x")
111 References:
112 DAQ-003: Timestamp Jitter Compensation and Clock Correction
113 """
114 if len(timestamps) == 0:
115 raise ValueError("Timestamps array cannot be empty")
117 if expected_rate <= 0:
118 raise ValueError("expected_rate must be positive")
120 if max_correction_factor <= 0:
121 raise ValueError("max_correction_factor must be positive")
123 if len(timestamps) < 3:
124 # Not enough data to filter
125 return TimestampCorrection(
126 corrected_timestamps=timestamps.copy(),
127 original_jitter_rms=0.0,
128 corrected_jitter_rms=0.0,
129 reduction_ratio=1.0,
130 samples_corrected=0,
131 max_correction=0.0,
132 )
134 expected_period = 1.0 / expected_rate
135 max_correction = max_correction_factor * expected_period
137 # Calculate original jitter
138 diffs = np.diff(timestamps)
139 original_jitter = diffs - expected_period
140 original_jitter_rms = float(np.sqrt(np.mean(original_jitter**2)))
142 # If jitter is negligible (below 1 ns), no correction needed
143 # This avoids correcting floating-point rounding errors in perfect timestamps
144 if original_jitter_rms < 1e-9:
145 return TimestampCorrection(
146 corrected_timestamps=timestamps.copy(),
147 original_jitter_rms=original_jitter_rms,
148 corrected_jitter_rms=original_jitter_rms,
149 reduction_ratio=1.0,
150 samples_corrected=0,
151 max_correction=0.0,
152 )
154 if method == "lowpass":
155 # Low-pass filter approach
156 # Design Butterworth filter: cutoff at expected_rate / 10
157 cutoff_freq = expected_rate / 10.0
158 nyquist = 0.5 * expected_rate
160 # Ensure cutoff is valid
161 if cutoff_freq >= nyquist: 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true
162 cutoff_freq = nyquist * 0.8
164 # Design 2nd order Butterworth
165 sos = signal.butter(2, cutoff_freq / nyquist, btype="low", output="sos")
167 # Filter the timestamps
168 # Need to detrend first to avoid edge effects
169 t_mean = np.mean(timestamps)
170 t_detrended = timestamps - t_mean
172 # Apply filter
173 filtered = signal.sosfiltfilt(sos, t_detrended)
174 corrected = filtered + t_mean
176 else: # pll
177 # Phase-locked loop model
178 # Simple PLL: track expected phase and correct deviations
179 corrected = np.zeros_like(timestamps)
180 corrected[0] = timestamps[0]
182 # PLL state
183 phase = 0.0
184 phase_increment = 2 * np.pi * expected_rate
186 for i in range(1, len(timestamps)):
187 # Predict next timestamp based on expected rate
188 predicted = corrected[i - 1] + expected_period
190 # Measure phase error
191 actual = timestamps[i]
192 error = actual - predicted
194 # Apply correction with limiting
195 correction = np.clip(error * 0.5, -max_correction, max_correction)
196 corrected[i] = predicted + correction
198 # Update phase
199 phase += phase_increment * (corrected[i] - corrected[i - 1])
201 # Limit corrections to max_correction
202 corrections = corrected - timestamps
203 exceeded = np.abs(corrections) > max_correction
204 corrections[exceeded] = np.sign(corrections[exceeded]) * max_correction
205 corrected = timestamps + corrections
207 # Calculate corrected jitter
208 corrected_diffs = np.diff(corrected)
209 corrected_jitter = corrected_diffs - expected_period
210 corrected_jitter_rms = float(np.sqrt(np.mean(corrected_jitter**2)))
212 # Calculate metrics
213 samples_corrected = int(np.sum(np.abs(corrections) > 1e-12))
214 max_correction_applied = float(np.max(np.abs(corrections)))
216 # original_jitter_rms is always > 0 here (early return handles negligible jitter)
217 reduction_ratio = original_jitter_rms / max(corrected_jitter_rms, 1e-15)
219 return TimestampCorrection(
220 corrected_timestamps=corrected,
221 original_jitter_rms=original_jitter_rms,
222 corrected_jitter_rms=corrected_jitter_rms,
223 reduction_ratio=reduction_ratio,
224 samples_corrected=samples_corrected,
225 max_correction=max_correction_applied,
226 )
229def decode_with_error_tolerance(
230 data: NDArray[np.uint8],
231 protocol: Literal["uart", "spi", "i2c", "can"],
232 *,
233 tolerance: ErrorTolerance = ErrorTolerance.TOLERANT,
234 **protocol_params: Any,
235) -> list[DecodedFrame]:
236 """Decode protocol with error tolerance and resynchronization.
238 : Continues decoding after framing/parity/stop-bit
239 errors instead of aborting. Applies to all protocol decoders.
241 Error tolerance modes (DAQ-004):
242 - STRICT: Abort on first error (backward compatible)
243 - TOLERANT: Skip error frame, resync, continue (default)
244 - PERMISSIVE: Best-effort decode, report all errors
246 Resynchronization strategies (DAQ-004):
247 - UART: Search for next valid start bit + stop bit pattern
248 - SPI: Re-align on next CS edge
249 - I2C: Search for next START condition
250 - CAN: Wait for recessive bus + next SOF
252 Args:
253 data: Raw protocol data bytes
254 protocol: Protocol type ('uart', 'spi', 'i2c', 'can')
255 tolerance: Error tolerance mode
256 **protocol_params: Protocol-specific parameters (baud, parity, etc.)
258 Returns:
259 List of DecodedFrame objects with data and error annotations
261 Raises:
262 ValueError: If protocol not supported
263 ValueError: If required protocol_params missing
264 Exception: Re-raised in STRICT mode if decoding fails
266 Examples:
267 >>> # Decode UART with error tolerance
268 >>> data = np.array([0xFF, 0x55, 0xAA, 0x00], dtype=np.uint8)
269 >>> frames = decode_with_error_tolerance(
270 ... data, 'uart', tolerance=ErrorTolerance.TOLERANT, baud=9600
271 ... )
272 >>> valid_frames = [f for f in frames if f.valid]
274 References:
275 DAQ-004: Error-Tolerant Protocol Decoding with Resynchronization
276 """
277 if protocol not in ("uart", "spi", "i2c", "can"):
278 raise ValueError(f"Unsupported protocol: {protocol}")
280 frames: list[DecodedFrame] = []
281 pos = 0
283 # Protocol-specific decode logic
284 # This is a simplified implementation showing the error handling pattern
285 # Full protocol decoders are in tracekit.analyzers.protocols
287 if protocol == "uart": 287 ↛ 343line 287 didn't jump to line 343 because the condition on line 287 was always true
288 # UART parameters
289 if "baud" not in protocol_params:
290 raise ValueError("UART requires 'baud' parameter")
292 # Simplified UART frame extraction with error tolerance
293 while pos < len(data):
294 try:
295 # Try to decode frame at current position
296 # This is simplified - real UART decoder would analyze bit timing
298 # Check for valid frame (simplified)
299 if pos + 1 >= len(data):
300 break
302 frame_data = bytes([data[pos]])
303 timestamp = float(pos) / protocol_params["baud"]
305 # Validate frame (simplified - would check start/stop bits)
306 is_valid = True
307 error_type = None
309 # Example: detect framing error (no proper stop bit)
310 if data[pos] == 0xFF: # Example error condition
311 is_valid = False
312 error_type = "framing"
314 frames.append(
315 DecodedFrame(
316 data=frame_data,
317 timestamp=timestamp,
318 valid=is_valid,
319 error_type=error_type,
320 position=pos,
321 )
322 )
324 if not is_valid and tolerance == ErrorTolerance.STRICT:
325 # Strict mode: abort on error
326 break
327 elif not is_valid and tolerance == ErrorTolerance.TOLERANT:
328 # Tolerant: skip error frame, resync
329 # Search for next valid start bit
330 pos += 1
331 # In real implementation, would search for start bit pattern
332 else:
333 # Permissive: record error, continue
334 pos += 1
336 except Exception:
337 if tolerance == ErrorTolerance.STRICT:
338 raise
339 else:
340 # Log error and continue
341 pos += 1
343 elif protocol == "spi":
344 # SPI: Re-align on CS edge
345 # Simplified placeholder
346 pass
348 elif protocol == "i2c":
349 # I2C: Search for START condition
350 # Simplified placeholder
351 pass
353 elif protocol == "can":
354 # CAN: Wait for SOF after error
355 # Simplified placeholder
356 pass
358 return frames