Coverage for src / tracekit / compliance / testing.py: 86%
102 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""EMC compliance testing implementation.
3This module provides compliance testing against regulatory limit masks.
6Example:
7 >>> from tracekit.compliance import load_limit_mask, test_compliance
8 >>> mask = load_limit_mask('FCC_Part15_ClassB')
9 >>> result = test_compliance(trace, mask)
10 >>> print(f"Status: {result.status}")
12References:
13 CISPR 16-1-1 (Measuring Apparatus)
14 ANSI C63.2 (Instrumentation)
15"""
17from __future__ import annotations
19from dataclasses import dataclass, field
20from enum import Enum
21from typing import TYPE_CHECKING, Any
23import numpy as np
25if TYPE_CHECKING:
26 from numpy.typing import NDArray
28 from tracekit.compliance.masks import LimitMask
29 from tracekit.core.types import WaveformTrace
32class DetectorType(Enum):
33 """EMC measurement detector types."""
35 PEAK = "peak"
36 QUASI_PEAK = "quasi-peak"
37 AVERAGE = "average"
38 RMS = "rms"
41@dataclass
42class ComplianceViolation:
43 """Single compliance violation record.
45 Attributes:
46 frequency: Violation frequency in Hz
47 measured_level: Measured level in mask unit (dBuV, etc.)
48 limit_level: Limit level at this frequency
49 excess_db: Amount exceeding limit (positive = violation)
50 detector: Detector type used
51 severity: Severity classification
52 """
54 frequency: float
55 measured_level: float
56 limit_level: float
57 excess_db: float
58 detector: str = "peak"
59 severity: str = "FAIL"
61 def __str__(self) -> str:
62 """Format violation as string."""
63 freq_mhz = self.frequency / 1e6
64 return (
65 f"{freq_mhz:.3f} MHz: {self.measured_level:.1f} dB "
66 f"(limit: {self.limit_level:.1f} dB, excess: {self.excess_db:.1f} dB)"
67 )
70@dataclass
71class ComplianceResult:
72 """Compliance test result.
74 Attributes:
75 status: Overall status ('PASS' or 'FAIL')
76 mask_name: Name of limit mask used
77 violations: List of violations
78 margin_to_limit: Minimum margin in dB (negative = failing)
79 worst_frequency: Frequency with worst margin
80 worst_margin: Worst margin value in dB
81 spectrum_freq: Tested frequency array
82 spectrum_level: Measured level array
83 limit_level: Limit level array (interpolated to spectrum frequencies)
84 detector: Detector type used
85 metadata: Additional result metadata
86 """
88 status: str
89 mask_name: str
90 violations: list[ComplianceViolation]
91 margin_to_limit: float
92 worst_frequency: float
93 worst_margin: float
94 spectrum_freq: NDArray[np.float64]
95 spectrum_level: NDArray[np.float64]
96 limit_level: NDArray[np.float64]
97 detector: str = "peak"
98 metadata: dict[str, Any] = field(default_factory=dict)
100 @property
101 def passed(self) -> bool:
102 """Return True if compliance test passed."""
103 return self.status == "PASS"
105 @property
106 def violation_count(self) -> int:
107 """Return number of violations."""
108 return len(self.violations)
110 def summary(self) -> str:
111 """Generate text summary of result."""
112 lines = [
113 f"EMC Compliance Test: {self.mask_name}",
114 f"Status: {self.status}",
115 f"Margin to limit: {self.margin_to_limit:.1f} dB",
116 f"Worst frequency: {self.worst_frequency / 1e6:.3f} MHz",
117 f"Worst margin: {self.worst_margin:.1f} dB",
118 ]
120 if self.violations:
121 lines.append(f"\nViolations ({len(self.violations)}):")
122 for v in self.violations[:10]: # Limit to first 10
123 lines.append(f" - {v}")
124 if len(self.violations) > 10:
125 lines.append(f" ... and {len(self.violations) - 10} more")
127 return "\n".join(lines)
130def check_compliance(
131 trace_or_spectrum: WaveformTrace | tuple[NDArray[np.float64], NDArray[np.float64]],
132 mask: LimitMask,
133 *,
134 detector: DetectorType | str = DetectorType.PEAK,
135 frequency_range: tuple[float, float] | None = None,
136 unit_conversion: str | None = None,
137) -> ComplianceResult:
138 """Check signal against EMC limit mask.
140 Args:
141 trace_or_spectrum: Either a WaveformTrace to analyze, or a tuple of
142 (frequency_array, magnitude_array) if spectrum already computed.
143 mask: LimitMask to test against.
144 detector: Detector type to use ('peak', 'quasi-peak', 'average', 'rms').
145 frequency_range: Optional (min, max) frequency range to test.
146 unit_conversion: Optional unit conversion ('V_to_dBuV', 'W_to_dBm', etc.)
148 Returns:
149 ComplianceResult with pass/fail status and violation details.
151 Example:
152 >>> mask = load_limit_mask('FCC_Part15_ClassB')
153 >>> result = check_compliance(trace, mask)
154 >>> print(result.summary())
155 """
156 from tracekit.core.types import WaveformTrace
158 # Handle detector type
159 if isinstance(detector, str):
160 detector = DetectorType(detector.lower().replace("-", "_").replace(" ", "_"))
162 # Get spectrum
163 if isinstance(trace_or_spectrum, WaveformTrace):
164 freq, mag = _compute_spectrum(trace_or_spectrum, detector)
165 else:
166 freq, mag = trace_or_spectrum
168 # Convert to dB if needed
169 if unit_conversion == "V_to_dBuV": 169 ↛ 171line 169 didn't jump to line 171 because the condition on line 169 was never true
170 # dBuV = 20*log10(V * 1e6)
171 spectrum_level = 20 * np.log10(np.abs(mag) * 1e6 + 1e-12)
172 elif unit_conversion == "W_to_dBm": 172 ↛ 174line 172 didn't jump to line 174 because the condition on line 172 was never true
173 # dBm = 10*log10(W * 1000)
174 spectrum_level = 10 * np.log10(np.abs(mag) * 1000 + 1e-12)
175 elif mag.max() > 0 and mag.max() < 10: 175 ↛ 177line 175 didn't jump to line 177 because the condition on line 175 was never true
176 # Assume linear voltage, convert to dBuV
177 spectrum_level = 20 * np.log10(np.abs(mag) * 1e6 + 1e-12)
178 else:
179 # Assume already in dB
180 spectrum_level = mag
182 # Apply frequency range filter
183 if frequency_range is not None:
184 f_min, f_max = frequency_range
185 mask_filter = (freq >= f_min) & (freq <= f_max)
186 freq = freq[mask_filter]
187 spectrum_level = spectrum_level[mask_filter]
189 # Limit to mask frequency range
190 mask_f_min, mask_f_max = mask.frequency_range
191 in_range = (freq >= mask_f_min) & (freq <= mask_f_max)
192 freq = freq[in_range]
193 spectrum_level = spectrum_level[in_range]
195 if len(freq) == 0:
196 # No data in mask range
197 return ComplianceResult(
198 status="PASS",
199 mask_name=mask.name,
200 violations=[],
201 margin_to_limit=np.inf,
202 worst_frequency=0.0,
203 worst_margin=np.inf,
204 spectrum_freq=np.array([]),
205 spectrum_level=np.array([]),
206 limit_level=np.array([]),
207 detector=detector.value,
208 )
210 # Interpolate limit to spectrum frequencies
211 limit_level = mask.interpolate(freq)
213 # Calculate margin (positive = passing)
214 margin = limit_level - spectrum_level
216 # Find violations
217 violations: list[ComplianceViolation] = []
218 violation_mask = margin < 0
219 if np.any(violation_mask):
220 violation_indices = np.where(violation_mask)[0]
221 for idx in violation_indices:
222 violations.append(
223 ComplianceViolation(
224 frequency=float(freq[idx]),
225 measured_level=float(spectrum_level[idx]),
226 limit_level=float(limit_level[idx]),
227 excess_db=float(-margin[idx]),
228 detector=detector.value,
229 severity="FAIL",
230 )
231 )
233 # Overall results
234 status = "FAIL" if violations else "PASS"
235 margin_to_limit = float(np.min(margin))
236 worst_idx = int(np.argmin(margin))
237 worst_frequency = float(freq[worst_idx])
238 worst_margin = float(margin[worst_idx])
240 return ComplianceResult(
241 status=status,
242 mask_name=mask.name,
243 violations=violations,
244 margin_to_limit=margin_to_limit,
245 worst_frequency=worst_frequency,
246 worst_margin=worst_margin,
247 spectrum_freq=freq,
248 spectrum_level=spectrum_level,
249 limit_level=limit_level,
250 detector=detector.value,
251 metadata={
252 "unit": mask.unit,
253 "distance": mask.distance,
254 "regulatory_body": mask.regulatory_body,
255 },
256 )
259def _compute_spectrum(
260 trace: WaveformTrace,
261 detector: DetectorType,
262) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
263 """Compute spectrum from trace with specified detector.
265 Args:
266 trace: Input waveform trace.
267 detector: Detector type.
269 Returns:
270 (frequency, magnitude) arrays.
271 """
272 from tracekit.analyzers.waveform.spectral import fft, psd
274 if detector == DetectorType.PEAK: 274 ↛ 278line 274 didn't jump to line 278 because the condition on line 274 was always true
275 # Use FFT for peak detection
276 freq, mag = fft(trace) # type: ignore[misc]
277 return freq, np.abs(mag)
278 elif detector == DetectorType.AVERAGE:
279 # Use Welch PSD for averaging
280 freq, mag = psd(trace, method="welch") # type: ignore[call-arg]
281 return freq, np.sqrt(mag) # Convert PSD to magnitude
282 elif detector == DetectorType.QUASI_PEAK:
283 # Quasi-peak requires special weighting (simplified here)
284 # Real implementation would use CISPR 16 weighting network
285 freq, mag = fft(trace) # type: ignore[misc]
286 # Apply simplified quasi-peak envelope
287 return freq, np.abs(mag) * 0.8 # Approximate QP < peak
288 else: # RMS
289 freq, mag = psd(trace, method="welch") # type: ignore[call-arg]
290 return freq, np.sqrt(mag)
293__all__ = [
294 "ComplianceResult",
295 "ComplianceViolation",
296 "DetectorType",
297 "check_compliance",
298]