Coverage for src / tracekit / reporting / engine.py: 41%
574 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Analysis Engine for orchestrating comprehensive analysis execution.
3This module provides the AnalysisEngine class that orchestrates running all
4applicable analyses on input data, handling progress tracking, timeouts,
5and error collection.
6"""
8from __future__ import annotations
10import importlib
11import inspect
12import logging
13import time
14import traceback
15import types
16from collections.abc import Callable
17from pathlib import Path
18from typing import Any
20import numpy as np
22from tracekit.core.types import TraceMetadata, WaveformTrace
23from tracekit.reporting.config import (
24 ANALYSIS_CAPABILITIES,
25 AnalysisConfig,
26 AnalysisDomain,
27 AnalysisError,
28 InputType,
29 ProgressInfo,
30 get_available_analyses,
31)
33logger = logging.getLogger(__name__)
36# Functions that require context-specific parameters that cannot be auto-detected
37NON_INFERRABLE_FUNCTIONS: set[str] = {
38 # INFERENCE domain - require specific data types
39 "tracekit.inference.protocol_dsl.decode_protocol",
40 "tracekit.inference.protocol_dsl.match_pattern",
41 "tracekit.inference.protocol_dsl.validate_message",
42 # PACKET domain - require PacketInfo objects
43 "tracekit.analyzers.packet.timing.analyze_inter_packet_timing",
44 "tracekit.analyzers.packet.timing.detect_bursts",
45 # POWER domain - require voltage+current pairs
46 "tracekit.analyzers.power.consumption.calculate_power",
47 "tracekit.analyzers.power.consumption.analyze_power_efficiency",
48}
51class AnalysisEngine:
52 """Engine for orchestrating comprehensive analysis execution.
54 The AnalysisEngine accepts input data (from file or in-memory), detects
55 the input type, determines applicable analysis domains, and executes
56 all relevant analysis functions with progress tracking and error handling.
58 Example:
59 >>> from tracekit.reporting import AnalysisEngine, AnalysisConfig
60 >>> config = AnalysisConfig(timeout_per_analysis=30.0)
61 >>> engine = AnalysisEngine(config)
62 >>> result = engine.run(input_path=Path("data.wfm"))
63 >>> print(f"Ran {result['stats']['total_analyses']} analyses")
64 >>> print(f"Success rate: {result['stats']['success_rate']:.1f}%")
65 """
67 def __init__(self, config: AnalysisConfig | None = None) -> None:
68 """Initialize the analysis engine.
70 Args:
71 config: Analysis configuration. If None, uses defaults.
72 """
73 self.config = config or AnalysisConfig()
74 self._start_time = 0.0
75 self._input_path: Path | None = None
77 def detect_input_type(self, input_path: Path | None, data: Any) -> InputType:
78 """Detect input type from file path or data characteristics.
80 Args:
81 input_path: Path to input file (None if in-memory data).
82 data: Input data object.
84 Returns:
85 Detected input type.
87 Raises:
88 ValueError: If input type cannot be determined.
89 """
90 # If path provided, detect from extension
91 if input_path is not None:
92 ext = input_path.suffix.lower()
94 # Waveform formats
95 if ext in {".wfm", ".csv", ".npz", ".h5", ".hdf5", ".wav", ".tdms"}:
96 return InputType.WAVEFORM
97 # Digital formats
98 elif ext in {".vcd", ".sr"}:
99 return InputType.DIGITAL
100 # Packet formats
101 elif ext in {".pcap", ".pcapng"}:
102 return InputType.PCAP
103 # Binary formats
104 elif ext in {".bin", ".raw"}: 104 ↛ 107line 104 didn't jump to line 107 because the condition on line 104 was always true
105 return InputType.BINARY
106 # S-parameter/Touchstone formats
107 elif ext in {".s1p", ".s2p", ".s3p", ".s4p", ".s5p", ".s6p", ".s7p", ".s8p"}:
108 return InputType.SPARAMS
110 # Detect from data object characteristics
111 if hasattr(data, "s_matrix") and hasattr(data, "frequencies"):
112 # SParameterData
113 return InputType.SPARAMS
114 elif hasattr(data, "data") and hasattr(data, "metadata"):
115 # WaveformTrace or DigitalTrace
116 if hasattr(data.metadata, "is_digital") and data.metadata.is_digital:
117 return InputType.DIGITAL
118 return InputType.WAVEFORM
119 elif isinstance(data, bytes | bytearray):
120 return InputType.BINARY
121 elif isinstance(data, list):
122 # Assume packet list
123 return InputType.PACKETS
124 elif isinstance(data, np.ndarray): 124 ↛ 128line 124 didn't jump to line 128 because the condition on line 124 was always true
125 # Assume waveform
126 return InputType.WAVEFORM
128 raise ValueError("Unable to determine input type from path or data characteristics")
130 def run(
131 self,
132 input_path: Path | None = None,
133 data: Any = None,
134 progress_callback: Callable[[ProgressInfo], None] | None = None,
135 ) -> dict[str, Any]:
136 """Run comprehensive analysis on input data.
138 Args:
139 input_path: Path to input file (or None for in-memory data).
140 data: Input data object (or None to load from input_path).
141 progress_callback: Optional callback for progress updates.
143 Returns:
144 Dictionary with keys:
145 - 'results': Dict mapping AnalysisDomain to analysis results
146 - 'errors': List of AnalysisError objects
147 - 'stats': Execution statistics dict
149 Raises:
150 ValueError: If neither input_path nor data provided.
151 FileNotFoundError: If input_path doesn't exist.
153 Example:
154 >>> def progress(info: ProgressInfo):
155 ... print(f"{info.phase}: {info.percent:.1f}%")
156 >>> result = engine.run(input_path=Path("data.wfm"), progress_callback=progress)
157 """
158 if input_path is None and data is None:
159 raise ValueError("Must provide either input_path or data")
161 self._start_time = time.time()
162 self._input_path = input_path
164 # Check available memory and adjust parallelism if needed
165 from tracekit.core.memory_guard import check_memory_available
167 min_required_mb = 500 # Minimum 500MB needed for analysis
168 if not check_memory_available(min_required_mb): 168 ↛ 169line 168 didn't jump to line 169 because the condition on line 168 was never true
169 logger.warning(
170 f"Low memory available (< {min_required_mb} MB). "
171 f"Reducing parallel workers to conserve memory."
172 )
173 # Temporarily reduce parallelism to conserve memory
174 self.config.parallel_domains = False
176 # Phase 1: Load data
177 if progress_callback:
178 progress_callback(
179 ProgressInfo(
180 phase="loading",
181 domain=None,
182 function=None,
183 percent=0.0,
184 message="Loading input data",
185 elapsed_seconds=0.0,
186 estimated_remaining_seconds=None,
187 )
188 )
190 if data is None:
191 if input_path is None or not input_path.exists(): 191 ↛ 195line 191 didn't jump to line 195 because the condition on line 191 was always true
192 raise FileNotFoundError(f"Input file not found: {input_path}")
194 # Load using tracekit loaders
195 from tracekit.loaders import load
197 data = load(input_path)
199 # Phase 2: Detect input type
200 input_type = self.detect_input_type(input_path, data)
202 if progress_callback:
203 progress_callback(
204 ProgressInfo(
205 phase="detecting",
206 domain=None,
207 function=None,
208 percent=5.0,
209 message=f"Detected input type: {input_type.value}",
210 elapsed_seconds=time.time() - self._start_time,
211 estimated_remaining_seconds=None,
212 )
213 )
215 # Phase 3: Determine applicable domains
216 applicable_domains = get_available_analyses(input_type)
218 # Filter by configuration
219 enabled_domains = [d for d in applicable_domains if self.config.is_domain_enabled(d)]
221 if progress_callback:
222 progress_callback(
223 ProgressInfo(
224 phase="planning",
225 domain=None,
226 function=None,
227 percent=10.0,
228 message=f"Planning analysis across {len(enabled_domains)} domains",
229 elapsed_seconds=time.time() - self._start_time,
230 estimated_remaining_seconds=None,
231 )
232 )
234 # Phase 4: Execute analyses
235 results: dict[AnalysisDomain, dict[str, Any]] = {}
236 errors: list[AnalysisError] = []
238 total_domains = len(enabled_domains)
240 # Execute domains in parallel if enabled and multiple domains exist
241 if self.config.parallel_domains and len(enabled_domains) > 1:
242 import concurrent.futures
244 # Use ThreadPoolExecutor with bounded workers from config
245 max_workers = min(self.config.max_parallel_workers, len(enabled_domains))
247 with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
248 # Submit all domain executions
249 futures = {
250 executor.submit(self._execute_domain, domain, data): domain
251 for domain in enabled_domains
252 }
254 # Process results as they complete
255 for completed, future in enumerate(concurrent.futures.as_completed(futures), 1):
256 domain = futures[future]
257 domain_percent = 10.0 + (completed / total_domains) * 80.0
259 if progress_callback: 259 ↛ 260line 259 didn't jump to line 260 because the condition on line 259 was never true
260 progress_callback(
261 ProgressInfo(
262 phase="analyzing",
263 domain=domain,
264 function=None,
265 percent=domain_percent,
266 message=f"Completed domain: {domain.value}",
267 elapsed_seconds=time.time() - self._start_time,
268 estimated_remaining_seconds=None,
269 )
270 )
272 try:
273 # Retrieve result with timeout
274 timeout_seconds = self.config.timeout_per_analysis or 30.0
275 domain_results, domain_errors = future.result(timeout=timeout_seconds * 10)
276 if domain_results: 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true
277 results[domain] = domain_results
278 errors.extend(domain_errors)
279 except concurrent.futures.TimeoutError:
280 logger.error(f"Domain {domain.value} exceeded timeout")
281 errors.append(
282 AnalysisError(
283 domain=domain,
284 function=f"{domain.value}.*",
285 error_type="TimeoutError",
286 error_message="Domain execution exceeded timeout",
287 traceback=None,
288 duration_ms=timeout_seconds * 10 * 1000,
289 )
290 )
291 except Exception as e:
292 logger.error(f"Domain {domain.value} failed: {e}")
293 errors.append(
294 AnalysisError(
295 domain=domain,
296 function=f"{domain.value}.*",
297 error_type=type(e).__name__,
298 error_message=str(e),
299 traceback=traceback.format_exc(),
300 duration_ms=0.0,
301 )
302 )
303 else:
304 # Sequential fallback (existing code)
305 for idx, domain in enumerate(enabled_domains):
306 domain_percent = 10.0 + (idx / total_domains) * 80.0
308 if progress_callback:
309 progress_callback(
310 ProgressInfo(
311 phase="analyzing",
312 domain=domain,
313 function=None,
314 percent=domain_percent,
315 message=f"Analyzing domain: {domain.value}",
316 elapsed_seconds=time.time() - self._start_time,
317 estimated_remaining_seconds=None,
318 )
319 )
321 domain_results, domain_errors = self._execute_domain(domain, data)
322 if domain_results: 322 ↛ 323line 322 didn't jump to line 323 because the condition on line 322 was never true
323 results[domain] = domain_results
324 errors.extend(domain_errors)
326 # Phase 5: Complete
327 total_duration = time.time() - self._start_time
329 if progress_callback:
330 progress_callback(
331 ProgressInfo(
332 phase="complete",
333 domain=None,
334 function=None,
335 percent=100.0,
336 message="Analysis complete",
337 elapsed_seconds=total_duration,
338 estimated_remaining_seconds=0.0,
339 )
340 )
342 # Calculate statistics
343 total_analyses = sum(len(dr) for dr in results.values())
344 successful_analyses = sum(
345 1 for dr in results.values() for v in dr.values() if not isinstance(v, Exception)
346 )
347 failed_analyses = len(errors)
349 stats = {
350 "input_type": input_type.value,
351 "total_domains": len(enabled_domains),
352 "total_analyses": total_analyses,
353 "successful_analyses": successful_analyses,
354 "failed_analyses": failed_analyses,
355 "success_rate": (successful_analyses / total_analyses * 100.0)
356 if total_analyses > 0
357 else 0.0,
358 "duration_seconds": total_duration,
359 }
361 return {
362 "results": results,
363 "errors": errors,
364 "stats": stats,
365 }
367 def _execute_domain(
368 self, domain: AnalysisDomain, data: Any
369 ) -> tuple[dict[str, Any], list[AnalysisError]]:
370 """Execute all analyses for a specific domain.
372 Args:
373 domain: Analysis domain to execute.
374 data: Input data object.
376 Returns:
377 Tuple of (results_dict, errors_list).
378 """
379 results: dict[str, Any] = {}
380 errors: list[AnalysisError] = []
382 # Preprocess data for specific domains
383 data = self._preprocess_for_domain(domain, data)
385 # Get domain capabilities
386 cap = ANALYSIS_CAPABILITIES.get(domain, {})
387 module_names = cap.get("modules", [])
389 # Fallback to old single-module format
390 if not module_names: 390 ↛ 391line 390 didn't jump to line 391 because the condition on line 390 was never true
391 single_module = cap.get("module", "")
392 if single_module:
393 module_names = [single_module]
395 if not module_names: 395 ↛ 396line 395 didn't jump to line 396 because the condition on line 395 was never true
396 logger.debug(f"No modules configured for domain {domain.value}")
397 return results, errors
399 # Get domain-specific config
400 domain_config = self.config.get_domain_config(domain)
401 timeout = domain_config.timeout or self.config.timeout_per_analysis
403 # Track executed functions to prevent duplicates
404 executed_functions: set[str] = set()
406 # Iterate through all modules for this domain
407 for module_name in module_names:
408 try:
409 module = importlib.import_module(module_name)
410 except ImportError as e:
411 logger.warning(f"Failed to import module {module_name}: {e}")
412 if not self.config.continue_on_error: 412 ↛ 413line 412 didn't jump to line 413 because the condition on line 412 was never true
413 errors.append(
414 AnalysisError(
415 domain=domain,
416 function=module_name,
417 error_type="ImportError",
418 error_message=str(e),
419 traceback=traceback.format_exc(),
420 duration_ms=0.0,
421 )
422 )
423 continue
425 # Discover public functions in the module
426 for func_name, func_obj in inspect.getmembers(module):
427 # Skip private functions and non-functions
428 if func_name.startswith("_") or not inspect.isfunction(func_obj): 428 ↛ 432line 428 didn't jump to line 432 because the condition on line 428 was always true
429 continue
431 # Skip functions not defined in this module (imported from elsewhere)
432 if func_obj.__module__ != module_name:
433 continue
435 # Skip if already executed (prevent duplicates)
436 func_path = f"{module_name}.{func_name}"
437 if func_path in executed_functions:
438 logger.debug(f"Skipping duplicate function: {func_path}")
439 continue
440 executed_functions.add(func_path)
442 # Execute the function
443 try:
444 result = self._execute_function(module_name, func_name, data, timeout)
445 results[f"{module_name}.{func_name}"] = result
446 except Exception as e:
447 error = AnalysisError(
448 domain=domain,
449 function=f"{module_name}.{func_name}",
450 error_type=type(e).__name__,
451 error_message=str(e),
452 traceback=traceback.format_exc(),
453 duration_ms=0.0,
454 )
455 errors.append(error)
457 if not self.config.continue_on_error:
458 # Stop execution for this domain
459 return results, errors
461 return results, errors
463 def _preprocess_for_domain(self, domain: AnalysisDomain, data: Any) -> Any:
464 """Preprocess data for domain-specific requirements.
466 Some domains require specialized data structures. This method
467 converts raw data into the appropriate format.
469 Args:
470 domain: Target analysis domain.
471 data: Input data object.
473 Returns:
474 Preprocessed data suitable for the domain.
475 """
476 if domain == AnalysisDomain.EYE:
477 # EYE domain requires an EyeDiagram object
478 # Try to generate one from waveform data
479 return self._preprocess_for_eye_domain(data)
481 return data
483 def _get_effective_sample_rate(self, data: Any, context: str = "general") -> float:
484 """Get effective sample rate from data metadata or config defaults.
486 Priority order:
487 1. Data metadata (e.g., WaveformTrace.metadata.sample_rate)
488 2. AnalysisConfig.default_sample_rate
489 3. Context-appropriate default constant
491 Args:
492 data: Input data object (may have .metadata.sample_rate).
493 context: Analysis context for selecting appropriate default.
494 Options: "general" (1 MHz), "highspeed" (1 GHz), "binary" (1 Hz).
496 Returns:
497 Effective sample rate in Hz.
499 Note:
500 This method logs a debug message when falling back to defaults,
501 as sample rate should ideally be provided in the data metadata
502 for accurate time-domain analysis.
503 """
504 # Try to extract from data metadata
505 data_sample_rate = None
506 if hasattr(data, "metadata") and hasattr(data.metadata, "sample_rate"):
507 data_sample_rate = data.metadata.sample_rate
508 if data_sample_rate is not None and data_sample_rate > 0: 508 ↛ 512line 508 didn't jump to line 512 because the condition on line 508 was always true
509 return float(data_sample_rate)
511 # Use config's get_effective_sample_rate method
512 effective_rate = self.config.get_effective_sample_rate(
513 data_sample_rate=data_sample_rate,
514 context=context,
515 )
517 # Log when using defaults (indicates missing metadata)
518 logger.debug(
519 f"Using default sample rate {effective_rate:.2e} Hz (context: {context}). "
520 f"For accurate analysis, provide sample_rate in data metadata."
521 )
523 return effective_rate
525 def _preprocess_for_eye_domain(self, data: Any) -> Any:
526 """Preprocess data for eye diagram analysis.
528 Attempts to generate an EyeDiagram from waveform data using
529 automatic unit interval detection via FFT-based period detection
530 with fallback to zero-crossing analysis.
532 Args:
533 data: Input waveform data.
535 Returns:
536 EyeDiagram object if successful, original data otherwise.
537 """
538 # Check if already an EyeDiagram
539 if hasattr(data, "samples_per_ui") and hasattr(data, "time_axis"):
540 return data
542 # Try to extract waveform data
543 if hasattr(data, "data") and hasattr(data, "metadata"):
544 # WaveformTrace
545 raw_data = data.data
546 sample_rate = getattr(data.metadata, "sample_rate", None)
547 elif isinstance(data, np.ndarray): 547 ↛ 552line 547 didn't jump to line 552 because the condition on line 547 was always true
548 raw_data = data
549 sample_rate = None
550 else:
551 # Can't preprocess, return as-is
552 return data
554 if raw_data is None or len(raw_data) == 0:
555 return data
557 try:
558 from tracekit.analyzers.eye.diagram import generate_eye
559 from tracekit.core.types import TraceMetadata, WaveformTrace
561 # Get effective sample rate using config-aware method
562 # Use "highspeed" context for eye diagram (typically high-speed serial)
563 if sample_rate is None or sample_rate <= 0:
564 sample_rate = self._get_effective_sample_rate(data, context="highspeed")
566 # Estimate unit interval using FFT-based period detection
567 unit_interval = self._detect_unit_interval_fft(raw_data, sample_rate)
569 # If FFT detection fails, try zero-crossing analysis
570 if unit_interval is None:
571 unit_interval = self._detect_unit_interval_zero_crossing(raw_data, sample_rate)
573 # If both methods fail, use default fallback
574 if unit_interval is None:
575 # Fallback: assume 100 UI in the data
576 unit_interval = len(raw_data) / sample_rate / 100
577 logger.debug("Using default unit interval fallback (100 UI in data)")
579 # Ensure unit interval is reasonable
580 min_ui = 10 / sample_rate # At least 10 samples per UI
581 max_ui = len(raw_data) / sample_rate / 10 # At least 10 UI in data
582 unit_interval = np.clip(unit_interval, min_ui, max_ui)
584 # Create a WaveformTrace if we only have raw data
585 if not hasattr(data, "data"): 585 ↛ 586line 585 didn't jump to line 586 because the condition on line 585 was never true
586 metadata = TraceMetadata(sample_rate=sample_rate)
587 trace = WaveformTrace(data=raw_data.astype(np.float64), metadata=metadata)
588 else:
589 trace = data
591 # Generate eye diagram
592 eye_diagram = generate_eye(
593 trace=trace,
594 unit_interval=unit_interval,
595 n_ui=2,
596 generate_histogram=True,
597 )
599 logger.debug(
600 f"Generated eye diagram: {eye_diagram.n_traces} traces, "
601 f"{eye_diagram.samples_per_ui} samples/UI"
602 )
603 return eye_diagram
605 except Exception as e:
606 logger.debug(f"Could not generate eye diagram: {e}")
607 # Return original data if preprocessing fails
608 return data
610 def _detect_unit_interval_fft(
611 self, raw_data: np.ndarray[Any, Any], sample_rate: float
612 ) -> float | None:
613 """Detect unit interval using FFT-based period detection.
615 Computes the FFT of the waveform, finds the dominant frequency
616 (excluding DC), and calculates the unit interval for NRZ data.
618 Args:
619 raw_data: Input waveform samples.
620 sample_rate: Sample rate in Hz.
622 Returns:
623 Estimated unit interval in seconds, or None if detection fails.
624 """
625 try:
626 # Remove DC component
627 data_ac = raw_data - np.mean(raw_data)
629 # Compute FFT
630 fft_result = np.fft.rfft(data_ac)
631 fft_freqs = np.fft.rfftfreq(len(data_ac), d=1.0 / sample_rate)
633 # Get magnitude spectrum (exclude DC bin at index 0)
634 magnitude = np.abs(fft_result[1:])
635 freqs = fft_freqs[1:]
637 if len(magnitude) == 0: 637 ↛ 638line 637 didn't jump to line 638 because the condition on line 637 was never true
638 return None
640 # Find dominant frequency (peak in magnitude spectrum)
641 peak_idx = np.argmax(magnitude)
642 dominant_freq = freqs[peak_idx]
644 # For NRZ data, unit interval = 1 / (2 * dominant_freq)
645 # For periodic signals like sine waves, unit interval = 1 / dominant_freq
646 # We'll use the period as the unit interval for general signals
647 if dominant_freq > 0: 647 ↛ 667line 647 didn't jump to line 667 because the condition on line 647 was always true
648 unit_interval = float(1.0 / dominant_freq)
650 # Sanity check: dominant frequency should be reasonable
651 min_freq = sample_rate / len(raw_data) # At least one full cycle
652 max_freq = sample_rate / 20 # At least 20 samples per cycle
654 if min_freq <= dominant_freq <= max_freq:
655 logger.debug(
656 f"FFT detected dominant frequency: {dominant_freq:.2f} Hz, "
657 f"unit interval: {unit_interval * 1e6:.3f} us"
658 )
659 return unit_interval
660 else:
661 logger.debug(
662 f"FFT dominant frequency {dominant_freq:.2f} Hz out of range "
663 f"[{min_freq:.2f}, {max_freq:.2f}] Hz"
664 )
665 return None
667 return None
669 except Exception as e:
670 logger.debug(f"FFT-based unit interval detection failed: {e}")
671 return None
673 def _detect_unit_interval_zero_crossing(
674 self, raw_data: np.ndarray[Any, Any], sample_rate: float
675 ) -> float | None:
676 """Detect unit interval using zero-crossing analysis.
678 Estimates the signal period from the average interval between
679 zero crossings.
681 Args:
682 raw_data: Input waveform samples.
683 sample_rate: Sample rate in Hz.
685 Returns:
686 Estimated unit interval in seconds, or None if detection fails.
687 """
688 try:
689 # Find zero crossings
690 zero_crossings = np.where(np.diff(np.sign(raw_data - np.mean(raw_data))))[0]
692 if len(zero_crossings) > 10:
693 # Estimate period from average crossing interval
694 avg_half_period = float(np.mean(np.diff(zero_crossings))) / sample_rate
695 unit_interval = avg_half_period * 2 # Full period
697 logger.debug(
698 f"Zero-crossing detected unit interval: {unit_interval * 1e6:.3f} us "
699 f"({len(zero_crossings)} crossings)"
700 )
701 return unit_interval
702 else:
703 logger.debug(f"Insufficient zero crossings ({len(zero_crossings)}) for detection")
704 return None
706 except Exception as e:
707 logger.debug(f"Zero-crossing unit interval detection failed: {e}")
708 return None
710 def _detect_baud_rate_from_filename(self, path: Path | None) -> float | None:
711 """Extract baud rate from filename patterns like 'capture_9600baud.vcd'.
713 Supports patterns such as:
714 - 9600baud, 115200baud
715 - 9600_baud, 115200_baud
716 - 1Mbaud, 1.5Mbaud (with M/m prefix for megabaud)
717 - 9600bps, 115200bps
719 Args:
720 path: Path to the input file (None if in-memory data).
722 Returns:
723 Detected baud rate in bps, or None if not detected.
724 """
725 if path is None:
726 return None
728 import re
730 # Match patterns: 9600baud, 115200_baud, 1Mbaud, etc.
731 patterns = [
732 r"(\d+(?:\.\d+)?)[_\s]*[Mm]?baud",
733 r"(\d+(?:\.\d+)?)[_\s]*bps",
734 r"baud[_-]?(\d+)",
735 ]
736 filename = path.stem.lower()
738 for pattern in patterns:
739 match = re.search(pattern, filename, re.IGNORECASE)
740 if match:
741 value = float(match.group(1))
742 # Handle M prefix (megabaud)
743 matched_text = filename[match.start() : match.end()].lower()
744 if "m" in matched_text and "baud" in matched_text:
745 value *= 1_000_000
746 logger.debug(f"Detected baud rate from filename '{path.name}': {value} bps")
747 return value
749 return None
751 def _detect_logic_family(self, data: np.ndarray[Any, Any]) -> str:
752 """Detect logic family from voltage levels.
754 Analyzes the voltage swing in the data to determine the likely
755 logic family standard.
757 Args:
758 data: Input waveform samples (voltage levels).
760 Returns:
761 Detected logic family name (e.g., "TTL", "LVCMOS33", "LVDS").
762 """
763 vmax = float(np.max(data))
764 vmin = float(np.min(data))
765 voltage_swing = vmax - vmin
767 # Classify based on voltage swing
768 if voltage_swing < 1.0:
769 logic_family = "LVDS" # ~0.35V swing
770 elif voltage_swing < 2.0:
771 logic_family = "LVCMOS18" # 1.8V
772 elif voltage_swing < 3.0:
773 logic_family = "LVCMOS25" # 2.5V
774 elif voltage_swing < 4.0:
775 logic_family = "LVCMOS33" # 3.3V
776 else:
777 logic_family = "TTL" # 5V
779 logger.debug(
780 f"Detected logic family from voltage swing {voltage_swing:.2f}V: {logic_family}"
781 )
782 return logic_family
784 def _detect_frequency_range(
785 self, data: np.ndarray[Any, Any], sample_rate: float
786 ) -> tuple[float, float] | None:
787 """Detect dominant frequency range from FFT analysis.
789 Computes the FFT of the input signal and identifies the range
790 of frequencies containing significant spectral content.
792 Args:
793 data: Input waveform samples.
794 sample_rate: Sample rate in Hz.
796 Returns:
797 Tuple of (min_freq, max_freq) in Hz for significant spectral content,
798 or None if detection fails.
799 """
800 try:
801 # Compute FFT
802 fft_result = np.fft.rfft(data - np.mean(data))
803 freqs = np.fft.rfftfreq(len(data), d=1.0 / sample_rate)
804 magnitude = np.abs(fft_result)
806 # Find frequencies with significant power (> 10% of max)
807 threshold = 0.1 * np.max(magnitude)
808 significant = freqs[magnitude > threshold]
810 if len(significant) > 0:
811 min_freq = float(np.min(significant))
812 max_freq = float(np.max(significant))
813 logger.debug(f"Detected frequency range: {min_freq:.2f} Hz - {max_freq:.2f} Hz")
814 return (min_freq, max_freq)
815 return None
816 except Exception as e:
817 logger.debug(f"Frequency range detection failed: {e}")
818 return None
820 def _detect_noise_floor(self, data: np.ndarray[Any, Any]) -> float | None:
821 """Estimate noise floor using median absolute deviation.
823 Uses robust statistical methods to estimate the noise level in
824 the signal, which is useful for setting thresholds in various
825 analysis functions.
827 Args:
828 data: Input waveform samples.
830 Returns:
831 Estimated noise level (standard deviation of noise), or None if detection fails.
832 """
833 try:
834 try:
835 from scipy import stats
837 # Use MAD for robust noise estimation
838 mad = stats.median_abs_deviation(data, scale="normal")
839 logger.debug(f"Detected noise floor (scipy MAD): {mad:.6f}")
840 return float(mad)
841 except ImportError:
842 # Fallback without scipy
843 median = np.median(data)
844 mad = np.median(np.abs(data - median)) * 1.4826
845 logger.debug(f"Detected noise floor (numpy MAD): {mad:.6f}")
846 return float(mad)
847 except Exception as e:
848 logger.debug(f"Noise floor detection failed: {e}")
849 return None
851 def _detect_protocol_hints(
852 self, data: np.ndarray[Any, Any], sample_rate: float
853 ) -> dict[str, Any]:
854 """Detect hints about potential protocols in the signal.
856 Analyzes signal characteristics such as edge timing and periodicity
857 to provide hints about the communication protocol that may be present.
859 Args:
860 data: Input waveform samples.
861 sample_rate: Sample rate in Hz.
863 Returns:
864 Dictionary with detected characteristics such as:
865 - 'detected_baud': Estimated baud rate (if detected)
866 - 'clock_regularity': 'high', 'medium', or 'low'
867 """
868 hints: dict[str, Any] = {}
869 try:
870 # Check for common baud rates by looking at edge timing
871 zero_crossings = np.where(np.diff(np.sign(data - np.mean(data))))[0]
872 if len(zero_crossings) > 10:
873 intervals = np.diff(zero_crossings) / sample_rate
874 avg_interval = float(np.median(intervals))
876 # Map to standard baud rates
877 common_bauds = [
878 300,
879 1200,
880 2400,
881 4800,
882 9600,
883 19200,
884 38400,
885 57600,
886 115200,
887 ]
888 for baud in common_bauds:
889 expected_interval = 1.0 / baud
890 if 0.8 < avg_interval / expected_interval < 1.2:
891 hints["detected_baud"] = baud
892 logger.debug(f"Protocol hint: detected baud rate {baud} bps")
893 break
895 # Check for clock-like periodicity
896 if len(zero_crossings) > 20:
897 interval_std = float(np.std(np.diff(zero_crossings)))
898 if interval_std < 2:
899 regularity = "high"
900 elif interval_std < 5:
901 regularity = "medium"
902 else:
903 regularity = "low"
904 hints["clock_regularity"] = regularity
905 logger.debug(f"Protocol hint: clock regularity {regularity}")
907 except Exception as e:
908 logger.debug(f"Protocol hints detection failed: {e}")
910 return hints
912 def _execute_function(
913 self, module_name: str, func_name: str, data: Any, timeout: float | None
914 ) -> Any:
915 """Execute a single analysis function with quality scoring.
917 Args:
918 module_name: Name of the module containing the function.
919 func_name: Name of the function to execute.
920 data: Input data object.
921 timeout: Timeout in seconds (None for no timeout).
923 Returns:
924 Analysis result with optional quality score attached.
926 Raises:
927 ValueError: If function is non-inferrable or invalid.
928 """
929 # Check if function is in non-inferrable skip list
930 func_path = f"{module_name}.{func_name}"
931 if func_path in NON_INFERRABLE_FUNCTIONS:
932 logger.debug(f"Skipping non-inferrable function: {func_path}")
933 raise ValueError(
934 f"Function {func_path} requires context-specific parameters that cannot be auto-detected"
935 )
937 module = importlib.import_module(module_name)
938 func = getattr(module, func_name)
940 # Prepare function arguments
941 args, kwargs = self._prepare_arguments(func, data)
943 if args is None:
944 # Function not applicable to this data type
945 raise ValueError(f"Function {func_name} not applicable to data type")
947 start_time = time.time()
949 # Execute with timeout if specified
950 if timeout is not None:
951 # Note: Python doesn't have built-in function timeout without threads/processes
952 # For simplicity, we'll just execute directly and check elapsed time afterward
953 # A production implementation would use threading.Timer or signal.alarm
954 result = func(*args, **kwargs)
956 elapsed = time.time() - start_time
957 if elapsed > timeout:
958 logger.warning(
959 f"Function {module_name}.{func_name} exceeded timeout "
960 f"({elapsed:.2f}s > {timeout:.2f}s)"
961 )
962 else:
963 result = func(*args, **kwargs)
965 # Consume generators to avoid serialization issues
966 if isinstance(result, types.GeneratorType):
967 try:
968 result = list(result)
969 logger.debug(f"Consumed generator from {module_name}.{func_name}")
970 except Exception as e:
971 logger.warning(f"Failed to consume generator from {module_name}.{func_name}: {e}")
972 result = f"<generator error: {type(e).__name__}>"
974 # Add quality scoring if enabled in config
975 if self.config.enable_quality_scoring:
976 result = self._add_quality_score(result, func_path, data)
978 return result
980 def _add_quality_score(self, result: Any, method_name: str, data: Any) -> Any:
981 """Add quality score to analysis result.
983 Args:
984 result: Analysis result to score.
985 method_name: Name of the analysis method.
986 data: Input data object.
988 Returns:
989 Result with quality score attached (if applicable).
990 """
991 try:
992 from tracekit.quality import score_analysis_result
994 # Extract raw data array for quality assessment
995 if hasattr(data, "data"):
996 raw_data = data.data
997 elif isinstance(data, np.ndarray):
998 raw_data = data
999 else:
1000 # Can't assess quality for non-array data
1001 return result
1003 # Score the result
1004 quality_score = score_analysis_result(
1005 result=result,
1006 method_name=method_name,
1007 data=raw_data,
1008 )
1010 # Attach quality score to result if it's a dict
1011 if isinstance(result, dict):
1012 result["_quality_score"] = quality_score.to_dict()
1013 # For other types, wrap in dict
1014 elif result is not None:
1015 return {
1016 "value": result,
1017 "_quality_score": quality_score.to_dict(),
1018 }
1020 except Exception as e:
1021 logger.debug(f"Failed to add quality score: {e}")
1023 return result
1025 def _prepare_arguments(
1026 self, func: Callable[..., Any], data: Any
1027 ) -> tuple[list[Any] | None, dict[str, Any]]:
1028 """Prepare arguments for an analysis function.
1030 Examines the function signature and prepares appropriate arguments
1031 from the input data.
1033 Args:
1034 func: Function to prepare arguments for.
1035 data: Input data object.
1037 Returns:
1038 Tuple of (args_list, kwargs_dict), or (None, {}) if not applicable.
1039 """
1040 sig = inspect.signature(func)
1041 params = list(sig.parameters.keys())
1043 if not params: 1043 ↛ 1044line 1043 didn't jump to line 1044 because the condition on line 1043 was never true
1044 return [], {}
1046 first_param = params[0]
1048 # Check for EyeDiagram - these functions expect 'eye' parameter
1049 if hasattr(data, "samples_per_ui") and hasattr(data, "time_axis"):
1050 # EyeDiagram object
1051 if first_param == "eye" or "EyeDiagram" in str(sig.parameters.get(first_param, "")):
1052 return [data], {}
1053 # Skip functions that don't work with EyeDiagram
1054 return None, {}
1056 # Check for SParameterData
1057 if hasattr(data, "s_matrix") and hasattr(data, "frequencies"):
1058 # S-parameter object
1059 if first_param in ("s_params", "s_param", "s_data", "sparams"):
1060 return [data], {}
1061 if "SParameter" in str(sig.parameters.get(first_param, "")): 1061 ↛ 1062line 1061 didn't jump to line 1062 because the condition on line 1061 was never true
1062 return [data], {}
1063 # Skip functions that don't work with S-params
1064 return None, {}
1066 # Check type annotation for packet-specific handling
1067 first_param_info = sig.parameters.get(first_param)
1068 param_annotation = first_param_info.annotation if first_param_info else None
1069 annotation_str = str(param_annotation) if param_annotation else ""
1071 # Handle PACKET domain - convert to PacketInfo objects if needed
1072 if "PacketInfo" in annotation_str or first_param == "packets": 1072 ↛ 1073line 1072 didn't jump to line 1073 because the condition on line 1072 was never true
1073 if isinstance(data, list):
1074 # Check if already PacketInfo objects
1075 if data and hasattr(data[0], "timestamp"):
1076 return [data], {}
1077 # Convert list of dicts to PacketInfo objects
1078 elif data and isinstance(data[0], dict):
1079 try:
1080 from tracekit.analyzers.packet.metrics import PacketInfo
1082 packets = [
1083 PacketInfo(
1084 timestamp=p.get("timestamp", 0.0),
1085 size=p.get("size", 0),
1086 sequence=p.get("sequence"),
1087 )
1088 for p in data
1089 ]
1090 return [packets], {}
1091 except Exception as e:
1092 logger.debug(f"Failed to convert to PacketInfo: {e}")
1093 return None, {}
1094 return None, {}
1096 # Check if data is a trace-like object (has .data and .metadata)
1097 is_trace = hasattr(data, "data") and hasattr(data, "metadata")
1099 # Extract raw data and sample rate using config-aware method
1100 if is_trace: 1100 ↛ 1103line 1100 didn't jump to line 1103 because the condition on line 1100 was always true
1101 raw_data = data.data
1102 sample_rate = self._get_effective_sample_rate(data, context="general")
1103 elif isinstance(data, np.ndarray):
1104 raw_data = data
1105 sample_rate = self._get_effective_sample_rate(data, context="general")
1106 elif isinstance(data, bytes | bytearray):
1107 raw_data = np.frombuffer(data, dtype=np.uint8)
1108 sample_rate = self._get_effective_sample_rate(data, context="binary")
1109 else:
1110 # Try to convert to array
1111 try:
1112 raw_data = np.array(data) if hasattr(data, "__iter__") else None
1113 except (ValueError, TypeError):
1114 raw_data = None
1115 sample_rate = self._get_effective_sample_rate(data, context="general")
1117 if raw_data is None or (hasattr(raw_data, "__len__") and len(raw_data) == 0): 1117 ↛ 1118line 1117 didn't jump to line 1118 because the condition on line 1117 was never true
1118 return None, {}
1120 kwargs: dict[str, Any] = {}
1122 # Common parameter mappings
1123 if "sample_rate" in params:
1124 kwargs["sample_rate"] = sample_rate
1125 if "fs" in params: 1125 ↛ 1126line 1125 didn't jump to line 1126 because the condition on line 1125 was never true
1126 kwargs["fs"] = sample_rate
1127 if "rate" in params: 1127 ↛ 1128line 1127 didn't jump to line 1128 because the condition on line 1127 was never true
1128 kwargs["rate"] = sample_rate
1130 # DIGITAL domain parameter auto-detection
1131 if "baud_rate" in params: 1131 ↛ 1133line 1131 didn't jump to line 1133 because the condition on line 1131 was never true
1132 # Check if baud_rate parameter has a default value
1133 param_info = sig.parameters.get("baud_rate")
1134 has_default = (
1135 param_info is not None and param_info.default is not inspect.Parameter.empty
1136 )
1138 # Only auto-detect if no default or default is None
1139 if not has_default or (param_info and param_info.default is None):
1140 # Try to detect from filename
1141 detected_baud = self._detect_baud_rate_from_filename(self._input_path)
1142 if detected_baud is not None:
1143 kwargs["baud_rate"] = detected_baud
1145 if "logic_family" in params: 1145 ↛ 1147line 1145 didn't jump to line 1147 because the condition on line 1145 was never true
1146 # Check if logic_family parameter has a default value
1147 param_info = sig.parameters.get("logic_family")
1148 has_default = (
1149 param_info is not None and param_info.default is not inspect.Parameter.empty
1150 )
1152 # Only auto-detect if no default or default is "auto"
1153 if not has_default or (param_info and param_info.default in (None, "auto")):
1154 # Try to detect from voltage levels
1155 try:
1156 detected_family = self._detect_logic_family(raw_data)
1157 kwargs["logic_family"] = detected_family
1158 except Exception as e:
1159 logger.debug(f"Could not auto-detect logic family: {e}")
1161 # Auto-detect frequency range for frequency-related parameters
1162 if "freq_min" in params or "freq_max" in params: 1162 ↛ 1163line 1162 didn't jump to line 1163 because the condition on line 1162 was never true
1163 try:
1164 freq_range = self._detect_frequency_range(raw_data, sample_rate)
1165 if freq_range is not None:
1166 min_freq, max_freq = freq_range
1167 if "freq_min" in params:
1168 param_info = sig.parameters.get("freq_min")
1169 has_default = (
1170 param_info is not None
1171 and param_info.default is not inspect.Parameter.empty
1172 )
1173 if not has_default or (param_info and param_info.default is None):
1174 kwargs["freq_min"] = min_freq
1175 if "freq_max" in params:
1176 param_info = sig.parameters.get("freq_max")
1177 has_default = (
1178 param_info is not None
1179 and param_info.default is not inspect.Parameter.empty
1180 )
1181 if not has_default or (param_info and param_info.default is None):
1182 kwargs["freq_max"] = max_freq
1183 except Exception as e:
1184 logger.debug(f"Could not auto-detect frequency range: {e}")
1186 # Auto-detect noise floor for threshold parameters
1187 if "noise_threshold" in params or "snr_threshold" in params: 1187 ↛ 1188line 1187 didn't jump to line 1188 because the condition on line 1187 was never true
1188 try:
1189 noise_floor = self._detect_noise_floor(raw_data)
1190 if noise_floor is not None:
1191 if "noise_threshold" in params:
1192 param_info = sig.parameters.get("noise_threshold")
1193 has_default = (
1194 param_info is not None
1195 and param_info.default is not inspect.Parameter.empty
1196 )
1197 if not has_default or (param_info and param_info.default is None):
1198 # Set threshold to 3 sigma (99.7% confidence)
1199 kwargs["noise_threshold"] = noise_floor * 3.0
1200 if "snr_threshold" in params:
1201 param_info = sig.parameters.get("snr_threshold")
1202 has_default = (
1203 param_info is not None
1204 and param_info.default is not inspect.Parameter.empty
1205 )
1206 if not has_default or (param_info and param_info.default is None):
1207 # Calculate signal RMS and set reasonable SNR threshold
1208 signal_rms = float(np.std(raw_data))
1209 if noise_floor > 0:
1210 detected_snr = signal_rms / noise_floor
1211 # Use half the detected SNR as threshold
1212 kwargs["snr_threshold"] = detected_snr / 2.0
1213 except Exception as e:
1214 logger.debug(f"Could not auto-detect noise floor: {e}")
1216 # Use protocol hints to assist with baud rate detection if not already set
1217 if "baud_rate" in params and "baud_rate" not in kwargs: 1217 ↛ 1218line 1217 didn't jump to line 1218 because the condition on line 1217 was never true
1218 try:
1219 protocol_hints = self._detect_protocol_hints(raw_data, sample_rate)
1220 if "detected_baud" in protocol_hints:
1221 param_info = sig.parameters.get("baud_rate")
1222 has_default = (
1223 param_info is not None and param_info.default is not inspect.Parameter.empty
1224 )
1225 if not has_default or (param_info and param_info.default is None):
1226 kwargs["baud_rate"] = protocol_hints["detected_baud"]
1227 logger.debug(
1228 f"Using protocol-detected baud rate: {protocol_hints['detected_baud']} bps"
1229 )
1230 except Exception as e:
1231 logger.debug(f"Could not use protocol hints for baud detection: {e}")
1233 # Add intelligent defaults for common missing parameters (data-dependent)
1234 data_length = len(raw_data) if hasattr(raw_data, "__len__") else 0
1236 if "window_size" in params: 1236 ↛ 1237line 1236 didn't jump to line 1237 because the condition on line 1236 was never true
1237 param_info = sig.parameters.get("window_size")
1238 has_default = (
1239 param_info is not None and param_info.default is not inspect.Parameter.empty
1240 )
1241 if not has_default and "window_size" not in kwargs:
1242 # Default to 10% of signal length, minimum 10 samples
1243 kwargs["window_size"] = max(10, data_length // 10)
1244 logger.debug(f"Using auto-detected window_size: {kwargs['window_size']}")
1246 if "min_width" in params: 1246 ↛ 1247line 1246 didn't jump to line 1247 because the condition on line 1246 was never true
1247 param_info = sig.parameters.get("min_width")
1248 has_default = (
1249 param_info is not None and param_info.default is not inspect.Parameter.empty
1250 )
1251 if not has_default and "min_width" not in kwargs:
1252 # Default to 10 samples in time, minimum 1ns
1253 kwargs["min_width"] = max(1e-9, 10.0 / sample_rate)
1254 logger.debug(f"Using auto-detected min_width: {kwargs['min_width']:.2e}s")
1256 if "max_width" in params: 1256 ↛ 1257line 1256 didn't jump to line 1257 because the condition on line 1256 was never true
1257 param_info = sig.parameters.get("max_width")
1258 has_default = (
1259 param_info is not None and param_info.default is not inspect.Parameter.empty
1260 )
1261 if not has_default and "max_width" not in kwargs:
1262 # Default to signal duration, maximum 1ms
1263 total_duration = data_length / sample_rate if data_length > 0 else 1e-3
1264 kwargs["max_width"] = min(1e-3, total_duration)
1265 logger.debug(f"Using auto-detected max_width: {kwargs['max_width']:.2e}s")
1267 if "threshold" in params and "threshold" not in kwargs: 1267 ↛ 1268line 1267 didn't jump to line 1268 because the condition on line 1267 was never true
1268 param_info = sig.parameters.get("threshold")
1269 has_default = (
1270 param_info is not None and param_info.default is not inspect.Parameter.empty
1271 )
1272 if not has_default or (param_info and param_info.default in (None, "auto")):
1273 # Auto-detect threshold from histogram or use midpoint
1274 try:
1275 if isinstance(raw_data, np.ndarray) and raw_data.size > 0:
1276 kwargs["threshold"] = float(np.median(raw_data))
1277 logger.debug(f"Using auto-detected threshold: {kwargs['threshold']:.3f}")
1278 except Exception as e:
1279 logger.debug(f"Could not auto-detect threshold: {e}")
1281 if "window_duration" in params: 1281 ↛ 1282line 1281 didn't jump to line 1282 because the condition on line 1281 was never true
1282 param_info = sig.parameters.get("window_duration")
1283 has_default = (
1284 param_info is not None and param_info.default is not inspect.Parameter.empty
1285 )
1286 if not has_default and "window_duration" not in kwargs:
1287 # Default to 1 second or total duration / 10
1288 total_duration = data_length / sample_rate if data_length > 0 else 1.0
1289 kwargs["window_duration"] = min(1.0, total_duration / 10.0)
1290 logger.debug(
1291 f"Using auto-detected window_duration: {kwargs['window_duration']:.3f}s"
1292 )
1294 # Create trace wrapper if function expects trace but we have raw array
1295 if not is_trace and ( 1295 ↛ 1299line 1295 didn't jump to line 1299 because the condition on line 1295 was never true
1296 "Trace" in annotation_str or "WaveformTrace" in annotation_str or first_param == "trace"
1297 ):
1298 # Wrap raw array in WaveformTrace with default metadata
1299 try:
1300 # Convert memoryview to ndarray if needed
1301 trace_data = np.asarray(raw_data) if isinstance(raw_data, memoryview) else raw_data
1302 metadata = TraceMetadata(sample_rate=sample_rate)
1303 data = WaveformTrace(data=trace_data, metadata=metadata)
1304 is_trace = True
1305 logger.debug("Created WaveformTrace wrapper for raw array data")
1306 except Exception as e:
1307 logger.debug(f"Could not create trace wrapper: {e}")
1308 return None, {}
1310 # If function expects WaveformTrace and we have a trace, pass it directly
1311 if is_trace and (
1312 "Trace" in annotation_str or "WaveformTrace" in annotation_str or first_param == "trace"
1313 ):
1314 return [data], kwargs
1316 if first_param in ("data", "signal", "x", "samples", "waveform"): 1316 ↛ 1318line 1316 didn't jump to line 1318 because the condition on line 1316 was always true
1317 return [raw_data], kwargs
1318 elif first_param == "trace" and not is_trace:
1319 # Function expects trace but we don't have one, skip
1320 return None, {}
1321 elif first_param == "edges":
1322 # Jitter/timing functions need edge timestamps
1323 # Try to detect edges from the data
1324 try:
1325 from tracekit.analyzers.digital import detect_edges
1327 if is_trace:
1328 edges = detect_edges(data)
1329 edge_times = edges.tolist() if len(edges) > 0 else []
1330 if len(edge_times) < 3:
1331 return None, {}
1332 return [edge_times], kwargs
1333 except Exception:
1334 return None, {}
1335 elif first_param == "periods":
1336 # Timing functions need period measurements
1337 # Compute periods from waveform or digital trace
1338 try:
1339 if is_trace:
1340 from tracekit.analyzers.waveform.measurements import period
1342 # Get all periods
1343 periods_result = period(data, return_all=True)
1344 if isinstance(periods_result, np.ndarray) and len(periods_result) >= 3:
1345 return [periods_result], kwargs
1346 return None, {}
1347 except Exception as e:
1348 logger.debug(f"Could not compute periods: {e}")
1349 return None, {}
1350 elif first_param in ("stream", "data") and "bytes" in annotation_str:
1351 # Functions expecting bytes - convert ndarray to bytes if needed
1352 if isinstance(data, bytes | bytearray):
1353 return [data], kwargs
1354 elif isinstance(raw_data, np.ndarray):
1355 return [raw_data.astype(np.uint8).tobytes()], kwargs
1356 else:
1357 return None, {}
1358 elif first_param == "bytes" or (first_param == "data" and "bytes" in str(sig)):
1359 # Entropy/binary functions need bytes
1360 if isinstance(data, bytes | bytearray):
1361 return [data], kwargs
1362 elif hasattr(raw_data, "astype"):
1363 return [raw_data.astype(np.uint8).tobytes()], kwargs
1364 else:
1365 return None, {}
1367 # Default: pass raw data
1368 return [raw_data], kwargs
1371__all__ = [
1372 "AnalysisEngine",
1373]