Coverage for src / tracekit / inference / signal_intelligence.py: 78%

521 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Signal classification and measurement intelligence for TraceKit. 

2 

3This module provides intelligent signal type detection, quality assessment, 

4and measurement suitability checking to help users understand why they might 

5get NaN results and which measurements are appropriate for their signals. 

6 

7 

8Example: 

9 >>> import tracekit as tk 

10 >>> trace = tk.load('signal.wfm') 

11 >>> classification = tk.classify_signal(trace) 

12 >>> print(f"Signal type: {classification['type']}") 

13 >>> print(f"Characteristics: {classification['characteristics']}") 

14 >>> quality = tk.assess_signal_quality(trace) 

15 >>> print(f"SNR: {quality['snr']:.1f} dB") 

16 >>> suggestions = tk.suggest_measurements(trace) 

17 >>> print(f"Recommended measurements: {suggestions}") 

18 

19References: 

20 IEEE 181-2011: Standard for Transitional Waveform Definitions 

21 IEEE 1057-2017: Standard for Digitizing Waveform Recorders 

22""" 

23 

24from __future__ import annotations 

25 

26from dataclasses import dataclass 

27from typing import TYPE_CHECKING, Any, cast 

28 

29import numpy as np 

30 

31if TYPE_CHECKING: 

32 from numpy.typing import NDArray 

33 

34 from tracekit.core.types import WaveformTrace 

35 from tracekit.reporting.config import AnalysisDomain 

36 

37 

38def classify_signal( 

39 trace: WaveformTrace | NDArray[np.floating[Any]], 

40 sample_rate: float = 1.0, 

41 *, 

42 digital_threshold_ratio: float = 0.8, 

43 dc_threshold_percent: float = 90.0, 

44 periodicity_threshold: float = 0.7, 

45) -> dict[str, Any]: 

46 """Classify signal type and characteristics. 

47 

48 Automatically detects whether a signal is digital, analog, or mixed, 

49 identifies key characteristics like periodicity and noise, and estimates 

50 fundamental properties. 

51 

52 Args: 

53 trace: Input waveform trace or numpy array to classify. 

54 sample_rate: Sample rate in Hz (only used if trace is ndarray). 

55 digital_threshold_ratio: Ratio of samples at two levels to consider digital (0-1). 

56 dc_threshold_percent: Percentage of DC component to classify as DC signal. 

57 periodicity_threshold: Correlation threshold for periodic detection (0-1). 

58 

59 Returns: 

60 Dictionary containing: 

61 - signal_type: Signal type ("digital", "analog", "mixed", "dc") 

62 - is_digital: Boolean indicating if signal is digital 

63 - is_periodic: Boolean indicating if signal is periodic 

64 - characteristics: List of characteristics like "periodic", "noisy", "pulsed" 

65 - dc_component: True if significant DC offset present 

66 - frequency_estimate: Estimated fundamental frequency in Hz (or None) 

67 - dominant_frequency: Same as frequency_estimate (for compatibility) 

68 - snr_db: Estimated SNR in dB (or None) 

69 - confidence: Classification confidence (0.0-1.0) 

70 - noise_level: Estimated noise level in signal units 

71 - levels: For digital signals, dict with "low" and "high" levels 

72 

73 Example: 

74 >>> trace = tk.load('square_wave.wfm') 

75 >>> info = tk.classify_signal(trace) 

76 >>> print(f"Type: {info['signal_type']}") 

77 Type: digital 

78 >>> print(f"Characteristics: {info['characteristics']}") 

79 Characteristics: ['periodic', 'clean'] 

80 >>> print(f"Frequency: {info['frequency_estimate']:.3e} Hz") 

81 Frequency: 1.000e+06 Hz 

82 

83 References: 

84 IEEE 181-2011: Digital waveform characterization 

85 """ 

86 # Handle both WaveformTrace and ndarray inputs 

87 if isinstance(trace, np.ndarray): 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true

88 data = trace 

89 trace_sample_rate = sample_rate 

90 else: 

91 data = trace.data 

92 trace_sample_rate = trace.metadata.sample_rate 

93 

94 n = len(data) 

95 

96 if n < 10: 

97 return { 

98 "type": "unknown", 

99 "signal_type": "unknown", 

100 "is_digital": False, 

101 "is_periodic": False, 

102 "characteristics": ["insufficient_data"], 

103 "dc_component": False, 

104 "frequency_estimate": None, 

105 "dominant_frequency": None, 

106 "snr_db": None, 

107 "confidence": 0.0, 

108 "noise_level": 0.0, 

109 "levels": None, 

110 } 

111 

112 # Calculate basic statistics 

113 mean_val = float(np.mean(data)) 

114 std_val = float(np.std(data)) 

115 min_val = float(np.min(data)) 

116 max_val = float(np.max(data)) 

117 amplitude = max_val - min_val 

118 

119 # Initialize result 

120 characteristics = [] 

121 signal_type = "analog" 

122 confidence = 0.5 

123 

124 # 1. Check for DC signal (very low variation) 

125 # Use coefficient of variation (CV) for DC detection 

126 cv = std_val / (abs(mean_val) + amplitude / 2 + 1e-12) 

127 if amplitude < 1e-9 or cv < 0.005: # Less than 0.5% variation 

128 signal_type = "dc" 

129 characteristics.append("constant") 

130 confidence = 0.95 

131 return { 

132 "type": signal_type, 

133 "signal_type": signal_type, 

134 "is_digital": False, 

135 "is_periodic": False, 

136 "characteristics": characteristics, 

137 "dc_component": True, 

138 "frequency_estimate": None, 

139 "dominant_frequency": None, 

140 "snr_db": None, 

141 "confidence": confidence, 

142 "noise_level": std_val, 

143 "levels": None, 

144 } 

145 

146 # 2. Check for digital signal (bimodal distribution) 

147 is_digital, digital_levels, digital_confidence = _detect_digital_signal( 

148 data, digital_threshold_ratio 

149 ) 

150 

151 if is_digital: 

152 signal_type = "digital" 

153 confidence = digital_confidence 

154 characteristics.append("digital_levels") 

155 

156 # 3. Assess noise level 

157 noise_level = _estimate_noise_level(data) 

158 noise_ratio = noise_level / (amplitude + 1e-12) 

159 

160 if noise_ratio < 0.05: 

161 characteristics.append("clean") 

162 elif noise_ratio < 0.15: 

163 characteristics.append("low_noise") 

164 elif noise_ratio < 0.30: 

165 characteristics.append("moderate_noise") 

166 else: 

167 characteristics.append("noisy") 

168 

169 # 4. Check for periodicity 

170 is_periodic, period_estimate, periodicity_score = _detect_periodicity( 

171 data, trace_sample_rate, periodicity_threshold 

172 ) 

173 

174 # For digital signals, also try edge-based periodicity detection 

175 # This works better for signals with few periods 

176 if not is_periodic and is_digital: 

177 edge_periodic, edge_period, edge_confidence = _detect_edge_periodicity( 

178 data, trace_sample_rate, digital_levels 

179 ) 

180 if edge_periodic: 

181 is_periodic = edge_periodic 

182 period_estimate = edge_period 

183 periodicity_score = edge_confidence 

184 

185 # Also try FFT-based frequency detection 

186 # FFT is more reliable for undersampled signals where autocorrelation may detect harmonics 

187 if n >= 64: 

188 fft_periodic, fft_period, fft_confidence = _detect_periodicity_fft(data, trace_sample_rate) 

189 if fft_periodic: 

190 # If autocorrelation also found periodicity, compare results 

191 if is_periodic and period_estimate is not None: 

192 # If frequencies differ significantly (>20%), prefer the higher frequency 

193 # (lower frequencies are often harmonics or aliasing artifacts) 

194 auto_freq = 1.0 / period_estimate if period_estimate > 0 else 0 

195 fft_freq = 1.0 / fft_period if fft_period is not None and fft_period > 0 else 0 

196 freq_ratio = max(auto_freq, fft_freq) / (min(auto_freq, fft_freq) + 1e-12) 

197 

198 if freq_ratio > 1.2: # More than 20% difference 

199 # Prefer higher frequency (more likely to be correct) 

200 if fft_freq > auto_freq: 

201 period_estimate = fft_period 

202 periodicity_score = fft_confidence 

203 else: 

204 # Only FFT detected periodicity 

205 is_periodic = fft_periodic 

206 period_estimate = fft_period 

207 periodicity_score = fft_confidence 

208 

209 if is_periodic: 

210 characteristics.append("periodic") 

211 frequency_estimate = ( 

212 1.0 / period_estimate if period_estimate is not None and period_estimate > 0 else None 

213 ) 

214 confidence = max(confidence, periodicity_score) 

215 else: 

216 characteristics.append("aperiodic") 

217 frequency_estimate = None 

218 

219 # 5. Check for DC component 

220 dc_component = abs(mean_val) > (amplitude * dc_threshold_percent / 100.0) 

221 

222 # 6. Detect pulsed/transient characteristics 

223 edge_count = _count_edges(data, digital_levels if is_digital else None) 

224 samples_per_edge = n / max(edge_count, 1) 

225 

226 if edge_count > 2 and samples_per_edge > 100: 

227 characteristics.append("pulsed") 

228 elif edge_count < 3 and amplitude > std_val * 2: 

229 characteristics.append("transient") 

230 

231 # 7. Check for mixed signal (both digital transitions and analog variation) 

232 if is_digital and digital_levels is not None: 

233 # Check if there's significant variation within digital levels 

234 low_region = data[data < (digital_levels["low"] + digital_levels["high"]) / 2] 

235 high_region = data[data >= (digital_levels["low"] + digital_levels["high"]) / 2] 

236 

237 if len(low_region) > 0 and len(high_region) > 0: 237 ↛ 247line 237 didn't jump to line 247 because the condition on line 237 was always true

238 low_std = np.std(low_region) 

239 high_std = np.std(high_region) 

240 level_separation = digital_levels["high"] - digital_levels["low"] 

241 

242 if low_std > level_separation * 0.1 or high_std > level_separation * 0.1: 242 ↛ 243line 242 didn't jump to line 243 because the condition on line 242 was never true

243 signal_type = "mixed" 

244 characteristics.append("analog_variation") 

245 

246 # Calculate SNR estimate 

247 snr_db = None 

248 if amplitude > noise_level * 10: 

249 signal_power = amplitude**2 / 8 # Approximate for most waveforms 

250 noise_power = noise_level**2 

251 if noise_power > 1e-20: 

252 snr_db = 10 * np.log10(signal_power / noise_power) 

253 

254 return { 

255 "type": signal_type, 

256 "signal_type": signal_type, 

257 "is_digital": is_digital, 

258 "is_periodic": is_periodic, 

259 "characteristics": characteristics, 

260 "dc_component": dc_component, 

261 "frequency_estimate": frequency_estimate, 

262 "dominant_frequency": frequency_estimate, 

263 "snr_db": float(snr_db) if snr_db is not None else None, 

264 "confidence": float(confidence), 

265 "noise_level": float(noise_level), 

266 "levels": digital_levels if is_digital else None, 

267 } 

268 

269 

270def assess_signal_quality( 

271 trace: WaveformTrace, 

272) -> dict[str, Any]: 

273 """Assess signal quality metrics. 

274 

275 Analyzes signal quality including SNR, noise level, clipping, saturation, 

276 and other quality indicators that affect measurement accuracy. 

277 

278 Args: 

279 trace: Input waveform trace to assess. 

280 

281 Returns: 

282 Dictionary containing: 

283 - snr: Signal-to-noise ratio in dB (or None if not applicable) 

284 - noise_level: RMS noise level in signal units 

285 - clipping: True if signal shows clipping 

286 - saturation: True if signal appears saturated 

287 - warnings: List of quality warning strings 

288 - dynamic_range: Signal dynamic range in dB 

289 - crest_factor: Peak-to-RMS ratio 

290 

291 Example: 

292 >>> trace = tk.load('noisy_sine.wfm') 

293 >>> quality = tk.assess_signal_quality(trace) 

294 >>> print(f"SNR: {quality['snr']:.1f} dB") 

295 SNR: 42.3 dB 

296 >>> if quality['warnings']: 

297 ... print(f"Warnings: {quality['warnings']}") 

298 

299 References: 

300 IEEE 1057-2017: ADC quality metrics 

301 """ 

302 data = trace.data 

303 n = len(data) 

304 warnings = [] 

305 

306 if n < 10: 

307 warnings.append("Insufficient data for quality assessment") 

308 return { 

309 "snr": None, 

310 "noise_level": 0.0, 

311 "clipping": False, 

312 "saturation": False, 

313 "warnings": warnings, 

314 "dynamic_range": None, 

315 "crest_factor": None, 

316 } 

317 

318 # Calculate statistics 

319 min_val = float(np.min(data)) 

320 max_val = float(np.max(data)) 

321 mean_val = float(np.mean(data)) 

322 rms_val = float(np.sqrt(np.mean(data**2))) 

323 amplitude = max_val - min_val 

324 

325 # 1. Detect clipping (samples stuck at extremes) 

326 # Real clipping shows as CONSECUTIVE samples at extremes, not just many samples near extremes 

327 clipping = False 

328 if amplitude > 1e-9: 

329 tolerance = amplitude * 0.01 # 1% tolerance 

330 

331 # Find consecutive runs at extremes 

332 at_min = data <= (min_val + tolerance) 

333 at_max = data >= (max_val - tolerance) 

334 

335 # Check for long consecutive runs (clipping) vs brief peaks (natural waveform) 

336 # For analog signals like sine waves, peaks naturally have ~5-10% of samples near extremes 

337 # Real clipping typically shows >15-20% consecutive samples 

338 # For digital signals, even short runs at extremes can indicate clipping 

339 min_run_length = max(int(n * 0.15), 100) # 15% of data or 100 samples minimum 

340 

341 # Find maximum consecutive run lengths 

342 max_min_run = 0 

343 max_max_run = 0 

344 

345 current_min_run = 0 

346 current_max_run = 0 

347 

348 for i in range(n): 

349 if at_min[i]: 

350 current_min_run += 1 

351 max_min_run = max(max_min_run, current_min_run) 

352 else: 

353 current_min_run = 0 

354 

355 if at_max[i]: 

356 current_max_run += 1 

357 max_max_run = max(max_max_run, current_max_run) 

358 else: 

359 current_max_run = 0 

360 

361 # Clipping detected if we have long consecutive runs at extremes 

362 if max_min_run >= min_run_length: 

363 clipping = True 

364 warnings.append( 

365 f"Signal clipping detected at minimum ({max_min_run} consecutive samples)" 

366 ) 

367 if max_max_run >= min_run_length: 

368 clipping = True 

369 warnings.append( 

370 f"Signal clipping detected at maximum ({max_max_run} consecutive samples)" 

371 ) 

372 

373 # 2. Detect saturation (signal stuck at one level) 

374 # For digital signals, 2 unique values is normal, not saturation 

375 saturation = False 

376 unique_values = len(np.unique(data)) 

377 classification = classify_signal(trace) 

378 

379 # Different thresholds for digital vs analog signals 

380 if classification["type"] == "digital": 

381 # Digital signals should have 2+ levels; saturation is when stuck at 1 level 

382 if unique_values < 2: 382 ↛ 383line 382 didn't jump to line 383 because the condition on line 382 was never true

383 saturation = True 

384 warnings.append(f"Signal saturation detected (only {unique_values} unique value)") 

385 else: 

386 # Analog signals should have many unique values 

387 if unique_values < max(10, n // 1000): 

388 saturation = True 

389 warnings.append(f"Signal saturation detected (only {unique_values} unique values)") 

390 

391 # 3. Estimate noise level 

392 noise_level = _estimate_noise_level(data) 

393 

394 # 4. Calculate SNR 

395 snr = None 

396 if amplitude > noise_level * 10: # Only calculate if signal > noise 

397 # Remove DC and calculate signal power 

398 data_ac = data - mean_val 

399 signal_power = np.mean(data_ac**2) 

400 noise_power = noise_level**2 

401 

402 if noise_power > 1e-20: 

403 snr = 10 * np.log10(signal_power / noise_power) 

404 else: 

405 snr = float("inf") 

406 

407 # 5. Calculate dynamic range 

408 dynamic_range = None 

409 if min_val != 0 and max_val != 0: 

410 dynamic_range = 20 * np.log10(max_val / (abs(min_val) + 1e-20)) 

411 

412 # 6. Calculate crest factor (peak-to-RMS) 

413 crest_factor = None 

414 if rms_val > 1e-12: 414 ↛ 418line 414 didn't jump to line 418 because the condition on line 414 was always true

415 crest_factor = max(abs(max_val), abs(min_val)) / rms_val 

416 

417 # 7. Check for quantization issues 

418 if n > 100: 

419 # Estimate quantization step 

420 sorted_data = np.sort(data) 

421 diffs = np.diff(sorted_data) 

422 diffs = diffs[diffs > 1e-15] # Remove near-zero differences 

423 

424 if len(diffs) > 10: 

425 min_step = np.min(diffs) 

426 if amplitude / min_step < 256: 

427 warnings.append( 

428 f"Low resolution detected ({int(amplitude / min_step)} levels), " 

429 "may affect measurement accuracy" 

430 ) 

431 

432 # 8. Check sample rate adequacy 

433 classification = classify_signal(trace) 

434 if classification["frequency_estimate"] is not None: 

435 # Check if sample rate is at least 10x the detected frequency 

436 nyquist_rate = 2 * classification["frequency_estimate"] 

437 if trace.metadata.sample_rate < nyquist_rate * 5: 

438 warnings.append( 

439 f"Sample rate ({trace.metadata.sample_rate:.3e} Hz) may be " 

440 f"insufficient for signal frequency ({classification['frequency_estimate']:.3e} Hz). " 

441 "Recommend at least 10x oversampling" 

442 ) 

443 

444 # Additional check: if samples per period is very low, we might be undersampling 

445 # This catches cases where frequency detection may be wrong due to aliasing 

446 samples_per_period = trace.metadata.sample_rate / classification["frequency_estimate"] 

447 if samples_per_period < 10 and "sample rate" not in "".join(warnings).lower(): 447 ↛ 448line 447 didn't jump to line 448 because the condition on line 447 was never true

448 warnings.append( 

449 f"Very low oversampling detected ({samples_per_period:.1f} samples per period). " 

450 f"Signal may be undersampled or frequency detection may be inaccurate. " 

451 "Recommend at least 10 samples per period" 

452 ) 

453 

454 return { 

455 "snr": float(snr) if snr is not None else None, 

456 "noise_level": float(noise_level), 

457 "clipping": bool(clipping), 

458 "saturation": bool(saturation), 

459 "warnings": warnings, 

460 "dynamic_range": float(dynamic_range) if dynamic_range is not None else None, 

461 "crest_factor": float(crest_factor) if crest_factor is not None else None, 

462 } 

463 

464 

465def check_measurement_suitability( 

466 trace: WaveformTrace, 

467 measurement_name: str, 

468) -> dict[str, Any]: 

469 """Check if a measurement is suitable for this signal. 

470 

471 Analyzes signal characteristics to determine if a specific measurement 

472 will produce valid results, and provides warnings and suggestions. 

473 

474 Args: 

475 trace: Input waveform trace. 

476 measurement_name: Name of measurement to check (e.g., "frequency", "rise_time"). 

477 

478 Returns: 

479 Dictionary containing: 

480 - suitable: True if measurement is appropriate for this signal 

481 - confidence: Confidence in suitability assessment (0.0-1.0) 

482 - warnings: List of warning strings 

483 - suggestions: List of suggestion strings 

484 - expected_result: "valid", "nan", or "unreliable" 

485 

486 Example: 

487 >>> trace = tk.load('dc_signal.wfm') 

488 >>> check = tk.check_measurement_suitability(trace, "frequency") 

489 >>> if not check['suitable']: 

490 ... print(f"Warning: {check['warnings']}") 

491 Warning: ['Frequency measurement not suitable for DC signal'] 

492 

493 References: 

494 IEEE 181-2011: Measurement applicability 

495 """ 

496 classification = classify_signal(trace) 

497 quality = assess_signal_quality(trace) 

498 

499 warnings = [] 

500 suggestions = [] 

501 suitable = True 

502 confidence = 0.8 

503 expected_result = "valid" 

504 

505 signal_type = classification["type"] 

506 characteristics = classification["characteristics"] 

507 

508 # Define measurement requirements 

509 frequency_measurements = ["frequency", "period"] 

510 edge_measurements = ["rise_time", "fall_time"] 

511 amplitude_measurements = ["amplitude", "overshoot", "undershoot", "preshoot"] 

512 duty_measurements = ["duty_cycle", "pulse_width"] 

513 _statistical_measurements = ["mean", "rms"] 

514 spectral_measurements = ["thd", "snr", "sinad", "enob", "sfdr", "fft", "psd"] 

515 

516 # Check DC signals 

517 if signal_type == "dc": 

518 if measurement_name in frequency_measurements: 

519 suitable = False 

520 warnings.append(f"{measurement_name} measurement not suitable for DC signal") 

521 suggestions.append("Use 'mean' or 'rms' measurements for DC signals") 

522 expected_result = "nan" 

523 elif measurement_name in edge_measurements: 

524 suitable = False 

525 warnings.append(f"{measurement_name} requires signal transitions") 

526 suggestions.append("Signal appears to be DC with no edges") 

527 expected_result = "nan" 

528 elif measurement_name in duty_measurements: 528 ↛ 529line 528 didn't jump to line 529 because the condition on line 528 was never true

529 suitable = False 

530 warnings.append(f"{measurement_name} requires periodic signal") 

531 expected_result = "nan" 

532 

533 # Check aperiodic signals 

534 if "aperiodic" in characteristics: 

535 if measurement_name in frequency_measurements + duty_measurements: 

536 suitable = False 

537 confidence = 0.6 

538 warnings.append(f"{measurement_name} requires periodic signal") 

539 suggestions.append("Signal does not appear periodic") 

540 expected_result = "nan" 

541 elif measurement_name in spectral_measurements: 

542 warnings.append("Spectral measurements on aperiodic signals may not show clear peaks") 

543 suggestions.append("Consider time-domain or statistical analysis") 

544 expected_result = "unreliable" 

545 

546 # Check digital vs analog 

547 if signal_type == "digital": 

548 if measurement_name in amplitude_measurements and measurement_name != "amplitude": 

549 warnings.append( 

550 f"{measurement_name} designed for analog signals with overshoot/ringing" 

551 ) 

552 suggestions.append("Digital signals may show zero overshoot/undershoot") 

553 expected_result = "unreliable" 

554 confidence = 0.5 

555 

556 # Check for sufficient transitions 

557 if measurement_name in edge_measurements + duty_measurements: 

558 data = trace.data 

559 edge_count = _count_edges(data, classification.get("levels")) 

560 if edge_count < 2: 

561 suitable = False 

562 warnings.append(f"{measurement_name} requires at least 2 signal edges") 

563 suggestions.append(f"Signal has only {edge_count} detected edge(s)") 

564 expected_result = "nan" 

565 

566 # Check signal quality impacts 

567 if quality["clipping"]: 

568 if measurement_name in edge_measurements + amplitude_measurements: 

569 warnings.append("Signal clipping detected, may affect measurement accuracy") 

570 # Don't override "nan" - if measurement is fundamentally unsuitable, keep it as "nan" 

571 if expected_result != "nan": 571 ↛ 573line 571 didn't jump to line 573 because the condition on line 571 was always true

572 expected_result = "unreliable" 

573 confidence = min(confidence, 0.6) 

574 

575 if quality["saturation"]: 

576 warnings.append("Signal saturation detected, measurements may be unreliable") 

577 # Don't override "nan" - if measurement is fundamentally unsuitable, keep it as "nan" 

578 if expected_result != "nan": 

579 expected_result = "unreliable" 

580 confidence = min(confidence, 0.5) 

581 

582 if quality["snr"] is not None and quality["snr"] < 20: 582 ↛ 583line 582 didn't jump to line 583 because the condition on line 582 was never true

583 if measurement_name in edge_measurements: 

584 warnings.append( 

585 f"Low SNR ({quality['snr']:.1f} dB) may affect edge timing measurements" 

586 ) 

587 suggestions.append("Consider filtering signal to improve SNR") 

588 confidence = min(confidence, 0.7) 

589 

590 # Check sample rate for timing measurements 

591 if measurement_name in edge_measurements + frequency_measurements: 

592 if classification["frequency_estimate"] is not None: 

593 nyquist_rate = 2 * classification["frequency_estimate"] 

594 if trace.metadata.sample_rate < nyquist_rate * 5: 

595 warnings.append("Sample rate may be too low for accurate timing measurements") 

596 suggestions.append( 

597 f"Recommend sample rate > {nyquist_rate * 10:.3e} Hz (10x signal frequency)" 

598 ) 

599 expected_result = "unreliable" 

600 confidence = min(confidence, 0.6) 

601 

602 # Check data length 

603 n = len(trace.data) 

604 if measurement_name in spectral_measurements: 

605 if n < 256: 

606 warnings.append(f"Signal length ({n} samples) may be too short for spectral analysis") 

607 suggestions.append("Recommend at least 1024 samples for FFT-based measurements") 

608 expected_result = "unreliable" 

609 confidence = min(confidence, 0.5) 

610 

611 if measurement_name in frequency_measurements: 

612 if classification["frequency_estimate"] is not None: 

613 min_samples = trace.metadata.sample_rate / classification["frequency_estimate"] 

614 # Require at least 0.5 periods for basic detection 

615 # Having 1+ complete periods is ideal, but FFT can work with less 

616 if n < min_samples * 0.5: 616 ↛ 617line 616 didn't jump to line 617 because the condition on line 616 was never true

617 warnings.append( 

618 f"Signal length ({n} samples) captures < 0.5 periods, " 

619 "frequency measurement may fail" 

620 ) 

621 suggestions.append("Capture at least 2 periods for reliable frequency measurement") 

622 expected_result = "unreliable" 

623 confidence = min(confidence, 0.5) 

624 elif n < min_samples * 2: 

625 # Between 0.5 and 2 periods: usable but not ideal 

626 suggestions.append("Capture at least 10 periods for best accuracy") 

627 confidence = min(confidence, 0.75) 

628 

629 return { 

630 "suitable": suitable, 

631 "confidence": float(confidence), 

632 "warnings": warnings, 

633 "suggestions": suggestions, 

634 "expected_result": expected_result, 

635 } 

636 

637 

638def suggest_measurements( 

639 trace: WaveformTrace, 

640 *, 

641 max_suggestions: int = 10, 

642) -> list[dict[str, Any]]: 

643 """Suggest appropriate measurements for a signal. 

644 

645 Analyzes signal characteristics and recommends the most suitable 

646 measurements, ranked by relevance and reliability. 

647 

648 Args: 

649 trace: Input waveform trace. 

650 max_suggestions: Maximum number of suggestions to return. 

651 

652 Returns: 

653 List of dictionaries, each containing: 

654 - name: Measurement name 

655 - category: Measurement category (e.g., "timing", "amplitude", "spectral") 

656 - priority: Priority ranking (1=highest) 

657 - rationale: Why this measurement is recommended 

658 - confidence: Confidence in recommendation (0.0-1.0) 

659 

660 Example: 

661 >>> trace = tk.load('square_wave.wfm') 

662 >>> suggestions = tk.suggest_measurements(trace) 

663 >>> for s in suggestions[:3]: 

664 ... print(f"{s['name']}: {s['rationale']}") 

665 frequency: Periodic digital signal detected 

666 duty_cycle: Suitable for pulse analysis 

667 rise_time: Digital edges detected 

668 

669 References: 

670 Best practices for waveform analysis 

671 """ 

672 classification = classify_signal(trace) 

673 quality = assess_signal_quality(trace) 

674 

675 signal_type = classification["type"] 

676 characteristics = classification["characteristics"] 

677 

678 suggestions = [] 

679 

680 # Always suggest basic statistical measurements 

681 suggestions.append( 

682 { 

683 "name": "mean", 

684 "category": "statistical", 

685 "priority": 1, 

686 "rationale": "Basic DC level measurement, always applicable", 

687 "confidence": 1.0, 

688 } 

689 ) 

690 

691 suggestions.append( 

692 { 

693 "name": "rms", 

694 "category": "statistical", 

695 "priority": 2, 

696 "rationale": "RMS voltage measurement, useful for all signal types", 

697 "confidence": 1.0, 

698 } 

699 ) 

700 

701 # DC signals 

702 if signal_type == "dc": 

703 suggestions.append( 

704 { 

705 "name": "amplitude", 

706 "category": "amplitude", 

707 "priority": 3, 

708 "rationale": "Measure noise/variation level in DC signal", 

709 "confidence": 0.9, 

710 } 

711 ) 

712 # Don't suggest frequency, edges, etc. 

713 return sorted(suggestions, key=lambda x: cast("int", x["priority"]))[:max_suggestions] 

714 

715 # Amplitude measurements 

716 suggestions.append( 

717 { 

718 "name": "amplitude", 

719 "category": "amplitude", 

720 "priority": 3, 

721 "rationale": f"Peak-to-peak amplitude for {signal_type} signal", 

722 "confidence": 0.95, 

723 } 

724 ) 

725 

726 # Periodic signals 

727 if "periodic" in characteristics: 727 ↛ 749line 727 didn't jump to line 749 because the condition on line 727 was always true

728 suggestions.append( 

729 { 

730 "name": "frequency", 

731 "category": "timing", 

732 "priority": 4, 

733 "rationale": "Periodic signal detected, frequency measurement applicable", 

734 "confidence": classification["confidence"], 

735 } 

736 ) 

737 

738 suggestions.append( 

739 { 

740 "name": "period", 

741 "category": "timing", 

742 "priority": 5, 

743 "rationale": "Period measurement for periodic signal", 

744 "confidence": classification["confidence"], 

745 } 

746 ) 

747 

748 # Digital signals with edges 

749 if signal_type in ("digital", "mixed"): 

750 edge_count = _count_edges(trace.data, classification.get("levels")) 

751 

752 if edge_count >= 2: 752 ↛ 773line 752 didn't jump to line 773 because the condition on line 752 was always true

753 suggestions.append( 

754 { 

755 "name": "rise_time", 

756 "category": "timing", 

757 "priority": 6, 

758 "rationale": f"Digital edges detected ({edge_count} edges)", 

759 "confidence": 0.9 if quality["snr"] and quality["snr"] > 20 else 0.7, 

760 } 

761 ) 

762 

763 suggestions.append( 

764 { 

765 "name": "fall_time", 

766 "category": "timing", 

767 "priority": 7, 

768 "rationale": f"Digital edges detected ({edge_count} edges)", 

769 "confidence": 0.9 if quality["snr"] and quality["snr"] > 20 else 0.7, 

770 } 

771 ) 

772 

773 if "periodic" in characteristics and edge_count >= 2: 773 ↛ 796line 773 didn't jump to line 796 because the condition on line 773 was always true

774 # Need at least 2 edges (1 complete cycle) for duty cycle 

775 suggestions.append( 

776 { 

777 "name": "duty_cycle", 

778 "category": "timing", 

779 "priority": 8, 

780 "rationale": "Periodic pulse train detected", 

781 "confidence": 0.85 if edge_count >= 4 else 0.75, 

782 } 

783 ) 

784 

785 suggestions.append( 

786 { 

787 "name": "pulse_width", 

788 "category": "timing", 

789 "priority": 9, 

790 "rationale": "Pulse measurements suitable for periodic digital signal", 

791 "confidence": 0.85 if edge_count >= 4 else 0.75, 

792 } 

793 ) 

794 

795 # Analog signals 

796 if signal_type in ("analog", "mixed"): 

797 if not quality["clipping"]: 797 ↛ 819line 797 didn't jump to line 819 because the condition on line 797 was always true

798 suggestions.append( 

799 { 

800 "name": "overshoot", 

801 "category": "amplitude", 

802 "priority": 10, 

803 "rationale": "Analog signal, overshoot measurement applicable", 

804 "confidence": 0.8, 

805 } 

806 ) 

807 

808 suggestions.append( 

809 { 

810 "name": "undershoot", 

811 "category": "amplitude", 

812 "priority": 11, 

813 "rationale": "Analog signal, undershoot measurement applicable", 

814 "confidence": 0.8, 

815 } 

816 ) 

817 

818 # Spectral measurements for clean, periodic signals 

819 if "periodic" in characteristics and "clean" in characteristics: 819 ↛ 842line 819 didn't jump to line 842 because the condition on line 819 was always true

820 if len(trace.data) >= 256: 820 ↛ 842line 820 didn't jump to line 842 because the condition on line 820 was always true

821 suggestions.append( 

822 { 

823 "name": "thd", 

824 "category": "spectral", 

825 "priority": 12, 

826 "rationale": "Clean periodic signal suitable for harmonic analysis", 

827 "confidence": 0.85, 

828 } 

829 ) 

830 

831 suggestions.append( 

832 { 

833 "name": "snr", 

834 "category": "spectral", 

835 "priority": 13, 

836 "rationale": "Spectral SNR measurement for signal quality", 

837 "confidence": 0.8, 

838 } 

839 ) 

840 

841 # Sort by priority and limit 

842 suggestions = sorted(suggestions, key=lambda x: cast("int", x["priority"])) 

843 return suggestions[:max_suggestions] 

844 

845 

846# ============================================================================= 

847# Helper Functions 

848# ============================================================================= 

849 

850 

851def _detect_digital_signal( 

852 data: NDArray[np.floating[Any]], 

853 threshold_ratio: float, 

854) -> tuple[bool, dict[str, float] | None, float]: 

855 """Detect if signal is digital based on bimodal distribution. 

856 

857 Args: 

858 data: Signal data array. 

859 threshold_ratio: Ratio of samples at two levels to consider digital. 

860 

861 Returns: 

862 Tuple of (is_digital, levels_dict, confidence). 

863 """ 

864 # Use histogram to find peaks 

865 # Use more bins for better resolution on digital signals 

866 n_bins = min(100, len(np.unique(data))) 

867 hist, bin_edges = np.histogram(data, bins=n_bins) 

868 bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2 

869 

870 # Find peaks (local maxima or significant bins) 

871 peaks = [] 

872 

873 # Special case: if only 2 bins (perfect digital signal), both are peaks 

874 if len(hist) == 2: 

875 for i in range(len(hist)): 

876 if hist[i] > len(data) * 0.01: 876 ↛ 875line 876 didn't jump to line 875 because the condition on line 876 was always true

877 peaks.append((i, hist[i], bin_centers[i])) 

878 else: 

879 # Find local maxima in histogram 

880 for i in range(1, len(hist) - 1): 

881 if hist[i] > hist[i - 1] and hist[i] > hist[i + 1]: 

882 # Lower threshold for peak detection 

883 if hist[i] > len(data) * 0.01: # At least 1% of samples 

884 peaks.append((i, hist[i], bin_centers[i])) 

885 

886 # If we have exactly 2 dominant peaks, likely digital 

887 if len(peaks) >= 2: 

888 # Sort by count 

889 peaks = sorted(peaks, key=lambda x: x[1], reverse=True) 

890 

891 # Take top 2 peaks 

892 peak1, peak2 = peaks[0], peaks[1] 

893 

894 # Check if these two peaks account for most samples 

895 total_in_peaks = peak1[1] + peak2[1] 

896 ratio = total_in_peaks / len(data) 

897 

898 # Also check that peaks are well separated 

899 peak_separation = abs(peak1[2] - peak2[2]) 

900 data_range = np.ptp(data) 

901 

902 # Peaks should be separated by at least 30% of data range 

903 if ratio >= threshold_ratio and peak_separation > data_range * 0.3: 

904 low_level = min(peak1[2], peak2[2]) 

905 high_level = max(peak1[2], peak2[2]) 

906 

907 confidence = min(0.95, ratio) 

908 

909 return True, {"low": float(low_level), "high": float(high_level)}, confidence 

910 

911 return False, None, 0.0 

912 

913 

914def _estimate_noise_level(data: NDArray[np.floating[Any]]) -> float: 

915 """Estimate noise level using median absolute deviation. 

916 

917 Args: 

918 data: Signal data array. 

919 

920 Returns: 

921 Estimated RMS noise level. 

922 """ 

923 if len(data) < 10: 

924 return 0.0 

925 

926 # Use differencing to remove slow variations 

927 diffs = np.diff(data) 

928 

929 # MAD (Median Absolute Deviation) is robust to outliers 

930 median_diff = np.median(diffs) 

931 mad = np.median(np.abs(diffs - median_diff)) 

932 

933 # Convert MAD to RMS noise estimate 

934 # For Gaussian noise: sigma ≈ 1.4826 * MAD 

935 # Divide by sqrt(2) because diff amplifies noise by sqrt(2) 

936 noise_estimate = (1.4826 * mad) / np.sqrt(2) 

937 

938 return float(noise_estimate) 

939 

940 

941def _detect_periodicity( 

942 data: NDArray[np.floating[Any]], 

943 sample_rate: float, 

944 threshold: float, 

945) -> tuple[bool, float | None, float]: 

946 """Detect if signal is periodic using autocorrelation. 

947 

948 Args: 

949 data: Signal data array. 

950 sample_rate: Sampling rate in Hz. 

951 threshold: Correlation threshold for periodic detection. 

952 

953 Returns: 

954 Tuple of (is_periodic, period_seconds, confidence). 

955 """ 

956 n = len(data) 

957 

958 if n < 20: 

959 return False, None, 0.0 

960 

961 # Remove DC for autocorrelation 

962 data_ac = data - np.mean(data) 

963 

964 # Check if there's any variation 

965 if np.std(data_ac) < 1e-12: 965 ↛ 966line 965 didn't jump to line 966 because the condition on line 965 was never true

966 return False, None, 0.0 

967 

968 # Compute autocorrelation for lags up to n-10 to detect signals with ~1 period 

969 # This allows finding periodicity even when we have just 1 period of data 

970 # Keep at least 10 samples of overlap for correlation 

971 max_lag = min(n - 10, 20000) # Limit for performance 

972 

973 autocorr = np.correlate(data_ac, data_ac, mode="full") 

974 autocorr = autocorr[n - 1 : n - 1 + max_lag] 

975 

976 # Normalize 

977 if abs(autocorr[0]) > 1e-12: 977 ↛ 980line 977 didn't jump to line 980 because the condition on line 977 was always true

978 autocorr = autocorr / autocorr[0] 

979 else: 

980 return False, None, 0.0 

981 

982 # Find peaks in autocorrelation (exclude lag=0 and very small lags) 

983 # Start searching from lag > n/100 to avoid noise 

984 min_lag = max(3, n // 100) 

985 peaks = [] 

986 

987 for i in range(min_lag, len(autocorr) - 2): 

988 # Use stronger peak detection 

989 if ( 

990 autocorr[i] > autocorr[i - 1] 

991 and autocorr[i] > autocorr[i + 1] 

992 and autocorr[i] > autocorr[i - 2] 

993 and autocorr[i] > autocorr[i + 2] 

994 ): 

995 if autocorr[i] > threshold: 

996 peaks.append((i, autocorr[i])) 

997 

998 if peaks: 

999 # Take first significant peak as period 

1000 period_samples = peaks[0][0] 

1001 confidence = float(peaks[0][1]) 

1002 

1003 period_seconds = period_samples / sample_rate 

1004 

1005 return True, period_seconds, confidence 

1006 

1007 return False, None, 0.0 

1008 

1009 

1010def _count_edges( 

1011 data: NDArray[np.floating[Any]], 

1012 levels: dict[str, float] | None, 

1013) -> int: 

1014 """Count number of edges in signal. 

1015 

1016 Args: 

1017 data: Signal data array. 

1018 levels: Optional digital levels dict with "low" and "high" keys. 

1019 

1020 Returns: 

1021 Number of edges detected. 

1022 """ 

1023 if len(data) < 3: 

1024 return 0 

1025 

1026 if levels is not None: 

1027 # Use provided levels 

1028 threshold = (levels["low"] + levels["high"]) / 2 

1029 else: 

1030 # Use median as threshold 

1031 threshold = float(np.median(data)) 

1032 

1033 # Find crossings 

1034 above = data > threshold 

1035 crossings = np.diff(above.astype(int)) 

1036 

1037 # Count non-zero crossings (both rising and falling) 

1038 edge_count = np.sum(np.abs(crossings)) 

1039 

1040 return int(edge_count) 

1041 

1042 

1043def _detect_periodicity_fft( 

1044 data: NDArray[np.floating[Any]], 

1045 sample_rate: float, 

1046) -> tuple[bool, float | None, float]: 

1047 """Detect periodicity using FFT (frequency domain analysis). 

1048 

1049 This method works well for signals with few periods where autocorrelation 

1050 may fail. It finds the dominant frequency component in the signal. 

1051 

1052 Args: 

1053 data: Signal data array. 

1054 sample_rate: Sampling rate in Hz. 

1055 

1056 Returns: 

1057 Tuple of (is_periodic, period_seconds, confidence). 

1058 """ 

1059 n = len(data) 

1060 

1061 if n < 64: 

1062 return False, None, 0.0 

1063 

1064 # Remove DC component 

1065 data_ac = data - np.mean(data) 

1066 

1067 # Check if there's any variation 

1068 if np.std(data_ac) < 1e-12: 

1069 return False, None, 0.0 

1070 

1071 # Compute FFT 

1072 fft = np.fft.rfft(data_ac) 

1073 freqs = np.fft.rfftfreq(n, 1.0 / sample_rate) 

1074 

1075 # Compute power spectrum 

1076 power = np.abs(fft) ** 2 

1077 

1078 # Skip DC component (index 0) 

1079 if len(power) < 3: 1079 ↛ 1080line 1079 didn't jump to line 1080 because the condition on line 1079 was never true

1080 return False, None, 0.0 

1081 

1082 power = power[1:] 

1083 freqs = freqs[1:] 

1084 

1085 # Find peak in power spectrum 

1086 peak_idx = np.argmax(power) 

1087 peak_power = power[peak_idx] 

1088 peak_freq = freqs[peak_idx] 

1089 

1090 # Check if peak is significant compared to total power 

1091 total_power = np.sum(power) 

1092 if total_power < 1e-20: 1092 ↛ 1093line 1092 didn't jump to line 1093 because the condition on line 1092 was never true

1093 return False, None, 0.0 

1094 

1095 power_ratio = peak_power / total_power 

1096 

1097 # For periodic signals, the dominant frequency should have significant power 

1098 # Require at least 10% of total power in the peak 

1099 if power_ratio < 0.1: 

1100 return False, None, 0.0 

1101 

1102 # Check that frequency is reasonable (not too low or too high) 

1103 nyquist = sample_rate / 2 

1104 if peak_freq < sample_rate / n or peak_freq > nyquist * 0.9: 1104 ↛ 1105line 1104 didn't jump to line 1105 because the condition on line 1104 was never true

1105 return False, None, 0.0 

1106 

1107 # Estimate period 

1108 period_seconds = 1.0 / peak_freq 

1109 

1110 # Confidence based on how dominant the peak is 

1111 # High power ratio -> high confidence 

1112 confidence = min(0.95, 0.5 + power_ratio) 

1113 

1114 return True, period_seconds, float(confidence) 

1115 

1116 

1117def _detect_edge_periodicity( 

1118 data: NDArray[np.floating[Any]], 

1119 sample_rate: float, 

1120 levels: dict[str, float] | None, 

1121) -> tuple[bool, float | None, float]: 

1122 """Detect periodicity in digital signals by analyzing edge spacing. 

1123 

1124 This method works well for signals with few periods where autocorrelation 

1125 may fail. It detects regular patterns in edge timing. 

1126 

1127 Args: 

1128 data: Signal data array. 

1129 sample_rate: Sampling rate in Hz. 

1130 levels: Digital levels dict with "low" and "high" keys. 

1131 

1132 Returns: 

1133 Tuple of (is_periodic, period_seconds, confidence). 

1134 """ 

1135 if len(data) < 10 or levels is None: 

1136 return False, None, 0.0 

1137 

1138 threshold = (levels["low"] + levels["high"]) / 2 

1139 

1140 # Find edge positions 

1141 above = data > threshold 

1142 crossings = np.diff(above.astype(int)) 

1143 edge_positions = np.where(crossings != 0)[0] 

1144 

1145 if len(edge_positions) < 2: 

1146 # Need at least 2 edges (1 complete cycle) for detection 

1147 return False, None, 0.0 

1148 

1149 # Calculate intervals between edges 

1150 intervals = np.diff(edge_positions) 

1151 

1152 if len(intervals) < 1: 1152 ↛ 1153line 1152 didn't jump to line 1153 because the condition on line 1152 was never true

1153 return False, None, 0.0 

1154 

1155 # For a periodic signal, intervals should form a repeating pattern 

1156 # For a square wave: intervals alternate between high-time and low-time 

1157 # Check if intervals show regular pattern 

1158 

1159 # Calculate coefficient of variation of intervals 

1160 mean_interval = np.mean(intervals) 

1161 std_interval = np.std(intervals) 

1162 

1163 if mean_interval < 1: 1163 ↛ 1164line 1163 didn't jump to line 1164 because the condition on line 1163 was never true

1164 return False, None, 0.0 

1165 

1166 cv = std_interval / mean_interval 

1167 

1168 # Special case: exactly 1 interval (2 edges, half period of square wave) 

1169 if len(intervals) == 1: 

1170 # This represents half a period for a square wave 

1171 period_samples = 2 * intervals[0] 

1172 period_seconds = period_samples / sample_rate 

1173 # Lower confidence since we only have half a period 

1174 return True, period_seconds, 0.7 

1175 

1176 # For highly periodic signals, CV should be low 

1177 if cv > 0.3: 

1178 # High variation - check if it's alternating pattern (square wave) 

1179 if len(intervals) >= 4: 1179 ↛ 1194line 1179 didn't jump to line 1194 because the condition on line 1179 was always true

1180 # Check if odd and even intervals are each consistent 

1181 odd_intervals = intervals[::2] 

1182 even_intervals = intervals[1::2] 

1183 

1184 odd_cv = np.std(odd_intervals) / (np.mean(odd_intervals) + 1e-12) 

1185 even_cv = np.std(even_intervals) / (np.mean(even_intervals) + 1e-12) 

1186 

1187 if odd_cv < 0.2 and even_cv < 0.2: 1187 ↛ 1201line 1187 didn't jump to line 1201 because the condition on line 1187 was always true

1188 # Alternating pattern detected (square wave) 

1189 # Period is sum of two consecutive intervals 

1190 period_samples = np.mean(odd_intervals) + np.mean(even_intervals) 

1191 period_seconds = period_samples / sample_rate 

1192 confidence = 1.0 - max(odd_cv, even_cv) 

1193 return True, period_seconds, float(confidence) 

1194 elif len(intervals) == 2: 

1195 # Only 2 intervals - assume alternating pattern for square wave 

1196 period_samples = intervals[0] + intervals[1] 

1197 period_seconds = period_samples / sample_rate 

1198 # Moderate confidence with only 2 intervals 

1199 return True, period_seconds, 0.75 

1200 

1201 return False, None, 0.0 

1202 

1203 # Regular intervals detected 

1204 # For square waves with 50% duty cycle, full period = 2 * interval 

1205 # For other waveforms, check if all intervals are similar (uniform spacing) 

1206 

1207 # Estimate period from intervals 

1208 # If all intervals are similar, period might be 2*interval (square wave) 

1209 # Check by seeing if we have roughly equal numbers of edges per inferred period 

1210 period_samples = 2 * mean_interval # Assume square wave initially 

1211 num_periods = len(data) / period_samples 

1212 

1213 # If we have at least 1 period, consider it periodic 

1214 if num_periods >= 0.5: # Allow detection with half a period 1214 ↛ 1219line 1214 didn't jump to line 1219 because the condition on line 1214 was always true

1215 period_seconds = period_samples / sample_rate 

1216 confidence = 1.0 - min(cv / 0.3, 0.5) # Scale confidence by CV 

1217 return True, period_seconds, float(confidence) 

1218 

1219 return False, None, 0.0 

1220 

1221 

1222@dataclass 

1223class AnalysisRecommendation: 

1224 """Recommendation for an analysis to run. 

1225 

1226 Attributes: 

1227 domain: Analysis domain to run. 

1228 priority: Priority ranking (1=highest). 

1229 confidence: Expected confidence if run (0.0-1.0). 

1230 reasoning: Human-readable explanation. 

1231 estimated_runtime_ms: Estimated runtime in milliseconds. 

1232 prerequisites_met: Whether all prerequisites are satisfied. 

1233 """ 

1234 

1235 domain: AnalysisDomain 

1236 priority: int # 1=highest priority 

1237 confidence: float # Expected confidence if run 

1238 reasoning: str 

1239 estimated_runtime_ms: int = 100 

1240 prerequisites_met: bool = True 

1241 

1242 

1243def recommend_analyses( 

1244 data: NDArray[np.floating[Any]], 

1245 sample_rate: float = 1.0, 

1246 *, 

1247 time_budget_seconds: float | None = None, 

1248 confidence_target: float = 0.7, 

1249 exclude_domains: list[AnalysisDomain] | None = None, 

1250) -> list[AnalysisRecommendation]: 

1251 """Recommend which analyses to run based on signal characteristics. 

1252 

1253 Uses signal classification, quality metrics, and heuristics to 

1254 recommend the most valuable analyses for a given signal. 

1255 

1256 Args: 

1257 data: Input signal data. 

1258 sample_rate: Sample rate in Hz. 

1259 time_budget_seconds: Optional time budget (prioritizes faster analyses). 

1260 confidence_target: Minimum expected confidence threshold. 

1261 exclude_domains: Domains to exclude from recommendations. 

1262 

1263 Returns: 

1264 List of AnalysisRecommendation sorted by priority. 

1265 

1266 Example: 

1267 >>> import numpy as np 

1268 >>> import tracekit as tk 

1269 >>> # Generate test signal 

1270 >>> t = np.linspace(0, 1, 10000) 

1271 >>> signal = np.sin(2 * np.pi * 100 * t) 

1272 >>> recommendations = tk.recommend_analyses(signal, sample_rate=10000) 

1273 >>> for rec in recommendations[:3]: 

1274 ... print(f"{rec.domain.value}: {rec.reasoning}") 

1275 waveform: Basic waveform measurements are always applicable 

1276 statistics: Statistical analysis provides foundational metrics 

1277 spectral: Spectral analysis reveals frequency content - signal appears periodic 

1278 """ 

1279 # Avoid circular import 

1280 from tracekit.reporting.config import AnalysisDomain 

1281 

1282 recommendations = [] 

1283 exclude = set(exclude_domains or []) 

1284 

1285 # Classify signal 

1286 classification = classify_signal(data, sample_rate) 

1287 _signal_type = classification.get("signal_type", "unknown") # Reserved for future use 

1288 is_digital = classification.get("is_digital", False) 

1289 is_periodic = classification.get("is_periodic", False) 

1290 _snr_db = classification.get("snr_db", 20) # Reserved for future use 

1291 dominant_freq = classification.get("dominant_frequency") 

1292 

1293 # Always recommend these foundational domains 

1294 if AnalysisDomain.WAVEFORM not in exclude: 

1295 recommendations.append( 

1296 AnalysisRecommendation( 

1297 domain=AnalysisDomain.WAVEFORM, 

1298 priority=1, 

1299 confidence=0.95, 

1300 reasoning="Basic waveform measurements are always applicable", 

1301 estimated_runtime_ms=50, 

1302 ) 

1303 ) 

1304 

1305 if AnalysisDomain.STATISTICS not in exclude: 

1306 recommendations.append( 

1307 AnalysisRecommendation( 

1308 domain=AnalysisDomain.STATISTICS, 

1309 priority=1, 

1310 confidence=0.95, 

1311 reasoning="Statistical analysis provides foundational metrics", 

1312 estimated_runtime_ms=30, 

1313 ) 

1314 ) 

1315 

1316 # Spectral analysis - good for most signals 

1317 if AnalysisDomain.SPECTRAL not in exclude: 

1318 spectral_conf = 0.85 if is_periodic else 0.70 

1319 recommendations.append( 

1320 AnalysisRecommendation( 

1321 domain=AnalysisDomain.SPECTRAL, 

1322 priority=2 if is_periodic else 3, 

1323 confidence=spectral_conf, 

1324 reasoning="Spectral analysis reveals frequency content" 

1325 + (" - signal appears periodic" if is_periodic else ""), 

1326 estimated_runtime_ms=100, 

1327 ) 

1328 ) 

1329 

1330 # Digital-specific analyses 

1331 if is_digital: 

1332 if AnalysisDomain.DIGITAL not in exclude: 

1333 recommendations.append( 

1334 AnalysisRecommendation( 

1335 domain=AnalysisDomain.DIGITAL, 

1336 priority=1, 

1337 confidence=0.90, 

1338 reasoning="Digital signal detected - edge and timing analysis recommended", 

1339 estimated_runtime_ms=80, 

1340 ) 

1341 ) 

1342 

1343 if AnalysisDomain.TIMING not in exclude: 

1344 recommendations.append( 

1345 AnalysisRecommendation( 

1346 domain=AnalysisDomain.TIMING, 

1347 priority=2, 

1348 confidence=0.85, 

1349 reasoning="Timing analysis valuable for digital signals", 

1350 estimated_runtime_ms=60, 

1351 ) 

1352 ) 

1353 

1354 if AnalysisDomain.PROTOCOLS not in exclude and dominant_freq: 

1355 # Check if frequency matches common baud rates 

1356 common_bauds = [9600, 19200, 38400, 57600, 115200] 

1357 if any(abs(dominant_freq * 2 - b) / b < 0.1 for b in common_bauds): 

1358 recommendations.append( 

1359 AnalysisRecommendation( 

1360 domain=AnalysisDomain.PROTOCOLS, 

1361 priority=3, 

1362 confidence=0.70, 

1363 reasoning=f"Frequency {dominant_freq:.0f} Hz suggests serial protocol", 

1364 estimated_runtime_ms=150, 

1365 ) 

1366 ) 

1367 

1368 # Periodic signal analyses 

1369 if is_periodic: 

1370 if AnalysisDomain.JITTER not in exclude and is_digital: 

1371 recommendations.append( 

1372 AnalysisRecommendation( 

1373 domain=AnalysisDomain.JITTER, 

1374 priority=3, 

1375 confidence=0.80, 

1376 reasoning="Periodic digital signal - jitter analysis applicable", 

1377 estimated_runtime_ms=120, 

1378 ) 

1379 ) 

1380 

1381 if AnalysisDomain.EYE not in exclude and is_digital: 

1382 recommendations.append( 

1383 AnalysisRecommendation( 

1384 domain=AnalysisDomain.EYE, 

1385 priority=3, 

1386 confidence=0.75, 

1387 reasoning="Eye diagram analysis for signal integrity assessment", 

1388 estimated_runtime_ms=200, 

1389 ) 

1390 ) 

1391 

1392 # Pattern analysis - good for complex signals 

1393 if AnalysisDomain.PATTERNS not in exclude and len(data) > 1000: 

1394 pattern_conf = 0.70 if is_periodic else 0.50 

1395 recommendations.append( 

1396 AnalysisRecommendation( 

1397 domain=AnalysisDomain.PATTERNS, 

1398 priority=4, 

1399 confidence=pattern_conf, 

1400 reasoning="Pattern analysis can reveal repeating structures", 

1401 estimated_runtime_ms=500, 

1402 ) 

1403 ) 

1404 

1405 # Entropy analysis - useful for random/encrypted data 

1406 if AnalysisDomain.ENTROPY not in exclude: 

1407 recommendations.append( 

1408 AnalysisRecommendation( 

1409 domain=AnalysisDomain.ENTROPY, 

1410 priority=5, 

1411 confidence=0.80, 

1412 reasoning="Entropy analysis characterizes randomness and complexity", 

1413 estimated_runtime_ms=100, 

1414 ) 

1415 ) 

1416 

1417 # Apply confidence threshold filter 

1418 recommendations = [r for r in recommendations if r.confidence >= confidence_target] 

1419 

1420 # Apply time budget filter if specified 

1421 if time_budget_seconds is not None: 

1422 budget_ms = time_budget_seconds * 1000 

1423 cumulative = 0 

1424 filtered = [] 

1425 for rec in sorted(recommendations, key=lambda x: (x.priority, -x.confidence)): 

1426 if cumulative + rec.estimated_runtime_ms <= budget_ms: 

1427 filtered.append(rec) 

1428 cumulative += rec.estimated_runtime_ms 

1429 recommendations = filtered 

1430 

1431 # Sort by priority, then by confidence 

1432 recommendations.sort(key=lambda x: (x.priority, -x.confidence)) 

1433 

1434 return recommendations 

1435 

1436 

1437def get_optimal_domain_order( 

1438 recommendations: list[AnalysisRecommendation], 

1439) -> list[AnalysisDomain]: 

1440 """Get optimal order for running analyses. 

1441 

1442 Considers dependencies and priorities to determine best order. 

1443 

1444 Args: 

1445 recommendations: List of analysis recommendations. 

1446 

1447 Returns: 

1448 Ordered list of domains to analyze. 

1449 

1450 Example: 

1451 >>> import numpy as np 

1452 >>> import tracekit as tk 

1453 >>> # Generate test signal 

1454 >>> t = np.linspace(0, 1, 10000) 

1455 >>> signal = np.sin(2 * np.pi * 100 * t) 

1456 >>> recommendations = tk.recommend_analyses(signal, sample_rate=10000) 

1457 >>> order = tk.get_optimal_domain_order(recommendations) 

1458 >>> print([d.value for d in order]) 

1459 ['waveform', 'statistics', 'spectral', 'patterns', 'entropy'] 

1460 """ 

1461 # Avoid circular import 

1462 from tracekit.reporting.config import AnalysisDomain 

1463 

1464 # Define dependencies 

1465 dependencies = { 

1466 AnalysisDomain.JITTER: [AnalysisDomain.TIMING], 

1467 AnalysisDomain.EYE: [AnalysisDomain.DIGITAL], 

1468 AnalysisDomain.PROTOCOLS: [AnalysisDomain.DIGITAL], 

1469 AnalysisDomain.INFERENCE: [AnalysisDomain.PATTERNS], 

1470 } 

1471 

1472 # Build order respecting dependencies 

1473 ordered = [] 

1474 remaining = {r.domain for r in recommendations} 

1475 

1476 while remaining: 

1477 # Find domains with satisfied dependencies 

1478 ready = [] 

1479 for domain in remaining: 

1480 deps = dependencies.get(domain, []) 

1481 if all(d not in remaining or d in ordered for d in deps): 

1482 ready.append(domain) 

1483 

1484 if not ready: 

1485 # No ready domains - just add remaining (circular deps) 

1486 ready = list(remaining) 

1487 

1488 # Add highest priority ready domain 

1489 for rec in sorted(recommendations, key=lambda x: (x.priority, -x.confidence)): 

1490 if rec.domain in ready: 

1491 ordered.append(rec.domain) 

1492 remaining.discard(rec.domain) 

1493 break 

1494 

1495 return ordered 

1496 

1497 

1498__all__ = [ 

1499 "AnalysisRecommendation", 

1500 "assess_signal_quality", 

1501 "check_measurement_suitability", 

1502 "classify_signal", 

1503 "get_optimal_domain_order", 

1504 "recommend_analyses", 

1505 "suggest_measurements", 

1506]