Coverage for src / tracekit / compliance / advanced.py: 78%

219 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Advanced EMC compliance features. 

2 

3This module provides advanced compliance testing capabilities including 

4limit interpolation, compliance test execution, and quasi-peak detection. 

5 

6 

7References: 

8 CISPR 16-1-1: Measuring Apparatus 

9 FCC Part 15: Unintentional Radiators 

10 EN 55032: EMC Standard for Multimedia Equipment 

11""" 

12 

13from __future__ import annotations 

14 

15import logging 

16from dataclasses import dataclass, field 

17from enum import Enum 

18from typing import TYPE_CHECKING, Any 

19 

20import numpy as np 

21 

22if TYPE_CHECKING: 

23 from numpy.typing import NDArray 

24 

25 from tracekit.compliance.masks import LimitMask 

26 

27logger = logging.getLogger(__name__) 

28 

29__all__ = [ 

30 "ComplianceTestConfig", 

31 "ComplianceTestRunner", 

32 "ComplianceTestSuite", 

33 "InterpolationMethod", 

34 "LimitInterpolator", 

35 "QPDetectorBand", 

36 "QuasiPeakDetector", 

37 "interpolate_limit", 

38 "run_compliance_suite", 

39] 

40 

41 

42# ============================================================================= 

43# ============================================================================= 

44 

45 

46class InterpolationMethod(Enum): 

47 """Interpolation methods for limit masks. 

48 

49 References: 

50 COMP-005: Limit Interpolation 

51 """ 

52 

53 LINEAR = "linear" # Linear interpolation 

54 LOG_LINEAR = "log-linear" # Log-linear (dB) interpolation 

55 CUBIC = "cubic" # Cubic spline 

56 STEP = "step" # Step function (no interpolation) 

57 

58 

59class LimitInterpolator: 

60 """Limit mask interpolator. 

61 

62 Provides accurate interpolation of EMC limits between 

63 defined frequency points. 

64 

65 Example: 

66 >>> from tracekit.compliance import load_limit_mask 

67 >>> mask = load_limit_mask('FCC_Part15_ClassB') 

68 >>> interp = LimitInterpolator(mask) 

69 >>> limit_at_100mhz = interp.interpolate(100e6) 

70 

71 References: 

72 COMP-005: Limit Interpolation 

73 """ 

74 

75 def __init__( 

76 self, 

77 mask: LimitMask, 

78 method: InterpolationMethod = InterpolationMethod.LOG_LINEAR, 

79 extrapolate: bool = False, 

80 ) -> None: 

81 """Initialize interpolator. 

82 

83 Args: 

84 mask: Limit mask to interpolate 

85 method: Interpolation method 

86 extrapolate: Allow extrapolation beyond mask range 

87 """ 

88 self._mask = mask 

89 self._method = method 

90 self._extrapolate = extrapolate 

91 

92 # Precompute log frequencies for log-linear interpolation 

93 self._log_freq = np.log10(mask.frequency) 

94 self._log_limit = mask.limit # Already in dB 

95 

96 def interpolate( 

97 self, 

98 frequency: float | NDArray[np.float64], 

99 ) -> NDArray[np.float64]: 

100 """Interpolate limit at given frequency/frequencies. 

101 

102 Args: 

103 frequency: Frequency or array of frequencies in Hz 

104 

105 Returns: 

106 Interpolated limit value(s) 

107 

108 Raises: 

109 ValueError: If frequency outside range and extrapolation disabled 

110 """ 

111 freq_array = np.atleast_1d(np.asarray(frequency, dtype=np.float64)) 

112 

113 # Validate positive frequencies first 

114 if np.any(freq_array <= 0): 

115 raise ValueError("Frequency must be positive") 

116 

117 # Check range 

118 f_min, f_max = self._mask.frequency_range 

119 if not self._extrapolate: 

120 if np.any(freq_array < f_min) or np.any(freq_array > f_max): 

121 out_of_range = freq_array[(freq_array < f_min) | (freq_array > f_max)] 

122 raise ValueError( 

123 f"Frequency {out_of_range[0]:.2e} Hz outside mask range " 

124 f"[{f_min:.2e}, {f_max:.2e}] Hz. " 

125 f"Set extrapolate=True to allow extrapolation." 

126 ) 

127 

128 if self._method == InterpolationMethod.LINEAR: 128 ↛ 129line 128 didn't jump to line 129 because the condition on line 128 was never true

129 return self._interp_linear(freq_array) 

130 elif self._method == InterpolationMethod.LOG_LINEAR: 130 ↛ 132line 130 didn't jump to line 132 because the condition on line 130 was always true

131 return self._interp_log_linear(freq_array) 

132 elif self._method == InterpolationMethod.CUBIC: 

133 return self._interp_cubic(freq_array) 

134 else: # STEP 

135 return self._interp_step(freq_array) 

136 

137 def _interp_linear(self, freq: NDArray[np.float64]) -> NDArray[np.float64]: 

138 """Linear interpolation.""" 

139 return np.interp(freq, self._mask.frequency, self._mask.limit) 

140 

141 def _interp_log_linear(self, freq: NDArray[np.float64]) -> NDArray[np.float64]: 

142 """Log-linear interpolation (linear in log-frequency space).""" 

143 log_freq = np.log10(freq) 

144 return np.interp(log_freq, self._log_freq, self._log_limit) 

145 

146 def _interp_cubic(self, freq: NDArray[np.float64]) -> NDArray[np.float64]: 

147 """Cubic spline interpolation.""" 

148 from scipy.interpolate import CubicSpline 

149 

150 # Use log-frequency for better behavior 

151 log_freq = np.log10(freq) 

152 spline = CubicSpline(self._log_freq, self._log_limit, extrapolate=self._extrapolate) 

153 result: NDArray[np.float64] = spline(log_freq) 

154 return result 

155 

156 def _interp_step(self, freq: NDArray[np.float64]) -> NDArray[np.float64]: 

157 """Step function (nearest lower point).""" 

158 result = np.zeros_like(freq) 

159 for i, f in enumerate(freq): 

160 idx = np.searchsorted(self._mask.frequency, f, side="right") - 1 

161 idx = max(0, min(idx, len(self._mask.limit) - 1)) 

162 result[i] = self._mask.limit[idx] 

163 return result 

164 

165 def get_limit_at( 

166 self, 

167 frequency: float, 

168 warn_on_extrapolation: bool = True, 

169 ) -> tuple[float, dict[str, Any]]: 

170 """Get limit at specific frequency with metadata. 

171 

172 Args: 

173 frequency: Frequency in Hz 

174 warn_on_extrapolation: Emit warning if extrapolating 

175 

176 Returns: 

177 (limit_value, metadata) tuple 

178 """ 

179 f_min, f_max = self._mask.frequency_range 

180 is_extrapolated = frequency < f_min or frequency > f_max 

181 

182 if is_extrapolated and warn_on_extrapolation: 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true

183 logger.warning( 

184 f"Extrapolating limit at {frequency:.2e} Hz " 

185 f"(mask range: {f_min:.2e} to {f_max:.2e} Hz)" 

186 ) 

187 

188 limit = ( 

189 float(self.interpolate(frequency)[0]) 

190 if not is_extrapolated or self._extrapolate 

191 else np.nan 

192 ) 

193 

194 # Find nearest defined points 

195 idx = np.searchsorted(self._mask.frequency, frequency) 

196 if idx == 0: 196 ↛ 197line 196 didn't jump to line 197 because the condition on line 196 was never true

197 lower_freq = None 

198 upper_freq = self._mask.frequency[0] 

199 elif idx >= len(self._mask.frequency): 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true

200 lower_freq = self._mask.frequency[-1] 

201 upper_freq = None 

202 else: 

203 lower_freq = self._mask.frequency[idx - 1] 

204 upper_freq = self._mask.frequency[idx] 

205 

206 return limit, { 

207 "frequency": frequency, 

208 "method": self._method.value, 

209 "is_extrapolated": is_extrapolated, 

210 "is_at_defined_point": frequency in self._mask.frequency, 

211 "lower_defined_freq": float(lower_freq) if lower_freq is not None else None, 

212 "upper_defined_freq": float(upper_freq) if upper_freq is not None else None, 

213 } 

214 

215 

216def interpolate_limit( 

217 mask: LimitMask, 

218 frequency: float | NDArray[np.float64], 

219 method: str = "log-linear", 

220) -> NDArray[np.float64]: 

221 """Convenience function for limit interpolation. 

222 

223 Args: 

224 mask: Limit mask 

225 frequency: Frequency or frequencies in Hz 

226 method: Interpolation method 

227 

228 Returns: 

229 Interpolated limit value(s) 

230 

231 Example: 

232 >>> limit = interpolate_limit(mask, 100e6) 

233 """ 

234 interp = LimitInterpolator( 

235 mask, 

236 method=InterpolationMethod(method), 

237 extrapolate=True, 

238 ) 

239 return interp.interpolate(frequency) 

240 

241 

242# ============================================================================= 

243# ============================================================================= 

244 

245 

246@dataclass 

247class ComplianceTestConfig: 

248 """Configuration for compliance test. 

249 

250 Attributes: 

251 mask_names: List of mask names to test against 

252 detector_type: Detector type to use 

253 frequency_range: Frequency range to test 

254 margin_required_db: Required margin to limit 

255 include_quasi_peak: Include QP detection 

256 generate_report: Generate detailed report 

257 

258 References: 

259 COMP-006: Compliance Test Execution 

260 """ 

261 

262 mask_names: list[str] = field(default_factory=lambda: ["FCC_Part15_ClassB"]) 

263 detector_type: str = "peak" 

264 frequency_range: tuple[float, float] | None = None 

265 margin_required_db: float = 0.0 

266 include_quasi_peak: bool = True 

267 generate_report: bool = True 

268 

269 

270@dataclass 

271class ComplianceTestResult: 

272 """Result of a single compliance test. 

273 

274 Attributes: 

275 mask_name: Mask tested against 

276 passed: Whether test passed 

277 margin_db: Margin to limit (negative = fail) 

278 worst_frequency: Worst-case frequency 

279 violations: List of violations 

280 detector_used: Detector type used 

281 """ 

282 

283 mask_name: str 

284 passed: bool 

285 margin_db: float 

286 worst_frequency: float 

287 violations: list[dict[str, Any]] 

288 detector_used: str 

289 metadata: dict[str, Any] = field(default_factory=dict) 

290 

291 

292@dataclass 

293class ComplianceTestSuiteResult: 

294 """Result of compliance test suite. 

295 

296 Attributes: 

297 overall_passed: True if all tests passed 

298 results: Individual test results 

299 summary: Test summary 

300 """ 

301 

302 overall_passed: bool 

303 results: list[ComplianceTestResult] 

304 summary: dict[str, Any] 

305 

306 

307class ComplianceTestRunner: 

308 """Compliance test execution engine. 

309 

310 Executes compliance tests against multiple masks with 

311 configurable detection methods. 

312 

313 Example: 

314 >>> runner = ComplianceTestRunner() 

315 >>> runner.add_mask('FCC_Part15_ClassB') 

316 >>> runner.add_mask('CE_CISPR32_ClassB') 

317 >>> result = runner.run(spectrum_freq, spectrum_level) 

318 

319 References: 

320 COMP-006: Compliance Test Execution 

321 """ 

322 

323 def __init__(self, config: ComplianceTestConfig | None = None) -> None: 

324 """Initialize test runner. 

325 

326 Args: 

327 config: Test configuration 

328 """ 

329 self._config = config or ComplianceTestConfig() 

330 self._masks: list[tuple[str, Any]] = [] 

331 self._qp_detector = QuasiPeakDetector() 

332 

333 def add_mask(self, mask_name: str) -> ComplianceTestRunner: 

334 """Add mask to test suite. 

335 

336 Args: 

337 mask_name: Mask name to add 

338 

339 Returns: 

340 Self for chaining 

341 """ 

342 from tracekit.compliance.masks import load_limit_mask 

343 

344 mask = load_limit_mask(mask_name) 

345 self._masks.append((mask_name, mask)) 

346 return self 

347 

348 def run( 

349 self, 

350 frequencies: NDArray[np.float64], 

351 levels: NDArray[np.float64], 

352 unit: str = "dBuV", 

353 ) -> ComplianceTestSuiteResult: 

354 """Run compliance test suite. 

355 

356 Args: 

357 frequencies: Frequency array in Hz 

358 levels: Level array in specified unit 

359 unit: Unit of level measurements 

360 

361 Returns: 

362 Test suite result 

363 """ 

364 results: list[ComplianceTestResult] = [] 

365 

366 for _mask_name, mask in self._masks: 

367 result = self._test_against_mask(frequencies, levels, mask, unit) 

368 results.append(result) 

369 

370 overall_passed = all(r.passed for r in results) 

371 

372 summary = { 

373 "total_tests": len(results), 

374 "passed": sum(1 for r in results if r.passed), 

375 "failed": sum(1 for r in results if not r.passed), 

376 "worst_margin_db": min(r.margin_db for r in results) if results else 0, 

377 "masks_tested": [r.mask_name for r in results], 

378 } 

379 

380 return ComplianceTestSuiteResult( 

381 overall_passed=overall_passed, 

382 results=results, 

383 summary=summary, 

384 ) 

385 

386 def _test_against_mask( 

387 self, 

388 frequencies: NDArray[np.float64], 

389 levels: NDArray[np.float64], 

390 mask: Any, 

391 unit: str, 

392 ) -> ComplianceTestResult: 

393 """Test against single mask.""" 

394 # Apply frequency range filter 

395 if self._config.frequency_range: 395 ↛ 396line 395 didn't jump to line 396 because the condition on line 395 was never true

396 f_min, f_max = self._config.frequency_range 

397 in_range = (frequencies >= f_min) & (frequencies <= f_max) 

398 frequencies = frequencies[in_range] 

399 levels = levels[in_range] 

400 

401 # Limit to mask range 

402 mask_f_min, mask_f_max = mask.frequency_range 

403 in_mask = (frequencies >= mask_f_min) & (frequencies <= mask_f_max) 

404 frequencies = frequencies[in_mask] 

405 levels = levels[in_mask] 

406 

407 if len(frequencies) == 0: 407 ↛ 408line 407 didn't jump to line 408 because the condition on line 407 was never true

408 return ComplianceTestResult( 

409 mask_name=mask.name, 

410 passed=True, 

411 margin_db=np.inf, 

412 worst_frequency=0.0, 

413 violations=[], 

414 detector_used=self._config.detector_type, 

415 ) 

416 

417 # Interpolate limits 

418 interp = LimitInterpolator(mask) 

419 limits = interp.interpolate(frequencies) 

420 

421 # Apply quasi-peak if requested 

422 if self._config.include_quasi_peak and mask.detector == "quasi-peak": 422 ↛ 426line 422 didn't jump to line 426 because the condition on line 422 was always true

423 levels = self._qp_detector.apply(levels, frequencies) 

424 

425 # Calculate margin 

426 margin = limits - levels 

427 min_margin = float(np.min(margin)) 

428 worst_idx = int(np.argmin(margin)) 

429 

430 # Find violations (considering required margin) 

431 violations = [] 

432 violation_mask = margin < self._config.margin_required_db 

433 if np.any(violation_mask): 

434 for idx in np.where(violation_mask)[0]: 

435 violations.append( 

436 { 

437 "frequency": float(frequencies[idx]), 

438 "measured": float(levels[idx]), 

439 "limit": float(limits[idx]), 

440 "excess_db": float(-margin[idx]), 

441 } 

442 ) 

443 

444 passed = len(violations) == 0 

445 

446 return ComplianceTestResult( 

447 mask_name=mask.name, 

448 passed=passed, 

449 margin_db=min_margin, 

450 worst_frequency=float(frequencies[worst_idx]), 

451 violations=violations, 

452 detector_used=self._config.detector_type, 

453 metadata={"unit": unit}, 

454 ) 

455 

456 

457class ComplianceTestSuite: 

458 """Pre-configured compliance test suites. 

459 

460 Provides standard test configurations for common scenarios. 

461 

462 References: 

463 COMP-006: Compliance Test Execution 

464 """ 

465 

466 @staticmethod 

467 def residential() -> ComplianceTestRunner: 

468 """Get residential (Class B) test suite.""" 

469 runner = ComplianceTestRunner(ComplianceTestConfig(include_quasi_peak=True)) 

470 runner.add_mask("FCC_Part15_ClassB") 

471 runner.add_mask("CE_CISPR32_ClassB") 

472 return runner 

473 

474 @staticmethod 

475 def commercial() -> ComplianceTestRunner: 

476 """Get commercial (Class A) test suite.""" 

477 runner = ComplianceTestRunner(ComplianceTestConfig(include_quasi_peak=True)) 

478 runner.add_mask("FCC_Part15_ClassA") 

479 runner.add_mask("CE_CISPR32_ClassA") 

480 return runner 

481 

482 @staticmethod 

483 def military() -> ComplianceTestRunner: 

484 """Get military (MIL-STD) test suite.""" 

485 runner = ComplianceTestRunner(ComplianceTestConfig(include_quasi_peak=False)) 

486 runner.add_mask("MIL_STD_461G_RE102") 

487 runner.add_mask("MIL_STD_461G_CE102") 

488 return runner 

489 

490 

491def run_compliance_suite( 

492 frequencies: NDArray[np.float64], 

493 levels: NDArray[np.float64], 

494 suite: str = "residential", 

495) -> ComplianceTestSuiteResult: 

496 """Run standard compliance test suite. 

497 

498 Args: 

499 frequencies: Frequency array in Hz 

500 levels: Level array in dB 

501 suite: Suite name ('residential', 'commercial', 'military') 

502 

503 Returns: 

504 Test suite result 

505 

506 Raises: 

507 ValueError: If suite name is unknown. 

508 

509 Example: 

510 >>> result = run_compliance_suite(freq, levels, suite='residential') 

511 >>> print(f"Passed: {result.overall_passed}") 

512 """ 

513 if suite == "residential": 513 ↛ 515line 513 didn't jump to line 515 because the condition on line 513 was always true

514 runner = ComplianceTestSuite.residential() 

515 elif suite == "commercial": 

516 runner = ComplianceTestSuite.commercial() 

517 elif suite == "military": 

518 runner = ComplianceTestSuite.military() 

519 else: 

520 raise ValueError(f"Unknown suite: {suite}") 

521 

522 return runner.run(frequencies, levels) 

523 

524 

525# ============================================================================= 

526# ============================================================================= 

527 

528 

529class QPDetectorBand(Enum): 

530 """CISPR 16-1-1 quasi-peak detector bands. 

531 

532 References: 

533 CISPR 16-1-1 Table 1 

534 COMP-007: Quasi-Peak Detection 

535 """ 

536 

537 BAND_A = "A" # 9 kHz - 150 kHz 

538 BAND_B = "B" # 150 kHz - 30 MHz 

539 BAND_C = "C" # 30 MHz - 300 MHz 

540 BAND_D = "D" # 300 MHz - 1 GHz 

541 

542 

543@dataclass 

544class QPDetectorParams: 

545 """Quasi-peak detector parameters per CISPR 16-1-1. 

546 

547 Attributes: 

548 bandwidth: Measurement bandwidth in Hz 

549 charge_time: Charge time constant in ms 

550 discharge_time: Discharge time constant in ms 

551 mechanical_time: Meter mechanical time constant in ms 

552 """ 

553 

554 bandwidth: float 

555 charge_time: float 

556 discharge_time: float 

557 mechanical_time: float 

558 

559 

560class QuasiPeakDetector: 

561 """CISPR 16-1-1 quasi-peak detector. 

562 

563 Implements quasi-peak detection per CISPR 16-1-1 standard for 

564 EMC compliance measurements. 

565 

566 Example: 

567 >>> detector = QuasiPeakDetector() 

568 >>> qp_levels = detector.apply(peak_levels, frequencies) 

569 

570 References: 

571 CISPR 16-1-1: Measuring Apparatus 

572 COMP-007: Quasi-Peak Detection 

573 """ 

574 

575 # CISPR 16-1-1 detector parameters by band 

576 BAND_PARAMS = { # noqa: RUF012 

577 QPDetectorBand.BAND_A: QPDetectorParams( 

578 bandwidth=200, # 200 Hz 

579 charge_time=45, # ms 

580 discharge_time=500, # ms 

581 mechanical_time=160, # ms 

582 ), 

583 QPDetectorBand.BAND_B: QPDetectorParams( 

584 bandwidth=9000, # 9 kHz 

585 charge_time=1, # ms 

586 discharge_time=160, # ms 

587 mechanical_time=160, # ms 

588 ), 

589 QPDetectorBand.BAND_C: QPDetectorParams( 

590 bandwidth=120000, # 120 kHz 

591 charge_time=1, # ms 

592 discharge_time=550, # ms 

593 mechanical_time=100, # ms 

594 ), 

595 QPDetectorBand.BAND_D: QPDetectorParams( 

596 bandwidth=1000000, # 1 MHz 

597 charge_time=1, # ms 

598 discharge_time=550, # ms 

599 mechanical_time=100, # ms 

600 ), 

601 } 

602 

603 # Frequency ranges for bands (Hz) 

604 BAND_RANGES = { # noqa: RUF012 

605 QPDetectorBand.BAND_A: (9e3, 150e3), 

606 QPDetectorBand.BAND_B: (150e3, 30e6), 

607 QPDetectorBand.BAND_C: (30e6, 300e6), 

608 QPDetectorBand.BAND_D: (300e6, 1e9), 

609 } 

610 

611 def __init__(self) -> None: 

612 """Initialize quasi-peak detector.""" 

613 self._lookup_table: dict[str, NDArray[np.float64]] = {} 

614 

615 def get_band(self, frequency: float) -> QPDetectorBand | None: 

616 """Get CISPR band for frequency. 

617 

618 Args: 

619 frequency: Frequency in Hz 

620 

621 Returns: 

622 Band or None if outside all bands 

623 """ 

624 for band, (f_min, f_max) in self.BAND_RANGES.items(): 624 ↛ 627line 624 didn't jump to line 627 because the loop on line 624 didn't complete

625 if f_min <= frequency <= f_max: 

626 return band 

627 return None 

628 

629 def get_params(self, frequency: float) -> QPDetectorParams | None: 

630 """Get detector parameters for frequency. 

631 

632 Args: 

633 frequency: Frequency in Hz 

634 

635 Returns: 

636 Detector parameters or None 

637 """ 

638 band = self.get_band(frequency) 

639 if band is None: 639 ↛ 640line 639 didn't jump to line 640 because the condition on line 639 was never true

640 return None 

641 return self.BAND_PARAMS[band] 

642 

643 def apply( 

644 self, 

645 peak_levels: NDArray[np.float64], 

646 frequencies: NDArray[np.float64], 

647 ) -> NDArray[np.float64]: 

648 """Apply quasi-peak detection to peak levels. 

649 

650 Args: 

651 peak_levels: Peak detector levels in dB 

652 frequencies: Corresponding frequencies in Hz 

653 

654 Returns: 

655 Quasi-peak levels in dB 

656 

657 Note: 

658 Quasi-peak is always <= peak for repetitive signals. 

659 The correction factor depends on pulse repetition rate. 

660 """ 

661 qp_levels = np.copy(peak_levels) 

662 

663 for i, (level, freq) in enumerate(zip(peak_levels, frequencies, strict=False)): 

664 band = self.get_band(freq) 

665 if band is not None: 665 ↛ 663line 665 didn't jump to line 663 because the condition on line 665 was always true

666 # Apply approximate QP correction 

667 # Real implementation would need actual signal for time-domain processing 

668 correction = self._get_qp_correction(band) 

669 qp_levels[i] = level - correction 

670 

671 return qp_levels 

672 

673 def _get_qp_correction(self, band: QPDetectorBand) -> float: 

674 """Get approximate QP correction factor. 

675 

676 This is a simplified model. Real QP detection requires 

677 time-domain processing of the actual signal. 

678 

679 Args: 

680 band: CISPR band 

681 

682 Returns: 

683 Correction factor in dB 

684 """ 

685 # Approximate corrections for periodic signals 

686 # Actual correction depends on pulse rate and duty cycle 

687 corrections = { 

688 QPDetectorBand.BAND_A: 3.0, 

689 QPDetectorBand.BAND_B: 6.0, 

690 QPDetectorBand.BAND_C: 4.0, 

691 QPDetectorBand.BAND_D: 4.0, 

692 } 

693 return corrections.get(band, 0.0) 

694 

695 def compare_peak_qp( 

696 self, 

697 peak_levels: NDArray[np.float64], 

698 frequencies: NDArray[np.float64], 

699 ) -> dict[str, Any]: 

700 """Compare peak and quasi-peak readings. 

701 

702 Args: 

703 peak_levels: Peak detector levels 

704 frequencies: Frequencies 

705 

706 Returns: 

707 Comparison results 

708 """ 

709 qp_levels = self.apply(peak_levels, frequencies) 

710 difference = peak_levels - qp_levels 

711 

712 return { 

713 "peak_levels": peak_levels, 

714 "qp_levels": qp_levels, 

715 "difference_db": difference, 

716 "max_difference_db": float(np.max(difference)), 

717 "avg_difference_db": float(np.mean(difference)), 

718 "description": ( 

719 "Quasi-peak is lower than peak for pulsed/repetitive signals. " 

720 "For CW signals, QP equals peak." 

721 ), 

722 } 

723 

724 def get_bandwidth(self, frequency: float) -> float: 

725 """Get measurement bandwidth for frequency. 

726 

727 Args: 

728 frequency: Frequency in Hz 

729 

730 Returns: 

731 Bandwidth in Hz 

732 """ 

733 params = self.get_params(frequency) 

734 if params is None: 734 ↛ 736line 734 didn't jump to line 736 because the condition on line 734 was never true

735 # Default to Band B 

736 return 9000 

737 return params.bandwidth 

738 

739 def validate_bandwidth(self, bandwidth: float) -> None: 

740 """Validate measurement bandwidth. 

741 

742 Args: 

743 bandwidth: Bandwidth to validate 

744 

745 Raises: 

746 ValueError: If bandwidth is invalid 

747 """ 

748 if bandwidth <= 0: 748 ↛ 751line 748 didn't jump to line 751 because the condition on line 748 was always true

749 raise ValueError("Bandwidth must be positive") 

750 

751 valid_bandwidths = [p.bandwidth for p in self.BAND_PARAMS.values()] 

752 if bandwidth not in valid_bandwidths: 

753 logger.warning( 

754 f"Non-standard bandwidth {bandwidth} Hz. " 

755 f"Standard CISPR bandwidths: {valid_bandwidths}" 

756 )