Coverage for src / tracekit / analyzers / power / basic.py: 100%
92 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Basic power analysis for TraceKit.
3Provides fundamental power calculations including instantaneous power,
4average power, RMS power, peak power, and energy.
7Example:
8 >>> from tracekit.analyzers.power.basic import instantaneous_power, power_statistics
9 >>> power_trace = instantaneous_power(voltage_trace, current_trace)
10 >>> stats = power_statistics(power_trace)
11 >>> print(f"Average: {stats['average']:.2f} W, Peak: {stats['peak']:.2f} W")
12"""
14from __future__ import annotations
16from typing import Any
18import numpy as np
19from scipy import interpolate
21from tracekit.core.exceptions import AnalysisError
22from tracekit.core.types import TraceMetadata, WaveformTrace
25def instantaneous_power(
26 voltage: WaveformTrace,
27 current: WaveformTrace,
28 *,
29 interpolate_if_needed: bool = True,
30) -> WaveformTrace:
31 """Calculate instantaneous power from voltage and current traces.
33 P(t) = V(t) * I(t)
35 Args:
36 voltage: Voltage waveform trace.
37 current: Current waveform trace.
38 interpolate_if_needed: If True, interpolate if sample rates differ.
40 Returns:
41 Power waveform trace (in Watts if inputs are V and A).
43 Raises:
44 AnalysisError: If sample rates mismatch and interpolation disabled.
46 Example:
47 >>> power = instantaneous_power(v_trace, i_trace)
48 >>> print(f"Peak power: {np.max(power.data):.2f} W")
50 References:
51 IEC 61000-4-7 (power measurements)
52 """
53 v_data = voltage.data
54 i_data = current.data
55 v_rate = voltage.metadata.sample_rate
56 i_rate = current.metadata.sample_rate
58 # Handle different sample rates
59 if v_rate != i_rate:
60 if not interpolate_if_needed:
61 raise AnalysisError(
62 f"Sample rate mismatch: voltage={v_rate}, current={i_rate}. "
63 "Set interpolate_if_needed=True to interpolate."
64 )
66 # Use higher sample rate and interpolate the other
67 if v_rate > i_rate:
68 # Interpolate current to voltage sample rate
69 t_current = np.arange(len(i_data)) / i_rate
70 t_voltage = np.arange(len(v_data)) / v_rate
71 interp = interpolate.interp1d(
72 t_current, i_data, kind="linear", fill_value="extrapolate"
73 )
74 i_data = interp(t_voltage)
75 sample_rate = v_rate
76 else:
77 # Interpolate voltage to current sample rate
78 t_voltage = np.arange(len(v_data)) / v_rate
79 t_current = np.arange(len(i_data)) / i_rate
80 interp = interpolate.interp1d(
81 t_voltage, v_data, kind="linear", fill_value="extrapolate"
82 )
83 v_data = interp(t_current)
84 sample_rate = i_rate
85 else:
86 sample_rate = v_rate
88 # Handle different lengths
89 min_len = min(len(v_data), len(i_data))
90 v_data = v_data[:min_len]
91 i_data = i_data[:min_len]
93 # Calculate instantaneous power
94 power_data = v_data * i_data
96 return WaveformTrace(
97 data=power_data.astype(np.float64),
98 metadata=TraceMetadata(sample_rate=sample_rate),
99 )
102def average_power(
103 power: WaveformTrace | None = None,
104 *,
105 voltage: WaveformTrace | None = None,
106 current: WaveformTrace | None = None,
107) -> float:
108 """Calculate average (mean) power.
110 P_avg = (1/T) * integral(P(t) dt)
112 Args:
113 power: Power trace (if already calculated).
114 voltage: Voltage trace (alternative to power).
115 current: Current trace (alternative to power).
117 Returns:
118 Average power in Watts.
120 Raises:
121 AnalysisError: If neither power nor both voltage and current provided.
123 Example:
124 >>> p_avg = average_power(power_trace)
125 >>> # or
126 >>> p_avg = average_power(voltage=v, current=i)
127 """
128 if power is None:
129 if voltage is None or current is None:
130 raise AnalysisError("Either power trace or both voltage and current traces required")
131 power = instantaneous_power(voltage, current)
133 return float(np.mean(power.data))
136def rms_power(
137 power: WaveformTrace | None = None,
138 *,
139 voltage: WaveformTrace | None = None,
140 current: WaveformTrace | None = None,
141) -> float:
142 """Calculate RMS power.
144 P_rms = sqrt(mean(P(t)^2))
146 Args:
147 power: Power trace.
148 voltage: Voltage trace (alternative).
149 current: Current trace (alternative).
151 Returns:
152 RMS power in Watts.
154 Raises:
155 AnalysisError: If neither power nor both voltage and current provided.
157 Example:
158 >>> p_rms = rms_power(power_trace)
159 """
160 if power is None:
161 if voltage is None or current is None:
162 raise AnalysisError("Either power trace or both voltage and current traces required")
163 power = instantaneous_power(voltage, current)
165 return float(np.sqrt(np.mean(power.data**2)))
168def peak_power(
169 power: WaveformTrace | None = None,
170 *,
171 voltage: WaveformTrace | None = None,
172 current: WaveformTrace | None = None,
173 absolute: bool = True,
174) -> float:
175 """Calculate peak power.
177 Args:
178 power: Power trace.
179 voltage: Voltage trace (alternative).
180 current: Current trace (alternative).
181 absolute: If True, return absolute maximum. If False, maximum value.
183 Returns:
184 Peak power in Watts.
186 Raises:
187 AnalysisError: If neither power nor both voltage and current provided.
189 Example:
190 >>> p_peak = peak_power(power_trace)
191 """
192 if power is None:
193 if voltage is None or current is None:
194 raise AnalysisError("Either power trace or both voltage and current traces required")
195 power = instantaneous_power(voltage, current)
197 if absolute:
198 return float(np.max(np.abs(power.data)))
199 return float(np.max(power.data))
202def energy(
203 power: WaveformTrace | None = None,
204 *,
205 voltage: WaveformTrace | None = None,
206 current: WaveformTrace | None = None,
207 start_time: float | None = None,
208 end_time: float | None = None,
209) -> float:
210 """Calculate energy (integral of power over time).
212 E = integral(P(t) dt)
214 Args:
215 power: Power trace.
216 voltage: Voltage trace (alternative).
217 current: Current trace (alternative).
218 start_time: Start time for integration (seconds).
219 end_time: End time for integration (seconds).
221 Returns:
222 Energy in Joules.
224 Raises:
225 AnalysisError: If neither power nor both voltage and current provided.
227 Example:
228 >>> e = energy(power_trace)
229 >>> print(f"Total energy: {e*1e3:.2f} mJ")
230 """
231 if power is None:
232 if voltage is None or current is None:
233 raise AnalysisError("Either power trace or both voltage and current traces required")
234 power = instantaneous_power(voltage, current)
236 data = power.data
237 sample_period = power.metadata.time_base
239 # Apply time limits
240 if start_time is not None or end_time is not None:
241 time_vector = np.arange(len(data)) * sample_period
242 if start_time is not None:
243 mask = time_vector >= start_time
244 else:
245 mask = np.ones(len(data), dtype=bool)
246 if end_time is not None:
247 mask &= time_vector <= end_time
248 data = data[mask]
250 # Integrate using trapezoidal rule (scipy is stable across versions)
251 from scipy.integrate import trapezoid
253 return float(trapezoid(data, dx=sample_period))
256def power_statistics(
257 power: WaveformTrace | None = None,
258 *,
259 voltage: WaveformTrace | None = None,
260 current: WaveformTrace | None = None,
261) -> dict[str, float]:
262 """Calculate comprehensive power statistics.
264 Args:
265 power: Power trace.
266 voltage: Voltage trace (alternative).
267 current: Current trace (alternative).
269 Returns:
270 Dictionary with:
271 - average: Mean power
272 - rms: RMS power
273 - peak: Peak power (absolute)
274 - peak_positive: Maximum positive power
275 - peak_negative: Maximum negative power (regeneration)
276 - energy: Total energy
277 - min: Minimum power value
278 - std: Standard deviation
280 Raises:
281 AnalysisError: If neither power nor both voltage and current provided.
283 Example:
284 >>> stats = power_statistics(voltage=v, current=i)
285 >>> print(f"Average: {stats['average']:.2f} W")
286 >>> print(f"Peak: {stats['peak']:.2f} W")
287 >>> print(f"Energy: {stats['energy']*1e3:.2f} mJ")
288 """
289 if power is None:
290 if voltage is None or current is None:
291 raise AnalysisError("Either power trace or both voltage and current traces required")
292 power = instantaneous_power(voltage, current)
294 data = power.data
295 sample_period = power.metadata.time_base
297 # Use scipy trapezoid for stable API across NumPy versions
298 from scipy.integrate import trapezoid
300 return {
301 "average": float(np.mean(data)),
302 "rms": float(np.sqrt(np.mean(data**2))),
303 "peak": float(np.max(np.abs(data))),
304 "peak_positive": float(np.max(data)),
305 "peak_negative": float(np.min(data)),
306 "energy": float(trapezoid(data, dx=sample_period)),
307 "min": float(np.min(data)),
308 "std": float(np.std(data)),
309 }
312def power_profile(
313 voltage: WaveformTrace,
314 current: WaveformTrace,
315 *,
316 window_size: int | None = None,
317) -> dict[str, Any]:
318 """Generate power profile with rolling statistics.
320 Args:
321 voltage: Voltage trace.
322 current: Current trace.
323 window_size: Window size for rolling calculations. If None, auto-select.
325 Returns:
326 Dictionary with:
327 - power_trace: Instantaneous power trace
328 - rolling_avg: Rolling average power
329 - rolling_peak: Rolling peak power
330 - cumulative_energy: Cumulative energy over time
331 - statistics: Overall power statistics
332 """
333 power = instantaneous_power(voltage, current)
334 data = power.data
335 sample_period = power.metadata.time_base
337 if window_size is None:
338 # Auto-select: ~1% of total samples or 100, whichever is larger
339 window_size = max(100, len(data) // 100)
341 # Ensure window size is odd for centered window
342 if window_size % 2 == 0:
343 window_size += 1
345 # Rolling average
346 kernel = np.ones(window_size) / window_size
347 rolling_avg = np.convolve(data, kernel, mode="same")
349 # Rolling peak (using scipy.ndimage.maximum_filter would be faster but numpy-only)
350 from scipy.ndimage import maximum_filter1d
352 rolling_peak = maximum_filter1d(np.abs(data), size=window_size)
354 # Cumulative energy
355 cumulative_energy = np.cumsum(data) * sample_period
357 return {
358 "power_trace": power,
359 "rolling_avg": WaveformTrace(
360 data=rolling_avg.astype(np.float64),
361 metadata=power.metadata,
362 ),
363 "rolling_peak": WaveformTrace(
364 data=rolling_peak.astype(np.float64),
365 metadata=power.metadata,
366 ),
367 "cumulative_energy": WaveformTrace(
368 data=cumulative_energy.astype(np.float64),
369 metadata=power.metadata,
370 ),
371 "statistics": power_statistics(power),
372 }
375__all__ = [
376 "average_power",
377 "energy",
378 "instantaneous_power",
379 "peak_power",
380 "power_profile",
381 "power_statistics",
382 "rms_power",
383]