Coverage for src / tracekit / workflows / compliance.py: 100%

49 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""EMC/EMI compliance testing workflow. 

2 

3This module implements spectral compliance testing against regulatory limits. 

4 

5 

6Example: 

7 >>> import tracekit as tk 

8 >>> trace = tk.load('emissions.wfm') 

9 >>> result = tk.emc_compliance_test(trace, standard='FCC_Part15_ClassB') 

10 >>> print(f"Status: {result['status']}") 

11 >>> print(f"Violations: {len(result['violations'])}") 

12 

13References: 

14 FCC Part 15: Radio Frequency Devices 

15 CISPR 22/32: Information Technology Equipment 

16 MIL-STD-461: EMI/EMC Requirements 

17""" 

18 

19from __future__ import annotations 

20 

21from typing import TYPE_CHECKING, Any, Literal 

22 

23import numpy as np 

24 

25from tracekit.core.exceptions import AnalysisError 

26 

27if TYPE_CHECKING: 

28 from tracekit.core.types import WaveformTrace 

29 

30 

31def emc_compliance_test( 

32 trace: WaveformTrace, 

33 *, 

34 standard: str = "FCC_Part15_ClassB", 

35 frequency_range: tuple[float, float] | None = None, 

36 detector: Literal["peak", "quasi-peak", "average"] = "peak", 

37 report: str | None = None, 

38) -> dict[str, Any]: 

39 """EMC/EMI compliance testing against regulatory limits. 

40 

41 Performs spectral compliance testing: 

42 - Computes spectrum (FFT or welch) 

43 - Loads regulatory limit mask 

44 - Overlays limit lines on spectrum 

45 - Identifies violations 

46 - Generates compliance report 

47 

48 Args: 

49 trace: Signal to test for emissions. 

50 standard: Regulatory standard to test against: 

51 'FCC_Part15_ClassA', 'FCC_Part15_ClassB', 

52 'CE_CISPR22_ClassA', 'CE_CISPR22_ClassB', 

53 'CE_CISPR32_ClassA', 'CE_CISPR32_ClassB', 

54 'MIL_STD_461G_CE102', 'MIL_STD_461G_RE102' 

55 frequency_range: Optional frequency range (f_min, f_max) in Hz. 

56 detector: Detector type ('peak', 'quasi-peak', 'average'). 

57 report: Optional path to save HTML compliance report. 

58 

59 Returns: 

60 Dictionary containing: 

61 - status: 'PASS' or 'FAIL' 

62 - standard: Standard tested against 

63 - violations: List of frequency violations 

64 - margin_to_limit: Minimum margin in dB (negative if failing) 

65 - worst_frequency: Frequency with worst margin 

66 - worst_margin: Worst margin value in dB 

67 - spectrum_freq: Frequency array for spectrum 

68 - spectrum_mag: Magnitude array for spectrum (dBµV or dBm) 

69 - limit_freq: Frequency array for limit mask 

70 - limit_mag: Magnitude array for limit mask 

71 

72 Returns: 

73 Dictionary containing: 

74 - status: 'PASS' or 'FAIL' 

75 - standard: Standard tested against 

76 - violations: List of frequency violations 

77 - margin_to_limit: Minimum margin in dB (negative if failing) 

78 - worst_frequency: Frequency with worst margin 

79 - worst_margin: Worst margin value in dB 

80 - spectrum_freq: Frequency array for spectrum 

81 - spectrum_mag: Magnitude array for spectrum (dBµV or dBm) 

82 - limit_freq: Frequency array for limit mask 

83 - limit_mag: Magnitude array for limit mask 

84 

85 Example: 

86 >>> trace = tk.load('radiated_emissions.wfm') 

87 >>> result = tk.emc_compliance_test(trace, standard='FCC_Part15_ClassB') 

88 >>> print(f"Compliance: {result['status']}") 

89 >>> print(f"Margin: {result['margin_to_limit']:.1f} dB") 

90 >>> if result['violations']: 

91 ... print(f"Violations at: {[v['frequency']/1e6 for v in result['violations']]} MHz") 

92 

93 References: 

94 FCC Part 15 Subpart B (Unintentional Radiators) 

95 CISPR 22/32 (Information Technology Equipment EMC) 

96 MIL-STD-461G (Military EMC Requirements) 

97 """ 

98 # Import spectral analysis 

99 from tracekit.analyzers.waveform.spectral import fft 

100 

101 # Calculate spectrum 

102 freq, mag = fft(trace) # type: ignore[misc] 

103 

104 # Convert to dBµV (typical EMC unit) 

105 # Assuming mag is in V, convert to dBµV = 20*log10(V*1e6) 

106 spectrum_dbuv = 20 * np.log10(np.abs(mag) * 1e6 + 1e-12) # Add small value to avoid log(0) 

107 

108 # Load limit mask for standard 

109 limit_freq, limit_mag = _load_emc_mask(standard) 

110 

111 # Apply frequency range if specified 

112 if frequency_range is not None: 

113 f_min, f_max = frequency_range 

114 mask = (freq >= f_min) & (freq <= f_max) 

115 freq = freq[mask] 

116 spectrum_dbuv = spectrum_dbuv[mask] 

117 

118 # Interpolate limit to spectrum frequencies 

119 limit_interp = np.interp(freq, limit_freq, limit_mag) 

120 

121 # Find violations (spectrum exceeds limit) 

122 margin = limit_interp - spectrum_dbuv 

123 violations_mask = margin < 0 

124 

125 # Build violations list 

126 violations = [] 

127 if np.any(violations_mask): 

128 violation_indices = np.where(violations_mask)[0] 

129 for idx in violation_indices: 

130 violations.append( 

131 { 

132 "frequency": freq[idx], 

133 "measured_dbuv": spectrum_dbuv[idx], 

134 "limit_dbuv": limit_interp[idx], 

135 "excess_db": -margin[idx], # Positive value for excess 

136 } 

137 ) 

138 

139 # Overall status 

140 status = "FAIL" if violations else "PASS" 

141 

142 # Margin analysis 

143 margin_to_limit = np.min(margin) 

144 worst_idx = np.argmin(margin) 

145 worst_frequency = freq[worst_idx] 

146 worst_margin = margin[worst_idx] 

147 

148 result = { 

149 "status": status, 

150 "standard": standard, 

151 "violations": violations, 

152 "margin_to_limit": margin_to_limit, 

153 "worst_frequency": worst_frequency, 

154 "worst_margin": worst_margin, 

155 "spectrum_freq": freq, 

156 "spectrum_mag": spectrum_dbuv, 

157 "limit_freq": limit_freq, 

158 "limit_mag": limit_mag, 

159 "detector": detector, 

160 } 

161 

162 # Generate report if requested 

163 if report is not None: 

164 _generate_compliance_report(result, report) 

165 

166 return result 

167 

168 

169def _load_emc_mask( 

170 standard: str, 

171) -> tuple[np.ndarray[Any, np.dtype[np.float64]], np.ndarray[Any, np.dtype[np.float64]]]: 

172 """Load EMC limit mask for a standard. 

173 

174 Args: 

175 standard: Standard name. 

176 

177 Returns: 

178 Tuple of (frequency array, limit array in dBµV). 

179 

180 Raises: 

181 AnalysisError: If unknown EMC standard. 

182 """ 

183 # Simplified mask data - real implementation would load from data files 

184 masks = { 

185 "FCC_Part15_ClassB": { 

186 # Frequencies in MHz, limits in dBµV at 3m 

187 "freq": np.array([0.15, 0.5, 5.0, 30.0, 88.0, 216.0, 1000.0]) * 1e6, 

188 "limit": np.array([60, 60, 56, 46, 46, 46, 46]), # dBµV/m 

189 }, 

190 "FCC_Part15_ClassA": { 

191 "freq": np.array([0.15, 0.5, 5.0, 30.0, 88.0, 216.0, 1000.0]) * 1e6, 

192 "limit": np.array([70, 70, 66, 56, 56, 56, 56]), 

193 }, 

194 "CE_CISPR22_ClassB": { 

195 "freq": np.array([0.15, 0.5, 5.0, 30.0, 230.0, 1000.0]) * 1e6, 

196 "limit": np.array([66, 56, 56, 47, 47, 47]), 

197 }, 

198 "CE_CISPR22_ClassA": { 

199 "freq": np.array([0.15, 0.5, 5.0, 30.0, 230.0, 1000.0]) * 1e6, 

200 "limit": np.array([79, 73, 73, 60, 60, 60]), 

201 }, 

202 "CE_CISPR32_ClassB": { 

203 "freq": np.array([0.15, 0.5, 5.0, 30.0, 230.0, 1000.0]) * 1e6, 

204 "limit": np.array([66, 56, 56, 47, 47, 47]), 

205 }, 

206 "CE_CISPR32_ClassA": { 

207 "freq": np.array([0.15, 0.5, 5.0, 30.0, 230.0, 1000.0]) * 1e6, 

208 "limit": np.array([79, 73, 73, 60, 60, 60]), 

209 }, 

210 "MIL_STD_461G_CE102": { 

211 "freq": np.array([0.01, 0.15, 10.0, 50.0]) * 1e6, 

212 "limit": np.array([90, 80, 80, 80]), 

213 }, 

214 "MIL_STD_461G_RE102": { 

215 "freq": np.array([2, 30, 200, 1000, 18000]) * 1e6, 

216 "limit": np.array([54, 54, 34, 34, 34]), 

217 }, 

218 } 

219 

220 if standard not in masks: 

221 raise AnalysisError(f"Unknown EMC standard: {standard}") 

222 

223 mask_data = masks[standard] 

224 return mask_data["freq"], mask_data["limit"] 

225 

226 

227def _generate_compliance_report(result: dict[str, Any], output_path: str) -> None: 

228 """Generate HTML compliance report. 

229 

230 Args: 

231 result: Compliance test result dictionary. 

232 output_path: Path to save HTML report. 

233 """ 

234 status_color = "green" if result["status"] == "PASS" else "red" 

235 

236 html = f""" 

237 <html> 

238 <head><title>EMC Compliance Report</title></head> 

239 <body> 

240 <h1>EMC Compliance Test Report</h1> 

241 <h2>Standard: {result["standard"]}</h2> 

242 <h2 style="color: {status_color}">Status: {result["status"]}</h2> 

243 

244 <h3>Summary</h3> 

245 <table> 

246 <tr><th>Parameter</th><th>Value</th></tr> 

247 <tr><td>Margin to Limit</td><td>{result["margin_to_limit"]:.2f} dB</td></tr> 

248 <tr><td>Worst Frequency</td><td>{result["worst_frequency"] / 1e6:.2f} MHz</td></tr> 

249 <tr><td>Worst Margin</td><td>{result["worst_margin"]:.2f} dB</td></tr> 

250 <tr><td>Violations</td><td>{len(result["violations"])}</td></tr> 

251 </table> 

252 """ 

253 if result["violations"]: 

254 html += """ 

255 <h3>Violations</h3> 

256 <table> 

257 <tr><th>Frequency (MHz)</th><th>Measured (dBµV)</th><th>Limit (dBµV)</th><th>Excess (dB)</th></tr> 

258 """ 

259 for v in result["violations"]: 

260 html += f""" 

261 <tr> 

262 <td>{v["frequency"] / 1e6:.2f}</td> 

263 <td>{v["measured_dbuv"]:.2f}</td> 

264 <td>{v["limit_dbuv"]:.2f}</td> 

265 <td>{v["excess_db"]:.2f}</td> 

266 </tr> 

267 """ 

268 html += "</table>" 

269 

270 html += """ 

271 </body> 

272 </html> 

273 """ 

274 with open(output_path, "w") as f: 

275 f.write(html) 

276 

277 

278__all__ = ["emc_compliance_test"]