Coverage for src / tracekit / api / operators.py: 99%
139 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Pythonic operators and utilities for signal analysis.
3This module provides Pythonic operators, time-based indexing,
4automatic unit conversion, and convenience utilities.
5"""
7from __future__ import annotations
9import re
10from dataclasses import dataclass
11from typing import TYPE_CHECKING, Any
13import numpy as np
15if TYPE_CHECKING:
16 from collections.abc import Callable
18 from numpy.typing import NDArray
20__all__ = [
21 "TimeIndex",
22 "UnitConverter",
23 "convert_units",
24 "make_pipeable",
25]
28# =============================================================================
29# =============================================================================
32class TimeIndex:
33 """Time-based indexing for trace data.
35 Allows slicing trace data using time values instead of sample indices.
37 Example:
38 >>> ti = TimeIndex(data, sample_rate=1e9)
39 >>> # Get first 1 millisecond
40 >>> segment = ti["0ms":"1ms"]
41 >>> # Get from 100us to 200us
42 >>> segment = ti["100us":"200us"]
44 References:
45 API-016: Time-Based Indexing
46 """
48 # Time unit multipliers to seconds
49 TIME_UNITS = { # noqa: RUF012
50 "s": 1.0,
51 "ms": 1e-3,
52 "us": 1e-6,
53 "ns": 1e-9,
54 "ps": 1e-12,
55 }
57 def __init__(self, data: NDArray[np.float64], sample_rate: float, start_time: float = 0.0):
58 """Initialize time indexer.
60 Args:
61 data: Trace data array
62 sample_rate: Sample rate in Hz
63 start_time: Start time offset in seconds
64 """
65 self._data = data
66 self._sample_rate = sample_rate
67 self._start_time = start_time
69 @property
70 def duration(self) -> float:
71 """Get trace duration in seconds."""
72 return len(self._data) / self._sample_rate
74 @property
75 def time_axis(self) -> NDArray[np.float64]:
76 """Get time axis array."""
77 return np.arange(len(self._data)) / self._sample_rate + self._start_time
79 def _parse_time(self, time_str: str) -> float:
80 """Parse time string to seconds.
82 Args:
83 time_str: Time string (e.g., "100ms", "1.5us")
85 Returns:
86 Time in seconds
88 Raises:
89 ValueError: If time format is invalid or unit is unknown.
90 """
91 # Match number with optional unit
92 match = re.match(r"([-+]?\d*\.?\d+)\s*([a-zA-Z]*)", time_str.strip())
93 if not match:
94 raise ValueError(f"Invalid time format: {time_str}")
96 value = float(match.group(1))
97 unit = match.group(2).lower() or "s"
99 if unit not in self.TIME_UNITS:
100 raise ValueError(
101 f"Unknown time unit: {unit}. Valid units: {list(self.TIME_UNITS.keys())}"
102 )
104 return value * self.TIME_UNITS[unit]
106 def _time_to_index(self, time_seconds: float) -> int:
107 """Convert time to sample index.
109 Args:
110 time_seconds: Time in seconds
112 Returns:
113 Sample index
114 """
115 relative_time = time_seconds - self._start_time
116 index = int(relative_time * self._sample_rate)
117 return max(0, min(index, len(self._data) - 1))
119 def at(self, time: str | float) -> float:
120 """Get value at specific time.
122 Args:
123 time: Time as string or float (seconds)
125 Returns:
126 Value at that time
127 """
128 if isinstance(time, str):
129 time = self._parse_time(time)
130 index = self._time_to_index(time)
131 return float(self._data[index])
133 def slice(
134 self, start: str | float | None = None, end: str | float | None = None
135 ) -> NDArray[np.float64]:
136 """Slice data by time range.
138 Args:
139 start: Start time
140 end: End time
142 Returns:
143 Sliced data array
144 """
145 if start is not None:
146 if isinstance(start, str):
147 start = self._parse_time(start)
148 start_idx = self._time_to_index(start)
149 else:
150 start_idx = 0
152 if end is not None:
153 if isinstance(end, str):
154 end = self._parse_time(end)
155 end_idx = self._time_to_index(end)
156 else:
157 end_idx = len(self._data)
159 return self._data[start_idx:end_idx]
161 def __getitem__(self, key: slice | str | float) -> NDArray[np.float64] | float: # type: ignore[valid-type]
162 """Enable bracket notation for time-based indexing.
164 Args:
165 key: Slice with time strings, or single time
167 Returns:
168 Sliced data or single value
169 """
170 if isinstance(key, slice):
171 return self.slice(key.start, key.stop)
172 else:
173 return self.at(key)
176# =============================================================================
177# =============================================================================
180@dataclass
181class Unit:
182 """Unit definition with conversion factor.
184 Attributes:
185 name: Unit name
186 symbol: Unit symbol
187 factor: Conversion factor to base unit
188 base_unit: Base unit name
189 """
191 name: str
192 symbol: str
193 factor: float
194 base_unit: str
197class UnitConverter:
198 """Automatic unit conversion for measurements.
200 Supports common electrical and signal analysis units with
201 automatic prefix handling (mV, uV, MHz, etc.).
203 Example:
204 >>> converter = UnitConverter()
205 >>> converter.convert(1000, "mV", "V")
206 1.0
207 >>> converter.auto_scale(0.000001, "V")
208 (1.0, "uV")
210 References:
211 API-018: Automatic Unit Conversion
212 """
214 # SI prefixes
215 SI_PREFIXES = { # noqa: RUF012
216 "P": 1e15, # peta
217 "T": 1e12, # tera
218 "G": 1e9, # giga
219 "M": 1e6, # mega
220 "k": 1e3, # kilo
221 "": 1.0, # base
222 "m": 1e-3, # milli
223 "u": 1e-6, # micro
224 "n": 1e-9, # nano
225 "p": 1e-12, # pico
226 "f": 1e-15, # femto
227 }
229 # Base units
230 BASE_UNITS = { # noqa: RUF012
231 "V": "voltage",
232 "A": "current",
233 "W": "power",
234 "Hz": "frequency",
235 "s": "time",
236 "F": "capacitance",
237 "H": "inductance",
238 "Ohm": "resistance",
239 "dB": "decibel",
240 "dBm": "power_dbm",
241 "dBV": "voltage_dbv",
242 }
244 def __init__(self) -> None:
245 """Initialize converter."""
246 self._custom_units: dict[str, Unit] = {}
248 def _parse_unit(self, unit_str: str) -> tuple[float, str]:
249 """Parse unit string into prefix multiplier and base unit.
251 Args:
252 unit_str: Unit string (e.g., "mV", "MHz")
254 Returns:
255 Tuple of (multiplier, base_unit)
256 """
257 # Check for dB-based units first
258 for db_unit in ("dBm", "dBV", "dB"):
259 if unit_str.endswith(db_unit):
260 prefix = unit_str[: -len(db_unit)]
261 multiplier = self.SI_PREFIXES.get(prefix, 1.0)
262 return multiplier, db_unit
264 # Check for other base units
265 for base in sorted(self.BASE_UNITS.keys(), key=len, reverse=True):
266 if unit_str.endswith(base):
267 prefix = unit_str[: -len(base)]
268 multiplier = self.SI_PREFIXES.get(prefix, 1.0)
269 return multiplier, base
271 # No recognized base unit
272 return 1.0, unit_str
274 def convert(self, value: float, from_unit: str, to_unit: str) -> float:
275 """Convert value between units.
277 Args:
278 value: Value to convert
279 from_unit: Source unit
280 to_unit: Target unit
282 Returns:
283 Converted value
285 Raises:
286 ValueError: If units are incompatible
287 """
288 from_mult, from_base = self._parse_unit(from_unit)
289 to_mult, to_base = self._parse_unit(to_unit)
291 # Check compatibility
292 if from_base != to_base:
293 # Special handling for dB conversions
294 if from_base == "dBm" and to_base == "W":
295 return 10 ** ((value * from_mult - 30) / 10) / to_mult
296 elif from_base == "W" and to_base == "dBm":
297 return (10 * np.log10(value * from_mult) + 30) / to_mult # type: ignore[no-any-return]
298 elif from_base == "dBV" and to_base == "V":
299 return 10 ** ((value * from_mult) / 20) / to_mult
300 elif from_base == "V" and to_base == "dBV":
301 return 20 * np.log10(value * from_mult) / to_mult # type: ignore[no-any-return]
302 else:
303 raise ValueError(f"Cannot convert between {from_base} and {to_base}")
305 # Simple conversion
306 return value * from_mult / to_mult
308 def auto_scale(self, value: float, base_unit: str) -> tuple[float, str]:
309 """Automatically scale value to appropriate prefix.
311 Args:
312 value: Value in base units
313 base_unit: Base unit string
315 Returns:
316 Tuple of (scaled_value, unit_string)
318 Example:
319 >>> converter.auto_scale(0.000001, "V")
320 (1.0, "uV")
321 """
322 abs_value = abs(value) if value != 0 else 1
324 # Find appropriate prefix
325 prefixes_ordered = [
326 ("P", 1e15),
327 ("T", 1e12),
328 ("G", 1e9),
329 ("M", 1e6),
330 ("k", 1e3),
331 ("", 1.0),
332 ("m", 1e-3),
333 ("u", 1e-6),
334 ("n", 1e-9),
335 ("p", 1e-12),
336 ("f", 1e-15),
337 ]
339 for prefix, factor in prefixes_ordered: 339 ↛ 345line 339 didn't jump to line 345 because the loop on line 339 didn't complete
340 scaled = abs_value / factor
341 if 1.0 <= scaled < 1000.0 or prefix == "f":
342 return value / factor, f"{prefix}{base_unit}"
344 # Default to base unit
345 return value, base_unit
347 def format_value(self, value: float, base_unit: str, precision: int = 3) -> str:
348 """Format value with automatic scaling.
350 Args:
351 value: Value in base units
352 base_unit: Base unit string
353 precision: Decimal precision
355 Returns:
356 Formatted string
357 """
358 scaled, unit = self.auto_scale(value, base_unit)
359 return f"{scaled:.{precision}g} {unit}"
362def convert_units(value: float, from_unit: str, to_unit: str) -> float:
363 """Convert value between units.
365 Convenience function for unit conversion.
367 Args:
368 value: Value to convert
369 from_unit: Source unit
370 to_unit: Target unit
372 Returns:
373 Converted value
375 Example:
376 >>> convert_units(1000, "mV", "V")
377 1.0
378 >>> convert_units(1, "MHz", "Hz")
379 1000000.0
381 References:
382 API-018: Automatic Unit Conversion
383 """
384 return UnitConverter().convert(value, from_unit, to_unit)
387# =============================================================================
388# =============================================================================
391class PipeableFunction:
392 """Wrapper for making functions pipeable with >> operator.
394 Example:
395 >>> @make_pipeable
396 >>> def lowpass(data, cutoff):
397 ... return filtered_data
398 >>> result = data >> lowpass(cutoff=1e6) >> normalize()
400 References:
401 API-015: Pythonic Operators
402 """
404 def __init__(self, func: Callable, *args: Any, **kwargs: Any): # type: ignore[type-arg]
405 """Initialize pipeable function.
407 Args:
408 func: Function to wrap
409 *args: Positional arguments
410 **kwargs: Keyword arguments
411 """
412 self._func = func
413 self._args = args
414 self._kwargs = kwargs
416 def __call__(self, data: Any) -> Any:
417 """Call function with data as first argument.
419 Args:
420 data: Input data
422 Returns:
423 Function result
424 """
425 return self._func(data, *self._args, **self._kwargs)
427 def __rrshift__(self, other: Any) -> Any:
428 """Enable data >> func() syntax.
430 Args:
431 other: Left operand (data)
433 Returns:
434 Function result
435 """
436 return self(other)
439def make_pipeable(func: Callable) -> Callable: # type: ignore[type-arg]
440 """Decorator to make function pipeable with >> operator.
442 Args:
443 func: Function to wrap
445 Returns:
446 Wrapper that returns PipeableFunction
448 Example:
449 >>> @make_pipeable
450 >>> def scale(data, factor):
451 ... return data * factor
452 >>> result = data >> scale(factor=2)
454 References:
455 API-015: Pythonic Operators
456 """
458 def wrapper(*args: Any, **kwargs: Any) -> PipeableFunction:
459 return PipeableFunction(func, *args, **kwargs)
461 wrapper.__name__ = func.__name__
462 wrapper.__doc__ = func.__doc__
463 return wrapper
466# Create pipeable versions of common operations
467@make_pipeable
468def scale(data: NDArray[np.float64], factor: float) -> NDArray[np.float64]:
469 """Scale data by factor."""
470 return data * factor
473@make_pipeable
474def offset(data: NDArray[np.float64], value: float) -> NDArray[np.float64]:
475 """Add offset to data."""
476 return data + value
479@make_pipeable
480def clip_values(data: NDArray[np.float64], low: float, high: float) -> NDArray[np.float64]:
481 """Clip data to range."""
482 return np.clip(data, low, high)
485@make_pipeable
486def normalize_data(data: NDArray[np.float64], method: str = "minmax") -> NDArray[np.float64]:
487 """Normalize data."""
488 if method == "minmax":
489 dmin, dmax = data.min(), data.max()
490 if dmax - dmin > 0:
491 result: NDArray[np.float64] = (data - dmin) / (dmax - dmin)
492 return result
493 elif method == "zscore":
494 std = data.std()
495 if std > 0:
496 result_z: NDArray[np.float64] = (data - data.mean()) / std
497 return result_z
498 return data