Coverage for src / tracekit / analyzers / power / basic.py: 100%

92 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Basic power analysis for TraceKit. 

2 

3Provides fundamental power calculations including instantaneous power, 

4average power, RMS power, peak power, and energy. 

5 

6 

7Example: 

8 >>> from tracekit.analyzers.power.basic import instantaneous_power, power_statistics 

9 >>> power_trace = instantaneous_power(voltage_trace, current_trace) 

10 >>> stats = power_statistics(power_trace) 

11 >>> print(f"Average: {stats['average']:.2f} W, Peak: {stats['peak']:.2f} W") 

12""" 

13 

14from __future__ import annotations 

15 

16from typing import Any 

17 

18import numpy as np 

19from scipy import interpolate 

20 

21from tracekit.core.exceptions import AnalysisError 

22from tracekit.core.types import TraceMetadata, WaveformTrace 

23 

24 

25def instantaneous_power( 

26 voltage: WaveformTrace, 

27 current: WaveformTrace, 

28 *, 

29 interpolate_if_needed: bool = True, 

30) -> WaveformTrace: 

31 """Calculate instantaneous power from voltage and current traces. 

32 

33 P(t) = V(t) * I(t) 

34 

35 Args: 

36 voltage: Voltage waveform trace. 

37 current: Current waveform trace. 

38 interpolate_if_needed: If True, interpolate if sample rates differ. 

39 

40 Returns: 

41 Power waveform trace (in Watts if inputs are V and A). 

42 

43 Raises: 

44 AnalysisError: If sample rates mismatch and interpolation disabled. 

45 

46 Example: 

47 >>> power = instantaneous_power(v_trace, i_trace) 

48 >>> print(f"Peak power: {np.max(power.data):.2f} W") 

49 

50 References: 

51 IEC 61000-4-7 (power measurements) 

52 """ 

53 v_data = voltage.data 

54 i_data = current.data 

55 v_rate = voltage.metadata.sample_rate 

56 i_rate = current.metadata.sample_rate 

57 

58 # Handle different sample rates 

59 if v_rate != i_rate: 

60 if not interpolate_if_needed: 

61 raise AnalysisError( 

62 f"Sample rate mismatch: voltage={v_rate}, current={i_rate}. " 

63 "Set interpolate_if_needed=True to interpolate." 

64 ) 

65 

66 # Use higher sample rate and interpolate the other 

67 if v_rate > i_rate: 

68 # Interpolate current to voltage sample rate 

69 t_current = np.arange(len(i_data)) / i_rate 

70 t_voltage = np.arange(len(v_data)) / v_rate 

71 interp = interpolate.interp1d( 

72 t_current, i_data, kind="linear", fill_value="extrapolate" 

73 ) 

74 i_data = interp(t_voltage) 

75 sample_rate = v_rate 

76 else: 

77 # Interpolate voltage to current sample rate 

78 t_voltage = np.arange(len(v_data)) / v_rate 

79 t_current = np.arange(len(i_data)) / i_rate 

80 interp = interpolate.interp1d( 

81 t_voltage, v_data, kind="linear", fill_value="extrapolate" 

82 ) 

83 v_data = interp(t_current) 

84 sample_rate = i_rate 

85 else: 

86 sample_rate = v_rate 

87 

88 # Handle different lengths 

89 min_len = min(len(v_data), len(i_data)) 

90 v_data = v_data[:min_len] 

91 i_data = i_data[:min_len] 

92 

93 # Calculate instantaneous power 

94 power_data = v_data * i_data 

95 

96 return WaveformTrace( 

97 data=power_data.astype(np.float64), 

98 metadata=TraceMetadata(sample_rate=sample_rate), 

99 ) 

100 

101 

102def average_power( 

103 power: WaveformTrace | None = None, 

104 *, 

105 voltage: WaveformTrace | None = None, 

106 current: WaveformTrace | None = None, 

107) -> float: 

108 """Calculate average (mean) power. 

109 

110 P_avg = (1/T) * integral(P(t) dt) 

111 

112 Args: 

113 power: Power trace (if already calculated). 

114 voltage: Voltage trace (alternative to power). 

115 current: Current trace (alternative to power). 

116 

117 Returns: 

118 Average power in Watts. 

119 

120 Raises: 

121 AnalysisError: If neither power nor both voltage and current provided. 

122 

123 Example: 

124 >>> p_avg = average_power(power_trace) 

125 >>> # or 

126 >>> p_avg = average_power(voltage=v, current=i) 

127 """ 

128 if power is None: 

129 if voltage is None or current is None: 

130 raise AnalysisError("Either power trace or both voltage and current traces required") 

131 power = instantaneous_power(voltage, current) 

132 

133 return float(np.mean(power.data)) 

134 

135 

136def rms_power( 

137 power: WaveformTrace | None = None, 

138 *, 

139 voltage: WaveformTrace | None = None, 

140 current: WaveformTrace | None = None, 

141) -> float: 

142 """Calculate RMS power. 

143 

144 P_rms = sqrt(mean(P(t)^2)) 

145 

146 Args: 

147 power: Power trace. 

148 voltage: Voltage trace (alternative). 

149 current: Current trace (alternative). 

150 

151 Returns: 

152 RMS power in Watts. 

153 

154 Raises: 

155 AnalysisError: If neither power nor both voltage and current provided. 

156 

157 Example: 

158 >>> p_rms = rms_power(power_trace) 

159 """ 

160 if power is None: 

161 if voltage is None or current is None: 

162 raise AnalysisError("Either power trace or both voltage and current traces required") 

163 power = instantaneous_power(voltage, current) 

164 

165 return float(np.sqrt(np.mean(power.data**2))) 

166 

167 

168def peak_power( 

169 power: WaveformTrace | None = None, 

170 *, 

171 voltage: WaveformTrace | None = None, 

172 current: WaveformTrace | None = None, 

173 absolute: bool = True, 

174) -> float: 

175 """Calculate peak power. 

176 

177 Args: 

178 power: Power trace. 

179 voltage: Voltage trace (alternative). 

180 current: Current trace (alternative). 

181 absolute: If True, return absolute maximum. If False, maximum value. 

182 

183 Returns: 

184 Peak power in Watts. 

185 

186 Raises: 

187 AnalysisError: If neither power nor both voltage and current provided. 

188 

189 Example: 

190 >>> p_peak = peak_power(power_trace) 

191 """ 

192 if power is None: 

193 if voltage is None or current is None: 

194 raise AnalysisError("Either power trace or both voltage and current traces required") 

195 power = instantaneous_power(voltage, current) 

196 

197 if absolute: 

198 return float(np.max(np.abs(power.data))) 

199 return float(np.max(power.data)) 

200 

201 

202def energy( 

203 power: WaveformTrace | None = None, 

204 *, 

205 voltage: WaveformTrace | None = None, 

206 current: WaveformTrace | None = None, 

207 start_time: float | None = None, 

208 end_time: float | None = None, 

209) -> float: 

210 """Calculate energy (integral of power over time). 

211 

212 E = integral(P(t) dt) 

213 

214 Args: 

215 power: Power trace. 

216 voltage: Voltage trace (alternative). 

217 current: Current trace (alternative). 

218 start_time: Start time for integration (seconds). 

219 end_time: End time for integration (seconds). 

220 

221 Returns: 

222 Energy in Joules. 

223 

224 Raises: 

225 AnalysisError: If neither power nor both voltage and current provided. 

226 

227 Example: 

228 >>> e = energy(power_trace) 

229 >>> print(f"Total energy: {e*1e3:.2f} mJ") 

230 """ 

231 if power is None: 

232 if voltage is None or current is None: 

233 raise AnalysisError("Either power trace or both voltage and current traces required") 

234 power = instantaneous_power(voltage, current) 

235 

236 data = power.data 

237 sample_period = power.metadata.time_base 

238 

239 # Apply time limits 

240 if start_time is not None or end_time is not None: 

241 time_vector = np.arange(len(data)) * sample_period 

242 if start_time is not None: 

243 mask = time_vector >= start_time 

244 else: 

245 mask = np.ones(len(data), dtype=bool) 

246 if end_time is not None: 

247 mask &= time_vector <= end_time 

248 data = data[mask] 

249 

250 # Integrate using trapezoidal rule (scipy is stable across versions) 

251 from scipy.integrate import trapezoid 

252 

253 return float(trapezoid(data, dx=sample_period)) 

254 

255 

256def power_statistics( 

257 power: WaveformTrace | None = None, 

258 *, 

259 voltage: WaveformTrace | None = None, 

260 current: WaveformTrace | None = None, 

261) -> dict[str, float]: 

262 """Calculate comprehensive power statistics. 

263 

264 Args: 

265 power: Power trace. 

266 voltage: Voltage trace (alternative). 

267 current: Current trace (alternative). 

268 

269 Returns: 

270 Dictionary with: 

271 - average: Mean power 

272 - rms: RMS power 

273 - peak: Peak power (absolute) 

274 - peak_positive: Maximum positive power 

275 - peak_negative: Maximum negative power (regeneration) 

276 - energy: Total energy 

277 - min: Minimum power value 

278 - std: Standard deviation 

279 

280 Raises: 

281 AnalysisError: If neither power nor both voltage and current provided. 

282 

283 Example: 

284 >>> stats = power_statistics(voltage=v, current=i) 

285 >>> print(f"Average: {stats['average']:.2f} W") 

286 >>> print(f"Peak: {stats['peak']:.2f} W") 

287 >>> print(f"Energy: {stats['energy']*1e3:.2f} mJ") 

288 """ 

289 if power is None: 

290 if voltage is None or current is None: 

291 raise AnalysisError("Either power trace or both voltage and current traces required") 

292 power = instantaneous_power(voltage, current) 

293 

294 data = power.data 

295 sample_period = power.metadata.time_base 

296 

297 # Use scipy trapezoid for stable API across NumPy versions 

298 from scipy.integrate import trapezoid 

299 

300 return { 

301 "average": float(np.mean(data)), 

302 "rms": float(np.sqrt(np.mean(data**2))), 

303 "peak": float(np.max(np.abs(data))), 

304 "peak_positive": float(np.max(data)), 

305 "peak_negative": float(np.min(data)), 

306 "energy": float(trapezoid(data, dx=sample_period)), 

307 "min": float(np.min(data)), 

308 "std": float(np.std(data)), 

309 } 

310 

311 

312def power_profile( 

313 voltage: WaveformTrace, 

314 current: WaveformTrace, 

315 *, 

316 window_size: int | None = None, 

317) -> dict[str, Any]: 

318 """Generate power profile with rolling statistics. 

319 

320 Args: 

321 voltage: Voltage trace. 

322 current: Current trace. 

323 window_size: Window size for rolling calculations. If None, auto-select. 

324 

325 Returns: 

326 Dictionary with: 

327 - power_trace: Instantaneous power trace 

328 - rolling_avg: Rolling average power 

329 - rolling_peak: Rolling peak power 

330 - cumulative_energy: Cumulative energy over time 

331 - statistics: Overall power statistics 

332 """ 

333 power = instantaneous_power(voltage, current) 

334 data = power.data 

335 sample_period = power.metadata.time_base 

336 

337 if window_size is None: 

338 # Auto-select: ~1% of total samples or 100, whichever is larger 

339 window_size = max(100, len(data) // 100) 

340 

341 # Ensure window size is odd for centered window 

342 if window_size % 2 == 0: 

343 window_size += 1 

344 

345 # Rolling average 

346 kernel = np.ones(window_size) / window_size 

347 rolling_avg = np.convolve(data, kernel, mode="same") 

348 

349 # Rolling peak (using scipy.ndimage.maximum_filter would be faster but numpy-only) 

350 from scipy.ndimage import maximum_filter1d 

351 

352 rolling_peak = maximum_filter1d(np.abs(data), size=window_size) 

353 

354 # Cumulative energy 

355 cumulative_energy = np.cumsum(data) * sample_period 

356 

357 return { 

358 "power_trace": power, 

359 "rolling_avg": WaveformTrace( 

360 data=rolling_avg.astype(np.float64), 

361 metadata=power.metadata, 

362 ), 

363 "rolling_peak": WaveformTrace( 

364 data=rolling_peak.astype(np.float64), 

365 metadata=power.metadata, 

366 ), 

367 "cumulative_energy": WaveformTrace( 

368 data=cumulative_energy.astype(np.float64), 

369 metadata=power.metadata, 

370 ), 

371 "statistics": power_statistics(power), 

372 } 

373 

374 

375__all__ = [ 

376 "average_power", 

377 "energy", 

378 "instantaneous_power", 

379 "peak_power", 

380 "power_profile", 

381 "power_statistics", 

382 "rms_power", 

383]