Coverage for src / tracekit / exploratory / parse.py: 86%

109 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Error-tolerant protocol parsing with timestamp correction. 

2 

3 

4This module provides robust protocol decoding that continues after errors 

5and timestamp correction for jittery captures. 

6""" 

7 

8from dataclasses import dataclass 

9from enum import Enum 

10from typing import Any, Literal 

11 

12import numpy as np 

13from numpy.typing import NDArray 

14from scipy import signal 

15 

16 

17class ErrorTolerance(Enum): 

18 """Error tolerance modes for protocol decoding. 

19 

20 Attributes: 

21 STRICT: Abort on first error (backward compatible) 

22 TOLERANT: Skip error frame, resync, continue (default) 

23 PERMISSIVE: Best-effort decode, report all errors 

24 """ 

25 

26 STRICT = "strict" 

27 TOLERANT = "tolerant" 

28 PERMISSIVE = "permissive" 

29 

30 

31@dataclass 

32class DecodedFrame: 

33 """Decoded protocol frame with error annotation. 

34 

35 Attributes: 

36 data: Decoded data bytes 

37 timestamp: Frame timestamp in seconds 

38 valid: Whether frame is valid or has errors 

39 error_type: Type of error if invalid (e.g., 'framing', 'parity') 

40 position: Byte position in original trace 

41 """ 

42 

43 data: bytes 

44 timestamp: float 

45 valid: bool 

46 error_type: str | None 

47 position: int 

48 

49 

50@dataclass 

51class TimestampCorrection: 

52 """Result from timestamp jitter correction. 

53 

54 Attributes: 

55 corrected_timestamps: Array of corrected timestamps 

56 original_jitter_rms: RMS jitter before correction 

57 corrected_jitter_rms: RMS jitter after correction 

58 reduction_ratio: Jitter reduction factor (before/after) 

59 samples_corrected: Number of samples that were adjusted 

60 max_correction: Maximum correction applied to any sample 

61 """ 

62 

63 corrected_timestamps: NDArray[np.float64] 

64 original_jitter_rms: float 

65 corrected_jitter_rms: float 

66 reduction_ratio: float 

67 samples_corrected: int 

68 max_correction: float 

69 

70 

71def correct_timestamp_jitter( 

72 timestamps: NDArray[np.float64], 

73 expected_rate: float, 

74 *, 

75 method: Literal["lowpass", "pll"] = "lowpass", 

76 max_correction_factor: float = 2.0, 

77) -> TimestampCorrection: 

78 """Correct timestamp jitter using filtering or PLL model. 

79 

80 : Compensates for clock jitter in logic analyzer 

81 captures (e.g., USB transmission jitter) while preserving phase. 

82 

83 Correction constraints (DAQ-003): 

84 - Max correction per sample: ±max_correction_factor × expected_period # noqa: RUF002, RUF003 

85 - Filter cutoff: expected_rate / 10 (removes 10× jitter frequency) # noqa: RUF002, RUF003 

86 - Target reduction: ≥5× for typical USB jitter # noqa: RUF002, RUF003 

87 

88 Args: 

89 timestamps: Original jittery timestamps in seconds 

90 expected_rate: Expected nominal sample rate in Hz 

91 method: Correction method ('lowpass' or 'pll') 

92 max_correction_factor: Max correction as multiple of period 

93 

94 Returns: 

95 TimestampCorrection with corrected timestamps and metrics 

96 

97 Raises: 

98 ValueError: If timestamps array is empty 

99 ValueError: If expected_rate <= 0 

100 ValueError: If max_correction_factor <= 0 

101 

102 Examples: 

103 >>> # Correct jittery timestamps from USB logic analyzer 

104 >>> import numpy as np 

105 >>> timestamps = np.linspace(0, 1e-3, 1000) 

106 >>> jitter = np.random.normal(0, 1e-7, 1000) # 100ns jitter 

107 >>> jittery = timestamps + jitter 

108 >>> result = correct_timestamp_jitter(jittery, expected_rate=1e6) 

109 >>> print(f"Jitter reduced by {result.reduction_ratio:.1f}x") 

110 

111 References: 

112 DAQ-003: Timestamp Jitter Compensation and Clock Correction 

113 """ 

114 if len(timestamps) == 0: 

115 raise ValueError("Timestamps array cannot be empty") 

116 

117 if expected_rate <= 0: 

118 raise ValueError("expected_rate must be positive") 

119 

120 if max_correction_factor <= 0: 

121 raise ValueError("max_correction_factor must be positive") 

122 

123 if len(timestamps) < 3: 

124 # Not enough data to filter 

125 return TimestampCorrection( 

126 corrected_timestamps=timestamps.copy(), 

127 original_jitter_rms=0.0, 

128 corrected_jitter_rms=0.0, 

129 reduction_ratio=1.0, 

130 samples_corrected=0, 

131 max_correction=0.0, 

132 ) 

133 

134 expected_period = 1.0 / expected_rate 

135 max_correction = max_correction_factor * expected_period 

136 

137 # Calculate original jitter 

138 diffs = np.diff(timestamps) 

139 original_jitter = diffs - expected_period 

140 original_jitter_rms = float(np.sqrt(np.mean(original_jitter**2))) 

141 

142 # If jitter is negligible (below 1 ns), no correction needed 

143 # This avoids correcting floating-point rounding errors in perfect timestamps 

144 if original_jitter_rms < 1e-9: 

145 return TimestampCorrection( 

146 corrected_timestamps=timestamps.copy(), 

147 original_jitter_rms=original_jitter_rms, 

148 corrected_jitter_rms=original_jitter_rms, 

149 reduction_ratio=1.0, 

150 samples_corrected=0, 

151 max_correction=0.0, 

152 ) 

153 

154 if method == "lowpass": 

155 # Low-pass filter approach 

156 # Design Butterworth filter: cutoff at expected_rate / 10 

157 cutoff_freq = expected_rate / 10.0 

158 nyquist = 0.5 * expected_rate 

159 

160 # Ensure cutoff is valid 

161 if cutoff_freq >= nyquist: 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true

162 cutoff_freq = nyquist * 0.8 

163 

164 # Design 2nd order Butterworth 

165 sos = signal.butter(2, cutoff_freq / nyquist, btype="low", output="sos") 

166 

167 # Filter the timestamps 

168 # Need to detrend first to avoid edge effects 

169 t_mean = np.mean(timestamps) 

170 t_detrended = timestamps - t_mean 

171 

172 # Apply filter 

173 filtered = signal.sosfiltfilt(sos, t_detrended) 

174 corrected = filtered + t_mean 

175 

176 else: # pll 

177 # Phase-locked loop model 

178 # Simple PLL: track expected phase and correct deviations 

179 corrected = np.zeros_like(timestamps) 

180 corrected[0] = timestamps[0] 

181 

182 # PLL state 

183 phase = 0.0 

184 phase_increment = 2 * np.pi * expected_rate 

185 

186 for i in range(1, len(timestamps)): 

187 # Predict next timestamp based on expected rate 

188 predicted = corrected[i - 1] + expected_period 

189 

190 # Measure phase error 

191 actual = timestamps[i] 

192 error = actual - predicted 

193 

194 # Apply correction with limiting 

195 correction = np.clip(error * 0.5, -max_correction, max_correction) 

196 corrected[i] = predicted + correction 

197 

198 # Update phase 

199 phase += phase_increment * (corrected[i] - corrected[i - 1]) 

200 

201 # Limit corrections to max_correction 

202 corrections = corrected - timestamps 

203 exceeded = np.abs(corrections) > max_correction 

204 corrections[exceeded] = np.sign(corrections[exceeded]) * max_correction 

205 corrected = timestamps + corrections 

206 

207 # Calculate corrected jitter 

208 corrected_diffs = np.diff(corrected) 

209 corrected_jitter = corrected_diffs - expected_period 

210 corrected_jitter_rms = float(np.sqrt(np.mean(corrected_jitter**2))) 

211 

212 # Calculate metrics 

213 samples_corrected = int(np.sum(np.abs(corrections) > 1e-12)) 

214 max_correction_applied = float(np.max(np.abs(corrections))) 

215 

216 # original_jitter_rms is always > 0 here (early return handles negligible jitter) 

217 reduction_ratio = original_jitter_rms / max(corrected_jitter_rms, 1e-15) 

218 

219 return TimestampCorrection( 

220 corrected_timestamps=corrected, 

221 original_jitter_rms=original_jitter_rms, 

222 corrected_jitter_rms=corrected_jitter_rms, 

223 reduction_ratio=reduction_ratio, 

224 samples_corrected=samples_corrected, 

225 max_correction=max_correction_applied, 

226 ) 

227 

228 

229def decode_with_error_tolerance( 

230 data: NDArray[np.uint8], 

231 protocol: Literal["uart", "spi", "i2c", "can"], 

232 *, 

233 tolerance: ErrorTolerance = ErrorTolerance.TOLERANT, 

234 **protocol_params: Any, 

235) -> list[DecodedFrame]: 

236 """Decode protocol with error tolerance and resynchronization. 

237 

238 : Continues decoding after framing/parity/stop-bit 

239 errors instead of aborting. Applies to all protocol decoders. 

240 

241 Error tolerance modes (DAQ-004): 

242 - STRICT: Abort on first error (backward compatible) 

243 - TOLERANT: Skip error frame, resync, continue (default) 

244 - PERMISSIVE: Best-effort decode, report all errors 

245 

246 Resynchronization strategies (DAQ-004): 

247 - UART: Search for next valid start bit + stop bit pattern 

248 - SPI: Re-align on next CS edge 

249 - I2C: Search for next START condition 

250 - CAN: Wait for recessive bus + next SOF 

251 

252 Args: 

253 data: Raw protocol data bytes 

254 protocol: Protocol type ('uart', 'spi', 'i2c', 'can') 

255 tolerance: Error tolerance mode 

256 **protocol_params: Protocol-specific parameters (baud, parity, etc.) 

257 

258 Returns: 

259 List of DecodedFrame objects with data and error annotations 

260 

261 Raises: 

262 ValueError: If protocol not supported 

263 ValueError: If required protocol_params missing 

264 Exception: Re-raised in STRICT mode if decoding fails 

265 

266 Examples: 

267 >>> # Decode UART with error tolerance 

268 >>> data = np.array([0xFF, 0x55, 0xAA, 0x00], dtype=np.uint8) 

269 >>> frames = decode_with_error_tolerance( 

270 ... data, 'uart', tolerance=ErrorTolerance.TOLERANT, baud=9600 

271 ... ) 

272 >>> valid_frames = [f for f in frames if f.valid] 

273 

274 References: 

275 DAQ-004: Error-Tolerant Protocol Decoding with Resynchronization 

276 """ 

277 if protocol not in ("uart", "spi", "i2c", "can"): 

278 raise ValueError(f"Unsupported protocol: {protocol}") 

279 

280 frames: list[DecodedFrame] = [] 

281 pos = 0 

282 

283 # Protocol-specific decode logic 

284 # This is a simplified implementation showing the error handling pattern 

285 # Full protocol decoders are in tracekit.analyzers.protocols 

286 

287 if protocol == "uart": 287 ↛ 343line 287 didn't jump to line 343 because the condition on line 287 was always true

288 # UART parameters 

289 if "baud" not in protocol_params: 

290 raise ValueError("UART requires 'baud' parameter") 

291 

292 # Simplified UART frame extraction with error tolerance 

293 while pos < len(data): 

294 try: 

295 # Try to decode frame at current position 

296 # This is simplified - real UART decoder would analyze bit timing 

297 

298 # Check for valid frame (simplified) 

299 if pos + 1 >= len(data): 

300 break 

301 

302 frame_data = bytes([data[pos]]) 

303 timestamp = float(pos) / protocol_params["baud"] 

304 

305 # Validate frame (simplified - would check start/stop bits) 

306 is_valid = True 

307 error_type = None 

308 

309 # Example: detect framing error (no proper stop bit) 

310 if data[pos] == 0xFF: # Example error condition 

311 is_valid = False 

312 error_type = "framing" 

313 

314 frames.append( 

315 DecodedFrame( 

316 data=frame_data, 

317 timestamp=timestamp, 

318 valid=is_valid, 

319 error_type=error_type, 

320 position=pos, 

321 ) 

322 ) 

323 

324 if not is_valid and tolerance == ErrorTolerance.STRICT: 

325 # Strict mode: abort on error 

326 break 

327 elif not is_valid and tolerance == ErrorTolerance.TOLERANT: 

328 # Tolerant: skip error frame, resync 

329 # Search for next valid start bit 

330 pos += 1 

331 # In real implementation, would search for start bit pattern 

332 else: 

333 # Permissive: record error, continue 

334 pos += 1 

335 

336 except Exception: 

337 if tolerance == ErrorTolerance.STRICT: 

338 raise 

339 else: 

340 # Log error and continue 

341 pos += 1 

342 

343 elif protocol == "spi": 

344 # SPI: Re-align on CS edge 

345 # Simplified placeholder 

346 pass 

347 

348 elif protocol == "i2c": 

349 # I2C: Search for START condition 

350 # Simplified placeholder 

351 pass 

352 

353 elif protocol == "can": 

354 # CAN: Wait for SOF after error 

355 # Simplified placeholder 

356 pass 

357 

358 return frames