Coverage for src / tracekit / api / fluent.py: 100%

169 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Fluent interface for signal analysis. 

2 

3This module provides a fluent (method chaining) interface for 

4expressing signal analysis operations in a readable, intuitive way. 

5""" 

6 

7from __future__ import annotations 

8 

9from dataclasses import dataclass, field 

10from typing import TYPE_CHECKING, Any, TypeVar 

11 

12import numpy as np 

13 

14if TYPE_CHECKING: 

15 from collections.abc import Callable 

16 

17 from numpy.typing import NDArray 

18 

19T = TypeVar("T") 

20 

21__all__ = [ 

22 "FluentResult", 

23 "FluentTrace", 

24 "trace", 

25] 

26 

27 

28@dataclass 

29class FluentResult[T]: 

30 """Result container with fluent interface. 

31 

32 Provides method chaining for result processing. 

33 

34 Attributes: 

35 value: The wrapped value 

36 metadata: Associated metadata 

37 

38 Example: 

39 >>> result = FluentResult(42.5) 

40 >>> result.format("The value is {:.2f}").print() 

41 The value is 42.50 

42 

43 References: 

44 API-019: Fluent Interface 

45 """ 

46 

47 value: T 

48 metadata: dict[str, Any] = field(default_factory=dict) 

49 

50 def get(self) -> T: 

51 """Get the raw value. 

52 

53 Returns: 

54 The wrapped value 

55 """ 

56 return self.value 

57 

58 def map(self, func: Callable[[T], Any]) -> FluentResult: # type: ignore[type-arg] 

59 """Apply function to value. 

60 

61 Args: 

62 func: Function to apply 

63 

64 Returns: 

65 New FluentResult with mapped value 

66 """ 

67 return FluentResult(func(self.value), self.metadata.copy()) 

68 

69 def filter(self, predicate: Callable[[T], bool]) -> FluentResult | None: # type: ignore[type-arg] 

70 """Filter based on predicate. 

71 

72 Args: 

73 predicate: Filter function 

74 

75 Returns: 

76 Self if predicate passes, None otherwise 

77 """ 

78 if predicate(self.value): 

79 return self 

80 return None 

81 

82 def format(self, fmt: str) -> FluentResult[str]: 

83 """Format value as string. 

84 

85 Args: 

86 fmt: Format string 

87 

88 Returns: 

89 New FluentResult with formatted string 

90 """ 

91 return FluentResult(fmt.format(self.value), self.metadata.copy()) 

92 

93 def print(self, prefix: str = "") -> FluentResult[T]: 

94 """Print value and return self. 

95 

96 Args: 

97 prefix: Optional prefix 

98 

99 Returns: 

100 Self (for chaining) 

101 """ 

102 print(f"{prefix}{self.value}") 

103 return self 

104 

105 def with_metadata(self, **kwargs: Any) -> FluentResult[T]: 

106 """Add metadata. 

107 

108 Args: 

109 **kwargs: Metadata key-value pairs 

110 

111 Returns: 

112 Self (for chaining) 

113 """ 

114 self.metadata.update(kwargs) 

115 return self 

116 

117 def __repr__(self) -> str: 

118 return f"FluentResult({self.value!r})" 

119 

120 

121class FluentTrace: 

122 """Fluent interface wrapper for trace data. 

123 

124 Provides method chaining for signal processing operations. 

125 

126 Example: 

127 >>> result = (FluentTrace(data, sample_rate=1e9) 

128 ... .lowpass(cutoff=1e6) 

129 ... .normalize() 

130 ... .fft(nfft=8192) 

131 ... .magnitude() 

132 ... .get()) 

133 

134 References: 

135 API-019: Fluent Interface 

136 """ 

137 

138 def __init__(self, data: NDArray[np.float64], sample_rate: float = 1.0, **metadata: Any): 

139 """Initialize fluent trace. 

140 

141 Args: 

142 data: Trace data array 

143 sample_rate: Sample rate in Hz 

144 **metadata: Additional metadata 

145 """ 

146 self._data = data 

147 self._sample_rate = sample_rate 

148 self._metadata = metadata 

149 self._history: list[str] = [] 

150 

151 @property 

152 def data(self) -> NDArray[np.float64]: 

153 """Get current data.""" 

154 return self._data 

155 

156 @property 

157 def sample_rate(self) -> float: 

158 """Get sample rate.""" 

159 return self._sample_rate 

160 

161 def get(self) -> NDArray[np.float64]: 

162 """Get raw data array. 

163 

164 Returns: 

165 Data array 

166 """ 

167 return self._data 

168 

169 def copy(self) -> FluentTrace: 

170 """Create copy of trace. 

171 

172 Returns: 

173 New FluentTrace with copied data 

174 """ 

175 return FluentTrace(self._data.copy(), self._sample_rate, **self._metadata.copy()) 

176 

177 # ========================================================================= 

178 # Filtering Methods 

179 # ========================================================================= 

180 

181 def lowpass(self, cutoff: float, order: int = 4) -> FluentTrace: 

182 """Apply low-pass filter. 

183 

184 Args: 

185 cutoff: Cutoff frequency in Hz 

186 order: Filter order 

187 

188 Returns: 

189 Self (for chaining) 

190 """ 

191 from scipy import signal 

192 

193 nyq = self._sample_rate / 2 

194 normalized_cutoff = min(cutoff / nyq, 0.99) 

195 b, a = signal.butter(order, normalized_cutoff, btype="low") 

196 self._data = signal.filtfilt(b, a, self._data) 

197 self._history.append(f"lowpass(cutoff={cutoff})") 

198 return self 

199 

200 def highpass(self, cutoff: float, order: int = 4) -> FluentTrace: 

201 """Apply high-pass filter. 

202 

203 Args: 

204 cutoff: Cutoff frequency in Hz 

205 order: Filter order 

206 

207 Returns: 

208 Self (for chaining) 

209 """ 

210 from scipy import signal 

211 

212 nyq = self._sample_rate / 2 

213 normalized_cutoff = max(cutoff / nyq, 0.01) 

214 b, a = signal.butter(order, normalized_cutoff, btype="high") 

215 self._data = signal.filtfilt(b, a, self._data) 

216 self._history.append(f"highpass(cutoff={cutoff})") 

217 return self 

218 

219 def bandpass(self, low: float, high: float, order: int = 4) -> FluentTrace: 

220 """Apply band-pass filter. 

221 

222 Args: 

223 low: Low cutoff frequency 

224 high: High cutoff frequency 

225 order: Filter order 

226 

227 Returns: 

228 Self (for chaining) 

229 """ 

230 from scipy import signal 

231 

232 nyq = self._sample_rate / 2 

233 b, a = signal.butter(order, [low / nyq, high / nyq], btype="band") 

234 self._data = signal.filtfilt(b, a, self._data) 

235 self._history.append(f"bandpass(low={low}, high={high})") 

236 return self 

237 

238 def notch(self, freq: float, Q: float = 30.0) -> FluentTrace: 

239 """Apply notch filter. 

240 

241 Args: 

242 freq: Notch frequency 

243 Q: Quality factor 

244 

245 Returns: 

246 Self (for chaining) 

247 """ 

248 from scipy import signal 

249 

250 nyq = self._sample_rate / 2 

251 b, a = signal.iirnotch(freq / nyq, Q) 

252 self._data = signal.filtfilt(b, a, self._data) 

253 self._history.append(f"notch(freq={freq})") 

254 return self 

255 

256 # ========================================================================= 

257 # Transform Methods 

258 # ========================================================================= 

259 

260 def normalize(self, method: str = "minmax") -> FluentTrace: 

261 """Normalize data. 

262 

263 Args: 

264 method: Normalization method (minmax, zscore, peak) 

265 

266 Returns: 

267 Self (for chaining) 

268 """ 

269 if method == "minmax": 

270 data_min = np.min(self._data) 

271 data_max = np.max(self._data) 

272 if data_max - data_min > 0: 

273 self._data = (self._data - data_min) / (data_max - data_min) 

274 elif method == "zscore": 

275 std = np.std(self._data) 

276 if std > 0: 

277 self._data = (self._data - np.mean(self._data)) / std 

278 elif method == "peak": 

279 peak = np.max(np.abs(self._data)) 

280 if peak > 0: 

281 self._data = self._data / peak 

282 

283 self._history.append(f"normalize(method={method})") 

284 return self 

285 

286 def scale(self, factor: float) -> FluentTrace: 

287 """Scale data by factor. 

288 

289 Args: 

290 factor: Scale factor 

291 

292 Returns: 

293 Self (for chaining) 

294 """ 

295 self._data = self._data * factor 

296 self._history.append(f"scale(factor={factor})") 

297 return self 

298 

299 def offset(self, value: float) -> FluentTrace: 

300 """Add offset to data. 

301 

302 Args: 

303 value: Offset value 

304 

305 Returns: 

306 Self (for chaining) 

307 """ 

308 self._data = self._data + value 

309 self._history.append(f"offset(value={value})") 

310 return self 

311 

312 def clip(self, low: float, high: float) -> FluentTrace: 

313 """Clip data to range. 

314 

315 Args: 

316 low: Low limit 

317 high: High limit 

318 

319 Returns: 

320 Self (for chaining) 

321 """ 

322 self._data = np.clip(self._data, low, high) 

323 self._history.append(f"clip(low={low}, high={high})") 

324 return self 

325 

326 def abs(self) -> FluentTrace: 

327 """Take absolute value. 

328 

329 Returns: 

330 Self (for chaining) 

331 """ 

332 self._data = np.abs(self._data) 

333 self._history.append("abs()") 

334 return self 

335 

336 def diff(self) -> FluentTrace: 

337 """Differentiate data. 

338 

339 Returns: 

340 Self (for chaining) 

341 """ 

342 self._data = np.diff(self._data, prepend=self._data[0]) 

343 self._history.append("diff()") 

344 return self 

345 

346 def integrate(self) -> FluentTrace: 

347 """Integrate data. 

348 

349 Returns: 

350 Self (for chaining) 

351 """ 

352 dt = 1.0 / self._sample_rate 

353 self._data = np.cumsum(self._data) * dt 

354 self._history.append("integrate()") 

355 return self 

356 

357 # ========================================================================= 

358 # Resampling Methods 

359 # ========================================================================= 

360 

361 def resample(self, new_length: int) -> FluentTrace: 

362 """Resample to new length. 

363 

364 Args: 

365 new_length: New number of samples 

366 

367 Returns: 

368 Self (for chaining) 

369 """ 

370 from scipy import signal 

371 

372 self._data = signal.resample(self._data, new_length) 

373 self._sample_rate = self._sample_rate * new_length / len(self._data) 

374 self._history.append(f"resample(new_length={new_length})") 

375 return self 

376 

377 def decimate(self, factor: int) -> FluentTrace: 

378 """Decimate by factor. 

379 

380 Args: 

381 factor: Decimation factor 

382 

383 Returns: 

384 Self (for chaining) 

385 """ 

386 from scipy import signal 

387 

388 self._data = signal.decimate(self._data, factor) 

389 self._sample_rate = self._sample_rate / factor 

390 self._history.append(f"decimate(factor={factor})") 

391 return self 

392 

393 def slice(self, start: int = 0, end: int | None = None) -> FluentTrace: 

394 """Slice data. 

395 

396 Args: 

397 start: Start index 

398 end: End index (None for end of data) 

399 

400 Returns: 

401 Self (for chaining) 

402 """ 

403 self._data = self._data[start:end] 

404 self._history.append(f"slice(start={start}, end={end})") 

405 return self 

406 

407 # ========================================================================= 

408 # Spectral Methods 

409 # ========================================================================= 

410 

411 def fft(self, nfft: int | None = None) -> FluentTrace: 

412 """Compute FFT. 

413 

414 Args: 

415 nfft: FFT size 

416 

417 Returns: 

418 Self (for chaining, data is now complex) 

419 """ 

420 self._data = np.fft.fft(self._data, n=nfft) # type: ignore[assignment] 

421 self._history.append(f"fft(nfft={nfft})") 

422 return self 

423 

424 def magnitude(self) -> FluentTrace: 

425 """Compute magnitude of complex data. 

426 

427 Returns: 

428 Self (for chaining) 

429 """ 

430 self._data = np.abs(self._data) 

431 self._history.append("magnitude()") 

432 return self 

433 

434 def phase(self) -> FluentTrace: 

435 """Compute phase of complex data. 

436 

437 Returns: 

438 Self (for chaining) 

439 """ 

440 self._data = np.angle(self._data) 

441 self._history.append("phase()") 

442 return self 

443 

444 def psd(self, nperseg: int = 256) -> FluentResult[tuple]: # type: ignore[type-arg] 

445 """Compute power spectral density. 

446 

447 Args: 

448 nperseg: Segment size 

449 

450 Returns: 

451 FluentResult with (frequencies, psd) tuple 

452 """ 

453 from scipy import signal 

454 

455 f, psd = signal.welch(self._data, self._sample_rate, nperseg=nperseg) 

456 return FluentResult((f, psd)) 

457 

458 # ========================================================================= 

459 # Measurement Methods 

460 # ========================================================================= 

461 

462 def mean(self) -> FluentResult[float]: 

463 """Compute mean. 

464 

465 Returns: 

466 FluentResult with mean value 

467 """ 

468 return FluentResult(float(np.mean(self._data))) 

469 

470 def std(self) -> FluentResult[float]: 

471 """Compute standard deviation. 

472 

473 Returns: 

474 FluentResult with std value 

475 """ 

476 return FluentResult(float(np.std(self._data))) 

477 

478 def rms(self) -> FluentResult[float]: 

479 """Compute RMS value. 

480 

481 Returns: 

482 FluentResult with RMS value 

483 """ 

484 return FluentResult(float(np.sqrt(np.mean(self._data**2)))) 

485 

486 def peak_to_peak(self) -> FluentResult[float]: 

487 """Compute peak-to-peak value. 

488 

489 Returns: 

490 FluentResult with peak-to-peak value 

491 """ 

492 return FluentResult(float(np.ptp(self._data))) 

493 

494 def min(self) -> FluentResult[float]: 

495 """Get minimum value. 

496 

497 Returns: 

498 FluentResult with min value 

499 """ 

500 return FluentResult(float(np.min(self._data))) 

501 

502 def max(self) -> FluentResult[float]: 

503 """Get maximum value. 

504 

505 Returns: 

506 FluentResult with max value 

507 """ 

508 return FluentResult(float(np.max(self._data))) 

509 

510 # ========================================================================= 

511 # Utility Methods 

512 # ========================================================================= 

513 

514 def print_history(self) -> FluentTrace: 

515 """Print operation history. 

516 

517 Returns: 

518 Self (for chaining) 

519 """ 

520 print("Operation history:") 

521 for op in self._history: 

522 print(f" - {op}") 

523 return self 

524 

525 def with_metadata(self, **kwargs: Any) -> FluentTrace: 

526 """Add metadata. 

527 

528 Args: 

529 **kwargs: Metadata key-value pairs 

530 

531 Returns: 

532 Self (for chaining) 

533 """ 

534 self._metadata.update(kwargs) 

535 return self 

536 

537 def __len__(self) -> int: 

538 return len(self._data) 

539 

540 def __repr__(self) -> str: 

541 return ( 

542 f"FluentTrace(samples={len(self._data)}, " 

543 f"sample_rate={self._sample_rate}, " 

544 f"operations={len(self._history)})" 

545 ) 

546 

547 

548def trace(data: NDArray[np.float64], sample_rate: float = 1.0, **metadata: Any) -> FluentTrace: 

549 """Create fluent trace wrapper. 

550 

551 Factory function for creating FluentTrace instances. 

552 

553 Args: 

554 data: Trace data array 

555 sample_rate: Sample rate in Hz 

556 **metadata: Additional metadata 

557 

558 Returns: 

559 FluentTrace instance 

560 

561 Example: 

562 >>> result = (trace(data, sample_rate=1e9) 

563 ... .lowpass(cutoff=1e6) 

564 ... .normalize() 

565 ... .mean() 

566 ... .get()) 

567 

568 References: 

569 API-019: Fluent Interface 

570 """ 

571 return FluentTrace(data, sample_rate, **metadata)