Coverage for src / tracekit / discovery / comparison.py: 96%
172 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Intelligent trace comparison for auto-discovery.
3This module provides automatic trace comparison with alignment, difference
4detection, and plain-language explanations.
7Example:
8 >>> from tracekit.discovery import compare_traces
9 >>> diff = compare_traces(trace1, trace2)
10 >>> for d in diff.differences:
11 ... print(f"{d.category}: {d.description}")
13References:
14 TraceKit Auto-Discovery Specification
15"""
17from __future__ import annotations
19from dataclasses import dataclass, field
20from typing import TYPE_CHECKING, Literal
22import numpy as np
23from scipy import signal as sp_signal
25if TYPE_CHECKING:
26 from numpy.typing import NDArray
28 from tracekit.core.types import WaveformTrace
31@dataclass
32class Difference:
33 """Individual difference between traces.
35 Attributes:
36 category: Difference category (timing, amplitude, pattern, transitions).
37 timestamp_us: Timestamp in microseconds.
38 description: Plain language explanation.
39 severity: Severity level (CRITICAL, WARNING, INFO).
40 impact_score: Impact score (0.0-1.0, higher = more severe).
41 expected_value: Expected value from reference.
42 actual_value: Actual value from measured trace.
43 delta_value: Absolute difference.
44 delta_percent: Percentage difference.
45 confidence: Confidence in this difference detection.
46 """
48 category: str
49 timestamp_us: float
50 description: str
51 severity: str
52 impact_score: float
53 expected_value: float | None = None
54 actual_value: float | None = None
55 delta_value: float | None = None
56 delta_percent: float | None = None
57 confidence: float = 1.0
60@dataclass
61class TraceDiff:
62 """Result of intelligent trace comparison.
64 Attributes:
65 summary: High-level summary of comparison.
66 alignment_method: Method used to align traces.
67 similarity_score: Overall similarity (0.0-1.0).
68 differences: List of detected differences, sorted by impact.
69 visual_path: Path to generated visual comparison (if created).
70 stats: Statistical comparison metrics.
71 """
73 summary: str
74 alignment_method: str
75 similarity_score: float
76 differences: list[Difference] = field(default_factory=list)
77 visual_path: str | None = None
78 stats: dict | None = None # type: ignore[type-arg]
81def _align_time_based(
82 trace1: WaveformTrace,
83 trace2: WaveformTrace,
84) -> tuple[NDArray[np.float64], NDArray[np.float64], int]:
85 """Align traces based on time (sync to t=0).
87 Args:
88 trace1: First trace.
89 trace2: Second trace.
91 Returns:
92 Tuple of (data1, data2, offset_samples).
93 """
94 # Simply align to start (t=0)
95 min_len = min(len(trace1.data), len(trace2.data))
96 data1 = trace1.data[:min_len].astype(np.float64)
97 data2 = trace2.data[:min_len].astype(np.float64)
99 return data1, data2, 0
102def _align_trigger_based(
103 trace1: WaveformTrace,
104 trace2: WaveformTrace,
105 threshold_pct: float = 50.0,
106) -> tuple[NDArray[np.float64], NDArray[np.float64], int]:
107 """Align traces based on trigger point (first edge).
109 Args:
110 trace1: First trace.
111 trace2: Second trace.
112 threshold_pct: Threshold percentage for edge detection.
114 Returns:
115 Tuple of (data1, data2, offset_samples).
116 """
117 data1 = trace1.data.astype(np.float64)
118 data2 = trace2.data.astype(np.float64)
120 # Find first significant edge in each trace
121 range1 = np.ptp(data1)
122 range2 = np.ptp(data2)
124 threshold1 = np.min(data1) + range1 * threshold_pct / 100.0
125 threshold2 = np.min(data2) + range2 * threshold_pct / 100.0
127 # Find first crossing
128 idx1 = np.where(data1 > threshold1)[0]
129 idx2 = np.where(data2 > threshold2)[0]
131 offset1 = idx1[0] if len(idx1) > 0 else 0
132 offset2 = idx2[0] if len(idx2) > 0 else 0
134 # Align to earliest trigger
135 if offset1 <= offset2:
136 offset_samples = offset2 - offset1
137 data1_aligned = data1[offset1:]
138 data2_aligned = data2[offset2:]
139 else:
140 offset_samples = offset1 - offset2
141 data1_aligned = data1[offset1:]
142 data2_aligned = data2[offset2:]
144 # Truncate to same length
145 min_len = min(len(data1_aligned), len(data2_aligned))
146 return data1_aligned[:min_len], data2_aligned[:min_len], offset_samples
149def _align_pattern_based(
150 trace1: WaveformTrace,
151 trace2: WaveformTrace,
152) -> tuple[NDArray[np.float64], NDArray[np.float64], int]:
153 """Align traces using cross-correlation.
155 Args:
156 trace1: First trace.
157 trace2: Second trace.
159 Returns:
160 Tuple of (data1, data2, offset_samples).
161 """
162 data1 = trace1.data.astype(np.float64)
163 data2 = trace2.data.astype(np.float64)
165 # Normalize for correlation
166 data1_norm = (data1 - np.mean(data1)) / (np.std(data1) + 1e-10)
167 data2_norm = (data2 - np.mean(data2)) / (np.std(data2) + 1e-10)
169 # Cross-correlation
170 correlation = sp_signal.correlate(data1_norm, data2_norm, mode="full")
172 # Find peak
173 peak_idx = np.argmax(np.abs(correlation))
174 offset_samples = peak_idx - (len(data2) - 1)
176 # Align based on offset
177 if offset_samples >= 0:
178 data1_aligned = data1[offset_samples:]
179 data2_aligned = data2
180 else:
181 data1_aligned = data1
182 data2_aligned = data2[-offset_samples:]
184 # Truncate to same length
185 min_len = min(len(data1_aligned), len(data2_aligned))
186 return data1_aligned[:min_len], data2_aligned[:min_len], int(offset_samples)
189def _detect_timing_differences(
190 data1: NDArray[np.float64],
191 data2: NDArray[np.float64],
192 sample_rate: float,
193) -> list[Difference]:
194 """Detect timing differences between aligned traces.
196 Args:
197 data1: First trace data.
198 data2: Second trace data.
199 sample_rate: Sample rate in Hz.
201 Returns:
202 List of timing differences.
203 """
204 differences = []
206 # Look for timing shifts in edges
207 # Compute derivatives to find edges
208 diff1 = np.diff(data1)
209 diff2 = np.diff(data2)
211 # Find significant edges (> 10% of range per sample)
212 range1 = np.ptp(data1)
213 range2 = np.ptp(data2)
215 edge_threshold1 = range1 * 0.1
216 edge_threshold2 = range2 * 0.1
218 edges1 = np.where(np.abs(diff1) > edge_threshold1)[0]
219 edges2 = np.where(np.abs(diff2) > edge_threshold2)[0]
221 # Compare edge counts
222 if abs(len(edges1) - len(edges2)) > 2:
223 delta_edges = abs(len(edges1) - len(edges2))
224 timestamp_us = 0.0
226 differences.append(
227 Difference(
228 category="timing",
229 timestamp_us=timestamp_us,
230 description=f"Trace 1 has {len(edges1)} transitions while Trace 2 has {len(edges2)} transitions (difference: {delta_edges})",
231 severity="WARNING" if delta_edges > 5 else "INFO",
232 impact_score=min(1.0, delta_edges / 10.0),
233 confidence=0.90,
234 )
235 )
237 return differences
240def _detect_amplitude_differences(
241 data1: NDArray[np.float64],
242 data2: NDArray[np.float64],
243 sample_rate: float,
244) -> list[Difference]:
245 """Detect amplitude differences between aligned traces.
247 Args:
248 data1: First trace data.
249 data2: Second trace data.
250 sample_rate: Sample rate in Hz.
252 Returns:
253 List of amplitude differences.
254 """
255 differences = [] # type: ignore[var-annotated]
257 # Compute amplitude difference
258 amp_diff = np.abs(data1 - data2)
259 ref_range = np.ptp(data2)
261 if ref_range == 0:
262 return differences
264 # Find points with significant amplitude difference
265 threshold = ref_range * 0.05 # 5% of swing
267 significant_diffs = np.where(amp_diff > threshold)[0]
269 if len(significant_diffs) > len(data1) * 0.1: # More than 10% of samples
270 max_diff_idx = np.argmax(amp_diff)
271 max_diff = amp_diff[max_diff_idx]
272 timestamp_us = max_diff_idx / sample_rate * 1e6
274 delta_percent = (max_diff / ref_range) * 100.0
276 severity = "CRITICAL" if delta_percent > 20 else "WARNING" if delta_percent > 5 else "INFO"
278 differences.append(
279 Difference(
280 category="amplitude",
281 timestamp_us=float(timestamp_us),
282 description=f"Voltage differs by {max_diff:.3f}V ({delta_percent:.1f}% of signal swing)",
283 severity=severity,
284 impact_score=min(1.0, delta_percent / 20.0),
285 expected_value=float(data2[max_diff_idx]),
286 actual_value=float(data1[max_diff_idx]),
287 delta_value=float(max_diff),
288 delta_percent=delta_percent,
289 confidence=0.95,
290 )
291 )
293 return differences
296def _detect_pattern_differences(
297 data1: NDArray[np.float64],
298 data2: NDArray[np.float64],
299 sample_rate: float,
300) -> list[Difference]:
301 """Detect pattern differences between aligned traces.
303 Args:
304 data1: First trace data.
305 data2: Second trace data.
306 sample_rate: Sample rate in Hz.
308 Returns:
309 List of pattern differences.
310 """
311 differences = [] # type: ignore[var-annotated]
313 # Compute correlation
314 if len(data1) < 2:
315 return differences
317 data1_norm = (data1 - np.mean(data1)) / (np.std(data1) + 1e-10)
318 data2_norm = (data2 - np.mean(data2)) / (np.std(data2) + 1e-10)
320 correlation = np.corrcoef(data1_norm, data2_norm)[0, 1]
322 if correlation < 0.95:
323 severity = "CRITICAL" if correlation < 0.8 else "WARNING" if correlation < 0.95 else "INFO"
325 differences.append(
326 Difference(
327 category="pattern",
328 timestamp_us=0.0,
329 description=f"Signal patterns differ (correlation: {correlation:.2f}, expected: >0.95)",
330 severity=severity,
331 impact_score=1.0 - correlation,
332 confidence=0.88,
333 )
334 )
336 return differences
339def compare_traces(
340 trace1: WaveformTrace,
341 trace2: WaveformTrace,
342 *,
343 alignment: Literal["time", "trigger", "pattern", "auto"] = "auto",
344 difference_types: list[str] | None = None,
345 severity_threshold: str | None = None,
346) -> TraceDiff:
347 """Compare traces with intelligent alignment and difference detection.
349 Automatically aligns traces and identifies timing, amplitude, pattern,
350 and transition differences with plain-language explanations.
352 Args:
353 trace1: First trace (typically measured/actual).
354 trace2: Second trace (typically reference/expected).
355 alignment: Alignment method:
356 - "time": Sync to t=0
357 - "trigger": Sync to first edge (≥50% swing)
358 - "pattern": Cross-correlation alignment
359 - "auto": Try all methods, use best
360 difference_types: Types to detect (default: all).
361 severity_threshold: Only return differences at or above this level.
363 Returns:
364 TraceDiff with alignment method, differences, and summary.
366 Example:
367 >>> diff = compare_traces(measured, golden)
368 >>> for d in diff.differences[:5]:
369 ... print(f"{d.severity}: {d.description}")
371 References:
372 DISC-004: Intelligent Trace Comparison
373 """
374 difference_types = difference_types or [
375 "timing",
376 "amplitude",
377 "pattern",
378 "transitions",
379 ]
381 # Try alignment methods
382 if alignment == "auto":
383 # Try all methods and pick best correlation
384 methods = ["time", "trigger", "pattern"]
385 best_corr = -1
386 best_method = "time"
387 best_aligned = None
389 for method in methods:
390 if method == "time":
391 d1, d2, offset = _align_time_based(trace1, trace2)
392 elif method == "trigger":
393 d1, d2, offset = _align_trigger_based(trace1, trace2)
394 else: # pattern
395 d1, d2, offset = _align_pattern_based(trace1, trace2)
397 # Compute correlation
398 if len(d1) > 1:
399 d1_norm = (d1 - np.mean(d1)) / (np.std(d1) + 1e-10)
400 d2_norm = (d2 - np.mean(d2)) / (np.std(d2) + 1e-10)
401 corr = np.corrcoef(d1_norm, d2_norm)[0, 1]
403 if corr > best_corr:
404 best_corr = corr
405 best_method = method
406 best_aligned = (d1, d2, offset)
408 data1, data2, offset = best_aligned # type: ignore[misc]
409 alignment_method = f"{best_method}-based"
410 else:
411 # Use specified method
412 if alignment == "time":
413 data1, data2, offset = _align_time_based(trace1, trace2)
414 elif alignment == "trigger":
415 data1, data2, offset = _align_trigger_based(trace1, trace2)
416 else: # pattern
417 data1, data2, offset = _align_pattern_based(trace1, trace2)
419 alignment_method = f"{alignment}-based"
421 sample_rate = trace1.metadata.sample_rate
423 # Detect differences
424 all_differences = []
426 if "timing" in difference_types:
427 all_differences.extend(_detect_timing_differences(data1, data2, sample_rate))
429 if "amplitude" in difference_types:
430 all_differences.extend(_detect_amplitude_differences(data1, data2, sample_rate))
432 if "pattern" in difference_types:
433 all_differences.extend(_detect_pattern_differences(data1, data2, sample_rate))
435 # Sort by impact score (descending)
436 all_differences.sort(key=lambda d: d.impact_score, reverse=True)
438 # Filter by severity threshold
439 if severity_threshold:
440 severity_order = {"INFO": 0, "WARNING": 1, "CRITICAL": 2}
441 threshold_level = severity_order.get(severity_threshold, 0)
443 filtered = [
444 d for d in all_differences if severity_order.get(d.severity, 0) >= threshold_level
445 ]
446 all_differences = filtered
448 # Compute similarity score
449 if len(data1) > 1: 449 ↛ 455line 449 didn't jump to line 455 because the condition on line 449 was always true
450 data1_norm = (data1 - np.mean(data1)) / (np.std(data1) + 1e-10)
451 data2_norm = (data2 - np.mean(data2)) / (np.std(data2) + 1e-10)
452 correlation = np.corrcoef(data1_norm, data2_norm)[0, 1]
453 similarity_score = float((correlation + 1) / 2) # Map [-1,1] to [0,1]
454 else:
455 similarity_score = 1.0 if len(data1) == 0 or data1[0] == data2[0] else 0.0
457 # Build summary
458 if similarity_score > 0.95:
459 summary = "Signals are very similar"
460 elif similarity_score > 0.85: 460 ↛ 461line 460 didn't jump to line 461 because the condition on line 460 was never true
461 summary = "Signals are similar with minor differences"
462 elif similarity_score > 0.70: 462 ↛ 463line 462 didn't jump to line 463 because the condition on line 462 was never true
463 summary = "Signals show moderate differences"
464 else:
465 summary = "Signals are significantly different"
467 critical_count = sum(1 for d in all_differences if d.severity == "CRITICAL")
468 warning_count = sum(1 for d in all_differences if d.severity == "WARNING")
470 if critical_count > 0:
471 summary += f" ({critical_count} critical issue(s))"
472 elif warning_count > 0: 472 ↛ 473line 472 didn't jump to line 473 because the condition on line 472 was never true
473 summary += f" ({warning_count} warning(s))"
475 # Statistics
476 stats = {
477 "correlation": float(correlation) if len(data1) > 1 else 1.0,
478 "rms_error": float(np.sqrt(np.mean((data1 - data2) ** 2))),
479 "max_deviation": float(np.max(np.abs(data1 - data2))),
480 "max_deviation_time": float(np.argmax(np.abs(data1 - data2)) / sample_rate),
481 "avg_timing_offset": float(offset / sample_rate * 1e9), # ns
482 }
484 return TraceDiff(
485 summary=summary,
486 alignment_method=alignment_method,
487 similarity_score=similarity_score,
488 differences=all_differences,
489 stats=stats,
490 )
493__all__ = [
494 "Difference",
495 "TraceDiff",
496 "compare_traces",
497]