Coverage for src / tracekit / core / cross_domain.py: 20%

131 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Cross-domain correlation for analysis results. 

2 

3Enables results from different analysis domains to inform and validate 

4each other, improving overall confidence and detecting inconsistencies. 

5 

6 

7Example: 

8 >>> from tracekit.core.cross_domain import correlate_results 

9 >>> from tracekit.reporting.config import AnalysisDomain 

10 >>> results = { 

11 ... AnalysisDomain.SPECTRAL: {'dominant_frequency': 1000.0}, 

12 ... AnalysisDomain.TIMING: {'period': 0.001} 

13 ... } 

14 >>> correlation = correlate_results(results) 

15 >>> print(f"Coherence: {correlation.overall_coherence:.2f}") 

16 >>> print(f"Agreements: {correlation.agreements_detected}") 

17""" 

18 

19from __future__ import annotations 

20 

21import logging 

22from dataclasses import dataclass, field 

23from typing import Any 

24 

25import numpy as np 

26 

27from tracekit.reporting.config import AnalysisDomain 

28 

29logger = logging.getLogger(__name__) 

30 

31 

32@dataclass 

33class CrossDomainInsight: 

34 """An insight derived from cross-domain correlation. 

35 

36 Attributes: 

37 insight_type: Type of insight ("agreement", "conflict", "implication"). 

38 source_domains: Analysis domains that contributed to this insight. 

39 description: Human-readable description of the insight. 

40 confidence_impact: How much this affects confidence (-1.0 to +1.0). 

41 details: Additional details specific to this insight. 

42 """ 

43 

44 insight_type: str # "agreement", "conflict", "implication" 

45 source_domains: list[AnalysisDomain] 

46 description: str 

47 confidence_impact: float # How much this affects confidence (-1 to +1) 

48 details: dict[str, Any] = field(default_factory=dict) 

49 

50 def __post_init__(self) -> None: 

51 """Validate confidence impact after initialization.""" 

52 if not -1.0 <= self.confidence_impact <= 1.0: 

53 raise ValueError( 

54 f"Confidence impact must be in [-1.0, 1.0], got {self.confidence_impact}" 

55 ) 

56 

57 

58@dataclass 

59class CorrelationResult: 

60 """Result of cross-domain correlation analysis. 

61 

62 Attributes: 

63 insights: List of discovered cross-domain insights. 

64 confidence_adjustments: Per-domain confidence adjustments. 

65 conflicts_detected: Number of conflicts found. 

66 agreements_detected: Number of agreements found. 

67 """ 

68 

69 insights: list[CrossDomainInsight] = field(default_factory=list) 

70 confidence_adjustments: dict[str, float] = field(default_factory=dict) 

71 conflicts_detected: int = 0 

72 agreements_detected: int = 0 

73 

74 @property 

75 def overall_coherence(self) -> float: 

76 """Calculate overall coherence score (0-1). 

77 

78 Returns: 

79 Coherence score based on agreement/conflict ratio. 

80 """ 

81 if not self.insights: 

82 return 0.5 

83 total = self.agreements_detected + self.conflicts_detected 

84 if total == 0: 

85 return 0.5 

86 return self.agreements_detected / total 

87 

88 

89# Define which domains are semantically related 

90DOMAIN_AFFINITY: dict[AnalysisDomain, list[AnalysisDomain]] = { 

91 AnalysisDomain.DIGITAL: [AnalysisDomain.TIMING, AnalysisDomain.PROTOCOLS], 

92 AnalysisDomain.TIMING: [AnalysisDomain.DIGITAL, AnalysisDomain.JITTER, AnalysisDomain.SPECTRAL], 

93 AnalysisDomain.SPECTRAL: [ 

94 AnalysisDomain.JITTER, 

95 AnalysisDomain.STATISTICS, 

96 AnalysisDomain.TIMING, 

97 ], 

98 AnalysisDomain.WAVEFORM: [AnalysisDomain.STATISTICS], 

99 AnalysisDomain.STATISTICS: [AnalysisDomain.SPECTRAL, AnalysisDomain.WAVEFORM], 

100 AnalysisDomain.JITTER: [AnalysisDomain.TIMING, AnalysisDomain.EYE, AnalysisDomain.SPECTRAL], 

101 AnalysisDomain.EYE: [AnalysisDomain.JITTER, AnalysisDomain.SIGNAL_INTEGRITY], 

102 AnalysisDomain.PATTERNS: [AnalysisDomain.PROTOCOLS, AnalysisDomain.INFERENCE], 

103 AnalysisDomain.INFERENCE: [AnalysisDomain.PATTERNS, AnalysisDomain.PROTOCOLS], 

104 AnalysisDomain.PROTOCOLS: [AnalysisDomain.DIGITAL, AnalysisDomain.PATTERNS], 

105} 

106 

107 

108class CrossDomainCorrelator: 

109 """Correlate results across analysis domains.""" 

110 

111 def __init__(self, tolerance: float = 0.1): 

112 """Initialize correlator. 

113 

114 Args: 

115 tolerance: Tolerance for value comparisons (fraction). 

116 """ 

117 self.tolerance = tolerance 

118 self._correlation_rules = self._build_correlation_rules() 

119 

120 def correlate( 

121 self, 

122 results: dict[AnalysisDomain, dict[str, Any]], 

123 ) -> CorrelationResult: 

124 """Find correlations between domain results. 

125 

126 Args: 

127 results: Dictionary mapping domains to their results. 

128 

129 Returns: 

130 CorrelationResult with insights and adjustments. 

131 """ 

132 correlation_result = CorrelationResult() 

133 

134 # Only correlate domains that have results 

135 active_domains = [d for d, r in results.items() if r] 

136 

137 # Track checked pairs to avoid duplicates 

138 checked_pairs: set[tuple[AnalysisDomain, AnalysisDomain]] = set() 

139 

140 # Check each pair of related domains 

141 for domain in active_domains: 

142 related = DOMAIN_AFFINITY.get(domain, []) 

143 for related_domain in related: 

144 if related_domain in active_domains: 

145 # Create canonical pair ordering to avoid duplicates 

146 pair_tuple: tuple[AnalysisDomain, AnalysisDomain] = tuple( 

147 sorted([domain, related_domain], key=lambda d: d.value) 

148 ) # type: ignore[assignment] 

149 if pair_tuple not in checked_pairs: 

150 checked_pairs.add(pair_tuple) 

151 insights = self._correlate_pair( 

152 domain, results[domain], related_domain, results[related_domain] 

153 ) 

154 correlation_result.insights.extend(insights) 

155 

156 # Count agreements and conflicts 

157 for insight in correlation_result.insights: 

158 if insight.insight_type == "agreement": 

159 correlation_result.agreements_detected += 1 

160 elif insight.insight_type == "conflict": 

161 correlation_result.conflicts_detected += 1 

162 

163 # Calculate confidence adjustments 

164 correlation_result.confidence_adjustments = self._calculate_adjustments( 

165 correlation_result.insights 

166 ) 

167 

168 return correlation_result 

169 

170 def _correlate_pair( 

171 self, 

172 domain1: AnalysisDomain, 

173 results1: dict[str, Any], 

174 domain2: AnalysisDomain, 

175 results2: dict[str, Any], 

176 ) -> list[CrossDomainInsight]: 

177 """Correlate a pair of domains.""" 

178 insights = [] 

179 

180 # Apply correlation rules 

181 for rule in self._correlation_rules: 

182 if rule["domains"] == {domain1, domain2} or rule["domains"] == {domain2, domain1}: 

183 try: 

184 insight = rule["check"](results1, results2, domain1, domain2) 

185 if insight: 

186 insights.append(insight) 

187 except Exception as e: 

188 logger.debug(f"Correlation rule failed: {e}") 

189 

190 return insights 

191 

192 def _build_correlation_rules(self) -> list[dict[str, Any]]: 

193 """Build correlation rules for domain pairs.""" 

194 return [ 

195 { 

196 "domains": {AnalysisDomain.SPECTRAL, AnalysisDomain.TIMING}, 

197 "check": self._check_frequency_timing_agreement, 

198 }, 

199 { 

200 "domains": {AnalysisDomain.DIGITAL, AnalysisDomain.TIMING}, 

201 "check": self._check_digital_timing_consistency, 

202 }, 

203 { 

204 "domains": {AnalysisDomain.JITTER, AnalysisDomain.EYE}, 

205 "check": self._check_jitter_eye_correlation, 

206 }, 

207 { 

208 "domains": {AnalysisDomain.WAVEFORM, AnalysisDomain.STATISTICS}, 

209 "check": self._check_waveform_stats_consistency, 

210 }, 

211 ] 

212 

213 def _check_frequency_timing_agreement( 

214 self, 

215 results1: dict[str, Any], 

216 results2: dict[str, Any], 

217 domain1: AnalysisDomain, 

218 domain2: AnalysisDomain, 

219 ) -> CrossDomainInsight | None: 

220 """Check if spectral frequency matches timing period.""" 

221 # Extract frequency from spectral results 

222 spectral_freq = self._extract_value( 

223 results1 if domain1 == AnalysisDomain.SPECTRAL else results2, 

224 ["dominant_frequency", "peak_frequency", "fundamental"], 

225 ) 

226 

227 # Extract period from timing results 

228 timing_period = self._extract_value( 

229 results2 if domain2 == AnalysisDomain.TIMING else results1, 

230 ["period", "avg_period", "mean_period"], 

231 ) 

232 

233 if spectral_freq and timing_period and spectral_freq > 0 and timing_period > 0: 

234 expected_period = 1.0 / spectral_freq 

235 ratio = timing_period / expected_period 

236 

237 if 0.9 < ratio < 1.1: # Within 10% 

238 return CrossDomainInsight( 

239 insight_type="agreement", 

240 source_domains=[AnalysisDomain.SPECTRAL, AnalysisDomain.TIMING], 

241 description=( 

242 f"Spectral frequency ({spectral_freq:.1f} Hz) matches " 

243 f"timing period ({timing_period:.3e} s)" 

244 ), 

245 confidence_impact=0.15, 

246 details={ 

247 "spectral_freq": spectral_freq, 

248 "timing_period": timing_period, 

249 "ratio": ratio, 

250 }, 

251 ) 

252 elif ratio < 0.5 or ratio > 2.0: 

253 return CrossDomainInsight( 

254 insight_type="conflict", 

255 source_domains=[AnalysisDomain.SPECTRAL, AnalysisDomain.TIMING], 

256 description=( 

257 f"Spectral frequency ({spectral_freq:.1f} Hz) conflicts with " 

258 f"timing period ({timing_period:.3e} s)" 

259 ), 

260 confidence_impact=-0.2, 

261 details={ 

262 "spectral_freq": spectral_freq, 

263 "timing_period": timing_period, 

264 "ratio": ratio, 

265 }, 

266 ) 

267 

268 return None 

269 

270 def _check_digital_timing_consistency( 

271 self, 

272 results1: dict[str, Any], 

273 results2: dict[str, Any], 

274 domain1: AnalysisDomain, 

275 domain2: AnalysisDomain, 

276 ) -> CrossDomainInsight | None: 

277 """Check if digital edge count matches timing analysis.""" 

278 digital_results = results1 if domain1 == AnalysisDomain.DIGITAL else results2 

279 timing_results = results2 if domain2 == AnalysisDomain.TIMING else results1 

280 

281 edge_count = self._extract_value( 

282 digital_results, ["edge_count", "num_edges", "transitions"] 

283 ) 

284 timing_edges = self._extract_value(timing_results, ["edge_count", "transitions_detected"]) 

285 

286 if edge_count and timing_edges: 

287 if abs(edge_count - timing_edges) <= 2: 

288 return CrossDomainInsight( 

289 insight_type="agreement", 

290 source_domains=[AnalysisDomain.DIGITAL, AnalysisDomain.TIMING], 

291 description=(f"Edge counts agree: Digital={edge_count}, Timing={timing_edges}"), 

292 confidence_impact=0.1, 

293 ) 

294 

295 return None 

296 

297 def _check_jitter_eye_correlation( 

298 self, 

299 results1: dict[str, Any], 

300 results2: dict[str, Any], 

301 domain1: AnalysisDomain, 

302 domain2: AnalysisDomain, 

303 ) -> CrossDomainInsight | None: 

304 """Check jitter vs eye diagram correlation.""" 

305 jitter_results = results1 if domain1 == AnalysisDomain.JITTER else results2 

306 eye_results = results2 if domain2 == AnalysisDomain.EYE else results1 

307 

308 total_jitter = self._extract_value(jitter_results, ["total_jitter", "tj", "jitter_pp"]) 

309 eye_width = self._extract_value(eye_results, ["eye_width", "horizontal_opening"]) 

310 

311 # High jitter should correlate with narrow eye 

312 if total_jitter is not None and eye_width is not None: 

313 return CrossDomainInsight( 

314 insight_type="implication", 

315 source_domains=[AnalysisDomain.JITTER, AnalysisDomain.EYE], 

316 description=(f"Jitter ({total_jitter:.2e}) affects eye width ({eye_width:.2e})"), 

317 confidence_impact=0.05, 

318 details={"jitter": total_jitter, "eye_width": eye_width}, 

319 ) 

320 

321 return None 

322 

323 def _check_waveform_stats_consistency( 

324 self, 

325 results1: dict[str, Any], 

326 results2: dict[str, Any], 

327 domain1: AnalysisDomain, 

328 domain2: AnalysisDomain, 

329 ) -> CrossDomainInsight | None: 

330 """Check waveform measurements vs statistical analysis.""" 

331 waveform_results = results1 if domain1 == AnalysisDomain.WAVEFORM else results2 

332 stats_results = results2 if domain2 == AnalysisDomain.STATISTICS else results1 

333 

334 wf_amplitude = self._extract_value(waveform_results, ["amplitude", "vpp", "peak_to_peak"]) 

335 stats_std = self._extract_value(stats_results, ["std", "standard_deviation", "stdev"]) 

336 

337 if wf_amplitude and stats_std: 

338 # For periodic signals, amplitude ~ 2.83 * std (for sine wave) 

339 expected_ratio = wf_amplitude / (2.83 * stats_std) if stats_std > 0 else 0 

340 

341 if 0.8 < expected_ratio < 1.2: 

342 return CrossDomainInsight( 

343 insight_type="agreement", 

344 source_domains=[AnalysisDomain.WAVEFORM, AnalysisDomain.STATISTICS], 

345 description="Waveform amplitude consistent with statistical std dev", 

346 confidence_impact=0.1, 

347 ) 

348 

349 return None 

350 

351 def _extract_value( 

352 self, 

353 results: dict[str, Any], 

354 keys: list[str], 

355 ) -> float | None: 

356 """Extract a value from results using multiple possible keys.""" 

357 for key in keys: 

358 # Try direct key 

359 if key in results: 

360 val = results[key] 

361 if isinstance(val, int | float) and not np.isnan(val): 

362 return float(val) 

363 

364 # Try nested keys 

365 for result_val in results.values(): 

366 if isinstance(result_val, dict) and key in result_val: 

367 val = result_val[key] 

368 if isinstance(val, int | float) and not np.isnan(val): 

369 return float(val) 

370 

371 return None 

372 

373 def _calculate_adjustments( 

374 self, 

375 insights: list[CrossDomainInsight], 

376 ) -> dict[str, float]: 

377 """Calculate confidence adjustments based on insights.""" 

378 adjustments: dict[str, float] = {} 

379 

380 for insight in insights: 

381 for domain in insight.source_domains: 

382 domain_key = domain.value 

383 current = adjustments.get(domain_key, 0.0) 

384 adjustments[domain_key] = current + insight.confidence_impact 

385 

386 # Clamp adjustments to [-0.3, +0.3] 

387 return {k: max(-0.3, min(0.3, v)) for k, v in adjustments.items()} 

388 

389 

390def correlate_results( 

391 results: dict[AnalysisDomain, dict[str, Any]], 

392 tolerance: float = 0.1, 

393) -> CorrelationResult: 

394 """Convenience function to correlate domain results. 

395 

396 Args: 

397 results: Dictionary mapping domains to their results. 

398 tolerance: Tolerance for value comparisons. 

399 

400 Returns: 

401 CorrelationResult with insights. 

402 

403 Example: 

404 >>> from tracekit.reporting.config import AnalysisDomain 

405 >>> results = { 

406 ... AnalysisDomain.SPECTRAL: {'dominant_frequency': 1000.0}, 

407 ... AnalysisDomain.TIMING: {'period': 0.001} 

408 ... } 

409 >>> correlation = correlate_results(results) 

410 >>> print(f"Insights: {len(correlation.insights)}") 

411 """ 

412 correlator = CrossDomainCorrelator(tolerance) 

413 return correlator.correlate(results) 

414 

415 

416__all__ = [ 

417 "DOMAIN_AFFINITY", 

418 "CorrelationResult", 

419 "CrossDomainCorrelator", 

420 "CrossDomainInsight", 

421 "correlate_results", 

422]