Coverage for src / tracekit / compliance / advanced.py: 78%
219 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Advanced EMC compliance features.
3This module provides advanced compliance testing capabilities including
4limit interpolation, compliance test execution, and quasi-peak detection.
7References:
8 CISPR 16-1-1: Measuring Apparatus
9 FCC Part 15: Unintentional Radiators
10 EN 55032: EMC Standard for Multimedia Equipment
11"""
13from __future__ import annotations
15import logging
16from dataclasses import dataclass, field
17from enum import Enum
18from typing import TYPE_CHECKING, Any
20import numpy as np
22if TYPE_CHECKING:
23 from numpy.typing import NDArray
25 from tracekit.compliance.masks import LimitMask
27logger = logging.getLogger(__name__)
29__all__ = [
30 "ComplianceTestConfig",
31 "ComplianceTestRunner",
32 "ComplianceTestSuite",
33 "InterpolationMethod",
34 "LimitInterpolator",
35 "QPDetectorBand",
36 "QuasiPeakDetector",
37 "interpolate_limit",
38 "run_compliance_suite",
39]
42# =============================================================================
43# =============================================================================
46class InterpolationMethod(Enum):
47 """Interpolation methods for limit masks.
49 References:
50 COMP-005: Limit Interpolation
51 """
53 LINEAR = "linear" # Linear interpolation
54 LOG_LINEAR = "log-linear" # Log-linear (dB) interpolation
55 CUBIC = "cubic" # Cubic spline
56 STEP = "step" # Step function (no interpolation)
59class LimitInterpolator:
60 """Limit mask interpolator.
62 Provides accurate interpolation of EMC limits between
63 defined frequency points.
65 Example:
66 >>> from tracekit.compliance import load_limit_mask
67 >>> mask = load_limit_mask('FCC_Part15_ClassB')
68 >>> interp = LimitInterpolator(mask)
69 >>> limit_at_100mhz = interp.interpolate(100e6)
71 References:
72 COMP-005: Limit Interpolation
73 """
75 def __init__(
76 self,
77 mask: LimitMask,
78 method: InterpolationMethod = InterpolationMethod.LOG_LINEAR,
79 extrapolate: bool = False,
80 ) -> None:
81 """Initialize interpolator.
83 Args:
84 mask: Limit mask to interpolate
85 method: Interpolation method
86 extrapolate: Allow extrapolation beyond mask range
87 """
88 self._mask = mask
89 self._method = method
90 self._extrapolate = extrapolate
92 # Precompute log frequencies for log-linear interpolation
93 self._log_freq = np.log10(mask.frequency)
94 self._log_limit = mask.limit # Already in dB
96 def interpolate(
97 self,
98 frequency: float | NDArray[np.float64],
99 ) -> NDArray[np.float64]:
100 """Interpolate limit at given frequency/frequencies.
102 Args:
103 frequency: Frequency or array of frequencies in Hz
105 Returns:
106 Interpolated limit value(s)
108 Raises:
109 ValueError: If frequency outside range and extrapolation disabled
110 """
111 freq_array = np.atleast_1d(np.asarray(frequency, dtype=np.float64))
113 # Validate positive frequencies first
114 if np.any(freq_array <= 0):
115 raise ValueError("Frequency must be positive")
117 # Check range
118 f_min, f_max = self._mask.frequency_range
119 if not self._extrapolate:
120 if np.any(freq_array < f_min) or np.any(freq_array > f_max):
121 out_of_range = freq_array[(freq_array < f_min) | (freq_array > f_max)]
122 raise ValueError(
123 f"Frequency {out_of_range[0]:.2e} Hz outside mask range "
124 f"[{f_min:.2e}, {f_max:.2e}] Hz. "
125 f"Set extrapolate=True to allow extrapolation."
126 )
128 if self._method == InterpolationMethod.LINEAR: 128 ↛ 129line 128 didn't jump to line 129 because the condition on line 128 was never true
129 return self._interp_linear(freq_array)
130 elif self._method == InterpolationMethod.LOG_LINEAR: 130 ↛ 132line 130 didn't jump to line 132 because the condition on line 130 was always true
131 return self._interp_log_linear(freq_array)
132 elif self._method == InterpolationMethod.CUBIC:
133 return self._interp_cubic(freq_array)
134 else: # STEP
135 return self._interp_step(freq_array)
137 def _interp_linear(self, freq: NDArray[np.float64]) -> NDArray[np.float64]:
138 """Linear interpolation."""
139 return np.interp(freq, self._mask.frequency, self._mask.limit)
141 def _interp_log_linear(self, freq: NDArray[np.float64]) -> NDArray[np.float64]:
142 """Log-linear interpolation (linear in log-frequency space)."""
143 log_freq = np.log10(freq)
144 return np.interp(log_freq, self._log_freq, self._log_limit)
146 def _interp_cubic(self, freq: NDArray[np.float64]) -> NDArray[np.float64]:
147 """Cubic spline interpolation."""
148 from scipy.interpolate import CubicSpline
150 # Use log-frequency for better behavior
151 log_freq = np.log10(freq)
152 spline = CubicSpline(self._log_freq, self._log_limit, extrapolate=self._extrapolate)
153 result: NDArray[np.float64] = spline(log_freq)
154 return result
156 def _interp_step(self, freq: NDArray[np.float64]) -> NDArray[np.float64]:
157 """Step function (nearest lower point)."""
158 result = np.zeros_like(freq)
159 for i, f in enumerate(freq):
160 idx = np.searchsorted(self._mask.frequency, f, side="right") - 1
161 idx = max(0, min(idx, len(self._mask.limit) - 1))
162 result[i] = self._mask.limit[idx]
163 return result
165 def get_limit_at(
166 self,
167 frequency: float,
168 warn_on_extrapolation: bool = True,
169 ) -> tuple[float, dict[str, Any]]:
170 """Get limit at specific frequency with metadata.
172 Args:
173 frequency: Frequency in Hz
174 warn_on_extrapolation: Emit warning if extrapolating
176 Returns:
177 (limit_value, metadata) tuple
178 """
179 f_min, f_max = self._mask.frequency_range
180 is_extrapolated = frequency < f_min or frequency > f_max
182 if is_extrapolated and warn_on_extrapolation: 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true
183 logger.warning(
184 f"Extrapolating limit at {frequency:.2e} Hz "
185 f"(mask range: {f_min:.2e} to {f_max:.2e} Hz)"
186 )
188 limit = (
189 float(self.interpolate(frequency)[0])
190 if not is_extrapolated or self._extrapolate
191 else np.nan
192 )
194 # Find nearest defined points
195 idx = np.searchsorted(self._mask.frequency, frequency)
196 if idx == 0: 196 ↛ 197line 196 didn't jump to line 197 because the condition on line 196 was never true
197 lower_freq = None
198 upper_freq = self._mask.frequency[0]
199 elif idx >= len(self._mask.frequency): 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true
200 lower_freq = self._mask.frequency[-1]
201 upper_freq = None
202 else:
203 lower_freq = self._mask.frequency[idx - 1]
204 upper_freq = self._mask.frequency[idx]
206 return limit, {
207 "frequency": frequency,
208 "method": self._method.value,
209 "is_extrapolated": is_extrapolated,
210 "is_at_defined_point": frequency in self._mask.frequency,
211 "lower_defined_freq": float(lower_freq) if lower_freq is not None else None,
212 "upper_defined_freq": float(upper_freq) if upper_freq is not None else None,
213 }
216def interpolate_limit(
217 mask: LimitMask,
218 frequency: float | NDArray[np.float64],
219 method: str = "log-linear",
220) -> NDArray[np.float64]:
221 """Convenience function for limit interpolation.
223 Args:
224 mask: Limit mask
225 frequency: Frequency or frequencies in Hz
226 method: Interpolation method
228 Returns:
229 Interpolated limit value(s)
231 Example:
232 >>> limit = interpolate_limit(mask, 100e6)
233 """
234 interp = LimitInterpolator(
235 mask,
236 method=InterpolationMethod(method),
237 extrapolate=True,
238 )
239 return interp.interpolate(frequency)
242# =============================================================================
243# =============================================================================
246@dataclass
247class ComplianceTestConfig:
248 """Configuration for compliance test.
250 Attributes:
251 mask_names: List of mask names to test against
252 detector_type: Detector type to use
253 frequency_range: Frequency range to test
254 margin_required_db: Required margin to limit
255 include_quasi_peak: Include QP detection
256 generate_report: Generate detailed report
258 References:
259 COMP-006: Compliance Test Execution
260 """
262 mask_names: list[str] = field(default_factory=lambda: ["FCC_Part15_ClassB"])
263 detector_type: str = "peak"
264 frequency_range: tuple[float, float] | None = None
265 margin_required_db: float = 0.0
266 include_quasi_peak: bool = True
267 generate_report: bool = True
270@dataclass
271class ComplianceTestResult:
272 """Result of a single compliance test.
274 Attributes:
275 mask_name: Mask tested against
276 passed: Whether test passed
277 margin_db: Margin to limit (negative = fail)
278 worst_frequency: Worst-case frequency
279 violations: List of violations
280 detector_used: Detector type used
281 """
283 mask_name: str
284 passed: bool
285 margin_db: float
286 worst_frequency: float
287 violations: list[dict[str, Any]]
288 detector_used: str
289 metadata: dict[str, Any] = field(default_factory=dict)
292@dataclass
293class ComplianceTestSuiteResult:
294 """Result of compliance test suite.
296 Attributes:
297 overall_passed: True if all tests passed
298 results: Individual test results
299 summary: Test summary
300 """
302 overall_passed: bool
303 results: list[ComplianceTestResult]
304 summary: dict[str, Any]
307class ComplianceTestRunner:
308 """Compliance test execution engine.
310 Executes compliance tests against multiple masks with
311 configurable detection methods.
313 Example:
314 >>> runner = ComplianceTestRunner()
315 >>> runner.add_mask('FCC_Part15_ClassB')
316 >>> runner.add_mask('CE_CISPR32_ClassB')
317 >>> result = runner.run(spectrum_freq, spectrum_level)
319 References:
320 COMP-006: Compliance Test Execution
321 """
323 def __init__(self, config: ComplianceTestConfig | None = None) -> None:
324 """Initialize test runner.
326 Args:
327 config: Test configuration
328 """
329 self._config = config or ComplianceTestConfig()
330 self._masks: list[tuple[str, Any]] = []
331 self._qp_detector = QuasiPeakDetector()
333 def add_mask(self, mask_name: str) -> ComplianceTestRunner:
334 """Add mask to test suite.
336 Args:
337 mask_name: Mask name to add
339 Returns:
340 Self for chaining
341 """
342 from tracekit.compliance.masks import load_limit_mask
344 mask = load_limit_mask(mask_name)
345 self._masks.append((mask_name, mask))
346 return self
348 def run(
349 self,
350 frequencies: NDArray[np.float64],
351 levels: NDArray[np.float64],
352 unit: str = "dBuV",
353 ) -> ComplianceTestSuiteResult:
354 """Run compliance test suite.
356 Args:
357 frequencies: Frequency array in Hz
358 levels: Level array in specified unit
359 unit: Unit of level measurements
361 Returns:
362 Test suite result
363 """
364 results: list[ComplianceTestResult] = []
366 for _mask_name, mask in self._masks:
367 result = self._test_against_mask(frequencies, levels, mask, unit)
368 results.append(result)
370 overall_passed = all(r.passed for r in results)
372 summary = {
373 "total_tests": len(results),
374 "passed": sum(1 for r in results if r.passed),
375 "failed": sum(1 for r in results if not r.passed),
376 "worst_margin_db": min(r.margin_db for r in results) if results else 0,
377 "masks_tested": [r.mask_name for r in results],
378 }
380 return ComplianceTestSuiteResult(
381 overall_passed=overall_passed,
382 results=results,
383 summary=summary,
384 )
386 def _test_against_mask(
387 self,
388 frequencies: NDArray[np.float64],
389 levels: NDArray[np.float64],
390 mask: Any,
391 unit: str,
392 ) -> ComplianceTestResult:
393 """Test against single mask."""
394 # Apply frequency range filter
395 if self._config.frequency_range: 395 ↛ 396line 395 didn't jump to line 396 because the condition on line 395 was never true
396 f_min, f_max = self._config.frequency_range
397 in_range = (frequencies >= f_min) & (frequencies <= f_max)
398 frequencies = frequencies[in_range]
399 levels = levels[in_range]
401 # Limit to mask range
402 mask_f_min, mask_f_max = mask.frequency_range
403 in_mask = (frequencies >= mask_f_min) & (frequencies <= mask_f_max)
404 frequencies = frequencies[in_mask]
405 levels = levels[in_mask]
407 if len(frequencies) == 0: 407 ↛ 408line 407 didn't jump to line 408 because the condition on line 407 was never true
408 return ComplianceTestResult(
409 mask_name=mask.name,
410 passed=True,
411 margin_db=np.inf,
412 worst_frequency=0.0,
413 violations=[],
414 detector_used=self._config.detector_type,
415 )
417 # Interpolate limits
418 interp = LimitInterpolator(mask)
419 limits = interp.interpolate(frequencies)
421 # Apply quasi-peak if requested
422 if self._config.include_quasi_peak and mask.detector == "quasi-peak": 422 ↛ 426line 422 didn't jump to line 426 because the condition on line 422 was always true
423 levels = self._qp_detector.apply(levels, frequencies)
425 # Calculate margin
426 margin = limits - levels
427 min_margin = float(np.min(margin))
428 worst_idx = int(np.argmin(margin))
430 # Find violations (considering required margin)
431 violations = []
432 violation_mask = margin < self._config.margin_required_db
433 if np.any(violation_mask):
434 for idx in np.where(violation_mask)[0]:
435 violations.append(
436 {
437 "frequency": float(frequencies[idx]),
438 "measured": float(levels[idx]),
439 "limit": float(limits[idx]),
440 "excess_db": float(-margin[idx]),
441 }
442 )
444 passed = len(violations) == 0
446 return ComplianceTestResult(
447 mask_name=mask.name,
448 passed=passed,
449 margin_db=min_margin,
450 worst_frequency=float(frequencies[worst_idx]),
451 violations=violations,
452 detector_used=self._config.detector_type,
453 metadata={"unit": unit},
454 )
457class ComplianceTestSuite:
458 """Pre-configured compliance test suites.
460 Provides standard test configurations for common scenarios.
462 References:
463 COMP-006: Compliance Test Execution
464 """
466 @staticmethod
467 def residential() -> ComplianceTestRunner:
468 """Get residential (Class B) test suite."""
469 runner = ComplianceTestRunner(ComplianceTestConfig(include_quasi_peak=True))
470 runner.add_mask("FCC_Part15_ClassB")
471 runner.add_mask("CE_CISPR32_ClassB")
472 return runner
474 @staticmethod
475 def commercial() -> ComplianceTestRunner:
476 """Get commercial (Class A) test suite."""
477 runner = ComplianceTestRunner(ComplianceTestConfig(include_quasi_peak=True))
478 runner.add_mask("FCC_Part15_ClassA")
479 runner.add_mask("CE_CISPR32_ClassA")
480 return runner
482 @staticmethod
483 def military() -> ComplianceTestRunner:
484 """Get military (MIL-STD) test suite."""
485 runner = ComplianceTestRunner(ComplianceTestConfig(include_quasi_peak=False))
486 runner.add_mask("MIL_STD_461G_RE102")
487 runner.add_mask("MIL_STD_461G_CE102")
488 return runner
491def run_compliance_suite(
492 frequencies: NDArray[np.float64],
493 levels: NDArray[np.float64],
494 suite: str = "residential",
495) -> ComplianceTestSuiteResult:
496 """Run standard compliance test suite.
498 Args:
499 frequencies: Frequency array in Hz
500 levels: Level array in dB
501 suite: Suite name ('residential', 'commercial', 'military')
503 Returns:
504 Test suite result
506 Raises:
507 ValueError: If suite name is unknown.
509 Example:
510 >>> result = run_compliance_suite(freq, levels, suite='residential')
511 >>> print(f"Passed: {result.overall_passed}")
512 """
513 if suite == "residential": 513 ↛ 515line 513 didn't jump to line 515 because the condition on line 513 was always true
514 runner = ComplianceTestSuite.residential()
515 elif suite == "commercial":
516 runner = ComplianceTestSuite.commercial()
517 elif suite == "military":
518 runner = ComplianceTestSuite.military()
519 else:
520 raise ValueError(f"Unknown suite: {suite}")
522 return runner.run(frequencies, levels)
525# =============================================================================
526# =============================================================================
529class QPDetectorBand(Enum):
530 """CISPR 16-1-1 quasi-peak detector bands.
532 References:
533 CISPR 16-1-1 Table 1
534 COMP-007: Quasi-Peak Detection
535 """
537 BAND_A = "A" # 9 kHz - 150 kHz
538 BAND_B = "B" # 150 kHz - 30 MHz
539 BAND_C = "C" # 30 MHz - 300 MHz
540 BAND_D = "D" # 300 MHz - 1 GHz
543@dataclass
544class QPDetectorParams:
545 """Quasi-peak detector parameters per CISPR 16-1-1.
547 Attributes:
548 bandwidth: Measurement bandwidth in Hz
549 charge_time: Charge time constant in ms
550 discharge_time: Discharge time constant in ms
551 mechanical_time: Meter mechanical time constant in ms
552 """
554 bandwidth: float
555 charge_time: float
556 discharge_time: float
557 mechanical_time: float
560class QuasiPeakDetector:
561 """CISPR 16-1-1 quasi-peak detector.
563 Implements quasi-peak detection per CISPR 16-1-1 standard for
564 EMC compliance measurements.
566 Example:
567 >>> detector = QuasiPeakDetector()
568 >>> qp_levels = detector.apply(peak_levels, frequencies)
570 References:
571 CISPR 16-1-1: Measuring Apparatus
572 COMP-007: Quasi-Peak Detection
573 """
575 # CISPR 16-1-1 detector parameters by band
576 BAND_PARAMS = { # noqa: RUF012
577 QPDetectorBand.BAND_A: QPDetectorParams(
578 bandwidth=200, # 200 Hz
579 charge_time=45, # ms
580 discharge_time=500, # ms
581 mechanical_time=160, # ms
582 ),
583 QPDetectorBand.BAND_B: QPDetectorParams(
584 bandwidth=9000, # 9 kHz
585 charge_time=1, # ms
586 discharge_time=160, # ms
587 mechanical_time=160, # ms
588 ),
589 QPDetectorBand.BAND_C: QPDetectorParams(
590 bandwidth=120000, # 120 kHz
591 charge_time=1, # ms
592 discharge_time=550, # ms
593 mechanical_time=100, # ms
594 ),
595 QPDetectorBand.BAND_D: QPDetectorParams(
596 bandwidth=1000000, # 1 MHz
597 charge_time=1, # ms
598 discharge_time=550, # ms
599 mechanical_time=100, # ms
600 ),
601 }
603 # Frequency ranges for bands (Hz)
604 BAND_RANGES = { # noqa: RUF012
605 QPDetectorBand.BAND_A: (9e3, 150e3),
606 QPDetectorBand.BAND_B: (150e3, 30e6),
607 QPDetectorBand.BAND_C: (30e6, 300e6),
608 QPDetectorBand.BAND_D: (300e6, 1e9),
609 }
611 def __init__(self) -> None:
612 """Initialize quasi-peak detector."""
613 self._lookup_table: dict[str, NDArray[np.float64]] = {}
615 def get_band(self, frequency: float) -> QPDetectorBand | None:
616 """Get CISPR band for frequency.
618 Args:
619 frequency: Frequency in Hz
621 Returns:
622 Band or None if outside all bands
623 """
624 for band, (f_min, f_max) in self.BAND_RANGES.items(): 624 ↛ 627line 624 didn't jump to line 627 because the loop on line 624 didn't complete
625 if f_min <= frequency <= f_max:
626 return band
627 return None
629 def get_params(self, frequency: float) -> QPDetectorParams | None:
630 """Get detector parameters for frequency.
632 Args:
633 frequency: Frequency in Hz
635 Returns:
636 Detector parameters or None
637 """
638 band = self.get_band(frequency)
639 if band is None: 639 ↛ 640line 639 didn't jump to line 640 because the condition on line 639 was never true
640 return None
641 return self.BAND_PARAMS[band]
643 def apply(
644 self,
645 peak_levels: NDArray[np.float64],
646 frequencies: NDArray[np.float64],
647 ) -> NDArray[np.float64]:
648 """Apply quasi-peak detection to peak levels.
650 Args:
651 peak_levels: Peak detector levels in dB
652 frequencies: Corresponding frequencies in Hz
654 Returns:
655 Quasi-peak levels in dB
657 Note:
658 Quasi-peak is always <= peak for repetitive signals.
659 The correction factor depends on pulse repetition rate.
660 """
661 qp_levels = np.copy(peak_levels)
663 for i, (level, freq) in enumerate(zip(peak_levels, frequencies, strict=False)):
664 band = self.get_band(freq)
665 if band is not None: 665 ↛ 663line 665 didn't jump to line 663 because the condition on line 665 was always true
666 # Apply approximate QP correction
667 # Real implementation would need actual signal for time-domain processing
668 correction = self._get_qp_correction(band)
669 qp_levels[i] = level - correction
671 return qp_levels
673 def _get_qp_correction(self, band: QPDetectorBand) -> float:
674 """Get approximate QP correction factor.
676 This is a simplified model. Real QP detection requires
677 time-domain processing of the actual signal.
679 Args:
680 band: CISPR band
682 Returns:
683 Correction factor in dB
684 """
685 # Approximate corrections for periodic signals
686 # Actual correction depends on pulse rate and duty cycle
687 corrections = {
688 QPDetectorBand.BAND_A: 3.0,
689 QPDetectorBand.BAND_B: 6.0,
690 QPDetectorBand.BAND_C: 4.0,
691 QPDetectorBand.BAND_D: 4.0,
692 }
693 return corrections.get(band, 0.0)
695 def compare_peak_qp(
696 self,
697 peak_levels: NDArray[np.float64],
698 frequencies: NDArray[np.float64],
699 ) -> dict[str, Any]:
700 """Compare peak and quasi-peak readings.
702 Args:
703 peak_levels: Peak detector levels
704 frequencies: Frequencies
706 Returns:
707 Comparison results
708 """
709 qp_levels = self.apply(peak_levels, frequencies)
710 difference = peak_levels - qp_levels
712 return {
713 "peak_levels": peak_levels,
714 "qp_levels": qp_levels,
715 "difference_db": difference,
716 "max_difference_db": float(np.max(difference)),
717 "avg_difference_db": float(np.mean(difference)),
718 "description": (
719 "Quasi-peak is lower than peak for pulsed/repetitive signals. "
720 "For CW signals, QP equals peak."
721 ),
722 }
724 def get_bandwidth(self, frequency: float) -> float:
725 """Get measurement bandwidth for frequency.
727 Args:
728 frequency: Frequency in Hz
730 Returns:
731 Bandwidth in Hz
732 """
733 params = self.get_params(frequency)
734 if params is None: 734 ↛ 736line 734 didn't jump to line 736 because the condition on line 734 was never true
735 # Default to Band B
736 return 9000
737 return params.bandwidth
739 def validate_bandwidth(self, bandwidth: float) -> None:
740 """Validate measurement bandwidth.
742 Args:
743 bandwidth: Bandwidth to validate
745 Raises:
746 ValueError: If bandwidth is invalid
747 """
748 if bandwidth <= 0: 748 ↛ 751line 748 didn't jump to line 751 because the condition on line 748 was always true
749 raise ValueError("Bandwidth must be positive")
751 valid_bandwidths = [p.bandwidth for p in self.BAND_PARAMS.values()]
752 if bandwidth not in valid_bandwidths:
753 logger.warning(
754 f"Non-standard bandwidth {bandwidth} Hz. "
755 f"Standard CISPR bandwidths: {valid_bandwidths}"
756 )