Coverage for src / tracekit / api / operators.py: 99%

139 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Pythonic operators and utilities for signal analysis. 

2 

3This module provides Pythonic operators, time-based indexing, 

4automatic unit conversion, and convenience utilities. 

5""" 

6 

7from __future__ import annotations 

8 

9import re 

10from dataclasses import dataclass 

11from typing import TYPE_CHECKING, Any 

12 

13import numpy as np 

14 

15if TYPE_CHECKING: 

16 from collections.abc import Callable 

17 

18 from numpy.typing import NDArray 

19 

20__all__ = [ 

21 "TimeIndex", 

22 "UnitConverter", 

23 "convert_units", 

24 "make_pipeable", 

25] 

26 

27 

28# ============================================================================= 

29# ============================================================================= 

30 

31 

32class TimeIndex: 

33 """Time-based indexing for trace data. 

34 

35 Allows slicing trace data using time values instead of sample indices. 

36 

37 Example: 

38 >>> ti = TimeIndex(data, sample_rate=1e9) 

39 >>> # Get first 1 millisecond 

40 >>> segment = ti["0ms":"1ms"] 

41 >>> # Get from 100us to 200us 

42 >>> segment = ti["100us":"200us"] 

43 

44 References: 

45 API-016: Time-Based Indexing 

46 """ 

47 

48 # Time unit multipliers to seconds 

49 TIME_UNITS = { # noqa: RUF012 

50 "s": 1.0, 

51 "ms": 1e-3, 

52 "us": 1e-6, 

53 "ns": 1e-9, 

54 "ps": 1e-12, 

55 } 

56 

57 def __init__(self, data: NDArray[np.float64], sample_rate: float, start_time: float = 0.0): 

58 """Initialize time indexer. 

59 

60 Args: 

61 data: Trace data array 

62 sample_rate: Sample rate in Hz 

63 start_time: Start time offset in seconds 

64 """ 

65 self._data = data 

66 self._sample_rate = sample_rate 

67 self._start_time = start_time 

68 

69 @property 

70 def duration(self) -> float: 

71 """Get trace duration in seconds.""" 

72 return len(self._data) / self._sample_rate 

73 

74 @property 

75 def time_axis(self) -> NDArray[np.float64]: 

76 """Get time axis array.""" 

77 return np.arange(len(self._data)) / self._sample_rate + self._start_time 

78 

79 def _parse_time(self, time_str: str) -> float: 

80 """Parse time string to seconds. 

81 

82 Args: 

83 time_str: Time string (e.g., "100ms", "1.5us") 

84 

85 Returns: 

86 Time in seconds 

87 

88 Raises: 

89 ValueError: If time format is invalid or unit is unknown. 

90 """ 

91 # Match number with optional unit 

92 match = re.match(r"([-+]?\d*\.?\d+)\s*([a-zA-Z]*)", time_str.strip()) 

93 if not match: 

94 raise ValueError(f"Invalid time format: {time_str}") 

95 

96 value = float(match.group(1)) 

97 unit = match.group(2).lower() or "s" 

98 

99 if unit not in self.TIME_UNITS: 

100 raise ValueError( 

101 f"Unknown time unit: {unit}. Valid units: {list(self.TIME_UNITS.keys())}" 

102 ) 

103 

104 return value * self.TIME_UNITS[unit] 

105 

106 def _time_to_index(self, time_seconds: float) -> int: 

107 """Convert time to sample index. 

108 

109 Args: 

110 time_seconds: Time in seconds 

111 

112 Returns: 

113 Sample index 

114 """ 

115 relative_time = time_seconds - self._start_time 

116 index = int(relative_time * self._sample_rate) 

117 return max(0, min(index, len(self._data) - 1)) 

118 

119 def at(self, time: str | float) -> float: 

120 """Get value at specific time. 

121 

122 Args: 

123 time: Time as string or float (seconds) 

124 

125 Returns: 

126 Value at that time 

127 """ 

128 if isinstance(time, str): 

129 time = self._parse_time(time) 

130 index = self._time_to_index(time) 

131 return float(self._data[index]) 

132 

133 def slice( 

134 self, start: str | float | None = None, end: str | float | None = None 

135 ) -> NDArray[np.float64]: 

136 """Slice data by time range. 

137 

138 Args: 

139 start: Start time 

140 end: End time 

141 

142 Returns: 

143 Sliced data array 

144 """ 

145 if start is not None: 

146 if isinstance(start, str): 

147 start = self._parse_time(start) 

148 start_idx = self._time_to_index(start) 

149 else: 

150 start_idx = 0 

151 

152 if end is not None: 

153 if isinstance(end, str): 

154 end = self._parse_time(end) 

155 end_idx = self._time_to_index(end) 

156 else: 

157 end_idx = len(self._data) 

158 

159 return self._data[start_idx:end_idx] 

160 

161 def __getitem__(self, key: slice | str | float) -> NDArray[np.float64] | float: # type: ignore[valid-type] 

162 """Enable bracket notation for time-based indexing. 

163 

164 Args: 

165 key: Slice with time strings, or single time 

166 

167 Returns: 

168 Sliced data or single value 

169 """ 

170 if isinstance(key, slice): 

171 return self.slice(key.start, key.stop) 

172 else: 

173 return self.at(key) 

174 

175 

176# ============================================================================= 

177# ============================================================================= 

178 

179 

180@dataclass 

181class Unit: 

182 """Unit definition with conversion factor. 

183 

184 Attributes: 

185 name: Unit name 

186 symbol: Unit symbol 

187 factor: Conversion factor to base unit 

188 base_unit: Base unit name 

189 """ 

190 

191 name: str 

192 symbol: str 

193 factor: float 

194 base_unit: str 

195 

196 

197class UnitConverter: 

198 """Automatic unit conversion for measurements. 

199 

200 Supports common electrical and signal analysis units with 

201 automatic prefix handling (mV, uV, MHz, etc.). 

202 

203 Example: 

204 >>> converter = UnitConverter() 

205 >>> converter.convert(1000, "mV", "V") 

206 1.0 

207 >>> converter.auto_scale(0.000001, "V") 

208 (1.0, "uV") 

209 

210 References: 

211 API-018: Automatic Unit Conversion 

212 """ 

213 

214 # SI prefixes 

215 SI_PREFIXES = { # noqa: RUF012 

216 "P": 1e15, # peta 

217 "T": 1e12, # tera 

218 "G": 1e9, # giga 

219 "M": 1e6, # mega 

220 "k": 1e3, # kilo 

221 "": 1.0, # base 

222 "m": 1e-3, # milli 

223 "u": 1e-6, # micro 

224 "n": 1e-9, # nano 

225 "p": 1e-12, # pico 

226 "f": 1e-15, # femto 

227 } 

228 

229 # Base units 

230 BASE_UNITS = { # noqa: RUF012 

231 "V": "voltage", 

232 "A": "current", 

233 "W": "power", 

234 "Hz": "frequency", 

235 "s": "time", 

236 "F": "capacitance", 

237 "H": "inductance", 

238 "Ohm": "resistance", 

239 "dB": "decibel", 

240 "dBm": "power_dbm", 

241 "dBV": "voltage_dbv", 

242 } 

243 

244 def __init__(self) -> None: 

245 """Initialize converter.""" 

246 self._custom_units: dict[str, Unit] = {} 

247 

248 def _parse_unit(self, unit_str: str) -> tuple[float, str]: 

249 """Parse unit string into prefix multiplier and base unit. 

250 

251 Args: 

252 unit_str: Unit string (e.g., "mV", "MHz") 

253 

254 Returns: 

255 Tuple of (multiplier, base_unit) 

256 """ 

257 # Check for dB-based units first 

258 for db_unit in ("dBm", "dBV", "dB"): 

259 if unit_str.endswith(db_unit): 

260 prefix = unit_str[: -len(db_unit)] 

261 multiplier = self.SI_PREFIXES.get(prefix, 1.0) 

262 return multiplier, db_unit 

263 

264 # Check for other base units 

265 for base in sorted(self.BASE_UNITS.keys(), key=len, reverse=True): 

266 if unit_str.endswith(base): 

267 prefix = unit_str[: -len(base)] 

268 multiplier = self.SI_PREFIXES.get(prefix, 1.0) 

269 return multiplier, base 

270 

271 # No recognized base unit 

272 return 1.0, unit_str 

273 

274 def convert(self, value: float, from_unit: str, to_unit: str) -> float: 

275 """Convert value between units. 

276 

277 Args: 

278 value: Value to convert 

279 from_unit: Source unit 

280 to_unit: Target unit 

281 

282 Returns: 

283 Converted value 

284 

285 Raises: 

286 ValueError: If units are incompatible 

287 """ 

288 from_mult, from_base = self._parse_unit(from_unit) 

289 to_mult, to_base = self._parse_unit(to_unit) 

290 

291 # Check compatibility 

292 if from_base != to_base: 

293 # Special handling for dB conversions 

294 if from_base == "dBm" and to_base == "W": 

295 return 10 ** ((value * from_mult - 30) / 10) / to_mult 

296 elif from_base == "W" and to_base == "dBm": 

297 return (10 * np.log10(value * from_mult) + 30) / to_mult # type: ignore[no-any-return] 

298 elif from_base == "dBV" and to_base == "V": 

299 return 10 ** ((value * from_mult) / 20) / to_mult 

300 elif from_base == "V" and to_base == "dBV": 

301 return 20 * np.log10(value * from_mult) / to_mult # type: ignore[no-any-return] 

302 else: 

303 raise ValueError(f"Cannot convert between {from_base} and {to_base}") 

304 

305 # Simple conversion 

306 return value * from_mult / to_mult 

307 

308 def auto_scale(self, value: float, base_unit: str) -> tuple[float, str]: 

309 """Automatically scale value to appropriate prefix. 

310 

311 Args: 

312 value: Value in base units 

313 base_unit: Base unit string 

314 

315 Returns: 

316 Tuple of (scaled_value, unit_string) 

317 

318 Example: 

319 >>> converter.auto_scale(0.000001, "V") 

320 (1.0, "uV") 

321 """ 

322 abs_value = abs(value) if value != 0 else 1 

323 

324 # Find appropriate prefix 

325 prefixes_ordered = [ 

326 ("P", 1e15), 

327 ("T", 1e12), 

328 ("G", 1e9), 

329 ("M", 1e6), 

330 ("k", 1e3), 

331 ("", 1.0), 

332 ("m", 1e-3), 

333 ("u", 1e-6), 

334 ("n", 1e-9), 

335 ("p", 1e-12), 

336 ("f", 1e-15), 

337 ] 

338 

339 for prefix, factor in prefixes_ordered: 339 ↛ 345line 339 didn't jump to line 345 because the loop on line 339 didn't complete

340 scaled = abs_value / factor 

341 if 1.0 <= scaled < 1000.0 or prefix == "f": 

342 return value / factor, f"{prefix}{base_unit}" 

343 

344 # Default to base unit 

345 return value, base_unit 

346 

347 def format_value(self, value: float, base_unit: str, precision: int = 3) -> str: 

348 """Format value with automatic scaling. 

349 

350 Args: 

351 value: Value in base units 

352 base_unit: Base unit string 

353 precision: Decimal precision 

354 

355 Returns: 

356 Formatted string 

357 """ 

358 scaled, unit = self.auto_scale(value, base_unit) 

359 return f"{scaled:.{precision}g} {unit}" 

360 

361 

362def convert_units(value: float, from_unit: str, to_unit: str) -> float: 

363 """Convert value between units. 

364 

365 Convenience function for unit conversion. 

366 

367 Args: 

368 value: Value to convert 

369 from_unit: Source unit 

370 to_unit: Target unit 

371 

372 Returns: 

373 Converted value 

374 

375 Example: 

376 >>> convert_units(1000, "mV", "V") 

377 1.0 

378 >>> convert_units(1, "MHz", "Hz") 

379 1000000.0 

380 

381 References: 

382 API-018: Automatic Unit Conversion 

383 """ 

384 return UnitConverter().convert(value, from_unit, to_unit) 

385 

386 

387# ============================================================================= 

388# ============================================================================= 

389 

390 

391class PipeableFunction: 

392 """Wrapper for making functions pipeable with >> operator. 

393 

394 Example: 

395 >>> @make_pipeable 

396 >>> def lowpass(data, cutoff): 

397 ... return filtered_data 

398 >>> result = data >> lowpass(cutoff=1e6) >> normalize() 

399 

400 References: 

401 API-015: Pythonic Operators 

402 """ 

403 

404 def __init__(self, func: Callable, *args: Any, **kwargs: Any): # type: ignore[type-arg] 

405 """Initialize pipeable function. 

406 

407 Args: 

408 func: Function to wrap 

409 *args: Positional arguments 

410 **kwargs: Keyword arguments 

411 """ 

412 self._func = func 

413 self._args = args 

414 self._kwargs = kwargs 

415 

416 def __call__(self, data: Any) -> Any: 

417 """Call function with data as first argument. 

418 

419 Args: 

420 data: Input data 

421 

422 Returns: 

423 Function result 

424 """ 

425 return self._func(data, *self._args, **self._kwargs) 

426 

427 def __rrshift__(self, other: Any) -> Any: 

428 """Enable data >> func() syntax. 

429 

430 Args: 

431 other: Left operand (data) 

432 

433 Returns: 

434 Function result 

435 """ 

436 return self(other) 

437 

438 

439def make_pipeable(func: Callable) -> Callable: # type: ignore[type-arg] 

440 """Decorator to make function pipeable with >> operator. 

441 

442 Args: 

443 func: Function to wrap 

444 

445 Returns: 

446 Wrapper that returns PipeableFunction 

447 

448 Example: 

449 >>> @make_pipeable 

450 >>> def scale(data, factor): 

451 ... return data * factor 

452 >>> result = data >> scale(factor=2) 

453 

454 References: 

455 API-015: Pythonic Operators 

456 """ 

457 

458 def wrapper(*args: Any, **kwargs: Any) -> PipeableFunction: 

459 return PipeableFunction(func, *args, **kwargs) 

460 

461 wrapper.__name__ = func.__name__ 

462 wrapper.__doc__ = func.__doc__ 

463 return wrapper 

464 

465 

466# Create pipeable versions of common operations 

467@make_pipeable 

468def scale(data: NDArray[np.float64], factor: float) -> NDArray[np.float64]: 

469 """Scale data by factor.""" 

470 return data * factor 

471 

472 

473@make_pipeable 

474def offset(data: NDArray[np.float64], value: float) -> NDArray[np.float64]: 

475 """Add offset to data.""" 

476 return data + value 

477 

478 

479@make_pipeable 

480def clip_values(data: NDArray[np.float64], low: float, high: float) -> NDArray[np.float64]: 

481 """Clip data to range.""" 

482 return np.clip(data, low, high) 

483 

484 

485@make_pipeable 

486def normalize_data(data: NDArray[np.float64], method: str = "minmax") -> NDArray[np.float64]: 

487 """Normalize data.""" 

488 if method == "minmax": 

489 dmin, dmax = data.min(), data.max() 

490 if dmax - dmin > 0: 

491 result: NDArray[np.float64] = (data - dmin) / (dmax - dmin) 

492 return result 

493 elif method == "zscore": 

494 std = data.std() 

495 if std > 0: 

496 result_z: NDArray[np.float64] = (data - data.mean()) / std 

497 return result_z 

498 return data