Coverage for src / tracekit / discovery / comparison.py: 96%

172 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Intelligent trace comparison for auto-discovery. 

2 

3This module provides automatic trace comparison with alignment, difference 

4detection, and plain-language explanations. 

5 

6 

7Example: 

8 >>> from tracekit.discovery import compare_traces 

9 >>> diff = compare_traces(trace1, trace2) 

10 >>> for d in diff.differences: 

11 ... print(f"{d.category}: {d.description}") 

12 

13References: 

14 TraceKit Auto-Discovery Specification 

15""" 

16 

17from __future__ import annotations 

18 

19from dataclasses import dataclass, field 

20from typing import TYPE_CHECKING, Literal 

21 

22import numpy as np 

23from scipy import signal as sp_signal 

24 

25if TYPE_CHECKING: 

26 from numpy.typing import NDArray 

27 

28 from tracekit.core.types import WaveformTrace 

29 

30 

31@dataclass 

32class Difference: 

33 """Individual difference between traces. 

34 

35 Attributes: 

36 category: Difference category (timing, amplitude, pattern, transitions). 

37 timestamp_us: Timestamp in microseconds. 

38 description: Plain language explanation. 

39 severity: Severity level (CRITICAL, WARNING, INFO). 

40 impact_score: Impact score (0.0-1.0, higher = more severe). 

41 expected_value: Expected value from reference. 

42 actual_value: Actual value from measured trace. 

43 delta_value: Absolute difference. 

44 delta_percent: Percentage difference. 

45 confidence: Confidence in this difference detection. 

46 """ 

47 

48 category: str 

49 timestamp_us: float 

50 description: str 

51 severity: str 

52 impact_score: float 

53 expected_value: float | None = None 

54 actual_value: float | None = None 

55 delta_value: float | None = None 

56 delta_percent: float | None = None 

57 confidence: float = 1.0 

58 

59 

60@dataclass 

61class TraceDiff: 

62 """Result of intelligent trace comparison. 

63 

64 Attributes: 

65 summary: High-level summary of comparison. 

66 alignment_method: Method used to align traces. 

67 similarity_score: Overall similarity (0.0-1.0). 

68 differences: List of detected differences, sorted by impact. 

69 visual_path: Path to generated visual comparison (if created). 

70 stats: Statistical comparison metrics. 

71 """ 

72 

73 summary: str 

74 alignment_method: str 

75 similarity_score: float 

76 differences: list[Difference] = field(default_factory=list) 

77 visual_path: str | None = None 

78 stats: dict | None = None # type: ignore[type-arg] 

79 

80 

81def _align_time_based( 

82 trace1: WaveformTrace, 

83 trace2: WaveformTrace, 

84) -> tuple[NDArray[np.float64], NDArray[np.float64], int]: 

85 """Align traces based on time (sync to t=0). 

86 

87 Args: 

88 trace1: First trace. 

89 trace2: Second trace. 

90 

91 Returns: 

92 Tuple of (data1, data2, offset_samples). 

93 """ 

94 # Simply align to start (t=0) 

95 min_len = min(len(trace1.data), len(trace2.data)) 

96 data1 = trace1.data[:min_len].astype(np.float64) 

97 data2 = trace2.data[:min_len].astype(np.float64) 

98 

99 return data1, data2, 0 

100 

101 

102def _align_trigger_based( 

103 trace1: WaveformTrace, 

104 trace2: WaveformTrace, 

105 threshold_pct: float = 50.0, 

106) -> tuple[NDArray[np.float64], NDArray[np.float64], int]: 

107 """Align traces based on trigger point (first edge). 

108 

109 Args: 

110 trace1: First trace. 

111 trace2: Second trace. 

112 threshold_pct: Threshold percentage for edge detection. 

113 

114 Returns: 

115 Tuple of (data1, data2, offset_samples). 

116 """ 

117 data1 = trace1.data.astype(np.float64) 

118 data2 = trace2.data.astype(np.float64) 

119 

120 # Find first significant edge in each trace 

121 range1 = np.ptp(data1) 

122 range2 = np.ptp(data2) 

123 

124 threshold1 = np.min(data1) + range1 * threshold_pct / 100.0 

125 threshold2 = np.min(data2) + range2 * threshold_pct / 100.0 

126 

127 # Find first crossing 

128 idx1 = np.where(data1 > threshold1)[0] 

129 idx2 = np.where(data2 > threshold2)[0] 

130 

131 offset1 = idx1[0] if len(idx1) > 0 else 0 

132 offset2 = idx2[0] if len(idx2) > 0 else 0 

133 

134 # Align to earliest trigger 

135 if offset1 <= offset2: 

136 offset_samples = offset2 - offset1 

137 data1_aligned = data1[offset1:] 

138 data2_aligned = data2[offset2:] 

139 else: 

140 offset_samples = offset1 - offset2 

141 data1_aligned = data1[offset1:] 

142 data2_aligned = data2[offset2:] 

143 

144 # Truncate to same length 

145 min_len = min(len(data1_aligned), len(data2_aligned)) 

146 return data1_aligned[:min_len], data2_aligned[:min_len], offset_samples 

147 

148 

149def _align_pattern_based( 

150 trace1: WaveformTrace, 

151 trace2: WaveformTrace, 

152) -> tuple[NDArray[np.float64], NDArray[np.float64], int]: 

153 """Align traces using cross-correlation. 

154 

155 Args: 

156 trace1: First trace. 

157 trace2: Second trace. 

158 

159 Returns: 

160 Tuple of (data1, data2, offset_samples). 

161 """ 

162 data1 = trace1.data.astype(np.float64) 

163 data2 = trace2.data.astype(np.float64) 

164 

165 # Normalize for correlation 

166 data1_norm = (data1 - np.mean(data1)) / (np.std(data1) + 1e-10) 

167 data2_norm = (data2 - np.mean(data2)) / (np.std(data2) + 1e-10) 

168 

169 # Cross-correlation 

170 correlation = sp_signal.correlate(data1_norm, data2_norm, mode="full") 

171 

172 # Find peak 

173 peak_idx = np.argmax(np.abs(correlation)) 

174 offset_samples = peak_idx - (len(data2) - 1) 

175 

176 # Align based on offset 

177 if offset_samples >= 0: 

178 data1_aligned = data1[offset_samples:] 

179 data2_aligned = data2 

180 else: 

181 data1_aligned = data1 

182 data2_aligned = data2[-offset_samples:] 

183 

184 # Truncate to same length 

185 min_len = min(len(data1_aligned), len(data2_aligned)) 

186 return data1_aligned[:min_len], data2_aligned[:min_len], int(offset_samples) 

187 

188 

189def _detect_timing_differences( 

190 data1: NDArray[np.float64], 

191 data2: NDArray[np.float64], 

192 sample_rate: float, 

193) -> list[Difference]: 

194 """Detect timing differences between aligned traces. 

195 

196 Args: 

197 data1: First trace data. 

198 data2: Second trace data. 

199 sample_rate: Sample rate in Hz. 

200 

201 Returns: 

202 List of timing differences. 

203 """ 

204 differences = [] 

205 

206 # Look for timing shifts in edges 

207 # Compute derivatives to find edges 

208 diff1 = np.diff(data1) 

209 diff2 = np.diff(data2) 

210 

211 # Find significant edges (> 10% of range per sample) 

212 range1 = np.ptp(data1) 

213 range2 = np.ptp(data2) 

214 

215 edge_threshold1 = range1 * 0.1 

216 edge_threshold2 = range2 * 0.1 

217 

218 edges1 = np.where(np.abs(diff1) > edge_threshold1)[0] 

219 edges2 = np.where(np.abs(diff2) > edge_threshold2)[0] 

220 

221 # Compare edge counts 

222 if abs(len(edges1) - len(edges2)) > 2: 

223 delta_edges = abs(len(edges1) - len(edges2)) 

224 timestamp_us = 0.0 

225 

226 differences.append( 

227 Difference( 

228 category="timing", 

229 timestamp_us=timestamp_us, 

230 description=f"Trace 1 has {len(edges1)} transitions while Trace 2 has {len(edges2)} transitions (difference: {delta_edges})", 

231 severity="WARNING" if delta_edges > 5 else "INFO", 

232 impact_score=min(1.0, delta_edges / 10.0), 

233 confidence=0.90, 

234 ) 

235 ) 

236 

237 return differences 

238 

239 

240def _detect_amplitude_differences( 

241 data1: NDArray[np.float64], 

242 data2: NDArray[np.float64], 

243 sample_rate: float, 

244) -> list[Difference]: 

245 """Detect amplitude differences between aligned traces. 

246 

247 Args: 

248 data1: First trace data. 

249 data2: Second trace data. 

250 sample_rate: Sample rate in Hz. 

251 

252 Returns: 

253 List of amplitude differences. 

254 """ 

255 differences = [] # type: ignore[var-annotated] 

256 

257 # Compute amplitude difference 

258 amp_diff = np.abs(data1 - data2) 

259 ref_range = np.ptp(data2) 

260 

261 if ref_range == 0: 

262 return differences 

263 

264 # Find points with significant amplitude difference 

265 threshold = ref_range * 0.05 # 5% of swing 

266 

267 significant_diffs = np.where(amp_diff > threshold)[0] 

268 

269 if len(significant_diffs) > len(data1) * 0.1: # More than 10% of samples 

270 max_diff_idx = np.argmax(amp_diff) 

271 max_diff = amp_diff[max_diff_idx] 

272 timestamp_us = max_diff_idx / sample_rate * 1e6 

273 

274 delta_percent = (max_diff / ref_range) * 100.0 

275 

276 severity = "CRITICAL" if delta_percent > 20 else "WARNING" if delta_percent > 5 else "INFO" 

277 

278 differences.append( 

279 Difference( 

280 category="amplitude", 

281 timestamp_us=float(timestamp_us), 

282 description=f"Voltage differs by {max_diff:.3f}V ({delta_percent:.1f}% of signal swing)", 

283 severity=severity, 

284 impact_score=min(1.0, delta_percent / 20.0), 

285 expected_value=float(data2[max_diff_idx]), 

286 actual_value=float(data1[max_diff_idx]), 

287 delta_value=float(max_diff), 

288 delta_percent=delta_percent, 

289 confidence=0.95, 

290 ) 

291 ) 

292 

293 return differences 

294 

295 

296def _detect_pattern_differences( 

297 data1: NDArray[np.float64], 

298 data2: NDArray[np.float64], 

299 sample_rate: float, 

300) -> list[Difference]: 

301 """Detect pattern differences between aligned traces. 

302 

303 Args: 

304 data1: First trace data. 

305 data2: Second trace data. 

306 sample_rate: Sample rate in Hz. 

307 

308 Returns: 

309 List of pattern differences. 

310 """ 

311 differences = [] # type: ignore[var-annotated] 

312 

313 # Compute correlation 

314 if len(data1) < 2: 

315 return differences 

316 

317 data1_norm = (data1 - np.mean(data1)) / (np.std(data1) + 1e-10) 

318 data2_norm = (data2 - np.mean(data2)) / (np.std(data2) + 1e-10) 

319 

320 correlation = np.corrcoef(data1_norm, data2_norm)[0, 1] 

321 

322 if correlation < 0.95: 

323 severity = "CRITICAL" if correlation < 0.8 else "WARNING" if correlation < 0.95 else "INFO" 

324 

325 differences.append( 

326 Difference( 

327 category="pattern", 

328 timestamp_us=0.0, 

329 description=f"Signal patterns differ (correlation: {correlation:.2f}, expected: >0.95)", 

330 severity=severity, 

331 impact_score=1.0 - correlation, 

332 confidence=0.88, 

333 ) 

334 ) 

335 

336 return differences 

337 

338 

339def compare_traces( 

340 trace1: WaveformTrace, 

341 trace2: WaveformTrace, 

342 *, 

343 alignment: Literal["time", "trigger", "pattern", "auto"] = "auto", 

344 difference_types: list[str] | None = None, 

345 severity_threshold: str | None = None, 

346) -> TraceDiff: 

347 """Compare traces with intelligent alignment and difference detection. 

348 

349 Automatically aligns traces and identifies timing, amplitude, pattern, 

350 and transition differences with plain-language explanations. 

351 

352 Args: 

353 trace1: First trace (typically measured/actual). 

354 trace2: Second trace (typically reference/expected). 

355 alignment: Alignment method: 

356 - "time": Sync to t=0 

357 - "trigger": Sync to first edge (≥50% swing) 

358 - "pattern": Cross-correlation alignment 

359 - "auto": Try all methods, use best 

360 difference_types: Types to detect (default: all). 

361 severity_threshold: Only return differences at or above this level. 

362 

363 Returns: 

364 TraceDiff with alignment method, differences, and summary. 

365 

366 Example: 

367 >>> diff = compare_traces(measured, golden) 

368 >>> for d in diff.differences[:5]: 

369 ... print(f"{d.severity}: {d.description}") 

370 

371 References: 

372 DISC-004: Intelligent Trace Comparison 

373 """ 

374 difference_types = difference_types or [ 

375 "timing", 

376 "amplitude", 

377 "pattern", 

378 "transitions", 

379 ] 

380 

381 # Try alignment methods 

382 if alignment == "auto": 

383 # Try all methods and pick best correlation 

384 methods = ["time", "trigger", "pattern"] 

385 best_corr = -1 

386 best_method = "time" 

387 best_aligned = None 

388 

389 for method in methods: 

390 if method == "time": 

391 d1, d2, offset = _align_time_based(trace1, trace2) 

392 elif method == "trigger": 

393 d1, d2, offset = _align_trigger_based(trace1, trace2) 

394 else: # pattern 

395 d1, d2, offset = _align_pattern_based(trace1, trace2) 

396 

397 # Compute correlation 

398 if len(d1) > 1: 

399 d1_norm = (d1 - np.mean(d1)) / (np.std(d1) + 1e-10) 

400 d2_norm = (d2 - np.mean(d2)) / (np.std(d2) + 1e-10) 

401 corr = np.corrcoef(d1_norm, d2_norm)[0, 1] 

402 

403 if corr > best_corr: 

404 best_corr = corr 

405 best_method = method 

406 best_aligned = (d1, d2, offset) 

407 

408 data1, data2, offset = best_aligned # type: ignore[misc] 

409 alignment_method = f"{best_method}-based" 

410 else: 

411 # Use specified method 

412 if alignment == "time": 

413 data1, data2, offset = _align_time_based(trace1, trace2) 

414 elif alignment == "trigger": 

415 data1, data2, offset = _align_trigger_based(trace1, trace2) 

416 else: # pattern 

417 data1, data2, offset = _align_pattern_based(trace1, trace2) 

418 

419 alignment_method = f"{alignment}-based" 

420 

421 sample_rate = trace1.metadata.sample_rate 

422 

423 # Detect differences 

424 all_differences = [] 

425 

426 if "timing" in difference_types: 

427 all_differences.extend(_detect_timing_differences(data1, data2, sample_rate)) 

428 

429 if "amplitude" in difference_types: 

430 all_differences.extend(_detect_amplitude_differences(data1, data2, sample_rate)) 

431 

432 if "pattern" in difference_types: 

433 all_differences.extend(_detect_pattern_differences(data1, data2, sample_rate)) 

434 

435 # Sort by impact score (descending) 

436 all_differences.sort(key=lambda d: d.impact_score, reverse=True) 

437 

438 # Filter by severity threshold 

439 if severity_threshold: 

440 severity_order = {"INFO": 0, "WARNING": 1, "CRITICAL": 2} 

441 threshold_level = severity_order.get(severity_threshold, 0) 

442 

443 filtered = [ 

444 d for d in all_differences if severity_order.get(d.severity, 0) >= threshold_level 

445 ] 

446 all_differences = filtered 

447 

448 # Compute similarity score 

449 if len(data1) > 1: 449 ↛ 455line 449 didn't jump to line 455 because the condition on line 449 was always true

450 data1_norm = (data1 - np.mean(data1)) / (np.std(data1) + 1e-10) 

451 data2_norm = (data2 - np.mean(data2)) / (np.std(data2) + 1e-10) 

452 correlation = np.corrcoef(data1_norm, data2_norm)[0, 1] 

453 similarity_score = float((correlation + 1) / 2) # Map [-1,1] to [0,1] 

454 else: 

455 similarity_score = 1.0 if len(data1) == 0 or data1[0] == data2[0] else 0.0 

456 

457 # Build summary 

458 if similarity_score > 0.95: 

459 summary = "Signals are very similar" 

460 elif similarity_score > 0.85: 460 ↛ 461line 460 didn't jump to line 461 because the condition on line 460 was never true

461 summary = "Signals are similar with minor differences" 

462 elif similarity_score > 0.70: 462 ↛ 463line 462 didn't jump to line 463 because the condition on line 462 was never true

463 summary = "Signals show moderate differences" 

464 else: 

465 summary = "Signals are significantly different" 

466 

467 critical_count = sum(1 for d in all_differences if d.severity == "CRITICAL") 

468 warning_count = sum(1 for d in all_differences if d.severity == "WARNING") 

469 

470 if critical_count > 0: 

471 summary += f" ({critical_count} critical issue(s))" 

472 elif warning_count > 0: 472 ↛ 473line 472 didn't jump to line 473 because the condition on line 472 was never true

473 summary += f" ({warning_count} warning(s))" 

474 

475 # Statistics 

476 stats = { 

477 "correlation": float(correlation) if len(data1) > 1 else 1.0, 

478 "rms_error": float(np.sqrt(np.mean((data1 - data2) ** 2))), 

479 "max_deviation": float(np.max(np.abs(data1 - data2))), 

480 "max_deviation_time": float(np.argmax(np.abs(data1 - data2)) / sample_rate), 

481 "avg_timing_offset": float(offset / sample_rate * 1e9), # ns 

482 } 

483 

484 return TraceDiff( 

485 summary=summary, 

486 alignment_method=alignment_method, 

487 similarity_score=similarity_score, 

488 differences=all_differences, 

489 stats=stats, 

490 ) 

491 

492 

493__all__ = [ 

494 "Difference", 

495 "TraceDiff", 

496 "compare_traces", 

497]