Coverage for src / tracekit / workflows / compliance.py: 100%
49 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""EMC/EMI compliance testing workflow.
3This module implements spectral compliance testing against regulatory limits.
6Example:
7 >>> import tracekit as tk
8 >>> trace = tk.load('emissions.wfm')
9 >>> result = tk.emc_compliance_test(trace, standard='FCC_Part15_ClassB')
10 >>> print(f"Status: {result['status']}")
11 >>> print(f"Violations: {len(result['violations'])}")
13References:
14 FCC Part 15: Radio Frequency Devices
15 CISPR 22/32: Information Technology Equipment
16 MIL-STD-461: EMI/EMC Requirements
17"""
19from __future__ import annotations
21from typing import TYPE_CHECKING, Any, Literal
23import numpy as np
25from tracekit.core.exceptions import AnalysisError
27if TYPE_CHECKING:
28 from tracekit.core.types import WaveformTrace
31def emc_compliance_test(
32 trace: WaveformTrace,
33 *,
34 standard: str = "FCC_Part15_ClassB",
35 frequency_range: tuple[float, float] | None = None,
36 detector: Literal["peak", "quasi-peak", "average"] = "peak",
37 report: str | None = None,
38) -> dict[str, Any]:
39 """EMC/EMI compliance testing against regulatory limits.
41 Performs spectral compliance testing:
42 - Computes spectrum (FFT or welch)
43 - Loads regulatory limit mask
44 - Overlays limit lines on spectrum
45 - Identifies violations
46 - Generates compliance report
48 Args:
49 trace: Signal to test for emissions.
50 standard: Regulatory standard to test against:
51 'FCC_Part15_ClassA', 'FCC_Part15_ClassB',
52 'CE_CISPR22_ClassA', 'CE_CISPR22_ClassB',
53 'CE_CISPR32_ClassA', 'CE_CISPR32_ClassB',
54 'MIL_STD_461G_CE102', 'MIL_STD_461G_RE102'
55 frequency_range: Optional frequency range (f_min, f_max) in Hz.
56 detector: Detector type ('peak', 'quasi-peak', 'average').
57 report: Optional path to save HTML compliance report.
59 Returns:
60 Dictionary containing:
61 - status: 'PASS' or 'FAIL'
62 - standard: Standard tested against
63 - violations: List of frequency violations
64 - margin_to_limit: Minimum margin in dB (negative if failing)
65 - worst_frequency: Frequency with worst margin
66 - worst_margin: Worst margin value in dB
67 - spectrum_freq: Frequency array for spectrum
68 - spectrum_mag: Magnitude array for spectrum (dBµV or dBm)
69 - limit_freq: Frequency array for limit mask
70 - limit_mag: Magnitude array for limit mask
72 Returns:
73 Dictionary containing:
74 - status: 'PASS' or 'FAIL'
75 - standard: Standard tested against
76 - violations: List of frequency violations
77 - margin_to_limit: Minimum margin in dB (negative if failing)
78 - worst_frequency: Frequency with worst margin
79 - worst_margin: Worst margin value in dB
80 - spectrum_freq: Frequency array for spectrum
81 - spectrum_mag: Magnitude array for spectrum (dBµV or dBm)
82 - limit_freq: Frequency array for limit mask
83 - limit_mag: Magnitude array for limit mask
85 Example:
86 >>> trace = tk.load('radiated_emissions.wfm')
87 >>> result = tk.emc_compliance_test(trace, standard='FCC_Part15_ClassB')
88 >>> print(f"Compliance: {result['status']}")
89 >>> print(f"Margin: {result['margin_to_limit']:.1f} dB")
90 >>> if result['violations']:
91 ... print(f"Violations at: {[v['frequency']/1e6 for v in result['violations']]} MHz")
93 References:
94 FCC Part 15 Subpart B (Unintentional Radiators)
95 CISPR 22/32 (Information Technology Equipment EMC)
96 MIL-STD-461G (Military EMC Requirements)
97 """
98 # Import spectral analysis
99 from tracekit.analyzers.waveform.spectral import fft
101 # Calculate spectrum
102 freq, mag = fft(trace) # type: ignore[misc]
104 # Convert to dBµV (typical EMC unit)
105 # Assuming mag is in V, convert to dBµV = 20*log10(V*1e6)
106 spectrum_dbuv = 20 * np.log10(np.abs(mag) * 1e6 + 1e-12) # Add small value to avoid log(0)
108 # Load limit mask for standard
109 limit_freq, limit_mag = _load_emc_mask(standard)
111 # Apply frequency range if specified
112 if frequency_range is not None:
113 f_min, f_max = frequency_range
114 mask = (freq >= f_min) & (freq <= f_max)
115 freq = freq[mask]
116 spectrum_dbuv = spectrum_dbuv[mask]
118 # Interpolate limit to spectrum frequencies
119 limit_interp = np.interp(freq, limit_freq, limit_mag)
121 # Find violations (spectrum exceeds limit)
122 margin = limit_interp - spectrum_dbuv
123 violations_mask = margin < 0
125 # Build violations list
126 violations = []
127 if np.any(violations_mask):
128 violation_indices = np.where(violations_mask)[0]
129 for idx in violation_indices:
130 violations.append(
131 {
132 "frequency": freq[idx],
133 "measured_dbuv": spectrum_dbuv[idx],
134 "limit_dbuv": limit_interp[idx],
135 "excess_db": -margin[idx], # Positive value for excess
136 }
137 )
139 # Overall status
140 status = "FAIL" if violations else "PASS"
142 # Margin analysis
143 margin_to_limit = np.min(margin)
144 worst_idx = np.argmin(margin)
145 worst_frequency = freq[worst_idx]
146 worst_margin = margin[worst_idx]
148 result = {
149 "status": status,
150 "standard": standard,
151 "violations": violations,
152 "margin_to_limit": margin_to_limit,
153 "worst_frequency": worst_frequency,
154 "worst_margin": worst_margin,
155 "spectrum_freq": freq,
156 "spectrum_mag": spectrum_dbuv,
157 "limit_freq": limit_freq,
158 "limit_mag": limit_mag,
159 "detector": detector,
160 }
162 # Generate report if requested
163 if report is not None:
164 _generate_compliance_report(result, report)
166 return result
169def _load_emc_mask(
170 standard: str,
171) -> tuple[np.ndarray[Any, np.dtype[np.float64]], np.ndarray[Any, np.dtype[np.float64]]]:
172 """Load EMC limit mask for a standard.
174 Args:
175 standard: Standard name.
177 Returns:
178 Tuple of (frequency array, limit array in dBµV).
180 Raises:
181 AnalysisError: If unknown EMC standard.
182 """
183 # Simplified mask data - real implementation would load from data files
184 masks = {
185 "FCC_Part15_ClassB": {
186 # Frequencies in MHz, limits in dBµV at 3m
187 "freq": np.array([0.15, 0.5, 5.0, 30.0, 88.0, 216.0, 1000.0]) * 1e6,
188 "limit": np.array([60, 60, 56, 46, 46, 46, 46]), # dBµV/m
189 },
190 "FCC_Part15_ClassA": {
191 "freq": np.array([0.15, 0.5, 5.0, 30.0, 88.0, 216.0, 1000.0]) * 1e6,
192 "limit": np.array([70, 70, 66, 56, 56, 56, 56]),
193 },
194 "CE_CISPR22_ClassB": {
195 "freq": np.array([0.15, 0.5, 5.0, 30.0, 230.0, 1000.0]) * 1e6,
196 "limit": np.array([66, 56, 56, 47, 47, 47]),
197 },
198 "CE_CISPR22_ClassA": {
199 "freq": np.array([0.15, 0.5, 5.0, 30.0, 230.0, 1000.0]) * 1e6,
200 "limit": np.array([79, 73, 73, 60, 60, 60]),
201 },
202 "CE_CISPR32_ClassB": {
203 "freq": np.array([0.15, 0.5, 5.0, 30.0, 230.0, 1000.0]) * 1e6,
204 "limit": np.array([66, 56, 56, 47, 47, 47]),
205 },
206 "CE_CISPR32_ClassA": {
207 "freq": np.array([0.15, 0.5, 5.0, 30.0, 230.0, 1000.0]) * 1e6,
208 "limit": np.array([79, 73, 73, 60, 60, 60]),
209 },
210 "MIL_STD_461G_CE102": {
211 "freq": np.array([0.01, 0.15, 10.0, 50.0]) * 1e6,
212 "limit": np.array([90, 80, 80, 80]),
213 },
214 "MIL_STD_461G_RE102": {
215 "freq": np.array([2, 30, 200, 1000, 18000]) * 1e6,
216 "limit": np.array([54, 54, 34, 34, 34]),
217 },
218 }
220 if standard not in masks:
221 raise AnalysisError(f"Unknown EMC standard: {standard}")
223 mask_data = masks[standard]
224 return mask_data["freq"], mask_data["limit"]
227def _generate_compliance_report(result: dict[str, Any], output_path: str) -> None:
228 """Generate HTML compliance report.
230 Args:
231 result: Compliance test result dictionary.
232 output_path: Path to save HTML report.
233 """
234 status_color = "green" if result["status"] == "PASS" else "red"
236 html = f"""
237 <html>
238 <head><title>EMC Compliance Report</title></head>
239 <body>
240 <h1>EMC Compliance Test Report</h1>
241 <h2>Standard: {result["standard"]}</h2>
242 <h2 style="color: {status_color}">Status: {result["status"]}</h2>
244 <h3>Summary</h3>
245 <table>
246 <tr><th>Parameter</th><th>Value</th></tr>
247 <tr><td>Margin to Limit</td><td>{result["margin_to_limit"]:.2f} dB</td></tr>
248 <tr><td>Worst Frequency</td><td>{result["worst_frequency"] / 1e6:.2f} MHz</td></tr>
249 <tr><td>Worst Margin</td><td>{result["worst_margin"]:.2f} dB</td></tr>
250 <tr><td>Violations</td><td>{len(result["violations"])}</td></tr>
251 </table>
252 """
253 if result["violations"]:
254 html += """
255 <h3>Violations</h3>
256 <table>
257 <tr><th>Frequency (MHz)</th><th>Measured (dBµV)</th><th>Limit (dBµV)</th><th>Excess (dB)</th></tr>
258 """
259 for v in result["violations"]:
260 html += f"""
261 <tr>
262 <td>{v["frequency"] / 1e6:.2f}</td>
263 <td>{v["measured_dbuv"]:.2f}</td>
264 <td>{v["limit_dbuv"]:.2f}</td>
265 <td>{v["excess_db"]:.2f}</td>
266 </tr>
267 """
268 html += "</table>"
270 html += """
271 </body>
272 </html>
273 """
274 with open(output_path, "w") as f:
275 f.write(html)
278__all__ = ["emc_compliance_test"]