Coverage for src / tracekit / reporting / engine.py: 41%

574 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Analysis Engine for orchestrating comprehensive analysis execution. 

2 

3This module provides the AnalysisEngine class that orchestrates running all 

4applicable analyses on input data, handling progress tracking, timeouts, 

5and error collection. 

6""" 

7 

8from __future__ import annotations 

9 

10import importlib 

11import inspect 

12import logging 

13import time 

14import traceback 

15import types 

16from collections.abc import Callable 

17from pathlib import Path 

18from typing import Any 

19 

20import numpy as np 

21 

22from tracekit.core.types import TraceMetadata, WaveformTrace 

23from tracekit.reporting.config import ( 

24 ANALYSIS_CAPABILITIES, 

25 AnalysisConfig, 

26 AnalysisDomain, 

27 AnalysisError, 

28 InputType, 

29 ProgressInfo, 

30 get_available_analyses, 

31) 

32 

33logger = logging.getLogger(__name__) 

34 

35 

36# Functions that require context-specific parameters that cannot be auto-detected 

37NON_INFERRABLE_FUNCTIONS: set[str] = { 

38 # INFERENCE domain - require specific data types 

39 "tracekit.inference.protocol_dsl.decode_protocol", 

40 "tracekit.inference.protocol_dsl.match_pattern", 

41 "tracekit.inference.protocol_dsl.validate_message", 

42 # PACKET domain - require PacketInfo objects 

43 "tracekit.analyzers.packet.timing.analyze_inter_packet_timing", 

44 "tracekit.analyzers.packet.timing.detect_bursts", 

45 # POWER domain - require voltage+current pairs 

46 "tracekit.analyzers.power.consumption.calculate_power", 

47 "tracekit.analyzers.power.consumption.analyze_power_efficiency", 

48} 

49 

50 

51class AnalysisEngine: 

52 """Engine for orchestrating comprehensive analysis execution. 

53 

54 The AnalysisEngine accepts input data (from file or in-memory), detects 

55 the input type, determines applicable analysis domains, and executes 

56 all relevant analysis functions with progress tracking and error handling. 

57 

58 Example: 

59 >>> from tracekit.reporting import AnalysisEngine, AnalysisConfig 

60 >>> config = AnalysisConfig(timeout_per_analysis=30.0) 

61 >>> engine = AnalysisEngine(config) 

62 >>> result = engine.run(input_path=Path("data.wfm")) 

63 >>> print(f"Ran {result['stats']['total_analyses']} analyses") 

64 >>> print(f"Success rate: {result['stats']['success_rate']:.1f}%") 

65 """ 

66 

67 def __init__(self, config: AnalysisConfig | None = None) -> None: 

68 """Initialize the analysis engine. 

69 

70 Args: 

71 config: Analysis configuration. If None, uses defaults. 

72 """ 

73 self.config = config or AnalysisConfig() 

74 self._start_time = 0.0 

75 self._input_path: Path | None = None 

76 

77 def detect_input_type(self, input_path: Path | None, data: Any) -> InputType: 

78 """Detect input type from file path or data characteristics. 

79 

80 Args: 

81 input_path: Path to input file (None if in-memory data). 

82 data: Input data object. 

83 

84 Returns: 

85 Detected input type. 

86 

87 Raises: 

88 ValueError: If input type cannot be determined. 

89 """ 

90 # If path provided, detect from extension 

91 if input_path is not None: 

92 ext = input_path.suffix.lower() 

93 

94 # Waveform formats 

95 if ext in {".wfm", ".csv", ".npz", ".h5", ".hdf5", ".wav", ".tdms"}: 

96 return InputType.WAVEFORM 

97 # Digital formats 

98 elif ext in {".vcd", ".sr"}: 

99 return InputType.DIGITAL 

100 # Packet formats 

101 elif ext in {".pcap", ".pcapng"}: 

102 return InputType.PCAP 

103 # Binary formats 

104 elif ext in {".bin", ".raw"}: 104 ↛ 107line 104 didn't jump to line 107 because the condition on line 104 was always true

105 return InputType.BINARY 

106 # S-parameter/Touchstone formats 

107 elif ext in {".s1p", ".s2p", ".s3p", ".s4p", ".s5p", ".s6p", ".s7p", ".s8p"}: 

108 return InputType.SPARAMS 

109 

110 # Detect from data object characteristics 

111 if hasattr(data, "s_matrix") and hasattr(data, "frequencies"): 

112 # SParameterData 

113 return InputType.SPARAMS 

114 elif hasattr(data, "data") and hasattr(data, "metadata"): 

115 # WaveformTrace or DigitalTrace 

116 if hasattr(data.metadata, "is_digital") and data.metadata.is_digital: 

117 return InputType.DIGITAL 

118 return InputType.WAVEFORM 

119 elif isinstance(data, bytes | bytearray): 

120 return InputType.BINARY 

121 elif isinstance(data, list): 

122 # Assume packet list 

123 return InputType.PACKETS 

124 elif isinstance(data, np.ndarray): 124 ↛ 128line 124 didn't jump to line 128 because the condition on line 124 was always true

125 # Assume waveform 

126 return InputType.WAVEFORM 

127 

128 raise ValueError("Unable to determine input type from path or data characteristics") 

129 

130 def run( 

131 self, 

132 input_path: Path | None = None, 

133 data: Any = None, 

134 progress_callback: Callable[[ProgressInfo], None] | None = None, 

135 ) -> dict[str, Any]: 

136 """Run comprehensive analysis on input data. 

137 

138 Args: 

139 input_path: Path to input file (or None for in-memory data). 

140 data: Input data object (or None to load from input_path). 

141 progress_callback: Optional callback for progress updates. 

142 

143 Returns: 

144 Dictionary with keys: 

145 - 'results': Dict mapping AnalysisDomain to analysis results 

146 - 'errors': List of AnalysisError objects 

147 - 'stats': Execution statistics dict 

148 

149 Raises: 

150 ValueError: If neither input_path nor data provided. 

151 FileNotFoundError: If input_path doesn't exist. 

152 

153 Example: 

154 >>> def progress(info: ProgressInfo): 

155 ... print(f"{info.phase}: {info.percent:.1f}%") 

156 >>> result = engine.run(input_path=Path("data.wfm"), progress_callback=progress) 

157 """ 

158 if input_path is None and data is None: 

159 raise ValueError("Must provide either input_path or data") 

160 

161 self._start_time = time.time() 

162 self._input_path = input_path 

163 

164 # Check available memory and adjust parallelism if needed 

165 from tracekit.core.memory_guard import check_memory_available 

166 

167 min_required_mb = 500 # Minimum 500MB needed for analysis 

168 if not check_memory_available(min_required_mb): 168 ↛ 169line 168 didn't jump to line 169 because the condition on line 168 was never true

169 logger.warning( 

170 f"Low memory available (< {min_required_mb} MB). " 

171 f"Reducing parallel workers to conserve memory." 

172 ) 

173 # Temporarily reduce parallelism to conserve memory 

174 self.config.parallel_domains = False 

175 

176 # Phase 1: Load data 

177 if progress_callback: 

178 progress_callback( 

179 ProgressInfo( 

180 phase="loading", 

181 domain=None, 

182 function=None, 

183 percent=0.0, 

184 message="Loading input data", 

185 elapsed_seconds=0.0, 

186 estimated_remaining_seconds=None, 

187 ) 

188 ) 

189 

190 if data is None: 

191 if input_path is None or not input_path.exists(): 191 ↛ 195line 191 didn't jump to line 195 because the condition on line 191 was always true

192 raise FileNotFoundError(f"Input file not found: {input_path}") 

193 

194 # Load using tracekit loaders 

195 from tracekit.loaders import load 

196 

197 data = load(input_path) 

198 

199 # Phase 2: Detect input type 

200 input_type = self.detect_input_type(input_path, data) 

201 

202 if progress_callback: 

203 progress_callback( 

204 ProgressInfo( 

205 phase="detecting", 

206 domain=None, 

207 function=None, 

208 percent=5.0, 

209 message=f"Detected input type: {input_type.value}", 

210 elapsed_seconds=time.time() - self._start_time, 

211 estimated_remaining_seconds=None, 

212 ) 

213 ) 

214 

215 # Phase 3: Determine applicable domains 

216 applicable_domains = get_available_analyses(input_type) 

217 

218 # Filter by configuration 

219 enabled_domains = [d for d in applicable_domains if self.config.is_domain_enabled(d)] 

220 

221 if progress_callback: 

222 progress_callback( 

223 ProgressInfo( 

224 phase="planning", 

225 domain=None, 

226 function=None, 

227 percent=10.0, 

228 message=f"Planning analysis across {len(enabled_domains)} domains", 

229 elapsed_seconds=time.time() - self._start_time, 

230 estimated_remaining_seconds=None, 

231 ) 

232 ) 

233 

234 # Phase 4: Execute analyses 

235 results: dict[AnalysisDomain, dict[str, Any]] = {} 

236 errors: list[AnalysisError] = [] 

237 

238 total_domains = len(enabled_domains) 

239 

240 # Execute domains in parallel if enabled and multiple domains exist 

241 if self.config.parallel_domains and len(enabled_domains) > 1: 

242 import concurrent.futures 

243 

244 # Use ThreadPoolExecutor with bounded workers from config 

245 max_workers = min(self.config.max_parallel_workers, len(enabled_domains)) 

246 

247 with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: 

248 # Submit all domain executions 

249 futures = { 

250 executor.submit(self._execute_domain, domain, data): domain 

251 for domain in enabled_domains 

252 } 

253 

254 # Process results as they complete 

255 for completed, future in enumerate(concurrent.futures.as_completed(futures), 1): 

256 domain = futures[future] 

257 domain_percent = 10.0 + (completed / total_domains) * 80.0 

258 

259 if progress_callback: 259 ↛ 260line 259 didn't jump to line 260 because the condition on line 259 was never true

260 progress_callback( 

261 ProgressInfo( 

262 phase="analyzing", 

263 domain=domain, 

264 function=None, 

265 percent=domain_percent, 

266 message=f"Completed domain: {domain.value}", 

267 elapsed_seconds=time.time() - self._start_time, 

268 estimated_remaining_seconds=None, 

269 ) 

270 ) 

271 

272 try: 

273 # Retrieve result with timeout 

274 timeout_seconds = self.config.timeout_per_analysis or 30.0 

275 domain_results, domain_errors = future.result(timeout=timeout_seconds * 10) 

276 if domain_results: 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true

277 results[domain] = domain_results 

278 errors.extend(domain_errors) 

279 except concurrent.futures.TimeoutError: 

280 logger.error(f"Domain {domain.value} exceeded timeout") 

281 errors.append( 

282 AnalysisError( 

283 domain=domain, 

284 function=f"{domain.value}.*", 

285 error_type="TimeoutError", 

286 error_message="Domain execution exceeded timeout", 

287 traceback=None, 

288 duration_ms=timeout_seconds * 10 * 1000, 

289 ) 

290 ) 

291 except Exception as e: 

292 logger.error(f"Domain {domain.value} failed: {e}") 

293 errors.append( 

294 AnalysisError( 

295 domain=domain, 

296 function=f"{domain.value}.*", 

297 error_type=type(e).__name__, 

298 error_message=str(e), 

299 traceback=traceback.format_exc(), 

300 duration_ms=0.0, 

301 ) 

302 ) 

303 else: 

304 # Sequential fallback (existing code) 

305 for idx, domain in enumerate(enabled_domains): 

306 domain_percent = 10.0 + (idx / total_domains) * 80.0 

307 

308 if progress_callback: 

309 progress_callback( 

310 ProgressInfo( 

311 phase="analyzing", 

312 domain=domain, 

313 function=None, 

314 percent=domain_percent, 

315 message=f"Analyzing domain: {domain.value}", 

316 elapsed_seconds=time.time() - self._start_time, 

317 estimated_remaining_seconds=None, 

318 ) 

319 ) 

320 

321 domain_results, domain_errors = self._execute_domain(domain, data) 

322 if domain_results: 322 ↛ 323line 322 didn't jump to line 323 because the condition on line 322 was never true

323 results[domain] = domain_results 

324 errors.extend(domain_errors) 

325 

326 # Phase 5: Complete 

327 total_duration = time.time() - self._start_time 

328 

329 if progress_callback: 

330 progress_callback( 

331 ProgressInfo( 

332 phase="complete", 

333 domain=None, 

334 function=None, 

335 percent=100.0, 

336 message="Analysis complete", 

337 elapsed_seconds=total_duration, 

338 estimated_remaining_seconds=0.0, 

339 ) 

340 ) 

341 

342 # Calculate statistics 

343 total_analyses = sum(len(dr) for dr in results.values()) 

344 successful_analyses = sum( 

345 1 for dr in results.values() for v in dr.values() if not isinstance(v, Exception) 

346 ) 

347 failed_analyses = len(errors) 

348 

349 stats = { 

350 "input_type": input_type.value, 

351 "total_domains": len(enabled_domains), 

352 "total_analyses": total_analyses, 

353 "successful_analyses": successful_analyses, 

354 "failed_analyses": failed_analyses, 

355 "success_rate": (successful_analyses / total_analyses * 100.0) 

356 if total_analyses > 0 

357 else 0.0, 

358 "duration_seconds": total_duration, 

359 } 

360 

361 return { 

362 "results": results, 

363 "errors": errors, 

364 "stats": stats, 

365 } 

366 

367 def _execute_domain( 

368 self, domain: AnalysisDomain, data: Any 

369 ) -> tuple[dict[str, Any], list[AnalysisError]]: 

370 """Execute all analyses for a specific domain. 

371 

372 Args: 

373 domain: Analysis domain to execute. 

374 data: Input data object. 

375 

376 Returns: 

377 Tuple of (results_dict, errors_list). 

378 """ 

379 results: dict[str, Any] = {} 

380 errors: list[AnalysisError] = [] 

381 

382 # Preprocess data for specific domains 

383 data = self._preprocess_for_domain(domain, data) 

384 

385 # Get domain capabilities 

386 cap = ANALYSIS_CAPABILITIES.get(domain, {}) 

387 module_names = cap.get("modules", []) 

388 

389 # Fallback to old single-module format 

390 if not module_names: 390 ↛ 391line 390 didn't jump to line 391 because the condition on line 390 was never true

391 single_module = cap.get("module", "") 

392 if single_module: 

393 module_names = [single_module] 

394 

395 if not module_names: 395 ↛ 396line 395 didn't jump to line 396 because the condition on line 395 was never true

396 logger.debug(f"No modules configured for domain {domain.value}") 

397 return results, errors 

398 

399 # Get domain-specific config 

400 domain_config = self.config.get_domain_config(domain) 

401 timeout = domain_config.timeout or self.config.timeout_per_analysis 

402 

403 # Track executed functions to prevent duplicates 

404 executed_functions: set[str] = set() 

405 

406 # Iterate through all modules for this domain 

407 for module_name in module_names: 

408 try: 

409 module = importlib.import_module(module_name) 

410 except ImportError as e: 

411 logger.warning(f"Failed to import module {module_name}: {e}") 

412 if not self.config.continue_on_error: 412 ↛ 413line 412 didn't jump to line 413 because the condition on line 412 was never true

413 errors.append( 

414 AnalysisError( 

415 domain=domain, 

416 function=module_name, 

417 error_type="ImportError", 

418 error_message=str(e), 

419 traceback=traceback.format_exc(), 

420 duration_ms=0.0, 

421 ) 

422 ) 

423 continue 

424 

425 # Discover public functions in the module 

426 for func_name, func_obj in inspect.getmembers(module): 

427 # Skip private functions and non-functions 

428 if func_name.startswith("_") or not inspect.isfunction(func_obj): 428 ↛ 432line 428 didn't jump to line 432 because the condition on line 428 was always true

429 continue 

430 

431 # Skip functions not defined in this module (imported from elsewhere) 

432 if func_obj.__module__ != module_name: 

433 continue 

434 

435 # Skip if already executed (prevent duplicates) 

436 func_path = f"{module_name}.{func_name}" 

437 if func_path in executed_functions: 

438 logger.debug(f"Skipping duplicate function: {func_path}") 

439 continue 

440 executed_functions.add(func_path) 

441 

442 # Execute the function 

443 try: 

444 result = self._execute_function(module_name, func_name, data, timeout) 

445 results[f"{module_name}.{func_name}"] = result 

446 except Exception as e: 

447 error = AnalysisError( 

448 domain=domain, 

449 function=f"{module_name}.{func_name}", 

450 error_type=type(e).__name__, 

451 error_message=str(e), 

452 traceback=traceback.format_exc(), 

453 duration_ms=0.0, 

454 ) 

455 errors.append(error) 

456 

457 if not self.config.continue_on_error: 

458 # Stop execution for this domain 

459 return results, errors 

460 

461 return results, errors 

462 

463 def _preprocess_for_domain(self, domain: AnalysisDomain, data: Any) -> Any: 

464 """Preprocess data for domain-specific requirements. 

465 

466 Some domains require specialized data structures. This method 

467 converts raw data into the appropriate format. 

468 

469 Args: 

470 domain: Target analysis domain. 

471 data: Input data object. 

472 

473 Returns: 

474 Preprocessed data suitable for the domain. 

475 """ 

476 if domain == AnalysisDomain.EYE: 

477 # EYE domain requires an EyeDiagram object 

478 # Try to generate one from waveform data 

479 return self._preprocess_for_eye_domain(data) 

480 

481 return data 

482 

483 def _get_effective_sample_rate(self, data: Any, context: str = "general") -> float: 

484 """Get effective sample rate from data metadata or config defaults. 

485 

486 Priority order: 

487 1. Data metadata (e.g., WaveformTrace.metadata.sample_rate) 

488 2. AnalysisConfig.default_sample_rate 

489 3. Context-appropriate default constant 

490 

491 Args: 

492 data: Input data object (may have .metadata.sample_rate). 

493 context: Analysis context for selecting appropriate default. 

494 Options: "general" (1 MHz), "highspeed" (1 GHz), "binary" (1 Hz). 

495 

496 Returns: 

497 Effective sample rate in Hz. 

498 

499 Note: 

500 This method logs a debug message when falling back to defaults, 

501 as sample rate should ideally be provided in the data metadata 

502 for accurate time-domain analysis. 

503 """ 

504 # Try to extract from data metadata 

505 data_sample_rate = None 

506 if hasattr(data, "metadata") and hasattr(data.metadata, "sample_rate"): 

507 data_sample_rate = data.metadata.sample_rate 

508 if data_sample_rate is not None and data_sample_rate > 0: 508 ↛ 512line 508 didn't jump to line 512 because the condition on line 508 was always true

509 return float(data_sample_rate) 

510 

511 # Use config's get_effective_sample_rate method 

512 effective_rate = self.config.get_effective_sample_rate( 

513 data_sample_rate=data_sample_rate, 

514 context=context, 

515 ) 

516 

517 # Log when using defaults (indicates missing metadata) 

518 logger.debug( 

519 f"Using default sample rate {effective_rate:.2e} Hz (context: {context}). " 

520 f"For accurate analysis, provide sample_rate in data metadata." 

521 ) 

522 

523 return effective_rate 

524 

525 def _preprocess_for_eye_domain(self, data: Any) -> Any: 

526 """Preprocess data for eye diagram analysis. 

527 

528 Attempts to generate an EyeDiagram from waveform data using 

529 automatic unit interval detection via FFT-based period detection 

530 with fallback to zero-crossing analysis. 

531 

532 Args: 

533 data: Input waveform data. 

534 

535 Returns: 

536 EyeDiagram object if successful, original data otherwise. 

537 """ 

538 # Check if already an EyeDiagram 

539 if hasattr(data, "samples_per_ui") and hasattr(data, "time_axis"): 

540 return data 

541 

542 # Try to extract waveform data 

543 if hasattr(data, "data") and hasattr(data, "metadata"): 

544 # WaveformTrace 

545 raw_data = data.data 

546 sample_rate = getattr(data.metadata, "sample_rate", None) 

547 elif isinstance(data, np.ndarray): 547 ↛ 552line 547 didn't jump to line 552 because the condition on line 547 was always true

548 raw_data = data 

549 sample_rate = None 

550 else: 

551 # Can't preprocess, return as-is 

552 return data 

553 

554 if raw_data is None or len(raw_data) == 0: 

555 return data 

556 

557 try: 

558 from tracekit.analyzers.eye.diagram import generate_eye 

559 from tracekit.core.types import TraceMetadata, WaveformTrace 

560 

561 # Get effective sample rate using config-aware method 

562 # Use "highspeed" context for eye diagram (typically high-speed serial) 

563 if sample_rate is None or sample_rate <= 0: 

564 sample_rate = self._get_effective_sample_rate(data, context="highspeed") 

565 

566 # Estimate unit interval using FFT-based period detection 

567 unit_interval = self._detect_unit_interval_fft(raw_data, sample_rate) 

568 

569 # If FFT detection fails, try zero-crossing analysis 

570 if unit_interval is None: 

571 unit_interval = self._detect_unit_interval_zero_crossing(raw_data, sample_rate) 

572 

573 # If both methods fail, use default fallback 

574 if unit_interval is None: 

575 # Fallback: assume 100 UI in the data 

576 unit_interval = len(raw_data) / sample_rate / 100 

577 logger.debug("Using default unit interval fallback (100 UI in data)") 

578 

579 # Ensure unit interval is reasonable 

580 min_ui = 10 / sample_rate # At least 10 samples per UI 

581 max_ui = len(raw_data) / sample_rate / 10 # At least 10 UI in data 

582 unit_interval = np.clip(unit_interval, min_ui, max_ui) 

583 

584 # Create a WaveformTrace if we only have raw data 

585 if not hasattr(data, "data"): 585 ↛ 586line 585 didn't jump to line 586 because the condition on line 585 was never true

586 metadata = TraceMetadata(sample_rate=sample_rate) 

587 trace = WaveformTrace(data=raw_data.astype(np.float64), metadata=metadata) 

588 else: 

589 trace = data 

590 

591 # Generate eye diagram 

592 eye_diagram = generate_eye( 

593 trace=trace, 

594 unit_interval=unit_interval, 

595 n_ui=2, 

596 generate_histogram=True, 

597 ) 

598 

599 logger.debug( 

600 f"Generated eye diagram: {eye_diagram.n_traces} traces, " 

601 f"{eye_diagram.samples_per_ui} samples/UI" 

602 ) 

603 return eye_diagram 

604 

605 except Exception as e: 

606 logger.debug(f"Could not generate eye diagram: {e}") 

607 # Return original data if preprocessing fails 

608 return data 

609 

610 def _detect_unit_interval_fft( 

611 self, raw_data: np.ndarray[Any, Any], sample_rate: float 

612 ) -> float | None: 

613 """Detect unit interval using FFT-based period detection. 

614 

615 Computes the FFT of the waveform, finds the dominant frequency 

616 (excluding DC), and calculates the unit interval for NRZ data. 

617 

618 Args: 

619 raw_data: Input waveform samples. 

620 sample_rate: Sample rate in Hz. 

621 

622 Returns: 

623 Estimated unit interval in seconds, or None if detection fails. 

624 """ 

625 try: 

626 # Remove DC component 

627 data_ac = raw_data - np.mean(raw_data) 

628 

629 # Compute FFT 

630 fft_result = np.fft.rfft(data_ac) 

631 fft_freqs = np.fft.rfftfreq(len(data_ac), d=1.0 / sample_rate) 

632 

633 # Get magnitude spectrum (exclude DC bin at index 0) 

634 magnitude = np.abs(fft_result[1:]) 

635 freqs = fft_freqs[1:] 

636 

637 if len(magnitude) == 0: 637 ↛ 638line 637 didn't jump to line 638 because the condition on line 637 was never true

638 return None 

639 

640 # Find dominant frequency (peak in magnitude spectrum) 

641 peak_idx = np.argmax(magnitude) 

642 dominant_freq = freqs[peak_idx] 

643 

644 # For NRZ data, unit interval = 1 / (2 * dominant_freq) 

645 # For periodic signals like sine waves, unit interval = 1 / dominant_freq 

646 # We'll use the period as the unit interval for general signals 

647 if dominant_freq > 0: 647 ↛ 667line 647 didn't jump to line 667 because the condition on line 647 was always true

648 unit_interval = float(1.0 / dominant_freq) 

649 

650 # Sanity check: dominant frequency should be reasonable 

651 min_freq = sample_rate / len(raw_data) # At least one full cycle 

652 max_freq = sample_rate / 20 # At least 20 samples per cycle 

653 

654 if min_freq <= dominant_freq <= max_freq: 

655 logger.debug( 

656 f"FFT detected dominant frequency: {dominant_freq:.2f} Hz, " 

657 f"unit interval: {unit_interval * 1e6:.3f} us" 

658 ) 

659 return unit_interval 

660 else: 

661 logger.debug( 

662 f"FFT dominant frequency {dominant_freq:.2f} Hz out of range " 

663 f"[{min_freq:.2f}, {max_freq:.2f}] Hz" 

664 ) 

665 return None 

666 

667 return None 

668 

669 except Exception as e: 

670 logger.debug(f"FFT-based unit interval detection failed: {e}") 

671 return None 

672 

673 def _detect_unit_interval_zero_crossing( 

674 self, raw_data: np.ndarray[Any, Any], sample_rate: float 

675 ) -> float | None: 

676 """Detect unit interval using zero-crossing analysis. 

677 

678 Estimates the signal period from the average interval between 

679 zero crossings. 

680 

681 Args: 

682 raw_data: Input waveform samples. 

683 sample_rate: Sample rate in Hz. 

684 

685 Returns: 

686 Estimated unit interval in seconds, or None if detection fails. 

687 """ 

688 try: 

689 # Find zero crossings 

690 zero_crossings = np.where(np.diff(np.sign(raw_data - np.mean(raw_data))))[0] 

691 

692 if len(zero_crossings) > 10: 

693 # Estimate period from average crossing interval 

694 avg_half_period = float(np.mean(np.diff(zero_crossings))) / sample_rate 

695 unit_interval = avg_half_period * 2 # Full period 

696 

697 logger.debug( 

698 f"Zero-crossing detected unit interval: {unit_interval * 1e6:.3f} us " 

699 f"({len(zero_crossings)} crossings)" 

700 ) 

701 return unit_interval 

702 else: 

703 logger.debug(f"Insufficient zero crossings ({len(zero_crossings)}) for detection") 

704 return None 

705 

706 except Exception as e: 

707 logger.debug(f"Zero-crossing unit interval detection failed: {e}") 

708 return None 

709 

710 def _detect_baud_rate_from_filename(self, path: Path | None) -> float | None: 

711 """Extract baud rate from filename patterns like 'capture_9600baud.vcd'. 

712 

713 Supports patterns such as: 

714 - 9600baud, 115200baud 

715 - 9600_baud, 115200_baud 

716 - 1Mbaud, 1.5Mbaud (with M/m prefix for megabaud) 

717 - 9600bps, 115200bps 

718 

719 Args: 

720 path: Path to the input file (None if in-memory data). 

721 

722 Returns: 

723 Detected baud rate in bps, or None if not detected. 

724 """ 

725 if path is None: 

726 return None 

727 

728 import re 

729 

730 # Match patterns: 9600baud, 115200_baud, 1Mbaud, etc. 

731 patterns = [ 

732 r"(\d+(?:\.\d+)?)[_\s]*[Mm]?baud", 

733 r"(\d+(?:\.\d+)?)[_\s]*bps", 

734 r"baud[_-]?(\d+)", 

735 ] 

736 filename = path.stem.lower() 

737 

738 for pattern in patterns: 

739 match = re.search(pattern, filename, re.IGNORECASE) 

740 if match: 

741 value = float(match.group(1)) 

742 # Handle M prefix (megabaud) 

743 matched_text = filename[match.start() : match.end()].lower() 

744 if "m" in matched_text and "baud" in matched_text: 

745 value *= 1_000_000 

746 logger.debug(f"Detected baud rate from filename '{path.name}': {value} bps") 

747 return value 

748 

749 return None 

750 

751 def _detect_logic_family(self, data: np.ndarray[Any, Any]) -> str: 

752 """Detect logic family from voltage levels. 

753 

754 Analyzes the voltage swing in the data to determine the likely 

755 logic family standard. 

756 

757 Args: 

758 data: Input waveform samples (voltage levels). 

759 

760 Returns: 

761 Detected logic family name (e.g., "TTL", "LVCMOS33", "LVDS"). 

762 """ 

763 vmax = float(np.max(data)) 

764 vmin = float(np.min(data)) 

765 voltage_swing = vmax - vmin 

766 

767 # Classify based on voltage swing 

768 if voltage_swing < 1.0: 

769 logic_family = "LVDS" # ~0.35V swing 

770 elif voltage_swing < 2.0: 

771 logic_family = "LVCMOS18" # 1.8V 

772 elif voltage_swing < 3.0: 

773 logic_family = "LVCMOS25" # 2.5V 

774 elif voltage_swing < 4.0: 

775 logic_family = "LVCMOS33" # 3.3V 

776 else: 

777 logic_family = "TTL" # 5V 

778 

779 logger.debug( 

780 f"Detected logic family from voltage swing {voltage_swing:.2f}V: {logic_family}" 

781 ) 

782 return logic_family 

783 

784 def _detect_frequency_range( 

785 self, data: np.ndarray[Any, Any], sample_rate: float 

786 ) -> tuple[float, float] | None: 

787 """Detect dominant frequency range from FFT analysis. 

788 

789 Computes the FFT of the input signal and identifies the range 

790 of frequencies containing significant spectral content. 

791 

792 Args: 

793 data: Input waveform samples. 

794 sample_rate: Sample rate in Hz. 

795 

796 Returns: 

797 Tuple of (min_freq, max_freq) in Hz for significant spectral content, 

798 or None if detection fails. 

799 """ 

800 try: 

801 # Compute FFT 

802 fft_result = np.fft.rfft(data - np.mean(data)) 

803 freqs = np.fft.rfftfreq(len(data), d=1.0 / sample_rate) 

804 magnitude = np.abs(fft_result) 

805 

806 # Find frequencies with significant power (> 10% of max) 

807 threshold = 0.1 * np.max(magnitude) 

808 significant = freqs[magnitude > threshold] 

809 

810 if len(significant) > 0: 

811 min_freq = float(np.min(significant)) 

812 max_freq = float(np.max(significant)) 

813 logger.debug(f"Detected frequency range: {min_freq:.2f} Hz - {max_freq:.2f} Hz") 

814 return (min_freq, max_freq) 

815 return None 

816 except Exception as e: 

817 logger.debug(f"Frequency range detection failed: {e}") 

818 return None 

819 

820 def _detect_noise_floor(self, data: np.ndarray[Any, Any]) -> float | None: 

821 """Estimate noise floor using median absolute deviation. 

822 

823 Uses robust statistical methods to estimate the noise level in 

824 the signal, which is useful for setting thresholds in various 

825 analysis functions. 

826 

827 Args: 

828 data: Input waveform samples. 

829 

830 Returns: 

831 Estimated noise level (standard deviation of noise), or None if detection fails. 

832 """ 

833 try: 

834 try: 

835 from scipy import stats 

836 

837 # Use MAD for robust noise estimation 

838 mad = stats.median_abs_deviation(data, scale="normal") 

839 logger.debug(f"Detected noise floor (scipy MAD): {mad:.6f}") 

840 return float(mad) 

841 except ImportError: 

842 # Fallback without scipy 

843 median = np.median(data) 

844 mad = np.median(np.abs(data - median)) * 1.4826 

845 logger.debug(f"Detected noise floor (numpy MAD): {mad:.6f}") 

846 return float(mad) 

847 except Exception as e: 

848 logger.debug(f"Noise floor detection failed: {e}") 

849 return None 

850 

851 def _detect_protocol_hints( 

852 self, data: np.ndarray[Any, Any], sample_rate: float 

853 ) -> dict[str, Any]: 

854 """Detect hints about potential protocols in the signal. 

855 

856 Analyzes signal characteristics such as edge timing and periodicity 

857 to provide hints about the communication protocol that may be present. 

858 

859 Args: 

860 data: Input waveform samples. 

861 sample_rate: Sample rate in Hz. 

862 

863 Returns: 

864 Dictionary with detected characteristics such as: 

865 - 'detected_baud': Estimated baud rate (if detected) 

866 - 'clock_regularity': 'high', 'medium', or 'low' 

867 """ 

868 hints: dict[str, Any] = {} 

869 try: 

870 # Check for common baud rates by looking at edge timing 

871 zero_crossings = np.where(np.diff(np.sign(data - np.mean(data))))[0] 

872 if len(zero_crossings) > 10: 

873 intervals = np.diff(zero_crossings) / sample_rate 

874 avg_interval = float(np.median(intervals)) 

875 

876 # Map to standard baud rates 

877 common_bauds = [ 

878 300, 

879 1200, 

880 2400, 

881 4800, 

882 9600, 

883 19200, 

884 38400, 

885 57600, 

886 115200, 

887 ] 

888 for baud in common_bauds: 

889 expected_interval = 1.0 / baud 

890 if 0.8 < avg_interval / expected_interval < 1.2: 

891 hints["detected_baud"] = baud 

892 logger.debug(f"Protocol hint: detected baud rate {baud} bps") 

893 break 

894 

895 # Check for clock-like periodicity 

896 if len(zero_crossings) > 20: 

897 interval_std = float(np.std(np.diff(zero_crossings))) 

898 if interval_std < 2: 

899 regularity = "high" 

900 elif interval_std < 5: 

901 regularity = "medium" 

902 else: 

903 regularity = "low" 

904 hints["clock_regularity"] = regularity 

905 logger.debug(f"Protocol hint: clock regularity {regularity}") 

906 

907 except Exception as e: 

908 logger.debug(f"Protocol hints detection failed: {e}") 

909 

910 return hints 

911 

912 def _execute_function( 

913 self, module_name: str, func_name: str, data: Any, timeout: float | None 

914 ) -> Any: 

915 """Execute a single analysis function with quality scoring. 

916 

917 Args: 

918 module_name: Name of the module containing the function. 

919 func_name: Name of the function to execute. 

920 data: Input data object. 

921 timeout: Timeout in seconds (None for no timeout). 

922 

923 Returns: 

924 Analysis result with optional quality score attached. 

925 

926 Raises: 

927 ValueError: If function is non-inferrable or invalid. 

928 """ 

929 # Check if function is in non-inferrable skip list 

930 func_path = f"{module_name}.{func_name}" 

931 if func_path in NON_INFERRABLE_FUNCTIONS: 

932 logger.debug(f"Skipping non-inferrable function: {func_path}") 

933 raise ValueError( 

934 f"Function {func_path} requires context-specific parameters that cannot be auto-detected" 

935 ) 

936 

937 module = importlib.import_module(module_name) 

938 func = getattr(module, func_name) 

939 

940 # Prepare function arguments 

941 args, kwargs = self._prepare_arguments(func, data) 

942 

943 if args is None: 

944 # Function not applicable to this data type 

945 raise ValueError(f"Function {func_name} not applicable to data type") 

946 

947 start_time = time.time() 

948 

949 # Execute with timeout if specified 

950 if timeout is not None: 

951 # Note: Python doesn't have built-in function timeout without threads/processes 

952 # For simplicity, we'll just execute directly and check elapsed time afterward 

953 # A production implementation would use threading.Timer or signal.alarm 

954 result = func(*args, **kwargs) 

955 

956 elapsed = time.time() - start_time 

957 if elapsed > timeout: 

958 logger.warning( 

959 f"Function {module_name}.{func_name} exceeded timeout " 

960 f"({elapsed:.2f}s > {timeout:.2f}s)" 

961 ) 

962 else: 

963 result = func(*args, **kwargs) 

964 

965 # Consume generators to avoid serialization issues 

966 if isinstance(result, types.GeneratorType): 

967 try: 

968 result = list(result) 

969 logger.debug(f"Consumed generator from {module_name}.{func_name}") 

970 except Exception as e: 

971 logger.warning(f"Failed to consume generator from {module_name}.{func_name}: {e}") 

972 result = f"<generator error: {type(e).__name__}>" 

973 

974 # Add quality scoring if enabled in config 

975 if self.config.enable_quality_scoring: 

976 result = self._add_quality_score(result, func_path, data) 

977 

978 return result 

979 

980 def _add_quality_score(self, result: Any, method_name: str, data: Any) -> Any: 

981 """Add quality score to analysis result. 

982 

983 Args: 

984 result: Analysis result to score. 

985 method_name: Name of the analysis method. 

986 data: Input data object. 

987 

988 Returns: 

989 Result with quality score attached (if applicable). 

990 """ 

991 try: 

992 from tracekit.quality import score_analysis_result 

993 

994 # Extract raw data array for quality assessment 

995 if hasattr(data, "data"): 

996 raw_data = data.data 

997 elif isinstance(data, np.ndarray): 

998 raw_data = data 

999 else: 

1000 # Can't assess quality for non-array data 

1001 return result 

1002 

1003 # Score the result 

1004 quality_score = score_analysis_result( 

1005 result=result, 

1006 method_name=method_name, 

1007 data=raw_data, 

1008 ) 

1009 

1010 # Attach quality score to result if it's a dict 

1011 if isinstance(result, dict): 

1012 result["_quality_score"] = quality_score.to_dict() 

1013 # For other types, wrap in dict 

1014 elif result is not None: 

1015 return { 

1016 "value": result, 

1017 "_quality_score": quality_score.to_dict(), 

1018 } 

1019 

1020 except Exception as e: 

1021 logger.debug(f"Failed to add quality score: {e}") 

1022 

1023 return result 

1024 

1025 def _prepare_arguments( 

1026 self, func: Callable[..., Any], data: Any 

1027 ) -> tuple[list[Any] | None, dict[str, Any]]: 

1028 """Prepare arguments for an analysis function. 

1029 

1030 Examines the function signature and prepares appropriate arguments 

1031 from the input data. 

1032 

1033 Args: 

1034 func: Function to prepare arguments for. 

1035 data: Input data object. 

1036 

1037 Returns: 

1038 Tuple of (args_list, kwargs_dict), or (None, {}) if not applicable. 

1039 """ 

1040 sig = inspect.signature(func) 

1041 params = list(sig.parameters.keys()) 

1042 

1043 if not params: 1043 ↛ 1044line 1043 didn't jump to line 1044 because the condition on line 1043 was never true

1044 return [], {} 

1045 

1046 first_param = params[0] 

1047 

1048 # Check for EyeDiagram - these functions expect 'eye' parameter 

1049 if hasattr(data, "samples_per_ui") and hasattr(data, "time_axis"): 

1050 # EyeDiagram object 

1051 if first_param == "eye" or "EyeDiagram" in str(sig.parameters.get(first_param, "")): 

1052 return [data], {} 

1053 # Skip functions that don't work with EyeDiagram 

1054 return None, {} 

1055 

1056 # Check for SParameterData 

1057 if hasattr(data, "s_matrix") and hasattr(data, "frequencies"): 

1058 # S-parameter object 

1059 if first_param in ("s_params", "s_param", "s_data", "sparams"): 

1060 return [data], {} 

1061 if "SParameter" in str(sig.parameters.get(first_param, "")): 1061 ↛ 1062line 1061 didn't jump to line 1062 because the condition on line 1061 was never true

1062 return [data], {} 

1063 # Skip functions that don't work with S-params 

1064 return None, {} 

1065 

1066 # Check type annotation for packet-specific handling 

1067 first_param_info = sig.parameters.get(first_param) 

1068 param_annotation = first_param_info.annotation if first_param_info else None 

1069 annotation_str = str(param_annotation) if param_annotation else "" 

1070 

1071 # Handle PACKET domain - convert to PacketInfo objects if needed 

1072 if "PacketInfo" in annotation_str or first_param == "packets": 1072 ↛ 1073line 1072 didn't jump to line 1073 because the condition on line 1072 was never true

1073 if isinstance(data, list): 

1074 # Check if already PacketInfo objects 

1075 if data and hasattr(data[0], "timestamp"): 

1076 return [data], {} 

1077 # Convert list of dicts to PacketInfo objects 

1078 elif data and isinstance(data[0], dict): 

1079 try: 

1080 from tracekit.analyzers.packet.metrics import PacketInfo 

1081 

1082 packets = [ 

1083 PacketInfo( 

1084 timestamp=p.get("timestamp", 0.0), 

1085 size=p.get("size", 0), 

1086 sequence=p.get("sequence"), 

1087 ) 

1088 for p in data 

1089 ] 

1090 return [packets], {} 

1091 except Exception as e: 

1092 logger.debug(f"Failed to convert to PacketInfo: {e}") 

1093 return None, {} 

1094 return None, {} 

1095 

1096 # Check if data is a trace-like object (has .data and .metadata) 

1097 is_trace = hasattr(data, "data") and hasattr(data, "metadata") 

1098 

1099 # Extract raw data and sample rate using config-aware method 

1100 if is_trace: 1100 ↛ 1103line 1100 didn't jump to line 1103 because the condition on line 1100 was always true

1101 raw_data = data.data 

1102 sample_rate = self._get_effective_sample_rate(data, context="general") 

1103 elif isinstance(data, np.ndarray): 

1104 raw_data = data 

1105 sample_rate = self._get_effective_sample_rate(data, context="general") 

1106 elif isinstance(data, bytes | bytearray): 

1107 raw_data = np.frombuffer(data, dtype=np.uint8) 

1108 sample_rate = self._get_effective_sample_rate(data, context="binary") 

1109 else: 

1110 # Try to convert to array 

1111 try: 

1112 raw_data = np.array(data) if hasattr(data, "__iter__") else None 

1113 except (ValueError, TypeError): 

1114 raw_data = None 

1115 sample_rate = self._get_effective_sample_rate(data, context="general") 

1116 

1117 if raw_data is None or (hasattr(raw_data, "__len__") and len(raw_data) == 0): 1117 ↛ 1118line 1117 didn't jump to line 1118 because the condition on line 1117 was never true

1118 return None, {} 

1119 

1120 kwargs: dict[str, Any] = {} 

1121 

1122 # Common parameter mappings 

1123 if "sample_rate" in params: 

1124 kwargs["sample_rate"] = sample_rate 

1125 if "fs" in params: 1125 ↛ 1126line 1125 didn't jump to line 1126 because the condition on line 1125 was never true

1126 kwargs["fs"] = sample_rate 

1127 if "rate" in params: 1127 ↛ 1128line 1127 didn't jump to line 1128 because the condition on line 1127 was never true

1128 kwargs["rate"] = sample_rate 

1129 

1130 # DIGITAL domain parameter auto-detection 

1131 if "baud_rate" in params: 1131 ↛ 1133line 1131 didn't jump to line 1133 because the condition on line 1131 was never true

1132 # Check if baud_rate parameter has a default value 

1133 param_info = sig.parameters.get("baud_rate") 

1134 has_default = ( 

1135 param_info is not None and param_info.default is not inspect.Parameter.empty 

1136 ) 

1137 

1138 # Only auto-detect if no default or default is None 

1139 if not has_default or (param_info and param_info.default is None): 

1140 # Try to detect from filename 

1141 detected_baud = self._detect_baud_rate_from_filename(self._input_path) 

1142 if detected_baud is not None: 

1143 kwargs["baud_rate"] = detected_baud 

1144 

1145 if "logic_family" in params: 1145 ↛ 1147line 1145 didn't jump to line 1147 because the condition on line 1145 was never true

1146 # Check if logic_family parameter has a default value 

1147 param_info = sig.parameters.get("logic_family") 

1148 has_default = ( 

1149 param_info is not None and param_info.default is not inspect.Parameter.empty 

1150 ) 

1151 

1152 # Only auto-detect if no default or default is "auto" 

1153 if not has_default or (param_info and param_info.default in (None, "auto")): 

1154 # Try to detect from voltage levels 

1155 try: 

1156 detected_family = self._detect_logic_family(raw_data) 

1157 kwargs["logic_family"] = detected_family 

1158 except Exception as e: 

1159 logger.debug(f"Could not auto-detect logic family: {e}") 

1160 

1161 # Auto-detect frequency range for frequency-related parameters 

1162 if "freq_min" in params or "freq_max" in params: 1162 ↛ 1163line 1162 didn't jump to line 1163 because the condition on line 1162 was never true

1163 try: 

1164 freq_range = self._detect_frequency_range(raw_data, sample_rate) 

1165 if freq_range is not None: 

1166 min_freq, max_freq = freq_range 

1167 if "freq_min" in params: 

1168 param_info = sig.parameters.get("freq_min") 

1169 has_default = ( 

1170 param_info is not None 

1171 and param_info.default is not inspect.Parameter.empty 

1172 ) 

1173 if not has_default or (param_info and param_info.default is None): 

1174 kwargs["freq_min"] = min_freq 

1175 if "freq_max" in params: 

1176 param_info = sig.parameters.get("freq_max") 

1177 has_default = ( 

1178 param_info is not None 

1179 and param_info.default is not inspect.Parameter.empty 

1180 ) 

1181 if not has_default or (param_info and param_info.default is None): 

1182 kwargs["freq_max"] = max_freq 

1183 except Exception as e: 

1184 logger.debug(f"Could not auto-detect frequency range: {e}") 

1185 

1186 # Auto-detect noise floor for threshold parameters 

1187 if "noise_threshold" in params or "snr_threshold" in params: 1187 ↛ 1188line 1187 didn't jump to line 1188 because the condition on line 1187 was never true

1188 try: 

1189 noise_floor = self._detect_noise_floor(raw_data) 

1190 if noise_floor is not None: 

1191 if "noise_threshold" in params: 

1192 param_info = sig.parameters.get("noise_threshold") 

1193 has_default = ( 

1194 param_info is not None 

1195 and param_info.default is not inspect.Parameter.empty 

1196 ) 

1197 if not has_default or (param_info and param_info.default is None): 

1198 # Set threshold to 3 sigma (99.7% confidence) 

1199 kwargs["noise_threshold"] = noise_floor * 3.0 

1200 if "snr_threshold" in params: 

1201 param_info = sig.parameters.get("snr_threshold") 

1202 has_default = ( 

1203 param_info is not None 

1204 and param_info.default is not inspect.Parameter.empty 

1205 ) 

1206 if not has_default or (param_info and param_info.default is None): 

1207 # Calculate signal RMS and set reasonable SNR threshold 

1208 signal_rms = float(np.std(raw_data)) 

1209 if noise_floor > 0: 

1210 detected_snr = signal_rms / noise_floor 

1211 # Use half the detected SNR as threshold 

1212 kwargs["snr_threshold"] = detected_snr / 2.0 

1213 except Exception as e: 

1214 logger.debug(f"Could not auto-detect noise floor: {e}") 

1215 

1216 # Use protocol hints to assist with baud rate detection if not already set 

1217 if "baud_rate" in params and "baud_rate" not in kwargs: 1217 ↛ 1218line 1217 didn't jump to line 1218 because the condition on line 1217 was never true

1218 try: 

1219 protocol_hints = self._detect_protocol_hints(raw_data, sample_rate) 

1220 if "detected_baud" in protocol_hints: 

1221 param_info = sig.parameters.get("baud_rate") 

1222 has_default = ( 

1223 param_info is not None and param_info.default is not inspect.Parameter.empty 

1224 ) 

1225 if not has_default or (param_info and param_info.default is None): 

1226 kwargs["baud_rate"] = protocol_hints["detected_baud"] 

1227 logger.debug( 

1228 f"Using protocol-detected baud rate: {protocol_hints['detected_baud']} bps" 

1229 ) 

1230 except Exception as e: 

1231 logger.debug(f"Could not use protocol hints for baud detection: {e}") 

1232 

1233 # Add intelligent defaults for common missing parameters (data-dependent) 

1234 data_length = len(raw_data) if hasattr(raw_data, "__len__") else 0 

1235 

1236 if "window_size" in params: 1236 ↛ 1237line 1236 didn't jump to line 1237 because the condition on line 1236 was never true

1237 param_info = sig.parameters.get("window_size") 

1238 has_default = ( 

1239 param_info is not None and param_info.default is not inspect.Parameter.empty 

1240 ) 

1241 if not has_default and "window_size" not in kwargs: 

1242 # Default to 10% of signal length, minimum 10 samples 

1243 kwargs["window_size"] = max(10, data_length // 10) 

1244 logger.debug(f"Using auto-detected window_size: {kwargs['window_size']}") 

1245 

1246 if "min_width" in params: 1246 ↛ 1247line 1246 didn't jump to line 1247 because the condition on line 1246 was never true

1247 param_info = sig.parameters.get("min_width") 

1248 has_default = ( 

1249 param_info is not None and param_info.default is not inspect.Parameter.empty 

1250 ) 

1251 if not has_default and "min_width" not in kwargs: 

1252 # Default to 10 samples in time, minimum 1ns 

1253 kwargs["min_width"] = max(1e-9, 10.0 / sample_rate) 

1254 logger.debug(f"Using auto-detected min_width: {kwargs['min_width']:.2e}s") 

1255 

1256 if "max_width" in params: 1256 ↛ 1257line 1256 didn't jump to line 1257 because the condition on line 1256 was never true

1257 param_info = sig.parameters.get("max_width") 

1258 has_default = ( 

1259 param_info is not None and param_info.default is not inspect.Parameter.empty 

1260 ) 

1261 if not has_default and "max_width" not in kwargs: 

1262 # Default to signal duration, maximum 1ms 

1263 total_duration = data_length / sample_rate if data_length > 0 else 1e-3 

1264 kwargs["max_width"] = min(1e-3, total_duration) 

1265 logger.debug(f"Using auto-detected max_width: {kwargs['max_width']:.2e}s") 

1266 

1267 if "threshold" in params and "threshold" not in kwargs: 1267 ↛ 1268line 1267 didn't jump to line 1268 because the condition on line 1267 was never true

1268 param_info = sig.parameters.get("threshold") 

1269 has_default = ( 

1270 param_info is not None and param_info.default is not inspect.Parameter.empty 

1271 ) 

1272 if not has_default or (param_info and param_info.default in (None, "auto")): 

1273 # Auto-detect threshold from histogram or use midpoint 

1274 try: 

1275 if isinstance(raw_data, np.ndarray) and raw_data.size > 0: 

1276 kwargs["threshold"] = float(np.median(raw_data)) 

1277 logger.debug(f"Using auto-detected threshold: {kwargs['threshold']:.3f}") 

1278 except Exception as e: 

1279 logger.debug(f"Could not auto-detect threshold: {e}") 

1280 

1281 if "window_duration" in params: 1281 ↛ 1282line 1281 didn't jump to line 1282 because the condition on line 1281 was never true

1282 param_info = sig.parameters.get("window_duration") 

1283 has_default = ( 

1284 param_info is not None and param_info.default is not inspect.Parameter.empty 

1285 ) 

1286 if not has_default and "window_duration" not in kwargs: 

1287 # Default to 1 second or total duration / 10 

1288 total_duration = data_length / sample_rate if data_length > 0 else 1.0 

1289 kwargs["window_duration"] = min(1.0, total_duration / 10.0) 

1290 logger.debug( 

1291 f"Using auto-detected window_duration: {kwargs['window_duration']:.3f}s" 

1292 ) 

1293 

1294 # Create trace wrapper if function expects trace but we have raw array 

1295 if not is_trace and ( 1295 ↛ 1299line 1295 didn't jump to line 1299 because the condition on line 1295 was never true

1296 "Trace" in annotation_str or "WaveformTrace" in annotation_str or first_param == "trace" 

1297 ): 

1298 # Wrap raw array in WaveformTrace with default metadata 

1299 try: 

1300 # Convert memoryview to ndarray if needed 

1301 trace_data = np.asarray(raw_data) if isinstance(raw_data, memoryview) else raw_data 

1302 metadata = TraceMetadata(sample_rate=sample_rate) 

1303 data = WaveformTrace(data=trace_data, metadata=metadata) 

1304 is_trace = True 

1305 logger.debug("Created WaveformTrace wrapper for raw array data") 

1306 except Exception as e: 

1307 logger.debug(f"Could not create trace wrapper: {e}") 

1308 return None, {} 

1309 

1310 # If function expects WaveformTrace and we have a trace, pass it directly 

1311 if is_trace and ( 

1312 "Trace" in annotation_str or "WaveformTrace" in annotation_str or first_param == "trace" 

1313 ): 

1314 return [data], kwargs 

1315 

1316 if first_param in ("data", "signal", "x", "samples", "waveform"): 1316 ↛ 1318line 1316 didn't jump to line 1318 because the condition on line 1316 was always true

1317 return [raw_data], kwargs 

1318 elif first_param == "trace" and not is_trace: 

1319 # Function expects trace but we don't have one, skip 

1320 return None, {} 

1321 elif first_param == "edges": 

1322 # Jitter/timing functions need edge timestamps 

1323 # Try to detect edges from the data 

1324 try: 

1325 from tracekit.analyzers.digital import detect_edges 

1326 

1327 if is_trace: 

1328 edges = detect_edges(data) 

1329 edge_times = edges.tolist() if len(edges) > 0 else [] 

1330 if len(edge_times) < 3: 

1331 return None, {} 

1332 return [edge_times], kwargs 

1333 except Exception: 

1334 return None, {} 

1335 elif first_param == "periods": 

1336 # Timing functions need period measurements 

1337 # Compute periods from waveform or digital trace 

1338 try: 

1339 if is_trace: 

1340 from tracekit.analyzers.waveform.measurements import period 

1341 

1342 # Get all periods 

1343 periods_result = period(data, return_all=True) 

1344 if isinstance(periods_result, np.ndarray) and len(periods_result) >= 3: 

1345 return [periods_result], kwargs 

1346 return None, {} 

1347 except Exception as e: 

1348 logger.debug(f"Could not compute periods: {e}") 

1349 return None, {} 

1350 elif first_param in ("stream", "data") and "bytes" in annotation_str: 

1351 # Functions expecting bytes - convert ndarray to bytes if needed 

1352 if isinstance(data, bytes | bytearray): 

1353 return [data], kwargs 

1354 elif isinstance(raw_data, np.ndarray): 

1355 return [raw_data.astype(np.uint8).tobytes()], kwargs 

1356 else: 

1357 return None, {} 

1358 elif first_param == "bytes" or (first_param == "data" and "bytes" in str(sig)): 

1359 # Entropy/binary functions need bytes 

1360 if isinstance(data, bytes | bytearray): 

1361 return [data], kwargs 

1362 elif hasattr(raw_data, "astype"): 

1363 return [raw_data.astype(np.uint8).tobytes()], kwargs 

1364 else: 

1365 return None, {} 

1366 

1367 # Default: pass raw data 

1368 return [raw_data], kwargs 

1369 

1370 

1371__all__ = [ 

1372 "AnalysisEngine", 

1373]