Coverage for src / tracekit / inference / protocol.py: 96%

112 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Protocol type auto-detection from signal characteristics. 

2 

3This module analyzes edge timing, symbol rate, and idle levels to 

4automatically detect serial protocol types. 

5 

6 

7Example: 

8 >>> import tracekit as tk 

9 >>> trace = tk.load('serial_data.wfm') 

10 >>> result = tk.detect_protocol(trace) 

11 >>> print(f"Protocol: {result['protocol']}") 

12 >>> print(f"Confidence: {result['confidence']:.1%}") 

13 

14References: 

15 UART: TIA-232-F 

16 I2C: NXP UM10204 

17 SPI: Motorola SPI Block Guide 

18 CAN: ISO 11898 

19""" 

20 

21from __future__ import annotations 

22 

23from typing import TYPE_CHECKING, Any 

24 

25import numpy as np 

26 

27from tracekit.core.exceptions import AnalysisError 

28 

29if TYPE_CHECKING: 

30 from tracekit.core.types import WaveformTrace 

31 

32 

33def detect_protocol( 

34 trace: WaveformTrace, 

35 *, 

36 min_confidence: float = 0.6, 

37 return_candidates: bool = False, 

38) -> dict[str, Any]: 

39 """Auto-detect serial protocol type. 

40 

41 Analyzes signal characteristics to identify protocol: 

42 - Edge timing and regularity 

43 - Symbol rate detection 

44 - Idle level analysis 

45 - Transition patterns 

46 

47 Detects: UART, SPI, I2C, CAN, 1-Wire, Manchester 

48 

49 Args: 

50 trace: Signal to analyze. 

51 min_confidence: Minimum confidence threshold (0-1). 

52 return_candidates: If True, return all candidate protocols. 

53 

54 Returns: 

55 Dictionary containing: 

56 - protocol: Detected protocol name 

57 - confidence: Detection confidence (0-1) 

58 - config: Suggested decoder configuration dict 

59 - characteristics: Dict with detected signal characteristics 

60 - candidates: List of all candidates (if return_candidates=True) 

61 

62 Raises: 

63 AnalysisError: If no protocol can be detected with sufficient confidence. 

64 

65 Example: 

66 >>> trace = tk.load('unknown_serial.wfm') 

67 >>> result = tk.detect_protocol(trace, return_candidates=True) 

68 >>> print(f"Detected: {result['protocol']}") 

69 >>> print(f"Baud rate: {result['config'].get('baud_rate', 'N/A')}") 

70 >>> for candidate in result['candidates']: 

71 ... print(f" {candidate['protocol']}: {candidate['confidence']:.1%}") 

72 

73 References: 

74 sigrok Protocol Decoder heuristics 

75 UART: Asynchronous, idle high, start bit low 

76 I2C: Clock + data, open-drain, pull-up 

77 """ 

78 # Analyze signal characteristics 

79 characteristics = _analyze_signal_characteristics(trace) 

80 

81 # Score each protocol type 

82 candidates = [] 

83 

84 # UART detection 

85 uart_score = _score_uart(characteristics) 

86 if uart_score > 0: 

87 candidates.append( 

88 { 

89 "protocol": "UART", 

90 "confidence": uart_score, 

91 "config": { 

92 "baud_rate": characteristics.get("symbol_rate", 115200), 

93 "data_bits": 8, 

94 "parity": "none", 

95 "stop_bits": 1, 

96 }, 

97 } 

98 ) 

99 

100 # SPI detection 

101 spi_score = _score_spi(characteristics) 

102 if spi_score > 0: 

103 candidates.append( 

104 { 

105 "protocol": "SPI", 

106 "confidence": spi_score, 

107 "config": { 

108 "clock_polarity": 0, 

109 "clock_phase": 0, 

110 "bit_order": "MSB", 

111 }, 

112 } 

113 ) 

114 

115 # I2C detection 

116 i2c_score = _score_i2c(characteristics) 

117 if i2c_score > 0: 

118 candidates.append( 

119 { 

120 "protocol": "I2C", 

121 "confidence": i2c_score, 

122 "config": { 

123 "clock_rate": characteristics.get("symbol_rate", 100000), 

124 "address_bits": 7, 

125 }, 

126 } 

127 ) 

128 

129 # CAN detection 

130 can_score = _score_can(characteristics) 

131 if can_score > 0: 

132 candidates.append( 

133 { 

134 "protocol": "CAN", 

135 "confidence": can_score, 

136 "config": { 

137 "baud_rate": characteristics.get("symbol_rate", 500000), 

138 "sample_point": 0.75, 

139 }, 

140 } 

141 ) 

142 

143 # Sort by confidence 

144 candidates.sort(key=lambda x: x["confidence"], reverse=True) # type: ignore[arg-type, return-value] 

145 

146 if not candidates: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true

147 raise AnalysisError( 

148 "Could not detect protocol type. Signal may be analog or unsupported protocol." 

149 ) 

150 

151 # Primary detection 

152 primary = candidates[0] 

153 

154 if primary["confidence"] < min_confidence: # type: ignore[operator] 

155 raise AnalysisError( 

156 f"Protocol detection confidence too low: {primary['confidence']:.1%} " 

157 f"(minimum: {min_confidence:.1%}). Try specifying protocol manually." 

158 ) 

159 

160 # Build result 

161 result = { 

162 "protocol": primary["protocol"], 

163 "confidence": primary["confidence"], 

164 "config": primary["config"], 

165 "characteristics": characteristics, 

166 } 

167 

168 if return_candidates: 

169 result["candidates"] = candidates 

170 

171 return result 

172 

173 

174def _analyze_signal_characteristics(trace: WaveformTrace) -> dict[str, Any]: 

175 """Analyze signal to extract protocol-relevant characteristics. 

176 

177 Args: 

178 trace: Signal to analyze. 

179 

180 Returns: 

181 Dictionary with characteristics. 

182 """ 

183 data = trace.data 

184 sample_rate = trace.metadata.sample_rate 

185 

186 # Handle empty data 

187 if len(data) == 0: 187 ↛ 188line 187 didn't jump to line 188 because the condition on line 187 was never true

188 return { 

189 "regularity": 0, 

190 "symbol_rate": 0, 

191 "idle_level": "low", 

192 "duty_cycle": 0, 

193 "transition_density": 0, 

194 "edge_count": 0, 

195 } 

196 

197 # Detect edges 

198 threshold = (np.max(data) + np.min(data)) / 2 

199 digital = data > threshold 

200 edges = np.diff(digital.astype(int)) 

201 edge_indices = np.where(edges != 0)[0] 

202 

203 # Edge statistics 

204 if len(edge_indices) > 1: 

205 edge_times = edge_indices / sample_rate 

206 edge_intervals = np.diff(edge_times) 

207 

208 # Detect if edges are regular (clock-like) or irregular 

209 if len(edge_intervals) > 10: 

210 interval_std = np.std(edge_intervals) 

211 interval_mean = np.mean(edge_intervals) 

212 regularity = 1.0 - min(1.0, interval_std / (interval_mean + 1e-12)) 

213 else: 

214 regularity = 0.5 

215 

216 # Estimate symbol rate from edge intervals 

217 # For clock-based: symbol_rate = 1 / (2 * edge_interval) 

218 # For async: symbol_rate = 1 / min_interval 

219 median_interval = np.median(edge_intervals) 

220 symbol_rate = 1.0 / median_interval if median_interval > 0 else 0 

221 elif len(edge_indices) == 1: 

222 # With exactly 1 edge, regularity is indeterminate 

223 regularity = 0.5 

224 symbol_rate = 0 

225 else: 

226 # No edges: completely DC signal, no regularity 

227 regularity = 0 

228 symbol_rate = 0 

229 

230 # Idle level (high or low) 

231 idle_level = "high" if np.mean(data) > threshold else "low" 

232 

233 # Duty cycle 

234 duty_cycle = np.sum(digital) / len(digital) 

235 

236 # Transition density (edges per second) 

237 duration = len(data) / sample_rate 

238 transition_density = len(edge_indices) / duration if duration > 0 else 0 

239 

240 return { 

241 "regularity": regularity, 

242 "symbol_rate": symbol_rate, 

243 "idle_level": idle_level, 

244 "duty_cycle": duty_cycle, 

245 "transition_density": transition_density, 

246 "edge_count": len(edge_indices), 

247 } 

248 

249 

250def _score_uart(characteristics: dict[str, Any]) -> float: 

251 """Score likelihood of UART protocol. 

252 

253 UART characteristics: 

254 - Irregular edges (async) 

255 - Idle high 

256 - Low transition density 

257 

258 Args: 

259 characteristics: Signal characteristics. 

260 

261 Returns: 

262 Score from 0 to 1. 

263 """ 

264 score = 0.0 

265 

266 # UART is asynchronous - low regularity 

267 regularity = characteristics["regularity"] 

268 if regularity < 0.3: 

269 score += 0.4 

270 elif regularity < 0.5: 

271 score += 0.2 

272 

273 # UART idles high 

274 if characteristics["idle_level"] == "high": 

275 score += 0.3 

276 

277 # UART has moderate transition density 

278 density = characteristics["transition_density"] 

279 if 1000 < density < 1e6: # Typical UART range 

280 score += 0.3 

281 

282 # Cap at 0.99 to reflect inherent uncertainty 

283 return min(0.99, score) 

284 

285 

286def _score_spi(characteristics: dict[str, Any]) -> float: 

287 """Score likelihood of SPI protocol. 

288 

289 SPI characteristics: 

290 - Regular clock edges 

291 - ~50% duty cycle 

292 - High transition density 

293 

294 Args: 

295 characteristics: Signal characteristics. 

296 

297 Returns: 

298 Score from 0 to 1. 

299 """ 

300 score = 0.0 

301 

302 # SPI has regular clock - high regularity 

303 regularity = characteristics["regularity"] 

304 if regularity > 0.8: 

305 score += 0.5 

306 elif regularity > 0.6: 306 ↛ 307line 306 didn't jump to line 307 because the condition on line 306 was never true

307 score += 0.3 

308 

309 # SPI clock typically ~50% duty cycle 

310 duty_cycle = characteristics["duty_cycle"] 

311 duty_error = abs(duty_cycle - 0.5) 

312 if duty_error < 0.1: 

313 score += 0.3 

314 

315 # SPI has high transition density 

316 density = characteristics["transition_density"] 

317 if density > 1e5: # High speed 

318 score += 0.2 

319 

320 # Cap at 0.99 to reflect inherent uncertainty 

321 return min(0.99, score) 

322 

323 

324def _score_i2c(characteristics: dict[str, Any]) -> float: 

325 """Score likelihood of I2C protocol. 

326 

327 I2C characteristics: 

328 - Clock-like regularity 

329 - Idle high (pull-up) 

330 - Moderate transition density 

331 

332 Args: 

333 characteristics: Signal characteristics. 

334 

335 Returns: 

336 Score from 0 to 1. 

337 """ 

338 score = 0.0 

339 

340 # I2C clock has regularity 

341 regularity = characteristics["regularity"] 

342 if regularity > 0.6: 

343 score += 0.4 

344 

345 # I2C idles high 

346 if characteristics["idle_level"] == "high": 

347 score += 0.3 

348 

349 # I2C has lower transition density than SPI 

350 density = characteristics["transition_density"] 

351 if 1e3 < density < 1e6: 

352 score += 0.3 

353 

354 # Cap at 0.99 to reflect inherent uncertainty 

355 return min(0.99, score) 

356 

357 

358def _score_can(characteristics: dict[str, Any]) -> float: 

359 """Score likelihood of CAN protocol. 

360 

361 CAN characteristics: 

362 - Irregular edges (NRZ encoding with bit stuffing) 

363 - Idle high (recessive) 

364 - Moderate to high transition density 

365 

366 Args: 

367 characteristics: Signal characteristics. 

368 

369 Returns: 

370 Score from 0 to 1. 

371 """ 

372 score = 0.0 

373 

374 # CAN has some irregularity due to bit stuffing 

375 regularity = characteristics["regularity"] 

376 if 0.3 < regularity < 0.7: 

377 score += 0.4 

378 

379 # CAN idles high (recessive state) 

380 if characteristics["idle_level"] == "high": 

381 score += 0.3 

382 

383 # CAN has specific baud rates (typically 125k, 250k, 500k, 1M) 

384 symbol_rate = characteristics["symbol_rate"] 

385 common_rates = [125000, 250000, 500000, 1000000] 

386 for rate in common_rates: 

387 if abs(symbol_rate - rate) / rate < 0.1: # Within 10% 

388 score += 0.3 

389 break 

390 

391 # Cap at 0.99 to reflect inherent uncertainty 

392 return min(0.99, score) 

393 

394 

395__all__ = ["detect_protocol"]