Coverage for src / tracekit / visualization / optimization.py: 86%

331 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Visualization optimization functions for automatic plot parameter selection. 

2 

3This module provides intelligent optimization algorithms for plot parameters 

4including axis ranges, decimation, grid spacing, dynamic range, and 

5interesting region detection. 

6 

7 

8Example: 

9 >>> from tracekit.visualization.optimization import calculate_optimal_y_range 

10 >>> y_min, y_max = calculate_optimal_y_range(signal_data) 

11 >>> ax.set_ylim(y_min, y_max) 

12 

13References: 

14 - Wilkinson's tick placement algorithm (1999) 

15 - LTTB (Largest Triangle Three Buckets) decimation 

16 - Percentile-based outlier detection 

17 - Edge detection using Sobel operators 

18 - Statistical outlier detection using MAD (Median Absolute Deviation) 

19""" 

20 

21from __future__ import annotations 

22 

23from dataclasses import dataclass 

24from typing import TYPE_CHECKING, Literal 

25 

26import numpy as np 

27from scipy import signal as sp_signal 

28 

29if TYPE_CHECKING: 

30 from numpy.typing import NDArray 

31 

32 

33def calculate_optimal_y_range( 

34 data: NDArray[np.float64], 

35 *, 

36 outlier_threshold: float = 3.0, 

37 margin_percent: float = 5.0, 

38 symmetric: bool = False, 

39 clip_warning_threshold: float = 0.01, 

40) -> tuple[float, float]: 

41 """Calculate optimal Y-axis range with outlier exclusion. 

42 

43 Uses percentile-based outlier detection and smart margins to ensure 

44 data visibility without wasted space. Detects clipping when too many 

45 samples are excluded. 

46 

47 Args: 

48 data: Signal data array. 

49 outlier_threshold: Number of standard deviations for outlier exclusion (default 3.0). 

50 margin_percent: Margin as percentage of data range (default 5%, auto-adjusts). 

51 symmetric: If True, use symmetric range ±max for bipolar signals. 

52 clip_warning_threshold: Warn if this fraction of samples are clipped (default 1%). 

53 

54 Returns: 

55 Tuple of (y_min, y_max) for axis limits. 

56 

57 Raises: 

58 ValueError: If data is empty or all NaN. 

59 

60 Example: 

61 >>> signal = np.random.randn(1000) 

62 >>> y_min, y_max = calculate_optimal_y_range(signal, symmetric=True) 

63 >>> # For bipolar signal: y_min ≈ -y_max 

64 

65 References: 

66 VIS-013: Auto Y-Axis Range Optimization 

67 """ 

68 if len(data) == 0: 

69 raise ValueError("Data array is empty") 

70 

71 # Remove NaN values 

72 clean_data = data[~np.isnan(data)] 

73 

74 if len(clean_data) == 0: 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true

75 raise ValueError("Data contains only NaN values") 

76 

77 # Use percentile-based outlier detection (1st-99th percentile default) 

78 # This corresponds to approximately 3-sigma for normal distributions 

79 lower_percentile = 0.5 

80 upper_percentile = 99.5 

81 

82 # Calculate robust statistics using percentiles 

83 np.percentile(clean_data, lower_percentile) 

84 np.percentile(clean_data, upper_percentile) 

85 

86 # Filter outliers beyond threshold sigma 

87 median = np.median(clean_data) 

88 mad = np.median(np.abs(clean_data - median)) 

89 robust_std = 1.4826 * mad # MAD to std conversion 

90 

91 if robust_std > 0: 91 ↛ 96line 91 didn't jump to line 96 because the condition on line 91 was always true

92 z_scores = np.abs(clean_data - median) / robust_std 

93 inlier_mask = z_scores <= outlier_threshold 

94 filtered_data = clean_data[inlier_mask] 

95 else: 

96 filtered_data = clean_data 

97 

98 # Detect clipping 

99 clipped_fraction = 1.0 - (len(filtered_data) / len(clean_data)) 

100 if clipped_fraction > clip_warning_threshold: 100 ↛ 101line 100 didn't jump to line 101 because the condition on line 100 was never true

101 import warnings 

102 

103 warnings.warn( 

104 f"Clipping detected: {clipped_fraction * 100:.1f}% of samples " 

105 f"excluded by range limits (threshold: {clip_warning_threshold * 100:.1f}%)", 

106 UserWarning, 

107 stacklevel=2, 

108 ) 

109 

110 # Calculate data range 

111 data_min = np.min(filtered_data) 

112 data_max = np.max(filtered_data) 

113 data_range = data_max - data_min 

114 

115 # Smart margin selection based on data density 

116 if len(filtered_data) > 10000: 

117 # Dense data: smaller margin (2%) 

118 margin = 0.02 

119 elif len(filtered_data) < 100: 

120 # Sparse data: larger margin (10%) 

121 margin = 0.10 

122 else: 

123 # Default margin (5%) 

124 margin = margin_percent / 100.0 

125 

126 # Apply symmetric range mode for bipolar signals 

127 if symmetric: 

128 max_abs = max(abs(data_min), abs(data_max)) 

129 margin_value = max_abs * margin 

130 y_min = -(max_abs + margin_value) 

131 y_max = max_abs + margin_value 

132 else: 

133 margin_value = data_range * margin 

134 y_min = data_min - margin_value 

135 y_max = data_max + margin_value 

136 

137 return (float(y_min), float(y_max)) 

138 

139 

140def calculate_optimal_x_window( 

141 time: NDArray[np.float64], 

142 data: NDArray[np.float64], 

143 *, 

144 target_features: int = 5, 

145 samples_per_pixel: float = 2.0, 

146 screen_width: int = 1000, 

147 activity_threshold: float = 0.1, 

148) -> tuple[float, float]: 

149 """Calculate optimal X-axis time window with activity detection. 

150 

151 Intelligently determines time window based on signal activity and features. 

152 Detects regions with significant activity and zooms to show N complete features. 

153 

154 Args: 

155 time: Time axis array in seconds. 

156 data: Signal data array. 

157 target_features: Number of complete features to display (default 5-10). 

158 samples_per_pixel: Threshold for decimation (default >2 samples/pixel). 

159 screen_width: Screen width in pixels for decimation calculation. 

160 activity_threshold: Relative threshold for activity detection (0-1). 

161 

162 Returns: 

163 Tuple of (t_start, t_end) for time window in seconds. 

164 

165 Raises: 

166 ValueError: If arrays are empty or mismatched. 

167 

168 Example: 

169 >>> time = np.linspace(0, 1e-3, 10000) 

170 >>> signal = np.sin(2 * np.pi * 1000 * time) 

171 >>> t_start, t_end = calculate_optimal_x_window(time, signal, target_features=5) 

172 

173 References: 

174 VIS-014: Adaptive X-Axis Time Window 

175 """ 

176 if len(time) == 0 or len(data) == 0: 176 ↛ 177line 176 didn't jump to line 177 because the condition on line 176 was never true

177 raise ValueError("Time or data array is empty") 

178 

179 if len(time) != len(data): 

180 raise ValueError(f"Time and data arrays must match: {len(time)} vs {len(data)}") 

181 

182 # Detect signal activity using RMS windowing 

183 window_size = max(10, len(data) // 100) 

184 rms = np.sqrt(np.convolve(data**2, np.ones(window_size) / window_size, mode="same")) 

185 

186 # Find activity threshold 

187 rms_threshold = activity_threshold * np.max(rms) 

188 active_regions = rms > rms_threshold 

189 

190 if not np.any(active_regions): 190 ↛ 192line 190 didn't jump to line 192 because the condition on line 190 was never true

191 # No significant activity, return full range 

192 return (float(time[0]), float(time[-1])) 

193 

194 # Find first active region 

195 active_indices = np.where(active_regions)[0] 

196 first_active = active_indices[0] 

197 

198 # Detect features using autocorrelation for periodic signals 

199 # Use a subset for efficiency 

200 subset_size = min(5000, len(data) - first_active) 

201 subset_start = first_active 

202 subset_end = subset_start + subset_size 

203 

204 if subset_end > len(data): 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true

205 subset_end = len(data) 

206 subset_start = max(0, subset_end - subset_size) 

207 

208 subset = data[subset_start:subset_end] 

209 

210 # Try to detect periodicity 

211 if len(subset) > 20: 211 ↛ 231line 211 didn't jump to line 231 because the condition on line 211 was always true

212 # Use zero-crossing to detect period 

213 mean_val = np.mean(subset) 

214 crossings = np.where(np.diff(np.sign(subset - mean_val)))[0] 

215 

216 if len(crossings) >= 4: 216 ↛ 218line 216 didn't jump to line 218 because the condition on line 216 was never true

217 # Estimate period from crossings (two crossings per cycle) 

218 periods = np.diff(crossings[::2]) 

219 if len(periods) > 0: 

220 median_period = np.median(periods) 

221 samples_per_feature = int(median_period * 2) # Full cycle 

222 

223 # Calculate window to show target_features 

224 total_samples = samples_per_feature * target_features 

225 window_start = first_active 

226 window_end = min(window_start + total_samples, len(time) - 1) 

227 

228 return (float(time[window_start]), float(time[window_end])) 

229 

230 # Fallback: zoom to first N% of active region 

231 active_duration = len(active_indices) 

232 zoom_samples = min(active_duration, screen_width * int(samples_per_pixel)) 

233 window_end = min(first_active + zoom_samples, len(time) - 1) 

234 

235 return (float(time[first_active]), float(time[window_end])) 

236 

237 

238def calculate_grid_spacing( 

239 axis_min: float, 

240 axis_max: float, 

241 *, 

242 target_major_ticks: int = 7, 

243 log_scale: bool = False, 

244 time_axis: bool = False, 

245) -> tuple[float, float]: 

246 """Calculate optimal grid spacing using nice numbers. 

247 

248 Implements Wilkinson's tick placement algorithm to generate 

249 aesthetically pleasing major and minor grid line spacing. 

250 

251 Args: 

252 axis_min: Minimum axis value. 

253 axis_max: Maximum axis value. 

254 target_major_ticks: Target number of major gridlines (default 5-10). 

255 log_scale: Use logarithmic spacing for log-scale axes. 

256 time_axis: Use time-unit alignment (ms, μs, ns). 

257 

258 Returns: 

259 Tuple of (major_spacing, minor_spacing). 

260 

261 Raises: 

262 ValueError: If axis_max <= axis_min. 

263 

264 Example: 

265 >>> major, minor = calculate_grid_spacing(0, 100, target_major_ticks=5) 

266 >>> # Returns nice numbers like (20.0, 4.0) 

267 

268 References: 

269 VIS-019: Grid Auto-Spacing 

270 Wilkinson (1999): The Grammar of Graphics 

271 """ 

272 if axis_max <= axis_min: 

273 raise ValueError(f"Invalid axis range: [{axis_min}, {axis_max}]") 

274 

275 if log_scale: 

276 # Logarithmic spacing: major grids at decade boundaries 

277 log_min = np.log10(max(axis_min, 1e-100)) 

278 log_max = np.log10(axis_max) 

279 n_decades = log_max - log_min 

280 

281 if n_decades < 1: 281 ↛ 283line 281 didn't jump to line 283 because the condition on line 281 was never true

282 # Less than one decade: use linear spacing 

283 major_spacing = _calculate_nice_number((axis_max - axis_min) / target_major_ticks) 

284 minor_spacing = major_spacing / 5 

285 else: 

286 # Major grids at decades, minors at 2, 5 

287 major_spacing = 10.0 ** np.ceil(n_decades / target_major_ticks) 

288 minor_spacing = major_spacing / 5 

289 

290 return (float(major_spacing), float(minor_spacing)) 

291 

292 # Linear spacing with nice numbers 

293 axis_range = axis_max - axis_min 

294 rough_spacing = axis_range / target_major_ticks 

295 

296 # Find nice number for major spacing 

297 major_spacing = _calculate_nice_number(rough_spacing) 

298 

299 # Minor spacing: 1/5 or 1/2 of major 

300 # Use 1/5 for spacings ending in 5 or 10, 1/2 otherwise 

301 if major_spacing % 5 == 0 or major_spacing % 10 == 0: 

302 minor_spacing = major_spacing / 5 

303 else: 

304 minor_spacing = major_spacing / 2 

305 

306 # Time axis alignment 

307 if time_axis: 

308 # Align to natural time units 

309 time_units = [ 

310 1e-9, 

311 2e-9, 

312 5e-9, # ns 

313 1e-6, 

314 2e-6, 

315 5e-6, # μs 

316 1e-3, 

317 2e-3, 

318 5e-3, # ms 

319 1.0, 

320 2.0, 

321 5.0, 

322 ] # s 

323 

324 # Find closest time unit 

325 closest_idx = np.argmin(np.abs(np.array(time_units) - major_spacing)) 

326 major_spacing = time_units[closest_idx] 

327 minor_spacing = major_spacing / 5 

328 

329 # Check if grid would be too dense 

330 actual_major_ticks = axis_range / major_spacing 

331 if actual_major_ticks > 15: 

332 # Disable minor grids (set equal to major) 

333 minor_spacing = major_spacing 

334 

335 return (float(major_spacing), float(minor_spacing)) 

336 

337 

338def _calculate_nice_number(value: float) -> float: 

339 """Calculate nice number using powers of 10 × (1, 2, 5). # noqa: RUF002 

340 

341 Args: 

342 value: Input value. 

343 

344 Returns: 

345 Nice number (1, 2, or 5 × 10^n). # noqa: RUF002 

346 """ 

347 if value <= 0: 347 ↛ 348line 347 didn't jump to line 348 because the condition on line 347 was never true

348 return 1.0 

349 

350 # Find exponent 

351 exponent = np.floor(np.log10(value)) 

352 fraction = value / (10**exponent) 

353 

354 # Round to nice fraction (1, 2, 5) 

355 if fraction < 1.5: 

356 nice_fraction = 1.0 

357 elif fraction < 3.5: 357 ↛ 359line 357 didn't jump to line 359 because the condition on line 357 was always true

358 nice_fraction = 2.0 

359 elif fraction < 7.5: 

360 nice_fraction = 5.0 

361 else: 

362 nice_fraction = 10.0 

363 

364 return nice_fraction * (10**exponent) # type: ignore[no-any-return] 

365 

366 

367def optimize_db_range( 

368 spectrum: NDArray[np.float64], 

369 *, 

370 noise_floor_percentile: float = 5.0, 

371 peak_threshold_db: float = 10.0, 

372 margin_db: float = 10.0, 

373 max_dynamic_range_db: float = 100.0, 

374) -> tuple[float, float]: 

375 """Optimize dB range for spectrum plots with noise floor detection. 

376 

377 Automatically detects noise floor and calculates optimal dynamic range 

378 for maximum information visibility in frequency-domain plots. 

379 

380 Args: 

381 spectrum: Spectrum magnitude array (linear or dB). 

382 noise_floor_percentile: Percentile for noise floor estimation (default 5%). 

383 peak_threshold_db: Threshold above noise floor for peak detection (default 10 dB). 

384 margin_db: Margin below noise floor (default 10 dB). 

385 max_dynamic_range_db: Maximum dynamic range to display (default 100 dB). 

386 

387 Returns: 

388 Tuple of (db_min, db_max) for spectrum plot limits. 

389 

390 Raises: 

391 ValueError: If spectrum is empty or all zero. 

392 

393 Example: 

394 >>> spectrum_db = 20 * np.log10(np.abs(fft_result)) 

395 >>> db_min, db_max = optimize_db_range(spectrum_db) 

396 >>> ax.set_ylim(db_min, db_max) 

397 

398 References: 

399 VIS-022: Spectrum dB Range Optimization 

400 """ 

401 if len(spectrum) == 0: 

402 raise ValueError("Spectrum array is empty") 

403 

404 # Convert to dB if needed (check if values are in linear scale) 

405 if np.max(spectrum) > 100: 405 ↛ 407line 405 didn't jump to line 407 because the condition on line 405 was never true

406 # Likely linear, convert to dB 

407 spectrum_db = 20 * np.log10(np.maximum(spectrum, 1e-100)) 

408 else: 

409 # Assume already in dB 

410 spectrum_db = spectrum 

411 

412 # Detect noise floor using percentile method 

413 noise_floor = np.percentile(spectrum_db, noise_floor_percentile) 

414 

415 # Find signal peaks using scipy 

416 peak_indices, peak_properties = sp_signal.find_peaks( 

417 spectrum_db, 

418 height=noise_floor + peak_threshold_db, 

419 prominence=peak_threshold_db / 2, 

420 ) 

421 

422 if len(peak_indices) > 0: 

423 peak_max = np.max(peak_properties["peak_heights"]) 

424 else: 

425 # No peaks detected, use maximum value 

426 peak_max = np.max(spectrum_db) 

427 

428 # Calculate dB range 

429 db_max = float(peak_max) 

430 db_min = float(noise_floor - margin_db) 

431 

432 # Apply dynamic range compression if too wide 

433 dynamic_range = db_max - db_min 

434 if dynamic_range > max_dynamic_range_db: 

435 db_min = db_max - max_dynamic_range_db 

436 

437 return (db_min, db_max) 

438 

439 

440def decimate_for_display( 

441 time: NDArray[np.float64], 

442 data: NDArray[np.float64], 

443 *, 

444 max_points: int = 2000, 

445 method: Literal["lttb", "minmax", "uniform"] = "lttb", 

446) -> tuple[NDArray[np.float64], NDArray[np.float64]]: 

447 """Decimate signal data for display using LTTB or min-max envelope. 

448 

449 Intelligently reduces number of points while preserving visual appearance 

450 and important features like edges and peaks. 

451 

452 Args: 

453 time: Time axis array. 

454 data: Signal data array. 

455 max_points: Maximum number of points to retain. 

456 method: Decimation method ("lttb", "minmax", "uniform"). 

457 

458 Returns: 

459 Tuple of (decimated_time, decimated_data). 

460 

461 Raises: 

462 ValueError: If arrays are empty or method is invalid. 

463 

464 Example: 

465 >>> time_dec, data_dec = decimate_for_display(time, data, max_points=1000) 

466 >>> # Reduced from 100k to 1k points while preserving shape 

467 

468 References: 

469 VIS-014: Adaptive X-Axis Time Window 

470 LTTB: Largest Triangle Three Buckets algorithm 

471 """ 

472 if len(time) == 0 or len(data) == 0: 472 ↛ 473line 472 didn't jump to line 473 because the condition on line 472 was never true

473 raise ValueError("Time or data array is empty") 

474 

475 if len(time) != len(data): 475 ↛ 476line 475 didn't jump to line 476 because the condition on line 475 was never true

476 raise ValueError(f"Time and data arrays must match: {len(time)} vs {len(data)}") 

477 

478 # Don't decimate if already below threshold 

479 if len(data) <= max_points: 

480 return (time, data) 

481 

482 # Never decimate very small signals 

483 if len(data) < 10: 483 ↛ 484line 483 didn't jump to line 484 because the condition on line 483 was never true

484 return (time, data) 

485 

486 if method == "uniform": 

487 # Simple uniform stride decimation 

488 stride = len(data) // max_points 

489 indices = np.arange(0, len(data), stride) 

490 return (time[indices], data[indices]) 

491 

492 elif method == "minmax": 

493 # Min-max envelope: preserve peaks and valleys 

494 return _decimate_minmax(time, data, max_points) 

495 

496 elif method == "lttb": 

497 # Largest Triangle Three Buckets 

498 return _decimate_lttb(time, data, max_points) 

499 

500 else: 

501 raise ValueError(f"Invalid decimation method: {method}") 

502 

503 

504def _decimate_minmax( 

505 time: NDArray[np.float64], 

506 data: NDArray[np.float64], 

507 max_points: int, 

508) -> tuple[NDArray[np.float64], NDArray[np.float64]]: 

509 """Decimate using min-max envelope to preserve peaks/valleys. 

510 

511 Args: 

512 time: Time array. 

513 data: Signal data array. 

514 max_points: Maximum number of points to retain. 

515 

516 Returns: 

517 Tuple of (decimated_time, decimated_data). 

518 """ 

519 # Calculate bucket size 

520 bucket_size = len(data) // (max_points // 2) 

521 

522 decimated_time = [] 

523 decimated_data = [] 

524 

525 for i in range(0, len(data), bucket_size): 

526 bucket = data[i : i + bucket_size] 

527 time_bucket = time[i : i + bucket_size] 

528 

529 if len(bucket) > 0: 529 ↛ 525line 529 didn't jump to line 525 because the condition on line 529 was always true

530 # Add min and max from each bucket 

531 min_idx = np.argmin(bucket) 

532 max_idx = np.argmax(bucket) 

533 

534 # Add in chronological order 

535 if min_idx < max_idx: 

536 decimated_time.extend([time_bucket[min_idx], time_bucket[max_idx]]) 

537 decimated_data.extend([bucket[min_idx], bucket[max_idx]]) 

538 else: 

539 decimated_time.extend([time_bucket[max_idx], time_bucket[min_idx]]) 

540 decimated_data.extend([bucket[max_idx], bucket[min_idx]]) 

541 

542 return (np.array(decimated_time), np.array(decimated_data)) 

543 

544 

545def _decimate_lttb( 

546 time: NDArray[np.float64], 

547 data: NDArray[np.float64], 

548 max_points: int, 

549) -> tuple[NDArray[np.float64], NDArray[np.float64]]: 

550 """Decimate using Largest Triangle Three Buckets algorithm. 

551 

552 Preserves visual appearance by selecting points that maximize 

553 the area of triangles formed with neighboring buckets. 

554 

555 Args: 

556 time: Time array. 

557 data: Signal data array. 

558 max_points: Maximum number of points to retain. 

559 

560 Returns: 

561 Tuple of (decimated_time, decimated_data). 

562 """ 

563 if len(data) <= max_points: 563 ↛ 564line 563 didn't jump to line 564 because the condition on line 563 was never true

564 return (time, data) 

565 

566 # Always include first and last points 

567 sampled_time = [time[0]] 

568 sampled_data = [data[0]] 

569 

570 # Calculate bucket size 

571 bucket_size = (len(data) - 2) / (max_points - 2) 

572 

573 # Previous selected point 

574 prev_idx = 0 

575 

576 for i in range(max_points - 2): 

577 # Calculate average point of next bucket (for triangle area calculation) 

578 avg_range_start = int((i + 1) * bucket_size) + 1 

579 avg_range_end = int((i + 2) * bucket_size) + 1 

580 avg_range_end = min(avg_range_end, len(data)) 

581 

582 if avg_range_start < avg_range_end: 582 ↛ 586line 582 didn't jump to line 586 because the condition on line 582 was always true

583 avg_time = np.mean(time[avg_range_start:avg_range_end]) 

584 avg_data = np.mean(data[avg_range_start:avg_range_end]) 

585 else: 

586 avg_time = time[-1] 

587 avg_data = data[-1] 

588 

589 # Current bucket range 

590 range_start = int(i * bucket_size) + 1 

591 range_end = int((i + 1) * bucket_size) + 1 

592 range_end = min(range_end, len(data) - 1) 

593 

594 # Find point in current bucket that forms largest triangle 

595 prev_time = time[prev_idx] 

596 prev_data = data[prev_idx] 

597 

598 max_area = -1.0 

599 max_idx = range_start 

600 

601 for idx in range(range_start, range_end): 

602 # Calculate triangle area 

603 area = abs( 

604 (prev_time - avg_time) * (data[idx] - prev_data) 

605 - (prev_time - time[idx]) * (avg_data - prev_data) 

606 ) 

607 

608 if area > max_area: 

609 max_area = area 

610 max_idx = idx 

611 

612 sampled_time.append(time[max_idx]) 

613 sampled_data.append(data[max_idx]) 

614 prev_idx = max_idx 

615 

616 # Always include last point 

617 sampled_time.append(time[-1]) 

618 sampled_data.append(data[-1]) 

619 

620 return (np.array(sampled_time), np.array(sampled_data)) 

621 

622 

623@dataclass 

624class InterestingRegion: 

625 """Represents an interesting region in a signal. 

626 

627 Attributes: 

628 start_idx: Starting sample index 

629 end_idx: Ending sample index 

630 start_time: Starting time in seconds 

631 end_time: Ending time in seconds 

632 type: Type of interesting feature 

633 significance: Significance score (0-1, higher is more significant) 

634 metadata: Additional metadata about the region 

635 """ 

636 

637 start_idx: int 

638 end_idx: int 

639 start_time: float 

640 end_time: float 

641 type: Literal["edge", "glitch", "anomaly", "pattern_change"] 

642 significance: float 

643 metadata: dict # type: ignore[type-arg] 

644 

645 

646def detect_interesting_regions( 

647 signal: NDArray[np.float64], 

648 sample_rate: float, 

649 *, 

650 edge_threshold: float | None = None, 

651 glitch_sigma: float = 3.0, 

652 anomaly_threshold: float = 3.0, 

653 min_region_samples: int = 10, 

654 max_regions: int = 10, 

655) -> list[InterestingRegion]: 

656 """Detect interesting regions in a signal for automatic zoom/focus. 

657 

658 : Automatically detect and zoom to interesting signal 

659 regions such as edges, glitches, anomalies, or pattern changes. 

660 

661 Args: 

662 signal: Input signal array 

663 sample_rate: Sample rate in Hz 

664 edge_threshold: Edge detection threshold (default: auto from signal stddev) 

665 glitch_sigma: Sigma threshold for glitch detection (default: 3.0) 

666 anomaly_threshold: Threshold for anomaly detection in sigma (default: 3.0) 

667 min_region_samples: Minimum samples per region (default: 10) 

668 max_regions: Maximum number of regions to return (default: 10) 

669 

670 Returns: 

671 List of InterestingRegion objects, sorted by significance (descending) 

672 

673 Raises: 

674 ValueError: If signal is empty or sample_rate is invalid 

675 

676 Example: 

677 >>> signal = np.sin(2*np.pi*1000*t) + 0.1*np.random.randn(len(t)) 

678 >>> regions = detect_interesting_regions(signal, 1e6) 

679 >>> print(f"Found {len(regions)} interesting regions") 

680 

681 References: 

682 VIS-020: Zoom to Interesting Regions 

683 Edge detection: Sobel operator on signal derivative 

684 Glitch detection: MAD-based outlier detection 

685 """ 

686 if len(signal) == 0: 

687 raise ValueError("Signal cannot be empty") 

688 if sample_rate <= 0: 

689 raise ValueError("Sample rate must be positive") 

690 if min_region_samples < 1: 690 ↛ 691line 690 didn't jump to line 691 because the condition on line 690 was never true

691 raise ValueError("min_region_samples must be >= 1") 

692 

693 regions: list[InterestingRegion] = [] 

694 1.0 / sample_rate 

695 

696 # 1. Edge detection using first derivative 

697 edges = _detect_edges(signal, sample_rate, edge_threshold) 

698 regions.extend(edges) 

699 

700 # 2. Glitch detection using statistical outliers 

701 glitches = _detect_glitches(signal, sample_rate, glitch_sigma) 

702 regions.extend(glitches) 

703 

704 # 3. Anomaly detection using MAD 

705 anomalies = _detect_anomalies(signal, sample_rate, anomaly_threshold) 

706 regions.extend(anomalies) 

707 

708 # 4. Pattern change detection (simplified using variance changes) 

709 pattern_changes = _detect_pattern_changes(signal, sample_rate) 

710 regions.extend(pattern_changes) 

711 

712 # Filter out regions that are too small 

713 regions = [r for r in regions if (r.end_idx - r.start_idx) >= min_region_samples] 

714 

715 # Sort by significance (descending) 

716 regions.sort(key=lambda r: r.significance, reverse=True) 

717 

718 # Return top N regions 

719 return regions[:max_regions] 

720 

721 

722def _detect_edges( 

723 signal: NDArray[np.float64], 

724 sample_rate: float, 

725 threshold: float | None, 

726) -> list[InterestingRegion]: 

727 """Detect edge transitions using first derivative. 

728 

729 Args: 

730 signal: Input signal 

731 sample_rate: Sample rate in Hz 

732 threshold: Edge threshold (auto if None) 

733 

734 Returns: 

735 List of edge regions 

736 """ 

737 # Calculate first derivative (gradient) 

738 gradient = np.gradient(signal) 

739 

740 # Auto threshold based on signal statistics 

741 if threshold is None: 741 ↛ 745line 741 didn't jump to line 745 because the condition on line 741 was always true

742 threshold = np.std(gradient) * 2.0 

743 

744 # Find where gradient exceeds threshold 

745 edge_mask = np.abs(gradient) > threshold 

746 

747 # Find continuous edge regions 

748 regions: list[InterestingRegion] = [] 

749 in_edge = False 

750 start_idx = 0 

751 

752 for i, is_edge in enumerate(edge_mask): 

753 if is_edge and not in_edge: 

754 # Start of edge 

755 start_idx = i 

756 in_edge = True 

757 elif not is_edge and in_edge: 

758 # End of edge 

759 end_idx = i 

760 

761 # Calculate significance based on gradient magnitude 

762 edge_gradient = gradient[start_idx:end_idx] 

763 significance = min(1.0, np.max(np.abs(edge_gradient)) / (threshold * 5)) 

764 

765 time_base = 1.0 / sample_rate 

766 regions.append( 

767 InterestingRegion( 

768 start_idx=start_idx, 

769 end_idx=end_idx, 

770 start_time=start_idx * time_base, 

771 end_time=end_idx * time_base, 

772 type="edge", 

773 significance=significance, 

774 metadata={ 

775 "max_gradient": float(np.max(np.abs(edge_gradient))), 

776 "threshold": threshold, 

777 }, 

778 ) 

779 ) 

780 in_edge = False 

781 

782 # Handle edge at end of signal 

783 if in_edge: 

784 end_idx = len(signal) 

785 edge_gradient = gradient[start_idx:end_idx] 

786 significance = min(1.0, np.max(np.abs(edge_gradient)) / (threshold * 5)) 

787 time_base = 1.0 / sample_rate 

788 regions.append( 

789 InterestingRegion( 

790 start_idx=start_idx, 

791 end_idx=end_idx, 

792 start_time=start_idx * time_base, 

793 end_time=end_idx * time_base, 

794 type="edge", 

795 significance=significance, 

796 metadata={ 

797 "max_gradient": float(np.max(np.abs(edge_gradient))), 

798 "threshold": threshold, 

799 }, 

800 ) 

801 ) 

802 

803 return regions 

804 

805 

806def _detect_glitches( 

807 signal: NDArray[np.float64], 

808 sample_rate: float, 

809 sigma_threshold: float, 

810) -> list[InterestingRegion]: 

811 """Detect isolated spikes (glitches) using z-score. 

812 

813 Args: 

814 signal: Input signal 

815 sample_rate: Sample rate in Hz 

816 sigma_threshold: Sigma threshold for outlier detection 

817 

818 Returns: 

819 List of glitch regions 

820 """ 

821 # Calculate z-scores 

822 mean = np.mean(signal) 

823 std = np.std(signal) 

824 

825 if std == 0: 825 ↛ 826line 825 didn't jump to line 826 because the condition on line 825 was never true

826 return [] 

827 

828 z_scores = np.abs((signal - mean) / std) 

829 

830 # Find outliers 

831 outlier_mask = z_scores > sigma_threshold 

832 

833 # Find isolated glitches (single sample or very short bursts) 

834 regions: list[InterestingRegion] = [] 

835 time_base = 1.0 / sample_rate 

836 

837 i = 0 

838 while i < len(outlier_mask): 

839 if outlier_mask[i]: 

840 # Start of potential glitch 

841 start_idx = i 

842 

843 # Find end of glitch (max 5 samples to be considered a glitch) 

844 while i < len(outlier_mask) and outlier_mask[i] and (i - start_idx) < 5: 

845 i += 1 

846 

847 end_idx = i 

848 

849 # Calculate significance based on z-score 

850 glitch_z_scores = z_scores[start_idx:end_idx] 

851 significance = min(1.0, np.max(glitch_z_scores) / (sigma_threshold * 3)) 

852 

853 regions.append( 

854 InterestingRegion( 

855 start_idx=start_idx, 

856 end_idx=end_idx, 

857 start_time=start_idx * time_base, 

858 end_time=end_idx * time_base, 

859 type="glitch", 

860 significance=significance, 

861 metadata={ 

862 "max_z_score": float(np.max(glitch_z_scores)), 

863 "threshold_sigma": sigma_threshold, 

864 }, 

865 ) 

866 ) 

867 else: 

868 i += 1 

869 

870 return regions 

871 

872 

873def _detect_anomalies( 

874 signal: NDArray[np.float64], 

875 sample_rate: float, 

876 threshold_sigma: float, 

877) -> list[InterestingRegion]: 

878 """Detect anomalies using MAD (Median Absolute Deviation). 

879 

880 Args: 

881 signal: Input signal 

882 sample_rate: Sample rate in Hz 

883 threshold_sigma: Sigma threshold for MAD 

884 

885 Returns: 

886 List of anomaly regions 

887 """ 

888 # Calculate MAD 

889 median = np.median(signal) 

890 mad = np.median(np.abs(signal - median)) 

891 

892 if mad == 0: 

893 return [] 

894 

895 # Modified z-score using MAD (more robust than standard z-score) 

896 modified_z_scores = 0.6745 * (signal - median) / mad 

897 

898 # Find anomalies 

899 anomaly_mask = np.abs(modified_z_scores) > threshold_sigma 

900 

901 # Find continuous anomaly regions 

902 regions: list[InterestingRegion] = [] 

903 in_anomaly = False 

904 start_idx = 0 

905 time_base = 1.0 / sample_rate 

906 

907 for i, is_anomaly in enumerate(anomaly_mask): 

908 if is_anomaly and not in_anomaly: 

909 start_idx = i 

910 in_anomaly = True 

911 elif not is_anomaly and in_anomaly: 

912 end_idx = i 

913 

914 # Calculate significance 

915 anomaly_scores = modified_z_scores[start_idx:end_idx] 

916 significance = min(1.0, np.max(np.abs(anomaly_scores)) / (threshold_sigma * 3)) 

917 

918 regions.append( 

919 InterestingRegion( 

920 start_idx=start_idx, 

921 end_idx=end_idx, 

922 start_time=start_idx * time_base, 

923 end_time=end_idx * time_base, 

924 type="anomaly", 

925 significance=significance, 

926 metadata={ 

927 "max_mad_score": float(np.max(np.abs(anomaly_scores))), 

928 "threshold_sigma": threshold_sigma, 

929 }, 

930 ) 

931 ) 

932 in_anomaly = False 

933 

934 # Handle anomaly at end 

935 if in_anomaly: 935 ↛ 936line 935 didn't jump to line 936 because the condition on line 935 was never true

936 end_idx = len(signal) 

937 anomaly_scores = modified_z_scores[start_idx:end_idx] 

938 significance = min(1.0, np.max(np.abs(anomaly_scores)) / (threshold_sigma * 3)) 

939 regions.append( 

940 InterestingRegion( 

941 start_idx=start_idx, 

942 end_idx=end_idx, 

943 start_time=start_idx * time_base, 

944 end_time=end_idx * time_base, 

945 type="anomaly", 

946 significance=significance, 

947 metadata={ 

948 "max_mad_score": float(np.max(np.abs(anomaly_scores))), 

949 "threshold_sigma": threshold_sigma, 

950 }, 

951 ) 

952 ) 

953 

954 return regions 

955 

956 

957def _detect_pattern_changes( 

958 signal: NDArray[np.float64], 

959 sample_rate: float, 

960) -> list[InterestingRegion]: 

961 """Detect pattern changes using windowed variance analysis. 

962 

963 Args: 

964 signal: Input signal 

965 sample_rate: Sample rate in Hz 

966 

967 Returns: 

968 List of pattern change regions 

969 """ 

970 # Use sliding window to detect variance changes 

971 window_size = min(100, len(signal) // 10) 

972 

973 if window_size < 10: 973 ↛ 974line 973 didn't jump to line 974 because the condition on line 973 was never true

974 return [] 

975 

976 # Calculate windowed variance 

977 variances = np.zeros(len(signal) - window_size + 1) 

978 for i in range(len(variances)): 

979 variances[i] = np.var(signal[i : i + window_size]) 

980 

981 # Detect changes in variance 

982 if len(variances) < 2: 982 ↛ 983line 982 didn't jump to line 983 because the condition on line 982 was never true

983 return [] 

984 

985 variance_gradient = np.gradient(variances) 

986 threshold = np.std(variance_gradient) * 2.0 

987 

988 change_mask = np.abs(variance_gradient) > threshold 

989 

990 # Find change regions 

991 regions: list[InterestingRegion] = [] 

992 time_base = 1.0 / sample_rate 

993 

994 for i in range(len(change_mask)): 

995 if change_mask[i]: 

996 start_idx = i 

997 end_idx = min(i + window_size, len(signal)) 

998 

999 # Calculate significance 

1000 significance = min(1.0, np.abs(variance_gradient[i]) / (threshold * 5)) 

1001 

1002 regions.append( 

1003 InterestingRegion( 

1004 start_idx=start_idx, 

1005 end_idx=end_idx, 

1006 start_time=start_idx * time_base, 

1007 end_time=end_idx * time_base, 

1008 type="pattern_change", 

1009 significance=significance, 

1010 metadata={ 

1011 "variance_change": float(variance_gradient[i]), 

1012 "threshold": threshold, 

1013 }, 

1014 ) 

1015 ) 

1016 

1017 return regions 

1018 

1019 

1020__all__ = [ 

1021 "InterestingRegion", 

1022 "calculate_grid_spacing", 

1023 "calculate_optimal_x_window", 

1024 "calculate_optimal_y_range", 

1025 "decimate_for_display", 

1026 "detect_interesting_regions", 

1027 "optimize_db_range", 

1028]