Coverage for src / tracekit / core / cross_domain.py: 20%
131 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Cross-domain correlation for analysis results.
3Enables results from different analysis domains to inform and validate
4each other, improving overall confidence and detecting inconsistencies.
7Example:
8 >>> from tracekit.core.cross_domain import correlate_results
9 >>> from tracekit.reporting.config import AnalysisDomain
10 >>> results = {
11 ... AnalysisDomain.SPECTRAL: {'dominant_frequency': 1000.0},
12 ... AnalysisDomain.TIMING: {'period': 0.001}
13 ... }
14 >>> correlation = correlate_results(results)
15 >>> print(f"Coherence: {correlation.overall_coherence:.2f}")
16 >>> print(f"Agreements: {correlation.agreements_detected}")
17"""
19from __future__ import annotations
21import logging
22from dataclasses import dataclass, field
23from typing import Any
25import numpy as np
27from tracekit.reporting.config import AnalysisDomain
29logger = logging.getLogger(__name__)
32@dataclass
33class CrossDomainInsight:
34 """An insight derived from cross-domain correlation.
36 Attributes:
37 insight_type: Type of insight ("agreement", "conflict", "implication").
38 source_domains: Analysis domains that contributed to this insight.
39 description: Human-readable description of the insight.
40 confidence_impact: How much this affects confidence (-1.0 to +1.0).
41 details: Additional details specific to this insight.
42 """
44 insight_type: str # "agreement", "conflict", "implication"
45 source_domains: list[AnalysisDomain]
46 description: str
47 confidence_impact: float # How much this affects confidence (-1 to +1)
48 details: dict[str, Any] = field(default_factory=dict)
50 def __post_init__(self) -> None:
51 """Validate confidence impact after initialization."""
52 if not -1.0 <= self.confidence_impact <= 1.0:
53 raise ValueError(
54 f"Confidence impact must be in [-1.0, 1.0], got {self.confidence_impact}"
55 )
58@dataclass
59class CorrelationResult:
60 """Result of cross-domain correlation analysis.
62 Attributes:
63 insights: List of discovered cross-domain insights.
64 confidence_adjustments: Per-domain confidence adjustments.
65 conflicts_detected: Number of conflicts found.
66 agreements_detected: Number of agreements found.
67 """
69 insights: list[CrossDomainInsight] = field(default_factory=list)
70 confidence_adjustments: dict[str, float] = field(default_factory=dict)
71 conflicts_detected: int = 0
72 agreements_detected: int = 0
74 @property
75 def overall_coherence(self) -> float:
76 """Calculate overall coherence score (0-1).
78 Returns:
79 Coherence score based on agreement/conflict ratio.
80 """
81 if not self.insights:
82 return 0.5
83 total = self.agreements_detected + self.conflicts_detected
84 if total == 0:
85 return 0.5
86 return self.agreements_detected / total
89# Define which domains are semantically related
90DOMAIN_AFFINITY: dict[AnalysisDomain, list[AnalysisDomain]] = {
91 AnalysisDomain.DIGITAL: [AnalysisDomain.TIMING, AnalysisDomain.PROTOCOLS],
92 AnalysisDomain.TIMING: [AnalysisDomain.DIGITAL, AnalysisDomain.JITTER, AnalysisDomain.SPECTRAL],
93 AnalysisDomain.SPECTRAL: [
94 AnalysisDomain.JITTER,
95 AnalysisDomain.STATISTICS,
96 AnalysisDomain.TIMING,
97 ],
98 AnalysisDomain.WAVEFORM: [AnalysisDomain.STATISTICS],
99 AnalysisDomain.STATISTICS: [AnalysisDomain.SPECTRAL, AnalysisDomain.WAVEFORM],
100 AnalysisDomain.JITTER: [AnalysisDomain.TIMING, AnalysisDomain.EYE, AnalysisDomain.SPECTRAL],
101 AnalysisDomain.EYE: [AnalysisDomain.JITTER, AnalysisDomain.SIGNAL_INTEGRITY],
102 AnalysisDomain.PATTERNS: [AnalysisDomain.PROTOCOLS, AnalysisDomain.INFERENCE],
103 AnalysisDomain.INFERENCE: [AnalysisDomain.PATTERNS, AnalysisDomain.PROTOCOLS],
104 AnalysisDomain.PROTOCOLS: [AnalysisDomain.DIGITAL, AnalysisDomain.PATTERNS],
105}
108class CrossDomainCorrelator:
109 """Correlate results across analysis domains."""
111 def __init__(self, tolerance: float = 0.1):
112 """Initialize correlator.
114 Args:
115 tolerance: Tolerance for value comparisons (fraction).
116 """
117 self.tolerance = tolerance
118 self._correlation_rules = self._build_correlation_rules()
120 def correlate(
121 self,
122 results: dict[AnalysisDomain, dict[str, Any]],
123 ) -> CorrelationResult:
124 """Find correlations between domain results.
126 Args:
127 results: Dictionary mapping domains to their results.
129 Returns:
130 CorrelationResult with insights and adjustments.
131 """
132 correlation_result = CorrelationResult()
134 # Only correlate domains that have results
135 active_domains = [d for d, r in results.items() if r]
137 # Track checked pairs to avoid duplicates
138 checked_pairs: set[tuple[AnalysisDomain, AnalysisDomain]] = set()
140 # Check each pair of related domains
141 for domain in active_domains:
142 related = DOMAIN_AFFINITY.get(domain, [])
143 for related_domain in related:
144 if related_domain in active_domains:
145 # Create canonical pair ordering to avoid duplicates
146 pair_tuple: tuple[AnalysisDomain, AnalysisDomain] = tuple(
147 sorted([domain, related_domain], key=lambda d: d.value)
148 ) # type: ignore[assignment]
149 if pair_tuple not in checked_pairs:
150 checked_pairs.add(pair_tuple)
151 insights = self._correlate_pair(
152 domain, results[domain], related_domain, results[related_domain]
153 )
154 correlation_result.insights.extend(insights)
156 # Count agreements and conflicts
157 for insight in correlation_result.insights:
158 if insight.insight_type == "agreement":
159 correlation_result.agreements_detected += 1
160 elif insight.insight_type == "conflict":
161 correlation_result.conflicts_detected += 1
163 # Calculate confidence adjustments
164 correlation_result.confidence_adjustments = self._calculate_adjustments(
165 correlation_result.insights
166 )
168 return correlation_result
170 def _correlate_pair(
171 self,
172 domain1: AnalysisDomain,
173 results1: dict[str, Any],
174 domain2: AnalysisDomain,
175 results2: dict[str, Any],
176 ) -> list[CrossDomainInsight]:
177 """Correlate a pair of domains."""
178 insights = []
180 # Apply correlation rules
181 for rule in self._correlation_rules:
182 if rule["domains"] == {domain1, domain2} or rule["domains"] == {domain2, domain1}:
183 try:
184 insight = rule["check"](results1, results2, domain1, domain2)
185 if insight:
186 insights.append(insight)
187 except Exception as e:
188 logger.debug(f"Correlation rule failed: {e}")
190 return insights
192 def _build_correlation_rules(self) -> list[dict[str, Any]]:
193 """Build correlation rules for domain pairs."""
194 return [
195 {
196 "domains": {AnalysisDomain.SPECTRAL, AnalysisDomain.TIMING},
197 "check": self._check_frequency_timing_agreement,
198 },
199 {
200 "domains": {AnalysisDomain.DIGITAL, AnalysisDomain.TIMING},
201 "check": self._check_digital_timing_consistency,
202 },
203 {
204 "domains": {AnalysisDomain.JITTER, AnalysisDomain.EYE},
205 "check": self._check_jitter_eye_correlation,
206 },
207 {
208 "domains": {AnalysisDomain.WAVEFORM, AnalysisDomain.STATISTICS},
209 "check": self._check_waveform_stats_consistency,
210 },
211 ]
213 def _check_frequency_timing_agreement(
214 self,
215 results1: dict[str, Any],
216 results2: dict[str, Any],
217 domain1: AnalysisDomain,
218 domain2: AnalysisDomain,
219 ) -> CrossDomainInsight | None:
220 """Check if spectral frequency matches timing period."""
221 # Extract frequency from spectral results
222 spectral_freq = self._extract_value(
223 results1 if domain1 == AnalysisDomain.SPECTRAL else results2,
224 ["dominant_frequency", "peak_frequency", "fundamental"],
225 )
227 # Extract period from timing results
228 timing_period = self._extract_value(
229 results2 if domain2 == AnalysisDomain.TIMING else results1,
230 ["period", "avg_period", "mean_period"],
231 )
233 if spectral_freq and timing_period and spectral_freq > 0 and timing_period > 0:
234 expected_period = 1.0 / spectral_freq
235 ratio = timing_period / expected_period
237 if 0.9 < ratio < 1.1: # Within 10%
238 return CrossDomainInsight(
239 insight_type="agreement",
240 source_domains=[AnalysisDomain.SPECTRAL, AnalysisDomain.TIMING],
241 description=(
242 f"Spectral frequency ({spectral_freq:.1f} Hz) matches "
243 f"timing period ({timing_period:.3e} s)"
244 ),
245 confidence_impact=0.15,
246 details={
247 "spectral_freq": spectral_freq,
248 "timing_period": timing_period,
249 "ratio": ratio,
250 },
251 )
252 elif ratio < 0.5 or ratio > 2.0:
253 return CrossDomainInsight(
254 insight_type="conflict",
255 source_domains=[AnalysisDomain.SPECTRAL, AnalysisDomain.TIMING],
256 description=(
257 f"Spectral frequency ({spectral_freq:.1f} Hz) conflicts with "
258 f"timing period ({timing_period:.3e} s)"
259 ),
260 confidence_impact=-0.2,
261 details={
262 "spectral_freq": spectral_freq,
263 "timing_period": timing_period,
264 "ratio": ratio,
265 },
266 )
268 return None
270 def _check_digital_timing_consistency(
271 self,
272 results1: dict[str, Any],
273 results2: dict[str, Any],
274 domain1: AnalysisDomain,
275 domain2: AnalysisDomain,
276 ) -> CrossDomainInsight | None:
277 """Check if digital edge count matches timing analysis."""
278 digital_results = results1 if domain1 == AnalysisDomain.DIGITAL else results2
279 timing_results = results2 if domain2 == AnalysisDomain.TIMING else results1
281 edge_count = self._extract_value(
282 digital_results, ["edge_count", "num_edges", "transitions"]
283 )
284 timing_edges = self._extract_value(timing_results, ["edge_count", "transitions_detected"])
286 if edge_count and timing_edges:
287 if abs(edge_count - timing_edges) <= 2:
288 return CrossDomainInsight(
289 insight_type="agreement",
290 source_domains=[AnalysisDomain.DIGITAL, AnalysisDomain.TIMING],
291 description=(f"Edge counts agree: Digital={edge_count}, Timing={timing_edges}"),
292 confidence_impact=0.1,
293 )
295 return None
297 def _check_jitter_eye_correlation(
298 self,
299 results1: dict[str, Any],
300 results2: dict[str, Any],
301 domain1: AnalysisDomain,
302 domain2: AnalysisDomain,
303 ) -> CrossDomainInsight | None:
304 """Check jitter vs eye diagram correlation."""
305 jitter_results = results1 if domain1 == AnalysisDomain.JITTER else results2
306 eye_results = results2 if domain2 == AnalysisDomain.EYE else results1
308 total_jitter = self._extract_value(jitter_results, ["total_jitter", "tj", "jitter_pp"])
309 eye_width = self._extract_value(eye_results, ["eye_width", "horizontal_opening"])
311 # High jitter should correlate with narrow eye
312 if total_jitter is not None and eye_width is not None:
313 return CrossDomainInsight(
314 insight_type="implication",
315 source_domains=[AnalysisDomain.JITTER, AnalysisDomain.EYE],
316 description=(f"Jitter ({total_jitter:.2e}) affects eye width ({eye_width:.2e})"),
317 confidence_impact=0.05,
318 details={"jitter": total_jitter, "eye_width": eye_width},
319 )
321 return None
323 def _check_waveform_stats_consistency(
324 self,
325 results1: dict[str, Any],
326 results2: dict[str, Any],
327 domain1: AnalysisDomain,
328 domain2: AnalysisDomain,
329 ) -> CrossDomainInsight | None:
330 """Check waveform measurements vs statistical analysis."""
331 waveform_results = results1 if domain1 == AnalysisDomain.WAVEFORM else results2
332 stats_results = results2 if domain2 == AnalysisDomain.STATISTICS else results1
334 wf_amplitude = self._extract_value(waveform_results, ["amplitude", "vpp", "peak_to_peak"])
335 stats_std = self._extract_value(stats_results, ["std", "standard_deviation", "stdev"])
337 if wf_amplitude and stats_std:
338 # For periodic signals, amplitude ~ 2.83 * std (for sine wave)
339 expected_ratio = wf_amplitude / (2.83 * stats_std) if stats_std > 0 else 0
341 if 0.8 < expected_ratio < 1.2:
342 return CrossDomainInsight(
343 insight_type="agreement",
344 source_domains=[AnalysisDomain.WAVEFORM, AnalysisDomain.STATISTICS],
345 description="Waveform amplitude consistent with statistical std dev",
346 confidence_impact=0.1,
347 )
349 return None
351 def _extract_value(
352 self,
353 results: dict[str, Any],
354 keys: list[str],
355 ) -> float | None:
356 """Extract a value from results using multiple possible keys."""
357 for key in keys:
358 # Try direct key
359 if key in results:
360 val = results[key]
361 if isinstance(val, int | float) and not np.isnan(val):
362 return float(val)
364 # Try nested keys
365 for result_val in results.values():
366 if isinstance(result_val, dict) and key in result_val:
367 val = result_val[key]
368 if isinstance(val, int | float) and not np.isnan(val):
369 return float(val)
371 return None
373 def _calculate_adjustments(
374 self,
375 insights: list[CrossDomainInsight],
376 ) -> dict[str, float]:
377 """Calculate confidence adjustments based on insights."""
378 adjustments: dict[str, float] = {}
380 for insight in insights:
381 for domain in insight.source_domains:
382 domain_key = domain.value
383 current = adjustments.get(domain_key, 0.0)
384 adjustments[domain_key] = current + insight.confidence_impact
386 # Clamp adjustments to [-0.3, +0.3]
387 return {k: max(-0.3, min(0.3, v)) for k, v in adjustments.items()}
390def correlate_results(
391 results: dict[AnalysisDomain, dict[str, Any]],
392 tolerance: float = 0.1,
393) -> CorrelationResult:
394 """Convenience function to correlate domain results.
396 Args:
397 results: Dictionary mapping domains to their results.
398 tolerance: Tolerance for value comparisons.
400 Returns:
401 CorrelationResult with insights.
403 Example:
404 >>> from tracekit.reporting.config import AnalysisDomain
405 >>> results = {
406 ... AnalysisDomain.SPECTRAL: {'dominant_frequency': 1000.0},
407 ... AnalysisDomain.TIMING: {'period': 0.001}
408 ... }
409 >>> correlation = correlate_results(results)
410 >>> print(f"Insights: {len(correlation.insights)}")
411 """
412 correlator = CrossDomainCorrelator(tolerance)
413 return correlator.correlate(results)
416__all__ = [
417 "DOMAIN_AFFINITY",
418 "CorrelationResult",
419 "CrossDomainCorrelator",
420 "CrossDomainInsight",
421 "correlate_results",
422]