Coverage for src / tracekit / inference / signal_intelligence.py: 78%
521 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Signal classification and measurement intelligence for TraceKit.
3This module provides intelligent signal type detection, quality assessment,
4and measurement suitability checking to help users understand why they might
5get NaN results and which measurements are appropriate for their signals.
8Example:
9 >>> import tracekit as tk
10 >>> trace = tk.load('signal.wfm')
11 >>> classification = tk.classify_signal(trace)
12 >>> print(f"Signal type: {classification['type']}")
13 >>> print(f"Characteristics: {classification['characteristics']}")
14 >>> quality = tk.assess_signal_quality(trace)
15 >>> print(f"SNR: {quality['snr']:.1f} dB")
16 >>> suggestions = tk.suggest_measurements(trace)
17 >>> print(f"Recommended measurements: {suggestions}")
19References:
20 IEEE 181-2011: Standard for Transitional Waveform Definitions
21 IEEE 1057-2017: Standard for Digitizing Waveform Recorders
22"""
24from __future__ import annotations
26from dataclasses import dataclass
27from typing import TYPE_CHECKING, Any, cast
29import numpy as np
31if TYPE_CHECKING:
32 from numpy.typing import NDArray
34 from tracekit.core.types import WaveformTrace
35 from tracekit.reporting.config import AnalysisDomain
38def classify_signal(
39 trace: WaveformTrace | NDArray[np.floating[Any]],
40 sample_rate: float = 1.0,
41 *,
42 digital_threshold_ratio: float = 0.8,
43 dc_threshold_percent: float = 90.0,
44 periodicity_threshold: float = 0.7,
45) -> dict[str, Any]:
46 """Classify signal type and characteristics.
48 Automatically detects whether a signal is digital, analog, or mixed,
49 identifies key characteristics like periodicity and noise, and estimates
50 fundamental properties.
52 Args:
53 trace: Input waveform trace or numpy array to classify.
54 sample_rate: Sample rate in Hz (only used if trace is ndarray).
55 digital_threshold_ratio: Ratio of samples at two levels to consider digital (0-1).
56 dc_threshold_percent: Percentage of DC component to classify as DC signal.
57 periodicity_threshold: Correlation threshold for periodic detection (0-1).
59 Returns:
60 Dictionary containing:
61 - signal_type: Signal type ("digital", "analog", "mixed", "dc")
62 - is_digital: Boolean indicating if signal is digital
63 - is_periodic: Boolean indicating if signal is periodic
64 - characteristics: List of characteristics like "periodic", "noisy", "pulsed"
65 - dc_component: True if significant DC offset present
66 - frequency_estimate: Estimated fundamental frequency in Hz (or None)
67 - dominant_frequency: Same as frequency_estimate (for compatibility)
68 - snr_db: Estimated SNR in dB (or None)
69 - confidence: Classification confidence (0.0-1.0)
70 - noise_level: Estimated noise level in signal units
71 - levels: For digital signals, dict with "low" and "high" levels
73 Example:
74 >>> trace = tk.load('square_wave.wfm')
75 >>> info = tk.classify_signal(trace)
76 >>> print(f"Type: {info['signal_type']}")
77 Type: digital
78 >>> print(f"Characteristics: {info['characteristics']}")
79 Characteristics: ['periodic', 'clean']
80 >>> print(f"Frequency: {info['frequency_estimate']:.3e} Hz")
81 Frequency: 1.000e+06 Hz
83 References:
84 IEEE 181-2011: Digital waveform characterization
85 """
86 # Handle both WaveformTrace and ndarray inputs
87 if isinstance(trace, np.ndarray): 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true
88 data = trace
89 trace_sample_rate = sample_rate
90 else:
91 data = trace.data
92 trace_sample_rate = trace.metadata.sample_rate
94 n = len(data)
96 if n < 10:
97 return {
98 "type": "unknown",
99 "signal_type": "unknown",
100 "is_digital": False,
101 "is_periodic": False,
102 "characteristics": ["insufficient_data"],
103 "dc_component": False,
104 "frequency_estimate": None,
105 "dominant_frequency": None,
106 "snr_db": None,
107 "confidence": 0.0,
108 "noise_level": 0.0,
109 "levels": None,
110 }
112 # Calculate basic statistics
113 mean_val = float(np.mean(data))
114 std_val = float(np.std(data))
115 min_val = float(np.min(data))
116 max_val = float(np.max(data))
117 amplitude = max_val - min_val
119 # Initialize result
120 characteristics = []
121 signal_type = "analog"
122 confidence = 0.5
124 # 1. Check for DC signal (very low variation)
125 # Use coefficient of variation (CV) for DC detection
126 cv = std_val / (abs(mean_val) + amplitude / 2 + 1e-12)
127 if amplitude < 1e-9 or cv < 0.005: # Less than 0.5% variation
128 signal_type = "dc"
129 characteristics.append("constant")
130 confidence = 0.95
131 return {
132 "type": signal_type,
133 "signal_type": signal_type,
134 "is_digital": False,
135 "is_periodic": False,
136 "characteristics": characteristics,
137 "dc_component": True,
138 "frequency_estimate": None,
139 "dominant_frequency": None,
140 "snr_db": None,
141 "confidence": confidence,
142 "noise_level": std_val,
143 "levels": None,
144 }
146 # 2. Check for digital signal (bimodal distribution)
147 is_digital, digital_levels, digital_confidence = _detect_digital_signal(
148 data, digital_threshold_ratio
149 )
151 if is_digital:
152 signal_type = "digital"
153 confidence = digital_confidence
154 characteristics.append("digital_levels")
156 # 3. Assess noise level
157 noise_level = _estimate_noise_level(data)
158 noise_ratio = noise_level / (amplitude + 1e-12)
160 if noise_ratio < 0.05:
161 characteristics.append("clean")
162 elif noise_ratio < 0.15:
163 characteristics.append("low_noise")
164 elif noise_ratio < 0.30:
165 characteristics.append("moderate_noise")
166 else:
167 characteristics.append("noisy")
169 # 4. Check for periodicity
170 is_periodic, period_estimate, periodicity_score = _detect_periodicity(
171 data, trace_sample_rate, periodicity_threshold
172 )
174 # For digital signals, also try edge-based periodicity detection
175 # This works better for signals with few periods
176 if not is_periodic and is_digital:
177 edge_periodic, edge_period, edge_confidence = _detect_edge_periodicity(
178 data, trace_sample_rate, digital_levels
179 )
180 if edge_periodic:
181 is_periodic = edge_periodic
182 period_estimate = edge_period
183 periodicity_score = edge_confidence
185 # Also try FFT-based frequency detection
186 # FFT is more reliable for undersampled signals where autocorrelation may detect harmonics
187 if n >= 64:
188 fft_periodic, fft_period, fft_confidence = _detect_periodicity_fft(data, trace_sample_rate)
189 if fft_periodic:
190 # If autocorrelation also found periodicity, compare results
191 if is_periodic and period_estimate is not None:
192 # If frequencies differ significantly (>20%), prefer the higher frequency
193 # (lower frequencies are often harmonics or aliasing artifacts)
194 auto_freq = 1.0 / period_estimate if period_estimate > 0 else 0
195 fft_freq = 1.0 / fft_period if fft_period is not None and fft_period > 0 else 0
196 freq_ratio = max(auto_freq, fft_freq) / (min(auto_freq, fft_freq) + 1e-12)
198 if freq_ratio > 1.2: # More than 20% difference
199 # Prefer higher frequency (more likely to be correct)
200 if fft_freq > auto_freq:
201 period_estimate = fft_period
202 periodicity_score = fft_confidence
203 else:
204 # Only FFT detected periodicity
205 is_periodic = fft_periodic
206 period_estimate = fft_period
207 periodicity_score = fft_confidence
209 if is_periodic:
210 characteristics.append("periodic")
211 frequency_estimate = (
212 1.0 / period_estimate if period_estimate is not None and period_estimate > 0 else None
213 )
214 confidence = max(confidence, periodicity_score)
215 else:
216 characteristics.append("aperiodic")
217 frequency_estimate = None
219 # 5. Check for DC component
220 dc_component = abs(mean_val) > (amplitude * dc_threshold_percent / 100.0)
222 # 6. Detect pulsed/transient characteristics
223 edge_count = _count_edges(data, digital_levels if is_digital else None)
224 samples_per_edge = n / max(edge_count, 1)
226 if edge_count > 2 and samples_per_edge > 100:
227 characteristics.append("pulsed")
228 elif edge_count < 3 and amplitude > std_val * 2:
229 characteristics.append("transient")
231 # 7. Check for mixed signal (both digital transitions and analog variation)
232 if is_digital and digital_levels is not None:
233 # Check if there's significant variation within digital levels
234 low_region = data[data < (digital_levels["low"] + digital_levels["high"]) / 2]
235 high_region = data[data >= (digital_levels["low"] + digital_levels["high"]) / 2]
237 if len(low_region) > 0 and len(high_region) > 0: 237 ↛ 247line 237 didn't jump to line 247 because the condition on line 237 was always true
238 low_std = np.std(low_region)
239 high_std = np.std(high_region)
240 level_separation = digital_levels["high"] - digital_levels["low"]
242 if low_std > level_separation * 0.1 or high_std > level_separation * 0.1: 242 ↛ 243line 242 didn't jump to line 243 because the condition on line 242 was never true
243 signal_type = "mixed"
244 characteristics.append("analog_variation")
246 # Calculate SNR estimate
247 snr_db = None
248 if amplitude > noise_level * 10:
249 signal_power = amplitude**2 / 8 # Approximate for most waveforms
250 noise_power = noise_level**2
251 if noise_power > 1e-20:
252 snr_db = 10 * np.log10(signal_power / noise_power)
254 return {
255 "type": signal_type,
256 "signal_type": signal_type,
257 "is_digital": is_digital,
258 "is_periodic": is_periodic,
259 "characteristics": characteristics,
260 "dc_component": dc_component,
261 "frequency_estimate": frequency_estimate,
262 "dominant_frequency": frequency_estimate,
263 "snr_db": float(snr_db) if snr_db is not None else None,
264 "confidence": float(confidence),
265 "noise_level": float(noise_level),
266 "levels": digital_levels if is_digital else None,
267 }
270def assess_signal_quality(
271 trace: WaveformTrace,
272) -> dict[str, Any]:
273 """Assess signal quality metrics.
275 Analyzes signal quality including SNR, noise level, clipping, saturation,
276 and other quality indicators that affect measurement accuracy.
278 Args:
279 trace: Input waveform trace to assess.
281 Returns:
282 Dictionary containing:
283 - snr: Signal-to-noise ratio in dB (or None if not applicable)
284 - noise_level: RMS noise level in signal units
285 - clipping: True if signal shows clipping
286 - saturation: True if signal appears saturated
287 - warnings: List of quality warning strings
288 - dynamic_range: Signal dynamic range in dB
289 - crest_factor: Peak-to-RMS ratio
291 Example:
292 >>> trace = tk.load('noisy_sine.wfm')
293 >>> quality = tk.assess_signal_quality(trace)
294 >>> print(f"SNR: {quality['snr']:.1f} dB")
295 SNR: 42.3 dB
296 >>> if quality['warnings']:
297 ... print(f"Warnings: {quality['warnings']}")
299 References:
300 IEEE 1057-2017: ADC quality metrics
301 """
302 data = trace.data
303 n = len(data)
304 warnings = []
306 if n < 10:
307 warnings.append("Insufficient data for quality assessment")
308 return {
309 "snr": None,
310 "noise_level": 0.0,
311 "clipping": False,
312 "saturation": False,
313 "warnings": warnings,
314 "dynamic_range": None,
315 "crest_factor": None,
316 }
318 # Calculate statistics
319 min_val = float(np.min(data))
320 max_val = float(np.max(data))
321 mean_val = float(np.mean(data))
322 rms_val = float(np.sqrt(np.mean(data**2)))
323 amplitude = max_val - min_val
325 # 1. Detect clipping (samples stuck at extremes)
326 # Real clipping shows as CONSECUTIVE samples at extremes, not just many samples near extremes
327 clipping = False
328 if amplitude > 1e-9:
329 tolerance = amplitude * 0.01 # 1% tolerance
331 # Find consecutive runs at extremes
332 at_min = data <= (min_val + tolerance)
333 at_max = data >= (max_val - tolerance)
335 # Check for long consecutive runs (clipping) vs brief peaks (natural waveform)
336 # For analog signals like sine waves, peaks naturally have ~5-10% of samples near extremes
337 # Real clipping typically shows >15-20% consecutive samples
338 # For digital signals, even short runs at extremes can indicate clipping
339 min_run_length = max(int(n * 0.15), 100) # 15% of data or 100 samples minimum
341 # Find maximum consecutive run lengths
342 max_min_run = 0
343 max_max_run = 0
345 current_min_run = 0
346 current_max_run = 0
348 for i in range(n):
349 if at_min[i]:
350 current_min_run += 1
351 max_min_run = max(max_min_run, current_min_run)
352 else:
353 current_min_run = 0
355 if at_max[i]:
356 current_max_run += 1
357 max_max_run = max(max_max_run, current_max_run)
358 else:
359 current_max_run = 0
361 # Clipping detected if we have long consecutive runs at extremes
362 if max_min_run >= min_run_length:
363 clipping = True
364 warnings.append(
365 f"Signal clipping detected at minimum ({max_min_run} consecutive samples)"
366 )
367 if max_max_run >= min_run_length:
368 clipping = True
369 warnings.append(
370 f"Signal clipping detected at maximum ({max_max_run} consecutive samples)"
371 )
373 # 2. Detect saturation (signal stuck at one level)
374 # For digital signals, 2 unique values is normal, not saturation
375 saturation = False
376 unique_values = len(np.unique(data))
377 classification = classify_signal(trace)
379 # Different thresholds for digital vs analog signals
380 if classification["type"] == "digital":
381 # Digital signals should have 2+ levels; saturation is when stuck at 1 level
382 if unique_values < 2: 382 ↛ 383line 382 didn't jump to line 383 because the condition on line 382 was never true
383 saturation = True
384 warnings.append(f"Signal saturation detected (only {unique_values} unique value)")
385 else:
386 # Analog signals should have many unique values
387 if unique_values < max(10, n // 1000):
388 saturation = True
389 warnings.append(f"Signal saturation detected (only {unique_values} unique values)")
391 # 3. Estimate noise level
392 noise_level = _estimate_noise_level(data)
394 # 4. Calculate SNR
395 snr = None
396 if amplitude > noise_level * 10: # Only calculate if signal > noise
397 # Remove DC and calculate signal power
398 data_ac = data - mean_val
399 signal_power = np.mean(data_ac**2)
400 noise_power = noise_level**2
402 if noise_power > 1e-20:
403 snr = 10 * np.log10(signal_power / noise_power)
404 else:
405 snr = float("inf")
407 # 5. Calculate dynamic range
408 dynamic_range = None
409 if min_val != 0 and max_val != 0:
410 dynamic_range = 20 * np.log10(max_val / (abs(min_val) + 1e-20))
412 # 6. Calculate crest factor (peak-to-RMS)
413 crest_factor = None
414 if rms_val > 1e-12: 414 ↛ 418line 414 didn't jump to line 418 because the condition on line 414 was always true
415 crest_factor = max(abs(max_val), abs(min_val)) / rms_val
417 # 7. Check for quantization issues
418 if n > 100:
419 # Estimate quantization step
420 sorted_data = np.sort(data)
421 diffs = np.diff(sorted_data)
422 diffs = diffs[diffs > 1e-15] # Remove near-zero differences
424 if len(diffs) > 10:
425 min_step = np.min(diffs)
426 if amplitude / min_step < 256:
427 warnings.append(
428 f"Low resolution detected ({int(amplitude / min_step)} levels), "
429 "may affect measurement accuracy"
430 )
432 # 8. Check sample rate adequacy
433 classification = classify_signal(trace)
434 if classification["frequency_estimate"] is not None:
435 # Check if sample rate is at least 10x the detected frequency
436 nyquist_rate = 2 * classification["frequency_estimate"]
437 if trace.metadata.sample_rate < nyquist_rate * 5:
438 warnings.append(
439 f"Sample rate ({trace.metadata.sample_rate:.3e} Hz) may be "
440 f"insufficient for signal frequency ({classification['frequency_estimate']:.3e} Hz). "
441 "Recommend at least 10x oversampling"
442 )
444 # Additional check: if samples per period is very low, we might be undersampling
445 # This catches cases where frequency detection may be wrong due to aliasing
446 samples_per_period = trace.metadata.sample_rate / classification["frequency_estimate"]
447 if samples_per_period < 10 and "sample rate" not in "".join(warnings).lower(): 447 ↛ 448line 447 didn't jump to line 448 because the condition on line 447 was never true
448 warnings.append(
449 f"Very low oversampling detected ({samples_per_period:.1f} samples per period). "
450 f"Signal may be undersampled or frequency detection may be inaccurate. "
451 "Recommend at least 10 samples per period"
452 )
454 return {
455 "snr": float(snr) if snr is not None else None,
456 "noise_level": float(noise_level),
457 "clipping": bool(clipping),
458 "saturation": bool(saturation),
459 "warnings": warnings,
460 "dynamic_range": float(dynamic_range) if dynamic_range is not None else None,
461 "crest_factor": float(crest_factor) if crest_factor is not None else None,
462 }
465def check_measurement_suitability(
466 trace: WaveformTrace,
467 measurement_name: str,
468) -> dict[str, Any]:
469 """Check if a measurement is suitable for this signal.
471 Analyzes signal characteristics to determine if a specific measurement
472 will produce valid results, and provides warnings and suggestions.
474 Args:
475 trace: Input waveform trace.
476 measurement_name: Name of measurement to check (e.g., "frequency", "rise_time").
478 Returns:
479 Dictionary containing:
480 - suitable: True if measurement is appropriate for this signal
481 - confidence: Confidence in suitability assessment (0.0-1.0)
482 - warnings: List of warning strings
483 - suggestions: List of suggestion strings
484 - expected_result: "valid", "nan", or "unreliable"
486 Example:
487 >>> trace = tk.load('dc_signal.wfm')
488 >>> check = tk.check_measurement_suitability(trace, "frequency")
489 >>> if not check['suitable']:
490 ... print(f"Warning: {check['warnings']}")
491 Warning: ['Frequency measurement not suitable for DC signal']
493 References:
494 IEEE 181-2011: Measurement applicability
495 """
496 classification = classify_signal(trace)
497 quality = assess_signal_quality(trace)
499 warnings = []
500 suggestions = []
501 suitable = True
502 confidence = 0.8
503 expected_result = "valid"
505 signal_type = classification["type"]
506 characteristics = classification["characteristics"]
508 # Define measurement requirements
509 frequency_measurements = ["frequency", "period"]
510 edge_measurements = ["rise_time", "fall_time"]
511 amplitude_measurements = ["amplitude", "overshoot", "undershoot", "preshoot"]
512 duty_measurements = ["duty_cycle", "pulse_width"]
513 _statistical_measurements = ["mean", "rms"]
514 spectral_measurements = ["thd", "snr", "sinad", "enob", "sfdr", "fft", "psd"]
516 # Check DC signals
517 if signal_type == "dc":
518 if measurement_name in frequency_measurements:
519 suitable = False
520 warnings.append(f"{measurement_name} measurement not suitable for DC signal")
521 suggestions.append("Use 'mean' or 'rms' measurements for DC signals")
522 expected_result = "nan"
523 elif measurement_name in edge_measurements:
524 suitable = False
525 warnings.append(f"{measurement_name} requires signal transitions")
526 suggestions.append("Signal appears to be DC with no edges")
527 expected_result = "nan"
528 elif measurement_name in duty_measurements: 528 ↛ 529line 528 didn't jump to line 529 because the condition on line 528 was never true
529 suitable = False
530 warnings.append(f"{measurement_name} requires periodic signal")
531 expected_result = "nan"
533 # Check aperiodic signals
534 if "aperiodic" in characteristics:
535 if measurement_name in frequency_measurements + duty_measurements:
536 suitable = False
537 confidence = 0.6
538 warnings.append(f"{measurement_name} requires periodic signal")
539 suggestions.append("Signal does not appear periodic")
540 expected_result = "nan"
541 elif measurement_name in spectral_measurements:
542 warnings.append("Spectral measurements on aperiodic signals may not show clear peaks")
543 suggestions.append("Consider time-domain or statistical analysis")
544 expected_result = "unreliable"
546 # Check digital vs analog
547 if signal_type == "digital":
548 if measurement_name in amplitude_measurements and measurement_name != "amplitude":
549 warnings.append(
550 f"{measurement_name} designed for analog signals with overshoot/ringing"
551 )
552 suggestions.append("Digital signals may show zero overshoot/undershoot")
553 expected_result = "unreliable"
554 confidence = 0.5
556 # Check for sufficient transitions
557 if measurement_name in edge_measurements + duty_measurements:
558 data = trace.data
559 edge_count = _count_edges(data, classification.get("levels"))
560 if edge_count < 2:
561 suitable = False
562 warnings.append(f"{measurement_name} requires at least 2 signal edges")
563 suggestions.append(f"Signal has only {edge_count} detected edge(s)")
564 expected_result = "nan"
566 # Check signal quality impacts
567 if quality["clipping"]:
568 if measurement_name in edge_measurements + amplitude_measurements:
569 warnings.append("Signal clipping detected, may affect measurement accuracy")
570 # Don't override "nan" - if measurement is fundamentally unsuitable, keep it as "nan"
571 if expected_result != "nan": 571 ↛ 573line 571 didn't jump to line 573 because the condition on line 571 was always true
572 expected_result = "unreliable"
573 confidence = min(confidence, 0.6)
575 if quality["saturation"]:
576 warnings.append("Signal saturation detected, measurements may be unreliable")
577 # Don't override "nan" - if measurement is fundamentally unsuitable, keep it as "nan"
578 if expected_result != "nan":
579 expected_result = "unreliable"
580 confidence = min(confidence, 0.5)
582 if quality["snr"] is not None and quality["snr"] < 20: 582 ↛ 583line 582 didn't jump to line 583 because the condition on line 582 was never true
583 if measurement_name in edge_measurements:
584 warnings.append(
585 f"Low SNR ({quality['snr']:.1f} dB) may affect edge timing measurements"
586 )
587 suggestions.append("Consider filtering signal to improve SNR")
588 confidence = min(confidence, 0.7)
590 # Check sample rate for timing measurements
591 if measurement_name in edge_measurements + frequency_measurements:
592 if classification["frequency_estimate"] is not None:
593 nyquist_rate = 2 * classification["frequency_estimate"]
594 if trace.metadata.sample_rate < nyquist_rate * 5:
595 warnings.append("Sample rate may be too low for accurate timing measurements")
596 suggestions.append(
597 f"Recommend sample rate > {nyquist_rate * 10:.3e} Hz (10x signal frequency)"
598 )
599 expected_result = "unreliable"
600 confidence = min(confidence, 0.6)
602 # Check data length
603 n = len(trace.data)
604 if measurement_name in spectral_measurements:
605 if n < 256:
606 warnings.append(f"Signal length ({n} samples) may be too short for spectral analysis")
607 suggestions.append("Recommend at least 1024 samples for FFT-based measurements")
608 expected_result = "unreliable"
609 confidence = min(confidence, 0.5)
611 if measurement_name in frequency_measurements:
612 if classification["frequency_estimate"] is not None:
613 min_samples = trace.metadata.sample_rate / classification["frequency_estimate"]
614 # Require at least 0.5 periods for basic detection
615 # Having 1+ complete periods is ideal, but FFT can work with less
616 if n < min_samples * 0.5: 616 ↛ 617line 616 didn't jump to line 617 because the condition on line 616 was never true
617 warnings.append(
618 f"Signal length ({n} samples) captures < 0.5 periods, "
619 "frequency measurement may fail"
620 )
621 suggestions.append("Capture at least 2 periods for reliable frequency measurement")
622 expected_result = "unreliable"
623 confidence = min(confidence, 0.5)
624 elif n < min_samples * 2:
625 # Between 0.5 and 2 periods: usable but not ideal
626 suggestions.append("Capture at least 10 periods for best accuracy")
627 confidence = min(confidence, 0.75)
629 return {
630 "suitable": suitable,
631 "confidence": float(confidence),
632 "warnings": warnings,
633 "suggestions": suggestions,
634 "expected_result": expected_result,
635 }
638def suggest_measurements(
639 trace: WaveformTrace,
640 *,
641 max_suggestions: int = 10,
642) -> list[dict[str, Any]]:
643 """Suggest appropriate measurements for a signal.
645 Analyzes signal characteristics and recommends the most suitable
646 measurements, ranked by relevance and reliability.
648 Args:
649 trace: Input waveform trace.
650 max_suggestions: Maximum number of suggestions to return.
652 Returns:
653 List of dictionaries, each containing:
654 - name: Measurement name
655 - category: Measurement category (e.g., "timing", "amplitude", "spectral")
656 - priority: Priority ranking (1=highest)
657 - rationale: Why this measurement is recommended
658 - confidence: Confidence in recommendation (0.0-1.0)
660 Example:
661 >>> trace = tk.load('square_wave.wfm')
662 >>> suggestions = tk.suggest_measurements(trace)
663 >>> for s in suggestions[:3]:
664 ... print(f"{s['name']}: {s['rationale']}")
665 frequency: Periodic digital signal detected
666 duty_cycle: Suitable for pulse analysis
667 rise_time: Digital edges detected
669 References:
670 Best practices for waveform analysis
671 """
672 classification = classify_signal(trace)
673 quality = assess_signal_quality(trace)
675 signal_type = classification["type"]
676 characteristics = classification["characteristics"]
678 suggestions = []
680 # Always suggest basic statistical measurements
681 suggestions.append(
682 {
683 "name": "mean",
684 "category": "statistical",
685 "priority": 1,
686 "rationale": "Basic DC level measurement, always applicable",
687 "confidence": 1.0,
688 }
689 )
691 suggestions.append(
692 {
693 "name": "rms",
694 "category": "statistical",
695 "priority": 2,
696 "rationale": "RMS voltage measurement, useful for all signal types",
697 "confidence": 1.0,
698 }
699 )
701 # DC signals
702 if signal_type == "dc":
703 suggestions.append(
704 {
705 "name": "amplitude",
706 "category": "amplitude",
707 "priority": 3,
708 "rationale": "Measure noise/variation level in DC signal",
709 "confidence": 0.9,
710 }
711 )
712 # Don't suggest frequency, edges, etc.
713 return sorted(suggestions, key=lambda x: cast("int", x["priority"]))[:max_suggestions]
715 # Amplitude measurements
716 suggestions.append(
717 {
718 "name": "amplitude",
719 "category": "amplitude",
720 "priority": 3,
721 "rationale": f"Peak-to-peak amplitude for {signal_type} signal",
722 "confidence": 0.95,
723 }
724 )
726 # Periodic signals
727 if "periodic" in characteristics: 727 ↛ 749line 727 didn't jump to line 749 because the condition on line 727 was always true
728 suggestions.append(
729 {
730 "name": "frequency",
731 "category": "timing",
732 "priority": 4,
733 "rationale": "Periodic signal detected, frequency measurement applicable",
734 "confidence": classification["confidence"],
735 }
736 )
738 suggestions.append(
739 {
740 "name": "period",
741 "category": "timing",
742 "priority": 5,
743 "rationale": "Period measurement for periodic signal",
744 "confidence": classification["confidence"],
745 }
746 )
748 # Digital signals with edges
749 if signal_type in ("digital", "mixed"):
750 edge_count = _count_edges(trace.data, classification.get("levels"))
752 if edge_count >= 2: 752 ↛ 773line 752 didn't jump to line 773 because the condition on line 752 was always true
753 suggestions.append(
754 {
755 "name": "rise_time",
756 "category": "timing",
757 "priority": 6,
758 "rationale": f"Digital edges detected ({edge_count} edges)",
759 "confidence": 0.9 if quality["snr"] and quality["snr"] > 20 else 0.7,
760 }
761 )
763 suggestions.append(
764 {
765 "name": "fall_time",
766 "category": "timing",
767 "priority": 7,
768 "rationale": f"Digital edges detected ({edge_count} edges)",
769 "confidence": 0.9 if quality["snr"] and quality["snr"] > 20 else 0.7,
770 }
771 )
773 if "periodic" in characteristics and edge_count >= 2: 773 ↛ 796line 773 didn't jump to line 796 because the condition on line 773 was always true
774 # Need at least 2 edges (1 complete cycle) for duty cycle
775 suggestions.append(
776 {
777 "name": "duty_cycle",
778 "category": "timing",
779 "priority": 8,
780 "rationale": "Periodic pulse train detected",
781 "confidence": 0.85 if edge_count >= 4 else 0.75,
782 }
783 )
785 suggestions.append(
786 {
787 "name": "pulse_width",
788 "category": "timing",
789 "priority": 9,
790 "rationale": "Pulse measurements suitable for periodic digital signal",
791 "confidence": 0.85 if edge_count >= 4 else 0.75,
792 }
793 )
795 # Analog signals
796 if signal_type in ("analog", "mixed"):
797 if not quality["clipping"]: 797 ↛ 819line 797 didn't jump to line 819 because the condition on line 797 was always true
798 suggestions.append(
799 {
800 "name": "overshoot",
801 "category": "amplitude",
802 "priority": 10,
803 "rationale": "Analog signal, overshoot measurement applicable",
804 "confidence": 0.8,
805 }
806 )
808 suggestions.append(
809 {
810 "name": "undershoot",
811 "category": "amplitude",
812 "priority": 11,
813 "rationale": "Analog signal, undershoot measurement applicable",
814 "confidence": 0.8,
815 }
816 )
818 # Spectral measurements for clean, periodic signals
819 if "periodic" in characteristics and "clean" in characteristics: 819 ↛ 842line 819 didn't jump to line 842 because the condition on line 819 was always true
820 if len(trace.data) >= 256: 820 ↛ 842line 820 didn't jump to line 842 because the condition on line 820 was always true
821 suggestions.append(
822 {
823 "name": "thd",
824 "category": "spectral",
825 "priority": 12,
826 "rationale": "Clean periodic signal suitable for harmonic analysis",
827 "confidence": 0.85,
828 }
829 )
831 suggestions.append(
832 {
833 "name": "snr",
834 "category": "spectral",
835 "priority": 13,
836 "rationale": "Spectral SNR measurement for signal quality",
837 "confidence": 0.8,
838 }
839 )
841 # Sort by priority and limit
842 suggestions = sorted(suggestions, key=lambda x: cast("int", x["priority"]))
843 return suggestions[:max_suggestions]
846# =============================================================================
847# Helper Functions
848# =============================================================================
851def _detect_digital_signal(
852 data: NDArray[np.floating[Any]],
853 threshold_ratio: float,
854) -> tuple[bool, dict[str, float] | None, float]:
855 """Detect if signal is digital based on bimodal distribution.
857 Args:
858 data: Signal data array.
859 threshold_ratio: Ratio of samples at two levels to consider digital.
861 Returns:
862 Tuple of (is_digital, levels_dict, confidence).
863 """
864 # Use histogram to find peaks
865 # Use more bins for better resolution on digital signals
866 n_bins = min(100, len(np.unique(data)))
867 hist, bin_edges = np.histogram(data, bins=n_bins)
868 bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
870 # Find peaks (local maxima or significant bins)
871 peaks = []
873 # Special case: if only 2 bins (perfect digital signal), both are peaks
874 if len(hist) == 2:
875 for i in range(len(hist)):
876 if hist[i] > len(data) * 0.01: 876 ↛ 875line 876 didn't jump to line 875 because the condition on line 876 was always true
877 peaks.append((i, hist[i], bin_centers[i]))
878 else:
879 # Find local maxima in histogram
880 for i in range(1, len(hist) - 1):
881 if hist[i] > hist[i - 1] and hist[i] > hist[i + 1]:
882 # Lower threshold for peak detection
883 if hist[i] > len(data) * 0.01: # At least 1% of samples
884 peaks.append((i, hist[i], bin_centers[i]))
886 # If we have exactly 2 dominant peaks, likely digital
887 if len(peaks) >= 2:
888 # Sort by count
889 peaks = sorted(peaks, key=lambda x: x[1], reverse=True)
891 # Take top 2 peaks
892 peak1, peak2 = peaks[0], peaks[1]
894 # Check if these two peaks account for most samples
895 total_in_peaks = peak1[1] + peak2[1]
896 ratio = total_in_peaks / len(data)
898 # Also check that peaks are well separated
899 peak_separation = abs(peak1[2] - peak2[2])
900 data_range = np.ptp(data)
902 # Peaks should be separated by at least 30% of data range
903 if ratio >= threshold_ratio and peak_separation > data_range * 0.3:
904 low_level = min(peak1[2], peak2[2])
905 high_level = max(peak1[2], peak2[2])
907 confidence = min(0.95, ratio)
909 return True, {"low": float(low_level), "high": float(high_level)}, confidence
911 return False, None, 0.0
914def _estimate_noise_level(data: NDArray[np.floating[Any]]) -> float:
915 """Estimate noise level using median absolute deviation.
917 Args:
918 data: Signal data array.
920 Returns:
921 Estimated RMS noise level.
922 """
923 if len(data) < 10:
924 return 0.0
926 # Use differencing to remove slow variations
927 diffs = np.diff(data)
929 # MAD (Median Absolute Deviation) is robust to outliers
930 median_diff = np.median(diffs)
931 mad = np.median(np.abs(diffs - median_diff))
933 # Convert MAD to RMS noise estimate
934 # For Gaussian noise: sigma ≈ 1.4826 * MAD
935 # Divide by sqrt(2) because diff amplifies noise by sqrt(2)
936 noise_estimate = (1.4826 * mad) / np.sqrt(2)
938 return float(noise_estimate)
941def _detect_periodicity(
942 data: NDArray[np.floating[Any]],
943 sample_rate: float,
944 threshold: float,
945) -> tuple[bool, float | None, float]:
946 """Detect if signal is periodic using autocorrelation.
948 Args:
949 data: Signal data array.
950 sample_rate: Sampling rate in Hz.
951 threshold: Correlation threshold for periodic detection.
953 Returns:
954 Tuple of (is_periodic, period_seconds, confidence).
955 """
956 n = len(data)
958 if n < 20:
959 return False, None, 0.0
961 # Remove DC for autocorrelation
962 data_ac = data - np.mean(data)
964 # Check if there's any variation
965 if np.std(data_ac) < 1e-12: 965 ↛ 966line 965 didn't jump to line 966 because the condition on line 965 was never true
966 return False, None, 0.0
968 # Compute autocorrelation for lags up to n-10 to detect signals with ~1 period
969 # This allows finding periodicity even when we have just 1 period of data
970 # Keep at least 10 samples of overlap for correlation
971 max_lag = min(n - 10, 20000) # Limit for performance
973 autocorr = np.correlate(data_ac, data_ac, mode="full")
974 autocorr = autocorr[n - 1 : n - 1 + max_lag]
976 # Normalize
977 if abs(autocorr[0]) > 1e-12: 977 ↛ 980line 977 didn't jump to line 980 because the condition on line 977 was always true
978 autocorr = autocorr / autocorr[0]
979 else:
980 return False, None, 0.0
982 # Find peaks in autocorrelation (exclude lag=0 and very small lags)
983 # Start searching from lag > n/100 to avoid noise
984 min_lag = max(3, n // 100)
985 peaks = []
987 for i in range(min_lag, len(autocorr) - 2):
988 # Use stronger peak detection
989 if (
990 autocorr[i] > autocorr[i - 1]
991 and autocorr[i] > autocorr[i + 1]
992 and autocorr[i] > autocorr[i - 2]
993 and autocorr[i] > autocorr[i + 2]
994 ):
995 if autocorr[i] > threshold:
996 peaks.append((i, autocorr[i]))
998 if peaks:
999 # Take first significant peak as period
1000 period_samples = peaks[0][0]
1001 confidence = float(peaks[0][1])
1003 period_seconds = period_samples / sample_rate
1005 return True, period_seconds, confidence
1007 return False, None, 0.0
1010def _count_edges(
1011 data: NDArray[np.floating[Any]],
1012 levels: dict[str, float] | None,
1013) -> int:
1014 """Count number of edges in signal.
1016 Args:
1017 data: Signal data array.
1018 levels: Optional digital levels dict with "low" and "high" keys.
1020 Returns:
1021 Number of edges detected.
1022 """
1023 if len(data) < 3:
1024 return 0
1026 if levels is not None:
1027 # Use provided levels
1028 threshold = (levels["low"] + levels["high"]) / 2
1029 else:
1030 # Use median as threshold
1031 threshold = float(np.median(data))
1033 # Find crossings
1034 above = data > threshold
1035 crossings = np.diff(above.astype(int))
1037 # Count non-zero crossings (both rising and falling)
1038 edge_count = np.sum(np.abs(crossings))
1040 return int(edge_count)
1043def _detect_periodicity_fft(
1044 data: NDArray[np.floating[Any]],
1045 sample_rate: float,
1046) -> tuple[bool, float | None, float]:
1047 """Detect periodicity using FFT (frequency domain analysis).
1049 This method works well for signals with few periods where autocorrelation
1050 may fail. It finds the dominant frequency component in the signal.
1052 Args:
1053 data: Signal data array.
1054 sample_rate: Sampling rate in Hz.
1056 Returns:
1057 Tuple of (is_periodic, period_seconds, confidence).
1058 """
1059 n = len(data)
1061 if n < 64:
1062 return False, None, 0.0
1064 # Remove DC component
1065 data_ac = data - np.mean(data)
1067 # Check if there's any variation
1068 if np.std(data_ac) < 1e-12:
1069 return False, None, 0.0
1071 # Compute FFT
1072 fft = np.fft.rfft(data_ac)
1073 freqs = np.fft.rfftfreq(n, 1.0 / sample_rate)
1075 # Compute power spectrum
1076 power = np.abs(fft) ** 2
1078 # Skip DC component (index 0)
1079 if len(power) < 3: 1079 ↛ 1080line 1079 didn't jump to line 1080 because the condition on line 1079 was never true
1080 return False, None, 0.0
1082 power = power[1:]
1083 freqs = freqs[1:]
1085 # Find peak in power spectrum
1086 peak_idx = np.argmax(power)
1087 peak_power = power[peak_idx]
1088 peak_freq = freqs[peak_idx]
1090 # Check if peak is significant compared to total power
1091 total_power = np.sum(power)
1092 if total_power < 1e-20: 1092 ↛ 1093line 1092 didn't jump to line 1093 because the condition on line 1092 was never true
1093 return False, None, 0.0
1095 power_ratio = peak_power / total_power
1097 # For periodic signals, the dominant frequency should have significant power
1098 # Require at least 10% of total power in the peak
1099 if power_ratio < 0.1:
1100 return False, None, 0.0
1102 # Check that frequency is reasonable (not too low or too high)
1103 nyquist = sample_rate / 2
1104 if peak_freq < sample_rate / n or peak_freq > nyquist * 0.9: 1104 ↛ 1105line 1104 didn't jump to line 1105 because the condition on line 1104 was never true
1105 return False, None, 0.0
1107 # Estimate period
1108 period_seconds = 1.0 / peak_freq
1110 # Confidence based on how dominant the peak is
1111 # High power ratio -> high confidence
1112 confidence = min(0.95, 0.5 + power_ratio)
1114 return True, period_seconds, float(confidence)
1117def _detect_edge_periodicity(
1118 data: NDArray[np.floating[Any]],
1119 sample_rate: float,
1120 levels: dict[str, float] | None,
1121) -> tuple[bool, float | None, float]:
1122 """Detect periodicity in digital signals by analyzing edge spacing.
1124 This method works well for signals with few periods where autocorrelation
1125 may fail. It detects regular patterns in edge timing.
1127 Args:
1128 data: Signal data array.
1129 sample_rate: Sampling rate in Hz.
1130 levels: Digital levels dict with "low" and "high" keys.
1132 Returns:
1133 Tuple of (is_periodic, period_seconds, confidence).
1134 """
1135 if len(data) < 10 or levels is None:
1136 return False, None, 0.0
1138 threshold = (levels["low"] + levels["high"]) / 2
1140 # Find edge positions
1141 above = data > threshold
1142 crossings = np.diff(above.astype(int))
1143 edge_positions = np.where(crossings != 0)[0]
1145 if len(edge_positions) < 2:
1146 # Need at least 2 edges (1 complete cycle) for detection
1147 return False, None, 0.0
1149 # Calculate intervals between edges
1150 intervals = np.diff(edge_positions)
1152 if len(intervals) < 1: 1152 ↛ 1153line 1152 didn't jump to line 1153 because the condition on line 1152 was never true
1153 return False, None, 0.0
1155 # For a periodic signal, intervals should form a repeating pattern
1156 # For a square wave: intervals alternate between high-time and low-time
1157 # Check if intervals show regular pattern
1159 # Calculate coefficient of variation of intervals
1160 mean_interval = np.mean(intervals)
1161 std_interval = np.std(intervals)
1163 if mean_interval < 1: 1163 ↛ 1164line 1163 didn't jump to line 1164 because the condition on line 1163 was never true
1164 return False, None, 0.0
1166 cv = std_interval / mean_interval
1168 # Special case: exactly 1 interval (2 edges, half period of square wave)
1169 if len(intervals) == 1:
1170 # This represents half a period for a square wave
1171 period_samples = 2 * intervals[0]
1172 period_seconds = period_samples / sample_rate
1173 # Lower confidence since we only have half a period
1174 return True, period_seconds, 0.7
1176 # For highly periodic signals, CV should be low
1177 if cv > 0.3:
1178 # High variation - check if it's alternating pattern (square wave)
1179 if len(intervals) >= 4: 1179 ↛ 1194line 1179 didn't jump to line 1194 because the condition on line 1179 was always true
1180 # Check if odd and even intervals are each consistent
1181 odd_intervals = intervals[::2]
1182 even_intervals = intervals[1::2]
1184 odd_cv = np.std(odd_intervals) / (np.mean(odd_intervals) + 1e-12)
1185 even_cv = np.std(even_intervals) / (np.mean(even_intervals) + 1e-12)
1187 if odd_cv < 0.2 and even_cv < 0.2: 1187 ↛ 1201line 1187 didn't jump to line 1201 because the condition on line 1187 was always true
1188 # Alternating pattern detected (square wave)
1189 # Period is sum of two consecutive intervals
1190 period_samples = np.mean(odd_intervals) + np.mean(even_intervals)
1191 period_seconds = period_samples / sample_rate
1192 confidence = 1.0 - max(odd_cv, even_cv)
1193 return True, period_seconds, float(confidence)
1194 elif len(intervals) == 2:
1195 # Only 2 intervals - assume alternating pattern for square wave
1196 period_samples = intervals[0] + intervals[1]
1197 period_seconds = period_samples / sample_rate
1198 # Moderate confidence with only 2 intervals
1199 return True, period_seconds, 0.75
1201 return False, None, 0.0
1203 # Regular intervals detected
1204 # For square waves with 50% duty cycle, full period = 2 * interval
1205 # For other waveforms, check if all intervals are similar (uniform spacing)
1207 # Estimate period from intervals
1208 # If all intervals are similar, period might be 2*interval (square wave)
1209 # Check by seeing if we have roughly equal numbers of edges per inferred period
1210 period_samples = 2 * mean_interval # Assume square wave initially
1211 num_periods = len(data) / period_samples
1213 # If we have at least 1 period, consider it periodic
1214 if num_periods >= 0.5: # Allow detection with half a period 1214 ↛ 1219line 1214 didn't jump to line 1219 because the condition on line 1214 was always true
1215 period_seconds = period_samples / sample_rate
1216 confidence = 1.0 - min(cv / 0.3, 0.5) # Scale confidence by CV
1217 return True, period_seconds, float(confidence)
1219 return False, None, 0.0
1222@dataclass
1223class AnalysisRecommendation:
1224 """Recommendation for an analysis to run.
1226 Attributes:
1227 domain: Analysis domain to run.
1228 priority: Priority ranking (1=highest).
1229 confidence: Expected confidence if run (0.0-1.0).
1230 reasoning: Human-readable explanation.
1231 estimated_runtime_ms: Estimated runtime in milliseconds.
1232 prerequisites_met: Whether all prerequisites are satisfied.
1233 """
1235 domain: AnalysisDomain
1236 priority: int # 1=highest priority
1237 confidence: float # Expected confidence if run
1238 reasoning: str
1239 estimated_runtime_ms: int = 100
1240 prerequisites_met: bool = True
1243def recommend_analyses(
1244 data: NDArray[np.floating[Any]],
1245 sample_rate: float = 1.0,
1246 *,
1247 time_budget_seconds: float | None = None,
1248 confidence_target: float = 0.7,
1249 exclude_domains: list[AnalysisDomain] | None = None,
1250) -> list[AnalysisRecommendation]:
1251 """Recommend which analyses to run based on signal characteristics.
1253 Uses signal classification, quality metrics, and heuristics to
1254 recommend the most valuable analyses for a given signal.
1256 Args:
1257 data: Input signal data.
1258 sample_rate: Sample rate in Hz.
1259 time_budget_seconds: Optional time budget (prioritizes faster analyses).
1260 confidence_target: Minimum expected confidence threshold.
1261 exclude_domains: Domains to exclude from recommendations.
1263 Returns:
1264 List of AnalysisRecommendation sorted by priority.
1266 Example:
1267 >>> import numpy as np
1268 >>> import tracekit as tk
1269 >>> # Generate test signal
1270 >>> t = np.linspace(0, 1, 10000)
1271 >>> signal = np.sin(2 * np.pi * 100 * t)
1272 >>> recommendations = tk.recommend_analyses(signal, sample_rate=10000)
1273 >>> for rec in recommendations[:3]:
1274 ... print(f"{rec.domain.value}: {rec.reasoning}")
1275 waveform: Basic waveform measurements are always applicable
1276 statistics: Statistical analysis provides foundational metrics
1277 spectral: Spectral analysis reveals frequency content - signal appears periodic
1278 """
1279 # Avoid circular import
1280 from tracekit.reporting.config import AnalysisDomain
1282 recommendations = []
1283 exclude = set(exclude_domains or [])
1285 # Classify signal
1286 classification = classify_signal(data, sample_rate)
1287 _signal_type = classification.get("signal_type", "unknown") # Reserved for future use
1288 is_digital = classification.get("is_digital", False)
1289 is_periodic = classification.get("is_periodic", False)
1290 _snr_db = classification.get("snr_db", 20) # Reserved for future use
1291 dominant_freq = classification.get("dominant_frequency")
1293 # Always recommend these foundational domains
1294 if AnalysisDomain.WAVEFORM not in exclude:
1295 recommendations.append(
1296 AnalysisRecommendation(
1297 domain=AnalysisDomain.WAVEFORM,
1298 priority=1,
1299 confidence=0.95,
1300 reasoning="Basic waveform measurements are always applicable",
1301 estimated_runtime_ms=50,
1302 )
1303 )
1305 if AnalysisDomain.STATISTICS not in exclude:
1306 recommendations.append(
1307 AnalysisRecommendation(
1308 domain=AnalysisDomain.STATISTICS,
1309 priority=1,
1310 confidence=0.95,
1311 reasoning="Statistical analysis provides foundational metrics",
1312 estimated_runtime_ms=30,
1313 )
1314 )
1316 # Spectral analysis - good for most signals
1317 if AnalysisDomain.SPECTRAL not in exclude:
1318 spectral_conf = 0.85 if is_periodic else 0.70
1319 recommendations.append(
1320 AnalysisRecommendation(
1321 domain=AnalysisDomain.SPECTRAL,
1322 priority=2 if is_periodic else 3,
1323 confidence=spectral_conf,
1324 reasoning="Spectral analysis reveals frequency content"
1325 + (" - signal appears periodic" if is_periodic else ""),
1326 estimated_runtime_ms=100,
1327 )
1328 )
1330 # Digital-specific analyses
1331 if is_digital:
1332 if AnalysisDomain.DIGITAL not in exclude:
1333 recommendations.append(
1334 AnalysisRecommendation(
1335 domain=AnalysisDomain.DIGITAL,
1336 priority=1,
1337 confidence=0.90,
1338 reasoning="Digital signal detected - edge and timing analysis recommended",
1339 estimated_runtime_ms=80,
1340 )
1341 )
1343 if AnalysisDomain.TIMING not in exclude:
1344 recommendations.append(
1345 AnalysisRecommendation(
1346 domain=AnalysisDomain.TIMING,
1347 priority=2,
1348 confidence=0.85,
1349 reasoning="Timing analysis valuable for digital signals",
1350 estimated_runtime_ms=60,
1351 )
1352 )
1354 if AnalysisDomain.PROTOCOLS not in exclude and dominant_freq:
1355 # Check if frequency matches common baud rates
1356 common_bauds = [9600, 19200, 38400, 57600, 115200]
1357 if any(abs(dominant_freq * 2 - b) / b < 0.1 for b in common_bauds):
1358 recommendations.append(
1359 AnalysisRecommendation(
1360 domain=AnalysisDomain.PROTOCOLS,
1361 priority=3,
1362 confidence=0.70,
1363 reasoning=f"Frequency {dominant_freq:.0f} Hz suggests serial protocol",
1364 estimated_runtime_ms=150,
1365 )
1366 )
1368 # Periodic signal analyses
1369 if is_periodic:
1370 if AnalysisDomain.JITTER not in exclude and is_digital:
1371 recommendations.append(
1372 AnalysisRecommendation(
1373 domain=AnalysisDomain.JITTER,
1374 priority=3,
1375 confidence=0.80,
1376 reasoning="Periodic digital signal - jitter analysis applicable",
1377 estimated_runtime_ms=120,
1378 )
1379 )
1381 if AnalysisDomain.EYE not in exclude and is_digital:
1382 recommendations.append(
1383 AnalysisRecommendation(
1384 domain=AnalysisDomain.EYE,
1385 priority=3,
1386 confidence=0.75,
1387 reasoning="Eye diagram analysis for signal integrity assessment",
1388 estimated_runtime_ms=200,
1389 )
1390 )
1392 # Pattern analysis - good for complex signals
1393 if AnalysisDomain.PATTERNS not in exclude and len(data) > 1000:
1394 pattern_conf = 0.70 if is_periodic else 0.50
1395 recommendations.append(
1396 AnalysisRecommendation(
1397 domain=AnalysisDomain.PATTERNS,
1398 priority=4,
1399 confidence=pattern_conf,
1400 reasoning="Pattern analysis can reveal repeating structures",
1401 estimated_runtime_ms=500,
1402 )
1403 )
1405 # Entropy analysis - useful for random/encrypted data
1406 if AnalysisDomain.ENTROPY not in exclude:
1407 recommendations.append(
1408 AnalysisRecommendation(
1409 domain=AnalysisDomain.ENTROPY,
1410 priority=5,
1411 confidence=0.80,
1412 reasoning="Entropy analysis characterizes randomness and complexity",
1413 estimated_runtime_ms=100,
1414 )
1415 )
1417 # Apply confidence threshold filter
1418 recommendations = [r for r in recommendations if r.confidence >= confidence_target]
1420 # Apply time budget filter if specified
1421 if time_budget_seconds is not None:
1422 budget_ms = time_budget_seconds * 1000
1423 cumulative = 0
1424 filtered = []
1425 for rec in sorted(recommendations, key=lambda x: (x.priority, -x.confidence)):
1426 if cumulative + rec.estimated_runtime_ms <= budget_ms:
1427 filtered.append(rec)
1428 cumulative += rec.estimated_runtime_ms
1429 recommendations = filtered
1431 # Sort by priority, then by confidence
1432 recommendations.sort(key=lambda x: (x.priority, -x.confidence))
1434 return recommendations
1437def get_optimal_domain_order(
1438 recommendations: list[AnalysisRecommendation],
1439) -> list[AnalysisDomain]:
1440 """Get optimal order for running analyses.
1442 Considers dependencies and priorities to determine best order.
1444 Args:
1445 recommendations: List of analysis recommendations.
1447 Returns:
1448 Ordered list of domains to analyze.
1450 Example:
1451 >>> import numpy as np
1452 >>> import tracekit as tk
1453 >>> # Generate test signal
1454 >>> t = np.linspace(0, 1, 10000)
1455 >>> signal = np.sin(2 * np.pi * 100 * t)
1456 >>> recommendations = tk.recommend_analyses(signal, sample_rate=10000)
1457 >>> order = tk.get_optimal_domain_order(recommendations)
1458 >>> print([d.value for d in order])
1459 ['waveform', 'statistics', 'spectral', 'patterns', 'entropy']
1460 """
1461 # Avoid circular import
1462 from tracekit.reporting.config import AnalysisDomain
1464 # Define dependencies
1465 dependencies = {
1466 AnalysisDomain.JITTER: [AnalysisDomain.TIMING],
1467 AnalysisDomain.EYE: [AnalysisDomain.DIGITAL],
1468 AnalysisDomain.PROTOCOLS: [AnalysisDomain.DIGITAL],
1469 AnalysisDomain.INFERENCE: [AnalysisDomain.PATTERNS],
1470 }
1472 # Build order respecting dependencies
1473 ordered = []
1474 remaining = {r.domain for r in recommendations}
1476 while remaining:
1477 # Find domains with satisfied dependencies
1478 ready = []
1479 for domain in remaining:
1480 deps = dependencies.get(domain, [])
1481 if all(d not in remaining or d in ordered for d in deps):
1482 ready.append(domain)
1484 if not ready:
1485 # No ready domains - just add remaining (circular deps)
1486 ready = list(remaining)
1488 # Add highest priority ready domain
1489 for rec in sorted(recommendations, key=lambda x: (x.priority, -x.confidence)):
1490 if rec.domain in ready:
1491 ordered.append(rec.domain)
1492 remaining.discard(rec.domain)
1493 break
1495 return ordered
1498__all__ = [
1499 "AnalysisRecommendation",
1500 "assess_signal_quality",
1501 "check_measurement_suitability",
1502 "classify_signal",
1503 "get_optimal_domain_order",
1504 "recommend_analyses",
1505 "suggest_measurements",
1506]