Coverage for src / tracekit / analyzers / power / ac_power.py: 92%

104 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""AC power analysis for TraceKit. 

2 

3Provides AC power calculations including reactive power, apparent power, 

4power factor, and harmonic analysis. 

5 

6 

7Example: 

8 >>> from tracekit.analyzers.power.ac_power import power_factor, reactive_power 

9 >>> pf = power_factor(voltage_trace, current_trace) 

10 >>> q = reactive_power(voltage_trace, current_trace) 

11 >>> print(f"Power factor: {pf:.3f}, Reactive power: {q:.2f} VAR") 

12""" 

13 

14from __future__ import annotations 

15 

16import numpy as np 

17 

18from tracekit.analyzers.power.basic import average_power 

19from tracekit.core.types import WaveformTrace 

20 

21 

22def phase_angle( 

23 voltage: WaveformTrace, 

24 current: WaveformTrace, 

25) -> float: 

26 """Calculate phase angle between voltage and current. 

27 

28 Uses cross-correlation to determine the phase shift. 

29 

30 Args: 

31 voltage: Voltage waveform trace. 

32 current: Current waveform trace. 

33 

34 Returns: 

35 Phase angle in radians (positive = current lags voltage). 

36 

37 Example: 

38 >>> phi = phase_angle(v_trace, i_trace) 

39 >>> print(f"Phase angle: {np.degrees(phi):.1f} degrees") 

40 """ 

41 v_data = voltage.data 

42 i_data = current.data 

43 

44 # Ensure same length 

45 min_len = min(len(v_data), len(i_data)) 

46 v_data = v_data[:min_len] 

47 i_data = i_data[:min_len] 

48 

49 # Remove DC offset 

50 v_ac = v_data - np.mean(v_data) 

51 i_ac = i_data - np.mean(i_data) 

52 

53 # Cross-correlation to find phase shift 

54 # correlate(v, i) tells us how much to shift i to align with v 

55 # A positive lag means i needs to be shifted right (i lags v) 

56 correlation = np.correlate(v_ac, i_ac, mode="full") 

57 lags = np.arange(-len(i_ac) + 1, len(v_ac)) 

58 

59 # Find peak correlation 

60 peak_idx = np.argmax(np.abs(correlation)) 

61 lag_samples = lags[peak_idx] 

62 

63 # Convert lag to phase angle 

64 # Estimate frequency using zero crossings 

65 v_crossings = np.where(np.diff(np.signbit(v_ac)))[0] 

66 if len(v_crossings) >= 2: 66 ↛ 74line 66 didn't jump to line 74 because the condition on line 66 was always true

67 period_samples = 2 * np.mean(np.diff(v_crossings)) 

68 # Negative lag because correlation gives us how much to shift i to align with v 

69 # If lag is negative, i is ahead of v (capacitive), phase should be negative 

70 # If lag is positive, i is behind v (inductive), phase should be positive 

71 phase = -2 * np.pi * lag_samples / period_samples 

72 else: 

73 # Fallback: use FFT to find fundamental frequency 

74 fft_v = np.fft.rfft(v_ac) 

75 freq_idx = np.argmax(np.abs(fft_v[1:])) + 1 

76 period_samples = len(v_ac) / freq_idx 

77 phase = -2 * np.pi * lag_samples / period_samples 

78 

79 return float(phase) 

80 

81 

82def reactive_power( 

83 voltage: WaveformTrace, 

84 current: WaveformTrace, 

85 *, 

86 frequency: float | None = None, 

87) -> float: 

88 """Calculate reactive power (Q) in VAR. 

89 

90 Q = V_rms * I_rms * sin(phi) 

91 

92 Args: 

93 voltage: Voltage waveform trace. 

94 current: Current waveform trace. 

95 frequency: Fundamental frequency in Hz. If None, auto-detect. 

96 

97 Returns: 

98 Reactive power in VAR (positive = inductive, negative = capacitive). 

99 

100 Example: 

101 >>> q = reactive_power(v_trace, i_trace) 

102 >>> print(f"Reactive power: {q:.2f} VAR") 

103 

104 References: 

105 IEEE 1459-2010 (Power quality definitions) 

106 """ 

107 v_data = voltage.data 

108 i_data = current.data 

109 

110 # Ensure same length 

111 min_len = min(len(v_data), len(i_data)) 

112 v_data = v_data[:min_len] 

113 i_data = i_data[:min_len] 

114 

115 # Calculate RMS values 

116 v_rms = float(np.sqrt(np.mean(v_data**2))) 

117 i_rms = float(np.sqrt(np.mean(i_data**2))) 

118 

119 # Calculate phase angle 

120 phi = phase_angle(voltage, current) 

121 

122 return v_rms * i_rms * np.sin(phi) # type: ignore[no-any-return] 

123 

124 

125def apparent_power( 

126 voltage: WaveformTrace, 

127 current: WaveformTrace, 

128) -> float: 

129 """Calculate apparent power (S) in VA. 

130 

131 S = V_rms * I_rms 

132 

133 Args: 

134 voltage: Voltage waveform trace. 

135 current: Current waveform trace. 

136 

137 Returns: 

138 Apparent power in VA. 

139 

140 Example: 

141 >>> s = apparent_power(v_trace, i_trace) 

142 >>> print(f"Apparent power: {s:.2f} VA") 

143 

144 References: 

145 IEEE 1459-2010 (Power quality definitions) 

146 """ 

147 v_data = voltage.data 

148 i_data = current.data 

149 

150 # Ensure same length 

151 min_len = min(len(v_data), len(i_data)) 

152 v_data = v_data[:min_len] 

153 i_data = i_data[:min_len] 

154 

155 v_rms = float(np.sqrt(np.mean(v_data**2))) 

156 i_rms = float(np.sqrt(np.mean(i_data**2))) 

157 

158 return v_rms * i_rms 

159 

160 

161def power_factor( 

162 voltage: WaveformTrace, 

163 current: WaveformTrace, 

164) -> float: 

165 """Calculate power factor (PF = P / S). 

166 

167 For sinusoidal waveforms, PF = cos(phi). 

168 For non-sinusoidal waveforms, includes distortion effects. 

169 

170 Args: 

171 voltage: Voltage waveform trace. 

172 current: Current waveform trace. 

173 

174 Returns: 

175 Power factor (0 to 1, can be negative for regeneration). 

176 

177 Example: 

178 >>> pf = power_factor(v_trace, i_trace) 

179 >>> print(f"Power factor: {pf:.3f}") 

180 

181 References: 

182 IEEE 1459-2010 

183 """ 

184 p = average_power(voltage=voltage, current=current) 

185 s = apparent_power(voltage, current) 

186 

187 if s == 0: 

188 return 0.0 

189 

190 return p / s 

191 

192 

193def displacement_power_factor( 

194 voltage: WaveformTrace, 

195 current: WaveformTrace, 

196) -> float: 

197 """Calculate displacement power factor (DPF). 

198 

199 DPF = cos(phi) where phi is the phase angle between fundamental 

200 components of voltage and current. 

201 

202 Args: 

203 voltage: Voltage waveform trace. 

204 current: Current waveform trace. 

205 

206 Returns: 

207 Displacement power factor. 

208 

209 Example: 

210 >>> dpf = displacement_power_factor(v_trace, i_trace) 

211 """ 

212 # Extract fundamental components 

213 v_fund = _extract_fundamental(voltage) 

214 i_fund = _extract_fundamental(current) 

215 

216 # Calculate phase angle of fundamentals 

217 phi = phase_angle(v_fund, i_fund) 

218 

219 return float(np.cos(phi)) 

220 

221 

222def distortion_power_factor( 

223 voltage: WaveformTrace, 

224 current: WaveformTrace, 

225) -> float: 

226 """Calculate distortion power factor. 

227 

228 Distortion PF = True PF / Displacement PF 

229 = sqrt(1 / (1 + THD_i^2)) (for sinusoidal voltage) 

230 

231 Args: 

232 voltage: Voltage waveform trace. 

233 current: Current waveform trace. 

234 

235 Returns: 

236 Distortion power factor. 

237 

238 Example: 

239 >>> dist_pf = distortion_power_factor(v_trace, i_trace) 

240 """ 

241 pf = power_factor(voltage, current) 

242 dpf = displacement_power_factor(voltage, current) 

243 

244 if dpf == 0: 244 ↛ 245line 244 didn't jump to line 245 because the condition on line 244 was never true

245 return 0.0 

246 

247 return pf / dpf 

248 

249 

250def total_harmonic_distortion_power( 

251 trace: WaveformTrace, 

252 fundamental_freq: float | None = None, 

253 max_harmonic: int = 50, 

254) -> float: 

255 """Calculate Total Harmonic Distortion for power analysis. 

256 

257 THD = sqrt(sum(V_h^2 for h=2..N)) / V_1 

258 

259 Args: 

260 trace: Voltage or current waveform. 

261 fundamental_freq: Fundamental frequency. If None, auto-detect. 

262 max_harmonic: Maximum harmonic order to include. 

263 

264 Returns: 

265 THD as a ratio (not percentage). 

266 

267 Example: 

268 >>> thd = total_harmonic_distortion_power(current_trace) 

269 >>> print(f"Current THD: {thd*100:.1f}%") 

270 """ 

271 data = trace.data 

272 sample_rate = trace.metadata.sample_rate 

273 

274 # FFT 

275 n = len(data) 

276 fft_result = np.fft.rfft(data) 

277 freqs = np.fft.rfftfreq(n, 1 / sample_rate) 

278 

279 # Find fundamental 

280 if fundamental_freq is None: 

281 # Auto-detect: find largest peak above DC 

282 magnitudes = np.abs(fft_result[1:]) 

283 fund_idx = int(np.argmax(magnitudes)) + 1 

284 fundamental_freq = freqs[fund_idx] 

285 else: 

286 fund_idx = int(np.round(fundamental_freq * n / sample_rate)) 

287 

288 # Get fundamental magnitude 

289 v1 = np.abs(fft_result[fund_idx]) 

290 if v1 == 0: 290 ↛ 291line 290 didn't jump to line 291 because the condition on line 290 was never true

291 return 0.0 

292 

293 # Sum harmonic magnitudes 

294 harmonic_sum_sq = 0.0 

295 for h in range(2, max_harmonic + 1): 

296 h_idx = h * fund_idx 

297 if h_idx < len(fft_result): 297 ↛ 295line 297 didn't jump to line 295 because the condition on line 297 was always true

298 harmonic_sum_sq += np.abs(fft_result[h_idx]) ** 2 

299 

300 return float(np.sqrt(harmonic_sum_sq) / v1) 

301 

302 

303def _extract_fundamental(trace: WaveformTrace) -> WaveformTrace: 

304 """Extract fundamental component of a waveform.""" 

305 data = trace.data 

306 n = len(data) 

307 

308 # FFT 

309 fft_result = np.fft.rfft(data) 

310 

311 # Find fundamental peak 

312 magnitudes = np.abs(fft_result[1:]) 

313 fund_idx = np.argmax(magnitudes) + 1 

314 

315 # Zero out all but fundamental 

316 filtered_fft = np.zeros_like(fft_result) 

317 filtered_fft[fund_idx] = fft_result[fund_idx] 

318 

319 # Inverse FFT 

320 fundamental = np.fft.irfft(filtered_fft, n=n) 

321 

322 return WaveformTrace( 

323 data=fundamental.astype(np.float64), 

324 metadata=trace.metadata, 

325 ) 

326 

327 

328def three_phase_power( 

329 v_a: WaveformTrace, 

330 v_b: WaveformTrace, 

331 v_c: WaveformTrace, 

332 i_a: WaveformTrace, 

333 i_b: WaveformTrace, 

334 i_c: WaveformTrace, 

335) -> dict[str, float]: 

336 """Calculate three-phase power quantities. 

337 

338 Args: 

339 v_a: Phase A voltage trace. 

340 v_b: Phase B voltage trace. 

341 v_c: Phase C voltage trace. 

342 i_a: Phase A current trace. 

343 i_b: Phase B current trace. 

344 i_c: Phase C current trace. 

345 

346 Returns: 

347 Dictionary with: 

348 - total_active: Total active power (W) 

349 - total_reactive: Total reactive power (VAR) 

350 - total_apparent: Total apparent power (VA) 

351 - power_factor: Three-phase power factor 

352 - phase_a_power: Phase A active power 

353 - phase_b_power: Phase B active power 

354 - phase_c_power: Phase C active power 

355 """ 

356 # Calculate per-phase powers 

357 p_a = average_power(voltage=v_a, current=i_a) 

358 p_b = average_power(voltage=v_b, current=i_b) 

359 p_c = average_power(voltage=v_c, current=i_c) 

360 

361 q_a = reactive_power(v_a, i_a) 

362 q_b = reactive_power(v_b, i_b) 

363 q_c = reactive_power(v_c, i_c) 

364 

365 total_p = p_a + p_b + p_c 

366 total_q = q_a + q_b + q_c 

367 total_s = np.sqrt(total_p**2 + total_q**2) 

368 

369 pf = total_p / total_s if total_s != 0 else 0.0 

370 

371 return { 

372 "total_active": total_p, 

373 "total_reactive": total_q, 

374 "total_apparent": total_s, 

375 "power_factor": pf, 

376 "phase_a_power": p_a, 

377 "phase_b_power": p_b, 

378 "phase_c_power": p_c, 

379 } 

380 

381 

382__all__ = [ 

383 "apparent_power", 

384 "displacement_power_factor", 

385 "distortion_power_factor", 

386 "phase_angle", 

387 "power_factor", 

388 "reactive_power", 

389 "three_phase_power", 

390 "total_harmonic_distortion_power", 

391]