Coverage for src / tracekit / exploratory / fuzzy.py: 94%

167 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Fuzzy matching for timing and pattern analysis. 

2 

3This module provides fuzzy matching capabilities for tolerating 

4timing variations and pattern deviations in real-world signals. 

5 

6 

7Example: 

8 >>> from tracekit.exploratory.fuzzy import fuzzy_timing_match 

9 >>> result = fuzzy_timing_match(edges, expected_period=1e-6, tolerance=0.1) 

10 >>> print(f"Match confidence: {result.confidence:.1%}") 

11""" 

12 

13from __future__ import annotations 

14 

15from dataclasses import dataclass 

16from typing import TYPE_CHECKING, Any 

17 

18import numpy as np 

19 

20from tracekit.core.types import WaveformTrace 

21 

22if TYPE_CHECKING: 

23 from numpy.typing import NDArray 

24 

25 

26@dataclass 

27class FuzzyTimingResult: 

28 """Result of fuzzy timing match. 

29 

30 Attributes: 

31 match: True if timing matches within tolerance. 

32 confidence: Match confidence (0.0 to 1.0). 

33 period: Detected period. 

34 deviation: Deviation from expected period. 

35 jitter_rms: RMS timing jitter. 

36 outlier_count: Number of timing outliers. 

37 outlier_indices: Indices of outlier edges. 

38 """ 

39 

40 match: bool 

41 confidence: float 

42 period: float 

43 deviation: float 

44 jitter_rms: float 

45 outlier_count: int 

46 outlier_indices: list[int] 

47 

48 

49def fuzzy_timing_match( 

50 trace_or_edges: WaveformTrace | NDArray[np.float64], 

51 *, 

52 expected_period: float | None = None, 

53 tolerance: float = 0.1, 

54 sample_rate: float | None = None, 

55) -> FuzzyTimingResult: 

56 """Match timing with fuzzy tolerance. 

57 

58 Allows timing variations while still detecting protocol patterns. 

59 Useful for signals with jitter or clock drift. 

60 

61 Args: 

62 trace_or_edges: WaveformTrace or array of edge times. 

63 expected_period: Expected period in seconds. 

64 tolerance: Tolerance as fraction (0.1 = 10%). 

65 sample_rate: Sample rate (required if trace provided). 

66 

67 Returns: 

68 FuzzyTimingResult with match information. 

69 

70 Raises: 

71 ValueError: If sample_rate is invalid when WaveformTrace provided. 

72 

73 Example: 

74 >>> result = fuzzy_timing_match(trace, expected_period=1e-6, tolerance=0.1) 

75 >>> print(f"Period match: {result.match}") 

76 >>> print(f"Actual period: {result.period:.3e} s") 

77 

78 References: 

79 FUZZY-001: Fuzzy Timing Tolerance 

80 """ 

81 # Extract edges if WaveformTrace provided 

82 if isinstance(trace_or_edges, WaveformTrace): 

83 data = trace_or_edges.data 

84 sample_rate = sample_rate or trace_or_edges.metadata.sample_rate 

85 

86 if sample_rate is None or sample_rate <= 0: 86 ↛ 87line 86 didn't jump to line 87 because the condition on line 86 was never true

87 raise ValueError("Valid sample_rate required for WaveformTrace") 

88 

89 # Threshold and find edges 

90 v_min = np.percentile(data, 5) 

91 v_max = np.percentile(data, 95) 

92 threshold = (v_min + v_max) / 2 

93 digital = data > threshold 

94 

95 # Find all transitions (rising and falling edges) 

96 edge_samples = np.where(np.abs(np.diff(digital.astype(int))) > 0)[0] 

97 edges = edge_samples / sample_rate 

98 else: 

99 edges = trace_or_edges 

100 

101 if len(edges) < 2: 

102 return FuzzyTimingResult( 

103 match=False, 

104 confidence=0.0, 

105 period=0.0, 

106 deviation=1.0, 

107 jitter_rms=0.0, 

108 outlier_count=0, 

109 outlier_indices=[], 

110 ) 

111 

112 # Calculate inter-edge intervals 

113 intervals = np.diff(edges) 

114 

115 # Detect period (use median for robustness) 

116 detected_period = np.median(intervals) 

117 

118 # Use expected period if provided, otherwise use detected 

119 if expected_period is None: 

120 expected_period = detected_period 

121 

122 # Calculate deviation 

123 deviation = abs(detected_period - expected_period) / expected_period 

124 

125 # Determine match 

126 match = bool(deviation <= tolerance) 

127 

128 # Calculate jitter 

129 normalized_intervals = intervals / detected_period 

130 jitter_rms = np.std(normalized_intervals - 1.0) * detected_period 

131 

132 # Find outliers 

133 outlier_threshold = expected_period * tolerance * 3 # 3x tolerance 

134 deviations = np.abs(intervals - expected_period) 

135 outlier_mask = deviations > outlier_threshold 

136 outlier_count = int(np.sum(outlier_mask)) 

137 outlier_indices = list(np.where(outlier_mask)[0]) 

138 

139 # Calculate confidence 

140 # Higher confidence for lower deviation and fewer outliers 

141 confidence = max(0.0, 1.0 - deviation / tolerance) 

142 confidence *= max(0.0, 1.0 - outlier_count / max(len(intervals), 1)) 

143 confidence = min(1.0, confidence) 

144 

145 return FuzzyTimingResult( 

146 match=match, 

147 confidence=confidence, 

148 period=detected_period, 

149 deviation=deviation, 

150 jitter_rms=jitter_rms, 

151 outlier_count=outlier_count, 

152 outlier_indices=outlier_indices, 

153 ) 

154 

155 

156@dataclass 

157class FuzzyPatternResult: 

158 """Result of fuzzy pattern match. 

159 

160 Attributes: 

161 matches: List of match locations with scores. 

162 best_match_score: Score of best match. 

163 total_matches: Total number of matches found. 

164 pattern_variations: Common pattern variations found. 

165 """ 

166 

167 matches: list[dict[str, Any]] 

168 best_match_score: float 

169 total_matches: int 

170 pattern_variations: list[tuple[tuple[int, ...], int]] 

171 

172 

173def fuzzy_pattern_match( 

174 trace: WaveformTrace, 

175 pattern: list[int] | tuple[int, ...], 

176 *, 

177 max_errors: int = 1, 

178 error_weight: float = 0.5, 

179) -> FuzzyPatternResult: 

180 """Match pattern with allowed bit errors. 

181 

182 Finds pattern occurrences allowing for bit errors, useful for 

183 noisy signals or partial matches. 

184 

185 Args: 

186 trace: Signal trace to search. 

187 pattern: Bit pattern to find (list of 0s and 1s). 

188 max_errors: Maximum allowed bit errors. 

189 error_weight: Weight reduction per error. 

190 

191 Returns: 

192 FuzzyPatternResult with match locations. 

193 

194 Example: 

195 >>> result = fuzzy_pattern_match(trace, [0, 1, 0, 1, 0, 1], max_errors=1) 

196 >>> print(f"Found {result.total_matches} matches") 

197 >>> for match in result.matches[:5]: 

198 ... print(f" Position {match['position']}: score {match['score']:.2f}") 

199 

200 References: 

201 FUZZY-002: Fuzzy Pattern Matching 

202 """ 

203 pattern = tuple(pattern) 

204 pattern_len = len(pattern) 

205 

206 # Handle empty pattern 

207 if pattern_len == 0: 

208 return FuzzyPatternResult( 

209 matches=[], 

210 best_match_score=0.0, 

211 total_matches=0, 

212 pattern_variations=[], 

213 ) 

214 

215 # Convert trace to digital 

216 data = trace.data 

217 v_min = np.percentile(data, 5) 

218 v_max = np.percentile(data, 95) 

219 threshold = (v_min + v_max) / 2 

220 digital = (data > threshold).astype(int) 

221 

222 # Find edges and sample at bit centers 

223 edges = np.where(np.diff(digital) != 0)[0] 

224 

225 if len(edges) < 2: 

226 return FuzzyPatternResult( 

227 matches=[], 

228 best_match_score=0.0, 

229 total_matches=0, 

230 pattern_variations=[], 

231 ) 

232 

233 # Estimate bit period from edges 

234 # Note: This algorithm works best with signals that have frequent transitions. 

235 # For patterns with long runs of identical bits, edge density may be insufficient 

236 # for accurate bit recovery. Best suited for alternating patterns or noisy signals. 

237 gaps = np.diff(edges) 

238 

239 # Use minimum gap as bit period estimate 

240 # The smallest gap between edges is typically one bit period 

241 estimated_bit_period = float(np.min(gaps)) 

242 

243 # Sample bits at regular intervals 

244 bits_list = [] 

245 sample_pos = edges[0] + estimated_bit_period / 2 

246 

247 while sample_pos < len(digital): 

248 idx = int(sample_pos) 

249 if idx < len(digital): 249 ↛ 251line 249 didn't jump to line 251 because the condition on line 249 was always true

250 bits_list.append(digital[idx]) 

251 sample_pos += estimated_bit_period 

252 

253 bits = np.array(bits_list) 

254 

255 # Search for pattern with fuzzy matching 

256 matches = [] 

257 variations: dict[tuple[int, ...], int] = {} 

258 

259 for i in range(len(bits) - pattern_len + 1): 

260 window = tuple(bits[i : i + pattern_len]) 

261 

262 # Count errors 

263 errors = sum(1 for a, b in zip(window, pattern, strict=False) if a != b) 

264 

265 if errors <= max_errors: 

266 score = 1.0 - errors * error_weight 

267 matches.append( 

268 { 

269 "position": i, 

270 "sample_position": int(edges[0] + i * estimated_bit_period), 

271 "errors": errors, 

272 "score": score, 

273 "actual_pattern": window, 

274 } 

275 ) 

276 

277 # Track variations 

278 if window != pattern: 

279 variations[window] = variations.get(window, 0) + 1 

280 

281 # Sort matches by score 

282 matches.sort(key=lambda x: x["score"], reverse=True) # type: ignore[arg-type, return-value] 

283 

284 best_score = matches[0]["score"] if matches else 0.0 

285 

286 # Sort variations by frequency 

287 variation_list = sorted(variations.items(), key=lambda x: x[1], reverse=True) 

288 

289 return FuzzyPatternResult( 

290 matches=matches, 

291 best_match_score=best_score, # type: ignore[arg-type] 

292 total_matches=len(matches), 

293 pattern_variations=variation_list[:10], 

294 ) 

295 

296 

297@dataclass 

298class FuzzyProtocolResult: 

299 """Result of fuzzy protocol detection. 

300 

301 Attributes: 

302 detected_protocol: Most likely protocol. 

303 confidence: Detection confidence. 

304 alternatives: Alternative protocol candidates. 

305 timing_score: Score based on timing match. 

306 pattern_score: Score based on pattern match. 

307 recommendations: Suggestions for improving detection. 

308 """ 

309 

310 detected_protocol: str 

311 confidence: float 

312 alternatives: list[tuple[str, float]] 

313 timing_score: float 

314 pattern_score: float 

315 recommendations: list[str] 

316 

317 

318# Protocol signatures for fuzzy matching 

319PROTOCOL_SIGNATURES = { 

320 "UART": { 

321 "start_bit": 0, 

322 "stop_bits": 1, 

323 "frame_size": [8, 9, 10, 11], # With start/stop 

324 "typical_rates": [9600, 19200, 38400, 57600, 115200], 

325 }, 

326 "I2C": { 

327 "start_pattern": [1, 0], # SDA falls while SCL high 

328 "stop_pattern": [0, 1], # SDA rises while SCL high 

329 "ack_bit": 0, 

330 "typical_rates": [100e3, 400e3, 1e6, 3.4e6], 

331 }, 

332 "SPI": { 

333 "idle_clock": [0, 1], # CPOL options 

334 "clock_phase": [0, 1], # CPHA options 

335 "frame_size": [8, 16], 

336 "typical_rates": [1e6, 5e6, 10e6, 20e6, 40e6], 

337 }, 

338 "CAN": { 

339 "start_of_frame": 0, 

340 "frame_patterns": ["standard", "extended"], 

341 "typical_rates": [125e3, 250e3, 500e3, 1e6], 

342 }, 

343} 

344 

345 

346def fuzzy_protocol_detect( 

347 trace: WaveformTrace, 

348 *, 

349 candidates: list[str] | None = None, 

350 timing_tolerance: float = 0.15, 

351 pattern_tolerance: int = 2, 

352) -> FuzzyProtocolResult: 

353 """Detect protocol with fuzzy matching. 

354 

355 Uses timing tolerance and pattern flexibility to identify 

356 protocols even with non-ideal signals. 

357 

358 Args: 

359 trace: Signal trace to analyze. 

360 candidates: List of protocols to consider (None = all). 

361 timing_tolerance: Timing tolerance as fraction. 

362 pattern_tolerance: Maximum pattern bit errors. 

363 

364 Returns: 

365 FuzzyProtocolResult with detection results. 

366 

367 Example: 

368 >>> result = fuzzy_protocol_detect(trace) 

369 >>> print(f"Detected: {result.detected_protocol}") 

370 >>> print(f"Confidence: {result.confidence:.1%}") 

371 

372 References: 

373 FUZZY-003: Fuzzy Protocol Detection 

374 """ 

375 data = trace.data 

376 sample_rate = trace.metadata.sample_rate 

377 

378 if candidates is None: 

379 candidates = list(PROTOCOL_SIGNATURES.keys()) 

380 

381 # Analyze signal characteristics 

382 v_min = np.percentile(data, 5) 

383 v_max = np.percentile(data, 95) 

384 threshold = (v_min + v_max) / 2 

385 digital = data > threshold 

386 

387 edges = np.where(np.diff(digital.astype(int)) != 0)[0] 

388 

389 if len(edges) < 4: 

390 return FuzzyProtocolResult( 

391 detected_protocol="Unknown", 

392 confidence=0.0, 

393 alternatives=[], 

394 timing_score=0.0, 

395 pattern_score=0.0, 

396 recommendations=["Insufficient edges for protocol detection"], 

397 ) 

398 

399 # Estimate bit rate 

400 intervals = np.diff(edges) 

401 median_interval = np.median(intervals) 

402 estimated_bitrate = sample_rate / median_interval 

403 

404 # Score each protocol 

405 scores: dict[str, dict[str, float]] = {} 

406 

407 for protocol in candidates: 

408 if protocol not in PROTOCOL_SIGNATURES: 408 ↛ 409line 408 didn't jump to line 409 because the condition on line 408 was never true

409 continue 

410 

411 sig = PROTOCOL_SIGNATURES[protocol] 

412 timing_score = 0.0 

413 pattern_score = 0.0 

414 

415 # Check timing against typical rates 

416 if "typical_rates" in sig: 416 ↛ 426line 416 didn't jump to line 426 because the condition on line 416 was always true

417 rates = sig["typical_rates"] 

418 if hasattr(rates, "__iter__"): 418 ↛ 426line 418 didn't jump to line 426 because the condition on line 418 was always true

419 for rate in rates: 

420 if isinstance(rate, int | float): 420 ↛ 419line 420 didn't jump to line 419 because the condition on line 420 was always true

421 ratio = estimated_bitrate / rate 

422 if (1 - timing_tolerance) <= ratio <= (1 + timing_tolerance): 

423 timing_score = max(timing_score, 1 - abs(1 - ratio) / timing_tolerance) 

424 

425 # Check patterns 

426 if "start_pattern" in sig: 

427 # Sample first few bits 

428 bits = [] 

429 pos = edges[0] + median_interval / 2 

430 for _ in range(4): 

431 if pos < len(digital): 431 ↛ 433line 431 didn't jump to line 433 because the condition on line 431 was always true

432 bits.append(int(digital[int(pos)])) 

433 pos += median_interval 

434 

435 expected = sig["start_pattern"] 

436 if len(bits) >= len(expected): # type: ignore[arg-type] 436 ↛ 446line 436 didn't jump to line 446 because the condition on line 436 was always true

437 errors = sum( 

438 1 # type: ignore[misc] 

439 for a, b in zip(bits[: len(expected)], expected, strict=False) # type: ignore[call-overload, arg-type] 

440 if a != b # type: ignore[misc, call-overload, arg-type] 

441 ) 

442 if errors <= pattern_tolerance: 442 ↛ 446line 442 didn't jump to line 446 because the condition on line 442 was always true

443 pattern_score = 1 - errors * 0.3 

444 

445 # Check frame size 

446 if "frame_size" in sig: 

447 # Estimate frame size from inter-frame gaps 

448 gap_threshold = median_interval * 2 

449 long_gaps = intervals[intervals > gap_threshold] 

450 if len(long_gaps) > 0: 

451 frame_samples = np.median(long_gaps) 

452 frame_bits = frame_samples / median_interval 

453 for valid_size in sig["frame_size"]: # type: ignore[attr-defined] 

454 if abs(frame_bits - valid_size) < 1.5: 454 ↛ 455line 454 didn't jump to line 455 because the condition on line 454 was never true

455 pattern_score = max(pattern_score, 0.7) 

456 

457 scores[protocol] = { 

458 "timing": timing_score, 

459 "pattern": pattern_score, 

460 "total": timing_score * 0.5 + pattern_score * 0.5, 

461 } 

462 

463 # Find best match 

464 if not scores: 464 ↛ 465line 464 didn't jump to line 465 because the condition on line 464 was never true

465 return FuzzyProtocolResult( 

466 detected_protocol="Unknown", 

467 confidence=0.0, 

468 alternatives=[], 

469 timing_score=0.0, 

470 pattern_score=0.0, 

471 recommendations=["No matching protocols found"], 

472 ) 

473 

474 sorted_protocols = sorted(scores.items(), key=lambda x: x[1]["total"], reverse=True) 

475 best_protocol, best_scores = sorted_protocols[0] 

476 

477 confidence = best_scores["total"] 

478 alternatives = [(p, s["total"]) for p, s in sorted_protocols[1:4] if s["total"] > 0.2] 

479 

480 # Generate recommendations 

481 recommendations = [] 

482 

483 if confidence < 0.5: 

484 recommendations.append("Low confidence - verify with protocol-specific decoder") 

485 

486 if best_scores["timing"] > best_scores["pattern"]: 

487 recommendations.append("Timing matched better than patterns - check signal quality") 

488 

489 if best_scores["pattern"] > best_scores["timing"]: 

490 recommendations.append("Patterns matched but timing off - check clock accuracy") 

491 

492 if not alternatives: 

493 recommendations.append("No alternative protocols detected") 

494 

495 return FuzzyProtocolResult( 

496 detected_protocol=best_protocol, 

497 confidence=confidence, 

498 alternatives=alternatives, 

499 timing_score=best_scores["timing"], 

500 pattern_score=best_scores["pattern"], 

501 recommendations=recommendations, 

502 ) 

503 

504 

505__all__ = [ 

506 "PROTOCOL_SIGNATURES", 

507 "FuzzyPatternResult", 

508 "FuzzyProtocolResult", 

509 "FuzzyTimingResult", 

510 "fuzzy_pattern_match", 

511 "fuzzy_protocol_detect", 

512 "fuzzy_timing_match", 

513]