Coverage for src / tracekit / compliance / testing.py: 86%

102 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""EMC compliance testing implementation. 

2 

3This module provides compliance testing against regulatory limit masks. 

4 

5 

6Example: 

7 >>> from tracekit.compliance import load_limit_mask, test_compliance 

8 >>> mask = load_limit_mask('FCC_Part15_ClassB') 

9 >>> result = test_compliance(trace, mask) 

10 >>> print(f"Status: {result.status}") 

11 

12References: 

13 CISPR 16-1-1 (Measuring Apparatus) 

14 ANSI C63.2 (Instrumentation) 

15""" 

16 

17from __future__ import annotations 

18 

19from dataclasses import dataclass, field 

20from enum import Enum 

21from typing import TYPE_CHECKING, Any 

22 

23import numpy as np 

24 

25if TYPE_CHECKING: 

26 from numpy.typing import NDArray 

27 

28 from tracekit.compliance.masks import LimitMask 

29 from tracekit.core.types import WaveformTrace 

30 

31 

32class DetectorType(Enum): 

33 """EMC measurement detector types.""" 

34 

35 PEAK = "peak" 

36 QUASI_PEAK = "quasi-peak" 

37 AVERAGE = "average" 

38 RMS = "rms" 

39 

40 

41@dataclass 

42class ComplianceViolation: 

43 """Single compliance violation record. 

44 

45 Attributes: 

46 frequency: Violation frequency in Hz 

47 measured_level: Measured level in mask unit (dBuV, etc.) 

48 limit_level: Limit level at this frequency 

49 excess_db: Amount exceeding limit (positive = violation) 

50 detector: Detector type used 

51 severity: Severity classification 

52 """ 

53 

54 frequency: float 

55 measured_level: float 

56 limit_level: float 

57 excess_db: float 

58 detector: str = "peak" 

59 severity: str = "FAIL" 

60 

61 def __str__(self) -> str: 

62 """Format violation as string.""" 

63 freq_mhz = self.frequency / 1e6 

64 return ( 

65 f"{freq_mhz:.3f} MHz: {self.measured_level:.1f} dB " 

66 f"(limit: {self.limit_level:.1f} dB, excess: {self.excess_db:.1f} dB)" 

67 ) 

68 

69 

70@dataclass 

71class ComplianceResult: 

72 """Compliance test result. 

73 

74 Attributes: 

75 status: Overall status ('PASS' or 'FAIL') 

76 mask_name: Name of limit mask used 

77 violations: List of violations 

78 margin_to_limit: Minimum margin in dB (negative = failing) 

79 worst_frequency: Frequency with worst margin 

80 worst_margin: Worst margin value in dB 

81 spectrum_freq: Tested frequency array 

82 spectrum_level: Measured level array 

83 limit_level: Limit level array (interpolated to spectrum frequencies) 

84 detector: Detector type used 

85 metadata: Additional result metadata 

86 """ 

87 

88 status: str 

89 mask_name: str 

90 violations: list[ComplianceViolation] 

91 margin_to_limit: float 

92 worst_frequency: float 

93 worst_margin: float 

94 spectrum_freq: NDArray[np.float64] 

95 spectrum_level: NDArray[np.float64] 

96 limit_level: NDArray[np.float64] 

97 detector: str = "peak" 

98 metadata: dict[str, Any] = field(default_factory=dict) 

99 

100 @property 

101 def passed(self) -> bool: 

102 """Return True if compliance test passed.""" 

103 return self.status == "PASS" 

104 

105 @property 

106 def violation_count(self) -> int: 

107 """Return number of violations.""" 

108 return len(self.violations) 

109 

110 def summary(self) -> str: 

111 """Generate text summary of result.""" 

112 lines = [ 

113 f"EMC Compliance Test: {self.mask_name}", 

114 f"Status: {self.status}", 

115 f"Margin to limit: {self.margin_to_limit:.1f} dB", 

116 f"Worst frequency: {self.worst_frequency / 1e6:.3f} MHz", 

117 f"Worst margin: {self.worst_margin:.1f} dB", 

118 ] 

119 

120 if self.violations: 

121 lines.append(f"\nViolations ({len(self.violations)}):") 

122 for v in self.violations[:10]: # Limit to first 10 

123 lines.append(f" - {v}") 

124 if len(self.violations) > 10: 

125 lines.append(f" ... and {len(self.violations) - 10} more") 

126 

127 return "\n".join(lines) 

128 

129 

130def check_compliance( 

131 trace_or_spectrum: WaveformTrace | tuple[NDArray[np.float64], NDArray[np.float64]], 

132 mask: LimitMask, 

133 *, 

134 detector: DetectorType | str = DetectorType.PEAK, 

135 frequency_range: tuple[float, float] | None = None, 

136 unit_conversion: str | None = None, 

137) -> ComplianceResult: 

138 """Check signal against EMC limit mask. 

139 

140 Args: 

141 trace_or_spectrum: Either a WaveformTrace to analyze, or a tuple of 

142 (frequency_array, magnitude_array) if spectrum already computed. 

143 mask: LimitMask to test against. 

144 detector: Detector type to use ('peak', 'quasi-peak', 'average', 'rms'). 

145 frequency_range: Optional (min, max) frequency range to test. 

146 unit_conversion: Optional unit conversion ('V_to_dBuV', 'W_to_dBm', etc.) 

147 

148 Returns: 

149 ComplianceResult with pass/fail status and violation details. 

150 

151 Example: 

152 >>> mask = load_limit_mask('FCC_Part15_ClassB') 

153 >>> result = check_compliance(trace, mask) 

154 >>> print(result.summary()) 

155 """ 

156 from tracekit.core.types import WaveformTrace 

157 

158 # Handle detector type 

159 if isinstance(detector, str): 

160 detector = DetectorType(detector.lower().replace("-", "_").replace(" ", "_")) 

161 

162 # Get spectrum 

163 if isinstance(trace_or_spectrum, WaveformTrace): 

164 freq, mag = _compute_spectrum(trace_or_spectrum, detector) 

165 else: 

166 freq, mag = trace_or_spectrum 

167 

168 # Convert to dB if needed 

169 if unit_conversion == "V_to_dBuV": 169 ↛ 171line 169 didn't jump to line 171 because the condition on line 169 was never true

170 # dBuV = 20*log10(V * 1e6) 

171 spectrum_level = 20 * np.log10(np.abs(mag) * 1e6 + 1e-12) 

172 elif unit_conversion == "W_to_dBm": 172 ↛ 174line 172 didn't jump to line 174 because the condition on line 172 was never true

173 # dBm = 10*log10(W * 1000) 

174 spectrum_level = 10 * np.log10(np.abs(mag) * 1000 + 1e-12) 

175 elif mag.max() > 0 and mag.max() < 10: 175 ↛ 177line 175 didn't jump to line 177 because the condition on line 175 was never true

176 # Assume linear voltage, convert to dBuV 

177 spectrum_level = 20 * np.log10(np.abs(mag) * 1e6 + 1e-12) 

178 else: 

179 # Assume already in dB 

180 spectrum_level = mag 

181 

182 # Apply frequency range filter 

183 if frequency_range is not None: 

184 f_min, f_max = frequency_range 

185 mask_filter = (freq >= f_min) & (freq <= f_max) 

186 freq = freq[mask_filter] 

187 spectrum_level = spectrum_level[mask_filter] 

188 

189 # Limit to mask frequency range 

190 mask_f_min, mask_f_max = mask.frequency_range 

191 in_range = (freq >= mask_f_min) & (freq <= mask_f_max) 

192 freq = freq[in_range] 

193 spectrum_level = spectrum_level[in_range] 

194 

195 if len(freq) == 0: 

196 # No data in mask range 

197 return ComplianceResult( 

198 status="PASS", 

199 mask_name=mask.name, 

200 violations=[], 

201 margin_to_limit=np.inf, 

202 worst_frequency=0.0, 

203 worst_margin=np.inf, 

204 spectrum_freq=np.array([]), 

205 spectrum_level=np.array([]), 

206 limit_level=np.array([]), 

207 detector=detector.value, 

208 ) 

209 

210 # Interpolate limit to spectrum frequencies 

211 limit_level = mask.interpolate(freq) 

212 

213 # Calculate margin (positive = passing) 

214 margin = limit_level - spectrum_level 

215 

216 # Find violations 

217 violations: list[ComplianceViolation] = [] 

218 violation_mask = margin < 0 

219 if np.any(violation_mask): 

220 violation_indices = np.where(violation_mask)[0] 

221 for idx in violation_indices: 

222 violations.append( 

223 ComplianceViolation( 

224 frequency=float(freq[idx]), 

225 measured_level=float(spectrum_level[idx]), 

226 limit_level=float(limit_level[idx]), 

227 excess_db=float(-margin[idx]), 

228 detector=detector.value, 

229 severity="FAIL", 

230 ) 

231 ) 

232 

233 # Overall results 

234 status = "FAIL" if violations else "PASS" 

235 margin_to_limit = float(np.min(margin)) 

236 worst_idx = int(np.argmin(margin)) 

237 worst_frequency = float(freq[worst_idx]) 

238 worst_margin = float(margin[worst_idx]) 

239 

240 return ComplianceResult( 

241 status=status, 

242 mask_name=mask.name, 

243 violations=violations, 

244 margin_to_limit=margin_to_limit, 

245 worst_frequency=worst_frequency, 

246 worst_margin=worst_margin, 

247 spectrum_freq=freq, 

248 spectrum_level=spectrum_level, 

249 limit_level=limit_level, 

250 detector=detector.value, 

251 metadata={ 

252 "unit": mask.unit, 

253 "distance": mask.distance, 

254 "regulatory_body": mask.regulatory_body, 

255 }, 

256 ) 

257 

258 

259def _compute_spectrum( 

260 trace: WaveformTrace, 

261 detector: DetectorType, 

262) -> tuple[NDArray[np.float64], NDArray[np.float64]]: 

263 """Compute spectrum from trace with specified detector. 

264 

265 Args: 

266 trace: Input waveform trace. 

267 detector: Detector type. 

268 

269 Returns: 

270 (frequency, magnitude) arrays. 

271 """ 

272 from tracekit.analyzers.waveform.spectral import fft, psd 

273 

274 if detector == DetectorType.PEAK: 274 ↛ 278line 274 didn't jump to line 278 because the condition on line 274 was always true

275 # Use FFT for peak detection 

276 freq, mag = fft(trace) # type: ignore[misc] 

277 return freq, np.abs(mag) 

278 elif detector == DetectorType.AVERAGE: 

279 # Use Welch PSD for averaging 

280 freq, mag = psd(trace, method="welch") # type: ignore[call-arg] 

281 return freq, np.sqrt(mag) # Convert PSD to magnitude 

282 elif detector == DetectorType.QUASI_PEAK: 

283 # Quasi-peak requires special weighting (simplified here) 

284 # Real implementation would use CISPR 16 weighting network 

285 freq, mag = fft(trace) # type: ignore[misc] 

286 # Apply simplified quasi-peak envelope 

287 return freq, np.abs(mag) * 0.8 # Approximate QP < peak 

288 else: # RMS 

289 freq, mag = psd(trace, method="welch") # type: ignore[call-arg] 

290 return freq, np.sqrt(mag) 

291 

292 

293__all__ = [ 

294 "ComplianceResult", 

295 "ComplianceViolation", 

296 "DetectorType", 

297 "check_compliance", 

298]