Coverage for src / tracekit / visualization / optimization.py: 86%
331 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Visualization optimization functions for automatic plot parameter selection.
3This module provides intelligent optimization algorithms for plot parameters
4including axis ranges, decimation, grid spacing, dynamic range, and
5interesting region detection.
8Example:
9 >>> from tracekit.visualization.optimization import calculate_optimal_y_range
10 >>> y_min, y_max = calculate_optimal_y_range(signal_data)
11 >>> ax.set_ylim(y_min, y_max)
13References:
14 - Wilkinson's tick placement algorithm (1999)
15 - LTTB (Largest Triangle Three Buckets) decimation
16 - Percentile-based outlier detection
17 - Edge detection using Sobel operators
18 - Statistical outlier detection using MAD (Median Absolute Deviation)
19"""
21from __future__ import annotations
23from dataclasses import dataclass
24from typing import TYPE_CHECKING, Literal
26import numpy as np
27from scipy import signal as sp_signal
29if TYPE_CHECKING:
30 from numpy.typing import NDArray
33def calculate_optimal_y_range(
34 data: NDArray[np.float64],
35 *,
36 outlier_threshold: float = 3.0,
37 margin_percent: float = 5.0,
38 symmetric: bool = False,
39 clip_warning_threshold: float = 0.01,
40) -> tuple[float, float]:
41 """Calculate optimal Y-axis range with outlier exclusion.
43 Uses percentile-based outlier detection and smart margins to ensure
44 data visibility without wasted space. Detects clipping when too many
45 samples are excluded.
47 Args:
48 data: Signal data array.
49 outlier_threshold: Number of standard deviations for outlier exclusion (default 3.0).
50 margin_percent: Margin as percentage of data range (default 5%, auto-adjusts).
51 symmetric: If True, use symmetric range ±max for bipolar signals.
52 clip_warning_threshold: Warn if this fraction of samples are clipped (default 1%).
54 Returns:
55 Tuple of (y_min, y_max) for axis limits.
57 Raises:
58 ValueError: If data is empty or all NaN.
60 Example:
61 >>> signal = np.random.randn(1000)
62 >>> y_min, y_max = calculate_optimal_y_range(signal, symmetric=True)
63 >>> # For bipolar signal: y_min ≈ -y_max
65 References:
66 VIS-013: Auto Y-Axis Range Optimization
67 """
68 if len(data) == 0:
69 raise ValueError("Data array is empty")
71 # Remove NaN values
72 clean_data = data[~np.isnan(data)]
74 if len(clean_data) == 0: 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true
75 raise ValueError("Data contains only NaN values")
77 # Use percentile-based outlier detection (1st-99th percentile default)
78 # This corresponds to approximately 3-sigma for normal distributions
79 lower_percentile = 0.5
80 upper_percentile = 99.5
82 # Calculate robust statistics using percentiles
83 np.percentile(clean_data, lower_percentile)
84 np.percentile(clean_data, upper_percentile)
86 # Filter outliers beyond threshold sigma
87 median = np.median(clean_data)
88 mad = np.median(np.abs(clean_data - median))
89 robust_std = 1.4826 * mad # MAD to std conversion
91 if robust_std > 0: 91 ↛ 96line 91 didn't jump to line 96 because the condition on line 91 was always true
92 z_scores = np.abs(clean_data - median) / robust_std
93 inlier_mask = z_scores <= outlier_threshold
94 filtered_data = clean_data[inlier_mask]
95 else:
96 filtered_data = clean_data
98 # Detect clipping
99 clipped_fraction = 1.0 - (len(filtered_data) / len(clean_data))
100 if clipped_fraction > clip_warning_threshold: 100 ↛ 101line 100 didn't jump to line 101 because the condition on line 100 was never true
101 import warnings
103 warnings.warn(
104 f"Clipping detected: {clipped_fraction * 100:.1f}% of samples "
105 f"excluded by range limits (threshold: {clip_warning_threshold * 100:.1f}%)",
106 UserWarning,
107 stacklevel=2,
108 )
110 # Calculate data range
111 data_min = np.min(filtered_data)
112 data_max = np.max(filtered_data)
113 data_range = data_max - data_min
115 # Smart margin selection based on data density
116 if len(filtered_data) > 10000:
117 # Dense data: smaller margin (2%)
118 margin = 0.02
119 elif len(filtered_data) < 100:
120 # Sparse data: larger margin (10%)
121 margin = 0.10
122 else:
123 # Default margin (5%)
124 margin = margin_percent / 100.0
126 # Apply symmetric range mode for bipolar signals
127 if symmetric:
128 max_abs = max(abs(data_min), abs(data_max))
129 margin_value = max_abs * margin
130 y_min = -(max_abs + margin_value)
131 y_max = max_abs + margin_value
132 else:
133 margin_value = data_range * margin
134 y_min = data_min - margin_value
135 y_max = data_max + margin_value
137 return (float(y_min), float(y_max))
140def calculate_optimal_x_window(
141 time: NDArray[np.float64],
142 data: NDArray[np.float64],
143 *,
144 target_features: int = 5,
145 samples_per_pixel: float = 2.0,
146 screen_width: int = 1000,
147 activity_threshold: float = 0.1,
148) -> tuple[float, float]:
149 """Calculate optimal X-axis time window with activity detection.
151 Intelligently determines time window based on signal activity and features.
152 Detects regions with significant activity and zooms to show N complete features.
154 Args:
155 time: Time axis array in seconds.
156 data: Signal data array.
157 target_features: Number of complete features to display (default 5-10).
158 samples_per_pixel: Threshold for decimation (default >2 samples/pixel).
159 screen_width: Screen width in pixels for decimation calculation.
160 activity_threshold: Relative threshold for activity detection (0-1).
162 Returns:
163 Tuple of (t_start, t_end) for time window in seconds.
165 Raises:
166 ValueError: If arrays are empty or mismatched.
168 Example:
169 >>> time = np.linspace(0, 1e-3, 10000)
170 >>> signal = np.sin(2 * np.pi * 1000 * time)
171 >>> t_start, t_end = calculate_optimal_x_window(time, signal, target_features=5)
173 References:
174 VIS-014: Adaptive X-Axis Time Window
175 """
176 if len(time) == 0 or len(data) == 0: 176 ↛ 177line 176 didn't jump to line 177 because the condition on line 176 was never true
177 raise ValueError("Time or data array is empty")
179 if len(time) != len(data):
180 raise ValueError(f"Time and data arrays must match: {len(time)} vs {len(data)}")
182 # Detect signal activity using RMS windowing
183 window_size = max(10, len(data) // 100)
184 rms = np.sqrt(np.convolve(data**2, np.ones(window_size) / window_size, mode="same"))
186 # Find activity threshold
187 rms_threshold = activity_threshold * np.max(rms)
188 active_regions = rms > rms_threshold
190 if not np.any(active_regions): 190 ↛ 192line 190 didn't jump to line 192 because the condition on line 190 was never true
191 # No significant activity, return full range
192 return (float(time[0]), float(time[-1]))
194 # Find first active region
195 active_indices = np.where(active_regions)[0]
196 first_active = active_indices[0]
198 # Detect features using autocorrelation for periodic signals
199 # Use a subset for efficiency
200 subset_size = min(5000, len(data) - first_active)
201 subset_start = first_active
202 subset_end = subset_start + subset_size
204 if subset_end > len(data): 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true
205 subset_end = len(data)
206 subset_start = max(0, subset_end - subset_size)
208 subset = data[subset_start:subset_end]
210 # Try to detect periodicity
211 if len(subset) > 20: 211 ↛ 231line 211 didn't jump to line 231 because the condition on line 211 was always true
212 # Use zero-crossing to detect period
213 mean_val = np.mean(subset)
214 crossings = np.where(np.diff(np.sign(subset - mean_val)))[0]
216 if len(crossings) >= 4: 216 ↛ 218line 216 didn't jump to line 218 because the condition on line 216 was never true
217 # Estimate period from crossings (two crossings per cycle)
218 periods = np.diff(crossings[::2])
219 if len(periods) > 0:
220 median_period = np.median(periods)
221 samples_per_feature = int(median_period * 2) # Full cycle
223 # Calculate window to show target_features
224 total_samples = samples_per_feature * target_features
225 window_start = first_active
226 window_end = min(window_start + total_samples, len(time) - 1)
228 return (float(time[window_start]), float(time[window_end]))
230 # Fallback: zoom to first N% of active region
231 active_duration = len(active_indices)
232 zoom_samples = min(active_duration, screen_width * int(samples_per_pixel))
233 window_end = min(first_active + zoom_samples, len(time) - 1)
235 return (float(time[first_active]), float(time[window_end]))
238def calculate_grid_spacing(
239 axis_min: float,
240 axis_max: float,
241 *,
242 target_major_ticks: int = 7,
243 log_scale: bool = False,
244 time_axis: bool = False,
245) -> tuple[float, float]:
246 """Calculate optimal grid spacing using nice numbers.
248 Implements Wilkinson's tick placement algorithm to generate
249 aesthetically pleasing major and minor grid line spacing.
251 Args:
252 axis_min: Minimum axis value.
253 axis_max: Maximum axis value.
254 target_major_ticks: Target number of major gridlines (default 5-10).
255 log_scale: Use logarithmic spacing for log-scale axes.
256 time_axis: Use time-unit alignment (ms, μs, ns).
258 Returns:
259 Tuple of (major_spacing, minor_spacing).
261 Raises:
262 ValueError: If axis_max <= axis_min.
264 Example:
265 >>> major, minor = calculate_grid_spacing(0, 100, target_major_ticks=5)
266 >>> # Returns nice numbers like (20.0, 4.0)
268 References:
269 VIS-019: Grid Auto-Spacing
270 Wilkinson (1999): The Grammar of Graphics
271 """
272 if axis_max <= axis_min:
273 raise ValueError(f"Invalid axis range: [{axis_min}, {axis_max}]")
275 if log_scale:
276 # Logarithmic spacing: major grids at decade boundaries
277 log_min = np.log10(max(axis_min, 1e-100))
278 log_max = np.log10(axis_max)
279 n_decades = log_max - log_min
281 if n_decades < 1: 281 ↛ 283line 281 didn't jump to line 283 because the condition on line 281 was never true
282 # Less than one decade: use linear spacing
283 major_spacing = _calculate_nice_number((axis_max - axis_min) / target_major_ticks)
284 minor_spacing = major_spacing / 5
285 else:
286 # Major grids at decades, minors at 2, 5
287 major_spacing = 10.0 ** np.ceil(n_decades / target_major_ticks)
288 minor_spacing = major_spacing / 5
290 return (float(major_spacing), float(minor_spacing))
292 # Linear spacing with nice numbers
293 axis_range = axis_max - axis_min
294 rough_spacing = axis_range / target_major_ticks
296 # Find nice number for major spacing
297 major_spacing = _calculate_nice_number(rough_spacing)
299 # Minor spacing: 1/5 or 1/2 of major
300 # Use 1/5 for spacings ending in 5 or 10, 1/2 otherwise
301 if major_spacing % 5 == 0 or major_spacing % 10 == 0:
302 minor_spacing = major_spacing / 5
303 else:
304 minor_spacing = major_spacing / 2
306 # Time axis alignment
307 if time_axis:
308 # Align to natural time units
309 time_units = [
310 1e-9,
311 2e-9,
312 5e-9, # ns
313 1e-6,
314 2e-6,
315 5e-6, # μs
316 1e-3,
317 2e-3,
318 5e-3, # ms
319 1.0,
320 2.0,
321 5.0,
322 ] # s
324 # Find closest time unit
325 closest_idx = np.argmin(np.abs(np.array(time_units) - major_spacing))
326 major_spacing = time_units[closest_idx]
327 minor_spacing = major_spacing / 5
329 # Check if grid would be too dense
330 actual_major_ticks = axis_range / major_spacing
331 if actual_major_ticks > 15:
332 # Disable minor grids (set equal to major)
333 minor_spacing = major_spacing
335 return (float(major_spacing), float(minor_spacing))
338def _calculate_nice_number(value: float) -> float:
339 """Calculate nice number using powers of 10 × (1, 2, 5). # noqa: RUF002
341 Args:
342 value: Input value.
344 Returns:
345 Nice number (1, 2, or 5 × 10^n). # noqa: RUF002
346 """
347 if value <= 0: 347 ↛ 348line 347 didn't jump to line 348 because the condition on line 347 was never true
348 return 1.0
350 # Find exponent
351 exponent = np.floor(np.log10(value))
352 fraction = value / (10**exponent)
354 # Round to nice fraction (1, 2, 5)
355 if fraction < 1.5:
356 nice_fraction = 1.0
357 elif fraction < 3.5: 357 ↛ 359line 357 didn't jump to line 359 because the condition on line 357 was always true
358 nice_fraction = 2.0
359 elif fraction < 7.5:
360 nice_fraction = 5.0
361 else:
362 nice_fraction = 10.0
364 return nice_fraction * (10**exponent) # type: ignore[no-any-return]
367def optimize_db_range(
368 spectrum: NDArray[np.float64],
369 *,
370 noise_floor_percentile: float = 5.0,
371 peak_threshold_db: float = 10.0,
372 margin_db: float = 10.0,
373 max_dynamic_range_db: float = 100.0,
374) -> tuple[float, float]:
375 """Optimize dB range for spectrum plots with noise floor detection.
377 Automatically detects noise floor and calculates optimal dynamic range
378 for maximum information visibility in frequency-domain plots.
380 Args:
381 spectrum: Spectrum magnitude array (linear or dB).
382 noise_floor_percentile: Percentile for noise floor estimation (default 5%).
383 peak_threshold_db: Threshold above noise floor for peak detection (default 10 dB).
384 margin_db: Margin below noise floor (default 10 dB).
385 max_dynamic_range_db: Maximum dynamic range to display (default 100 dB).
387 Returns:
388 Tuple of (db_min, db_max) for spectrum plot limits.
390 Raises:
391 ValueError: If spectrum is empty or all zero.
393 Example:
394 >>> spectrum_db = 20 * np.log10(np.abs(fft_result))
395 >>> db_min, db_max = optimize_db_range(spectrum_db)
396 >>> ax.set_ylim(db_min, db_max)
398 References:
399 VIS-022: Spectrum dB Range Optimization
400 """
401 if len(spectrum) == 0:
402 raise ValueError("Spectrum array is empty")
404 # Convert to dB if needed (check if values are in linear scale)
405 if np.max(spectrum) > 100: 405 ↛ 407line 405 didn't jump to line 407 because the condition on line 405 was never true
406 # Likely linear, convert to dB
407 spectrum_db = 20 * np.log10(np.maximum(spectrum, 1e-100))
408 else:
409 # Assume already in dB
410 spectrum_db = spectrum
412 # Detect noise floor using percentile method
413 noise_floor = np.percentile(spectrum_db, noise_floor_percentile)
415 # Find signal peaks using scipy
416 peak_indices, peak_properties = sp_signal.find_peaks(
417 spectrum_db,
418 height=noise_floor + peak_threshold_db,
419 prominence=peak_threshold_db / 2,
420 )
422 if len(peak_indices) > 0:
423 peak_max = np.max(peak_properties["peak_heights"])
424 else:
425 # No peaks detected, use maximum value
426 peak_max = np.max(spectrum_db)
428 # Calculate dB range
429 db_max = float(peak_max)
430 db_min = float(noise_floor - margin_db)
432 # Apply dynamic range compression if too wide
433 dynamic_range = db_max - db_min
434 if dynamic_range > max_dynamic_range_db:
435 db_min = db_max - max_dynamic_range_db
437 return (db_min, db_max)
440def decimate_for_display(
441 time: NDArray[np.float64],
442 data: NDArray[np.float64],
443 *,
444 max_points: int = 2000,
445 method: Literal["lttb", "minmax", "uniform"] = "lttb",
446) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
447 """Decimate signal data for display using LTTB or min-max envelope.
449 Intelligently reduces number of points while preserving visual appearance
450 and important features like edges and peaks.
452 Args:
453 time: Time axis array.
454 data: Signal data array.
455 max_points: Maximum number of points to retain.
456 method: Decimation method ("lttb", "minmax", "uniform").
458 Returns:
459 Tuple of (decimated_time, decimated_data).
461 Raises:
462 ValueError: If arrays are empty or method is invalid.
464 Example:
465 >>> time_dec, data_dec = decimate_for_display(time, data, max_points=1000)
466 >>> # Reduced from 100k to 1k points while preserving shape
468 References:
469 VIS-014: Adaptive X-Axis Time Window
470 LTTB: Largest Triangle Three Buckets algorithm
471 """
472 if len(time) == 0 or len(data) == 0: 472 ↛ 473line 472 didn't jump to line 473 because the condition on line 472 was never true
473 raise ValueError("Time or data array is empty")
475 if len(time) != len(data): 475 ↛ 476line 475 didn't jump to line 476 because the condition on line 475 was never true
476 raise ValueError(f"Time and data arrays must match: {len(time)} vs {len(data)}")
478 # Don't decimate if already below threshold
479 if len(data) <= max_points:
480 return (time, data)
482 # Never decimate very small signals
483 if len(data) < 10: 483 ↛ 484line 483 didn't jump to line 484 because the condition on line 483 was never true
484 return (time, data)
486 if method == "uniform":
487 # Simple uniform stride decimation
488 stride = len(data) // max_points
489 indices = np.arange(0, len(data), stride)
490 return (time[indices], data[indices])
492 elif method == "minmax":
493 # Min-max envelope: preserve peaks and valleys
494 return _decimate_minmax(time, data, max_points)
496 elif method == "lttb":
497 # Largest Triangle Three Buckets
498 return _decimate_lttb(time, data, max_points)
500 else:
501 raise ValueError(f"Invalid decimation method: {method}")
504def _decimate_minmax(
505 time: NDArray[np.float64],
506 data: NDArray[np.float64],
507 max_points: int,
508) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
509 """Decimate using min-max envelope to preserve peaks/valleys.
511 Args:
512 time: Time array.
513 data: Signal data array.
514 max_points: Maximum number of points to retain.
516 Returns:
517 Tuple of (decimated_time, decimated_data).
518 """
519 # Calculate bucket size
520 bucket_size = len(data) // (max_points // 2)
522 decimated_time = []
523 decimated_data = []
525 for i in range(0, len(data), bucket_size):
526 bucket = data[i : i + bucket_size]
527 time_bucket = time[i : i + bucket_size]
529 if len(bucket) > 0: 529 ↛ 525line 529 didn't jump to line 525 because the condition on line 529 was always true
530 # Add min and max from each bucket
531 min_idx = np.argmin(bucket)
532 max_idx = np.argmax(bucket)
534 # Add in chronological order
535 if min_idx < max_idx:
536 decimated_time.extend([time_bucket[min_idx], time_bucket[max_idx]])
537 decimated_data.extend([bucket[min_idx], bucket[max_idx]])
538 else:
539 decimated_time.extend([time_bucket[max_idx], time_bucket[min_idx]])
540 decimated_data.extend([bucket[max_idx], bucket[min_idx]])
542 return (np.array(decimated_time), np.array(decimated_data))
545def _decimate_lttb(
546 time: NDArray[np.float64],
547 data: NDArray[np.float64],
548 max_points: int,
549) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
550 """Decimate using Largest Triangle Three Buckets algorithm.
552 Preserves visual appearance by selecting points that maximize
553 the area of triangles formed with neighboring buckets.
555 Args:
556 time: Time array.
557 data: Signal data array.
558 max_points: Maximum number of points to retain.
560 Returns:
561 Tuple of (decimated_time, decimated_data).
562 """
563 if len(data) <= max_points: 563 ↛ 564line 563 didn't jump to line 564 because the condition on line 563 was never true
564 return (time, data)
566 # Always include first and last points
567 sampled_time = [time[0]]
568 sampled_data = [data[0]]
570 # Calculate bucket size
571 bucket_size = (len(data) - 2) / (max_points - 2)
573 # Previous selected point
574 prev_idx = 0
576 for i in range(max_points - 2):
577 # Calculate average point of next bucket (for triangle area calculation)
578 avg_range_start = int((i + 1) * bucket_size) + 1
579 avg_range_end = int((i + 2) * bucket_size) + 1
580 avg_range_end = min(avg_range_end, len(data))
582 if avg_range_start < avg_range_end: 582 ↛ 586line 582 didn't jump to line 586 because the condition on line 582 was always true
583 avg_time = np.mean(time[avg_range_start:avg_range_end])
584 avg_data = np.mean(data[avg_range_start:avg_range_end])
585 else:
586 avg_time = time[-1]
587 avg_data = data[-1]
589 # Current bucket range
590 range_start = int(i * bucket_size) + 1
591 range_end = int((i + 1) * bucket_size) + 1
592 range_end = min(range_end, len(data) - 1)
594 # Find point in current bucket that forms largest triangle
595 prev_time = time[prev_idx]
596 prev_data = data[prev_idx]
598 max_area = -1.0
599 max_idx = range_start
601 for idx in range(range_start, range_end):
602 # Calculate triangle area
603 area = abs(
604 (prev_time - avg_time) * (data[idx] - prev_data)
605 - (prev_time - time[idx]) * (avg_data - prev_data)
606 )
608 if area > max_area:
609 max_area = area
610 max_idx = idx
612 sampled_time.append(time[max_idx])
613 sampled_data.append(data[max_idx])
614 prev_idx = max_idx
616 # Always include last point
617 sampled_time.append(time[-1])
618 sampled_data.append(data[-1])
620 return (np.array(sampled_time), np.array(sampled_data))
623@dataclass
624class InterestingRegion:
625 """Represents an interesting region in a signal.
627 Attributes:
628 start_idx: Starting sample index
629 end_idx: Ending sample index
630 start_time: Starting time in seconds
631 end_time: Ending time in seconds
632 type: Type of interesting feature
633 significance: Significance score (0-1, higher is more significant)
634 metadata: Additional metadata about the region
635 """
637 start_idx: int
638 end_idx: int
639 start_time: float
640 end_time: float
641 type: Literal["edge", "glitch", "anomaly", "pattern_change"]
642 significance: float
643 metadata: dict # type: ignore[type-arg]
646def detect_interesting_regions(
647 signal: NDArray[np.float64],
648 sample_rate: float,
649 *,
650 edge_threshold: float | None = None,
651 glitch_sigma: float = 3.0,
652 anomaly_threshold: float = 3.0,
653 min_region_samples: int = 10,
654 max_regions: int = 10,
655) -> list[InterestingRegion]:
656 """Detect interesting regions in a signal for automatic zoom/focus.
658 : Automatically detect and zoom to interesting signal
659 regions such as edges, glitches, anomalies, or pattern changes.
661 Args:
662 signal: Input signal array
663 sample_rate: Sample rate in Hz
664 edge_threshold: Edge detection threshold (default: auto from signal stddev)
665 glitch_sigma: Sigma threshold for glitch detection (default: 3.0)
666 anomaly_threshold: Threshold for anomaly detection in sigma (default: 3.0)
667 min_region_samples: Minimum samples per region (default: 10)
668 max_regions: Maximum number of regions to return (default: 10)
670 Returns:
671 List of InterestingRegion objects, sorted by significance (descending)
673 Raises:
674 ValueError: If signal is empty or sample_rate is invalid
676 Example:
677 >>> signal = np.sin(2*np.pi*1000*t) + 0.1*np.random.randn(len(t))
678 >>> regions = detect_interesting_regions(signal, 1e6)
679 >>> print(f"Found {len(regions)} interesting regions")
681 References:
682 VIS-020: Zoom to Interesting Regions
683 Edge detection: Sobel operator on signal derivative
684 Glitch detection: MAD-based outlier detection
685 """
686 if len(signal) == 0:
687 raise ValueError("Signal cannot be empty")
688 if sample_rate <= 0:
689 raise ValueError("Sample rate must be positive")
690 if min_region_samples < 1: 690 ↛ 691line 690 didn't jump to line 691 because the condition on line 690 was never true
691 raise ValueError("min_region_samples must be >= 1")
693 regions: list[InterestingRegion] = []
694 1.0 / sample_rate
696 # 1. Edge detection using first derivative
697 edges = _detect_edges(signal, sample_rate, edge_threshold)
698 regions.extend(edges)
700 # 2. Glitch detection using statistical outliers
701 glitches = _detect_glitches(signal, sample_rate, glitch_sigma)
702 regions.extend(glitches)
704 # 3. Anomaly detection using MAD
705 anomalies = _detect_anomalies(signal, sample_rate, anomaly_threshold)
706 regions.extend(anomalies)
708 # 4. Pattern change detection (simplified using variance changes)
709 pattern_changes = _detect_pattern_changes(signal, sample_rate)
710 regions.extend(pattern_changes)
712 # Filter out regions that are too small
713 regions = [r for r in regions if (r.end_idx - r.start_idx) >= min_region_samples]
715 # Sort by significance (descending)
716 regions.sort(key=lambda r: r.significance, reverse=True)
718 # Return top N regions
719 return regions[:max_regions]
722def _detect_edges(
723 signal: NDArray[np.float64],
724 sample_rate: float,
725 threshold: float | None,
726) -> list[InterestingRegion]:
727 """Detect edge transitions using first derivative.
729 Args:
730 signal: Input signal
731 sample_rate: Sample rate in Hz
732 threshold: Edge threshold (auto if None)
734 Returns:
735 List of edge regions
736 """
737 # Calculate first derivative (gradient)
738 gradient = np.gradient(signal)
740 # Auto threshold based on signal statistics
741 if threshold is None: 741 ↛ 745line 741 didn't jump to line 745 because the condition on line 741 was always true
742 threshold = np.std(gradient) * 2.0
744 # Find where gradient exceeds threshold
745 edge_mask = np.abs(gradient) > threshold
747 # Find continuous edge regions
748 regions: list[InterestingRegion] = []
749 in_edge = False
750 start_idx = 0
752 for i, is_edge in enumerate(edge_mask):
753 if is_edge and not in_edge:
754 # Start of edge
755 start_idx = i
756 in_edge = True
757 elif not is_edge and in_edge:
758 # End of edge
759 end_idx = i
761 # Calculate significance based on gradient magnitude
762 edge_gradient = gradient[start_idx:end_idx]
763 significance = min(1.0, np.max(np.abs(edge_gradient)) / (threshold * 5))
765 time_base = 1.0 / sample_rate
766 regions.append(
767 InterestingRegion(
768 start_idx=start_idx,
769 end_idx=end_idx,
770 start_time=start_idx * time_base,
771 end_time=end_idx * time_base,
772 type="edge",
773 significance=significance,
774 metadata={
775 "max_gradient": float(np.max(np.abs(edge_gradient))),
776 "threshold": threshold,
777 },
778 )
779 )
780 in_edge = False
782 # Handle edge at end of signal
783 if in_edge:
784 end_idx = len(signal)
785 edge_gradient = gradient[start_idx:end_idx]
786 significance = min(1.0, np.max(np.abs(edge_gradient)) / (threshold * 5))
787 time_base = 1.0 / sample_rate
788 regions.append(
789 InterestingRegion(
790 start_idx=start_idx,
791 end_idx=end_idx,
792 start_time=start_idx * time_base,
793 end_time=end_idx * time_base,
794 type="edge",
795 significance=significance,
796 metadata={
797 "max_gradient": float(np.max(np.abs(edge_gradient))),
798 "threshold": threshold,
799 },
800 )
801 )
803 return regions
806def _detect_glitches(
807 signal: NDArray[np.float64],
808 sample_rate: float,
809 sigma_threshold: float,
810) -> list[InterestingRegion]:
811 """Detect isolated spikes (glitches) using z-score.
813 Args:
814 signal: Input signal
815 sample_rate: Sample rate in Hz
816 sigma_threshold: Sigma threshold for outlier detection
818 Returns:
819 List of glitch regions
820 """
821 # Calculate z-scores
822 mean = np.mean(signal)
823 std = np.std(signal)
825 if std == 0: 825 ↛ 826line 825 didn't jump to line 826 because the condition on line 825 was never true
826 return []
828 z_scores = np.abs((signal - mean) / std)
830 # Find outliers
831 outlier_mask = z_scores > sigma_threshold
833 # Find isolated glitches (single sample or very short bursts)
834 regions: list[InterestingRegion] = []
835 time_base = 1.0 / sample_rate
837 i = 0
838 while i < len(outlier_mask):
839 if outlier_mask[i]:
840 # Start of potential glitch
841 start_idx = i
843 # Find end of glitch (max 5 samples to be considered a glitch)
844 while i < len(outlier_mask) and outlier_mask[i] and (i - start_idx) < 5:
845 i += 1
847 end_idx = i
849 # Calculate significance based on z-score
850 glitch_z_scores = z_scores[start_idx:end_idx]
851 significance = min(1.0, np.max(glitch_z_scores) / (sigma_threshold * 3))
853 regions.append(
854 InterestingRegion(
855 start_idx=start_idx,
856 end_idx=end_idx,
857 start_time=start_idx * time_base,
858 end_time=end_idx * time_base,
859 type="glitch",
860 significance=significance,
861 metadata={
862 "max_z_score": float(np.max(glitch_z_scores)),
863 "threshold_sigma": sigma_threshold,
864 },
865 )
866 )
867 else:
868 i += 1
870 return regions
873def _detect_anomalies(
874 signal: NDArray[np.float64],
875 sample_rate: float,
876 threshold_sigma: float,
877) -> list[InterestingRegion]:
878 """Detect anomalies using MAD (Median Absolute Deviation).
880 Args:
881 signal: Input signal
882 sample_rate: Sample rate in Hz
883 threshold_sigma: Sigma threshold for MAD
885 Returns:
886 List of anomaly regions
887 """
888 # Calculate MAD
889 median = np.median(signal)
890 mad = np.median(np.abs(signal - median))
892 if mad == 0:
893 return []
895 # Modified z-score using MAD (more robust than standard z-score)
896 modified_z_scores = 0.6745 * (signal - median) / mad
898 # Find anomalies
899 anomaly_mask = np.abs(modified_z_scores) > threshold_sigma
901 # Find continuous anomaly regions
902 regions: list[InterestingRegion] = []
903 in_anomaly = False
904 start_idx = 0
905 time_base = 1.0 / sample_rate
907 for i, is_anomaly in enumerate(anomaly_mask):
908 if is_anomaly and not in_anomaly:
909 start_idx = i
910 in_anomaly = True
911 elif not is_anomaly and in_anomaly:
912 end_idx = i
914 # Calculate significance
915 anomaly_scores = modified_z_scores[start_idx:end_idx]
916 significance = min(1.0, np.max(np.abs(anomaly_scores)) / (threshold_sigma * 3))
918 regions.append(
919 InterestingRegion(
920 start_idx=start_idx,
921 end_idx=end_idx,
922 start_time=start_idx * time_base,
923 end_time=end_idx * time_base,
924 type="anomaly",
925 significance=significance,
926 metadata={
927 "max_mad_score": float(np.max(np.abs(anomaly_scores))),
928 "threshold_sigma": threshold_sigma,
929 },
930 )
931 )
932 in_anomaly = False
934 # Handle anomaly at end
935 if in_anomaly: 935 ↛ 936line 935 didn't jump to line 936 because the condition on line 935 was never true
936 end_idx = len(signal)
937 anomaly_scores = modified_z_scores[start_idx:end_idx]
938 significance = min(1.0, np.max(np.abs(anomaly_scores)) / (threshold_sigma * 3))
939 regions.append(
940 InterestingRegion(
941 start_idx=start_idx,
942 end_idx=end_idx,
943 start_time=start_idx * time_base,
944 end_time=end_idx * time_base,
945 type="anomaly",
946 significance=significance,
947 metadata={
948 "max_mad_score": float(np.max(np.abs(anomaly_scores))),
949 "threshold_sigma": threshold_sigma,
950 },
951 )
952 )
954 return regions
957def _detect_pattern_changes(
958 signal: NDArray[np.float64],
959 sample_rate: float,
960) -> list[InterestingRegion]:
961 """Detect pattern changes using windowed variance analysis.
963 Args:
964 signal: Input signal
965 sample_rate: Sample rate in Hz
967 Returns:
968 List of pattern change regions
969 """
970 # Use sliding window to detect variance changes
971 window_size = min(100, len(signal) // 10)
973 if window_size < 10: 973 ↛ 974line 973 didn't jump to line 974 because the condition on line 973 was never true
974 return []
976 # Calculate windowed variance
977 variances = np.zeros(len(signal) - window_size + 1)
978 for i in range(len(variances)):
979 variances[i] = np.var(signal[i : i + window_size])
981 # Detect changes in variance
982 if len(variances) < 2: 982 ↛ 983line 982 didn't jump to line 983 because the condition on line 982 was never true
983 return []
985 variance_gradient = np.gradient(variances)
986 threshold = np.std(variance_gradient) * 2.0
988 change_mask = np.abs(variance_gradient) > threshold
990 # Find change regions
991 regions: list[InterestingRegion] = []
992 time_base = 1.0 / sample_rate
994 for i in range(len(change_mask)):
995 if change_mask[i]:
996 start_idx = i
997 end_idx = min(i + window_size, len(signal))
999 # Calculate significance
1000 significance = min(1.0, np.abs(variance_gradient[i]) / (threshold * 5))
1002 regions.append(
1003 InterestingRegion(
1004 start_idx=start_idx,
1005 end_idx=end_idx,
1006 start_time=start_idx * time_base,
1007 end_time=end_idx * time_base,
1008 type="pattern_change",
1009 significance=significance,
1010 metadata={
1011 "variance_change": float(variance_gradient[i]),
1012 "threshold": threshold,
1013 },
1014 )
1015 )
1017 return regions
1020__all__ = [
1021 "InterestingRegion",
1022 "calculate_grid_spacing",
1023 "calculate_optimal_x_window",
1024 "calculate_optimal_y_range",
1025 "decimate_for_display",
1026 "detect_interesting_regions",
1027 "optimize_db_range",
1028]