Coverage for src / tracekit / streaming / realtime.py: 96%

208 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Real-time streaming APIs for live data acquisition and processing. 

2 

3This module provides interfaces for real-time data capture, buffering, and 

4on-the-fly analysis of streaming waveforms. Supports pluggable input sources, 

5configurable sample buffers, and streaming statistics. 

6""" 

7 

8from __future__ import annotations 

9 

10import threading 

11import time 

12from collections import deque 

13from collections.abc import Callable 

14from dataclasses import dataclass 

15from typing import TYPE_CHECKING, Any 

16 

17import numpy as np 

18 

19from ..core.types import TraceMetadata, WaveformTrace 

20 

21if TYPE_CHECKING: 

22 from collections.abc import Generator 

23 

24 from numpy.typing import NDArray 

25 

26 

27@dataclass 

28class RealtimeConfig: 

29 """Configuration for real-time streaming.""" 

30 

31 sample_rate: float 

32 """Sample rate in Hz.""" 

33 buffer_size: int = 10000 

34 """Size of the circular buffer in samples.""" 

35 chunk_size: int = 1000 

36 """Number of samples to yield per chunk.""" 

37 timeout: float = 10.0 

38 """Timeout in seconds for buffer operations.""" 

39 window_size: int | None = None 

40 """Window size for rolling statistics. If None, uses buffer_size.""" 

41 enable_validation: bool = True 

42 """Enable input validation.""" 

43 

44 def validate(self) -> None: 

45 """Validate configuration parameters. 

46 

47 Raises: 

48 ValueError: If configuration is invalid. 

49 """ 

50 if self.sample_rate <= 0: 

51 raise ValueError("sample_rate must be positive") 

52 

53 if self.buffer_size <= 0: 

54 raise ValueError("buffer_size must be positive") 

55 

56 if self.chunk_size <= 0: 

57 raise ValueError("chunk_size must be positive") 

58 

59 if self.chunk_size > self.buffer_size: 

60 raise ValueError("chunk_size cannot exceed buffer_size") 

61 

62 if self.timeout <= 0: 

63 raise ValueError("timeout must be positive") 

64 

65 if self.window_size is not None and self.window_size <= 0: 

66 raise ValueError("window_size must be positive") 

67 

68 

69class RealtimeBuffer: 

70 """Thread-safe circular buffer for real-time streaming. 

71 

72 Maintains a fixed-size buffer of the most recent samples with 

73 thread-safe read/write operations and overflow handling. 

74 

75 Example: 

76 >>> buffer = RealtimeBuffer(config) 

77 >>> buffer.write(samples) 

78 >>> chunk = buffer.read(chunk_size) 

79 """ 

80 

81 def __init__(self, config: RealtimeConfig) -> None: 

82 """Initialize real-time buffer. 

83 

84 Args: 

85 config: Realtime configuration. 

86 """ 

87 config.validate() 

88 self.config = config 

89 

90 self._buffer: deque[float] = deque(maxlen=config.buffer_size) 

91 self._lock = threading.RLock() 

92 self._not_empty = threading.Condition(self._lock) 

93 self._total_samples = 0 

94 self._overflow_count = 0 

95 

96 def write(self, data: NDArray[np.floating[Any]]) -> int: 

97 """Write samples to buffer. 

98 

99 Args: 

100 data: Array of samples to write. 

101 

102 Returns: 

103 Number of samples written. 

104 

105 Raises: 

106 TypeError: If data is not numeric array. 

107 """ 

108 if not isinstance(data, np.ndarray): 

109 raise TypeError("data must be numpy array") 

110 

111 if data.dtype.kind not in "fc": # float or complex 

112 raise TypeError("data must be float or complex array") 

113 

114 with self._not_empty: 

115 initial_len = len(self._buffer) 

116 

117 # Write samples to buffer 

118 for sample in data.flat: 

119 self._buffer.append(float(sample)) 

120 

121 # Track overflow 

122 if len(self._buffer) == self.config.buffer_size: 

123 written = self.config.buffer_size - initial_len 

124 if written < len(data): 

125 self._overflow_count += len(data) - written 

126 else: 

127 written = len(data) 

128 

129 self._total_samples += len(data) 

130 self._not_empty.notify_all() 

131 

132 return written 

133 

134 def read(self, n_samples: int, timeout: float | None = None) -> NDArray[np.float64]: 

135 """Read samples from buffer (blocking). 

136 

137 Args: 

138 n_samples: Number of samples to read. 

139 timeout: Timeout in seconds (None = use config timeout). 

140 

141 Returns: 

142 Array of samples, may be shorter if timeout occurs. 

143 

144 Raises: 

145 ValueError: If n_samples is invalid. 

146 TimeoutError: If timeout occurs without sufficient data. 

147 """ 

148 if n_samples <= 0: 

149 raise ValueError("n_samples must be positive") 

150 

151 if timeout is None: 

152 timeout = self.config.timeout 

153 

154 with self._not_empty: 

155 # Wait for data with timeout 

156 if not self._wait_for_data(n_samples, timeout): 

157 if len(self._buffer) == 0: 

158 raise TimeoutError("No data available") 

159 

160 # Read available samples 

161 n_read = min(n_samples, len(self._buffer)) 

162 data = np.array(list(self._buffer)[:n_read], dtype=np.float64) 

163 return data 

164 

165 def _wait_for_data(self, n_samples: int, timeout: float) -> bool: 

166 """Wait for minimum data in buffer. 

167 

168 Args: 

169 n_samples: Minimum samples to wait for. 

170 timeout: Timeout in seconds. 

171 

172 Returns: 

173 True if sufficient data available, False on timeout. 

174 """ 

175 deadline = time.time() + timeout 

176 while len(self._buffer) < n_samples: 

177 remaining = deadline - time.time() 

178 if remaining <= 0: 

179 return False 

180 if not self._not_empty.wait(timeout=min(remaining, 0.1)): 

181 continue 

182 return True 

183 

184 def get_available(self) -> int: 

185 """Get number of available samples in buffer. 

186 

187 Returns: 

188 Number of samples currently in buffer. 

189 """ 

190 with self._lock: 

191 return len(self._buffer) 

192 

193 def get_stats(self) -> dict[str, int]: 

194 """Get buffer statistics. 

195 

196 Returns: 

197 Dictionary with buffer stats (total_samples, overflow_count, available). 

198 """ 

199 with self._lock: 

200 return { 

201 "total_samples": self._total_samples, 

202 "overflow_count": self._overflow_count, 

203 "available": len(self._buffer), 

204 } 

205 

206 def clear(self) -> None: 

207 """Clear buffer contents. 

208 

209 Example: 

210 >>> buffer.clear() 

211 """ 

212 with self._lock: 

213 self._buffer.clear() 

214 self._total_samples = 0 

215 self._overflow_count = 0 

216 

217 def close(self) -> None: 

218 """Close buffer and release resources. 

219 

220 Example: 

221 >>> buffer.close() 

222 """ 

223 self.clear() 

224 

225 

226class RealtimeSource: 

227 """Base class for real-time data sources. 

228 

229 Subclass to implement custom data sources that feed the real-time 

230 buffer. Must implement the acquire method. 

231 

232 Example: 

233 >>> class CustomSource(RealtimeSource): 

234 ... def acquire(self) -> np.ndarray: 

235 ... # Get data from hardware 

236 ... return np.array([...]) 

237 """ 

238 

239 def acquire(self) -> NDArray[np.floating[Any]]: 

240 """Acquire samples from source. 

241 

242 Raises: 

243 NotImplementedError: Subclasses must implement. 

244 """ 

245 raise NotImplementedError("Subclasses must implement acquire()") 

246 

247 def start(self) -> None: 

248 """Start acquisition (optional). 

249 

250 Default implementation does nothing. 

251 """ 

252 pass 

253 

254 def stop(self) -> None: 

255 """Stop acquisition (optional). 

256 

257 Default implementation does nothing. 

258 """ 

259 pass 

260 

261 

262class RealtimeAnalyzer: 

263 """Analyzer for real-time streaming data. 

264 

265 Maintains rolling statistics and runs configurable analysis on 

266 incoming data chunks. 

267 

268 Example: 

269 >>> config = RealtimeConfig(sample_rate=1e6) 

270 >>> analyzer = RealtimeAnalyzer(config) 

271 >>> analyzer.accumulate(chunk) 

272 >>> stats = analyzer.get_statistics() 

273 """ 

274 

275 def __init__(self, config: RealtimeConfig) -> None: 

276 """Initialize real-time analyzer. 

277 

278 Args: 

279 config: Realtime configuration. 

280 """ 

281 config.validate() 

282 self.config = config 

283 

284 self._window_size = config.window_size or config.buffer_size 

285 self._samples: deque[float] = deque(maxlen=self._window_size) 

286 self._sum = 0.0 

287 self._sum_sq = 0.0 

288 self._min = float("inf") 

289 self._max = float("-inf") 

290 self._update_count = 0 

291 

292 def accumulate(self, data: NDArray[np.floating[Any]]) -> None: 

293 """Accumulate statistics from data chunk. 

294 

295 Args: 

296 data: Array of samples to process. 

297 

298 Raises: 

299 TypeError: If data is not numeric array. 

300 """ 

301 if not isinstance(data, np.ndarray): 

302 raise TypeError("data must be numpy array") 

303 

304 if data.dtype.kind not in "fc": 

305 raise TypeError("data must be float or complex array") 

306 

307 for sample in data.flat: 

308 sample_float = float(sample) 

309 

310 # Remove oldest sample from stats if window full 

311 if len(self._samples) == self._window_size: 

312 old_sample = self._samples[0] 

313 self._sum -= old_sample 

314 self._sum_sq -= old_sample**2 

315 

316 # Add new sample 

317 self._samples.append(sample_float) 

318 self._sum += sample_float 

319 self._sum_sq += sample_float**2 

320 self._min = min(self._min, sample_float) 

321 self._max = max(self._max, sample_float) 

322 self._update_count += 1 

323 

324 def get_statistics(self) -> dict[str, float]: 

325 """Get current rolling statistics. 

326 

327 Returns: 

328 Dictionary with mean, std, min, max, peak_to_peak. 

329 

330 Raises: 

331 ValueError: If no data accumulated. 

332 """ 

333 if len(self._samples) == 0: 

334 raise ValueError("No data accumulated yet") 

335 

336 n = len(self._samples) 

337 mean = self._sum / n 

338 variance = (self._sum_sq / n) - (mean**2) 

339 std = np.sqrt(max(0, variance)) 

340 

341 return { 

342 "mean": mean, 

343 "std": std, 

344 "min": self._min, 

345 "max": self._max, 

346 "peak_to_peak": self._max - self._min, 

347 "n_samples": n, 

348 } 

349 

350 def reset(self) -> None: 

351 """Reset accumulated statistics. 

352 

353 Example: 

354 >>> analyzer.reset() 

355 """ 

356 self._samples.clear() 

357 self._sum = 0.0 

358 self._sum_sq = 0.0 

359 self._min = float("inf") 

360 self._max = float("-inf") 

361 

362 

363class RealtimeStream: 

364 """High-level API for real-time data streaming and analysis. 

365 

366 Manages a data source, circular buffer, and analyzer for streaming 

367 waveform processing. 

368 

369 Example: 

370 >>> config = RealtimeConfig(sample_rate=1e6) 

371 >>> source = CustomSource() 

372 >>> stream = RealtimeStream(config, source) 

373 >>> stream.start() 

374 >>> for chunk in stream.iter_chunks(chunk_size=1000): 

375 ... print(chunk.data.mean()) 

376 >>> stream.stop() 

377 """ 

378 

379 def __init__( 

380 self, 

381 config: RealtimeConfig, 

382 source: RealtimeSource, 

383 on_chunk: Callable[[WaveformTrace], None] | None = None, 

384 ) -> None: 

385 """Initialize real-time stream. 

386 

387 Args: 

388 config: Realtime configuration. 

389 source: Data source for acquisition. 

390 on_chunk: Optional callback for each chunk acquired. 

391 """ 

392 config.validate() 

393 self.config = config 

394 self.source = source 

395 self._on_chunk = on_chunk 

396 

397 self._buffer = RealtimeBuffer(config) 

398 self._analyzer = RealtimeAnalyzer(config) 

399 self._is_running = False 

400 self._acquire_thread: threading.Thread | None = None 

401 self._chunk_count = 0 

402 

403 def start(self) -> None: 

404 """Start acquisition thread. 

405 

406 Example: 

407 >>> stream.start() 

408 """ 

409 if self._is_running: 409 ↛ 410line 409 didn't jump to line 410 because the condition on line 409 was never true

410 return 

411 

412 self._is_running = True 

413 self.source.start() 

414 

415 self._acquire_thread = threading.Thread(target=self._acquire_loop, daemon=True) 

416 self._acquire_thread.start() 

417 

418 def stop(self) -> None: 

419 """Stop acquisition thread. 

420 

421 Example: 

422 >>> stream.stop() 

423 """ 

424 if not self._is_running: 

425 return 

426 

427 self._is_running = False 

428 self.source.stop() 

429 

430 if self._acquire_thread is not None: 430 ↛ exitline 430 didn't return from function 'stop' because the condition on line 430 was always true

431 self._acquire_thread.join(timeout=5.0) 

432 

433 def iter_chunks(self) -> Generator[WaveformTrace, None, None]: 

434 """Iterate over data chunks as they arrive. 

435 

436 Yields chunks of configured size as data becomes available. 

437 

438 Yields: 

439 WaveformTrace chunks. 

440 

441 Raises: 

442 RuntimeError: If stream not started. 

443 

444 Example: 

445 >>> for chunk in stream.iter_chunks(): 

446 ... print(f"Chunk {chunk.metadata.start_index} has {len(chunk.data)} samples") 

447 """ 

448 if not self._is_running: 

449 raise RuntimeError("Stream not started") 

450 

451 sample_index = 0 

452 

453 while self._is_running: 453 ↛ exitline 453 didn't return from function 'iter_chunks' because the condition on line 453 was always true

454 try: 

455 data = self._buffer.read(self.config.chunk_size) 

456 

457 if len(data) > 0: 457 ↛ 453line 457 didn't jump to line 453 because the condition on line 457 was always true

458 # Accumulate statistics 

459 self._analyzer.accumulate(data) 

460 

461 # Create trace chunk 

462 metadata = TraceMetadata( 

463 sample_rate=self.config.sample_rate, 

464 ) 

465 

466 chunk = WaveformTrace(data=data, metadata=metadata) 

467 sample_index += len(data) 

468 self._chunk_count += 1 

469 

470 # Call callback if provided 

471 if self._on_chunk is not None: 

472 self._on_chunk(chunk) 

473 

474 yield chunk 

475 

476 except TimeoutError: 

477 # Check if stopped during timeout - _is_running may change asynchronously 

478 if not self._is_running: 

479 break # type: ignore[unreachable] 

480 continue 

481 

482 def get_statistics(self) -> dict[str, float]: 

483 """Get current statistics. 

484 

485 Returns: 

486 Dictionary with stream statistics. 

487 """ 

488 try: 

489 return self._analyzer.get_statistics() 

490 except ValueError: 

491 return { 

492 "mean": 0.0, 

493 "std": 0.0, 

494 "min": 0.0, 

495 "max": 0.0, 

496 "peak_to_peak": 0.0, 

497 "n_samples": 0, 

498 } 

499 

500 def get_buffer_stats(self) -> dict[str, int]: 

501 """Get buffer statistics. 

502 

503 Returns: 

504 Dictionary with buffer stats. 

505 """ 

506 return self._buffer.get_stats() 

507 

508 def get_chunk_count(self) -> int: 

509 """Get total number of chunks acquired. 

510 

511 Returns: 

512 Number of chunks yielded so far. 

513 """ 

514 return self._chunk_count 

515 

516 def _acquire_loop(self) -> None: 

517 """Background thread that acquires data from source.""" 

518 while self._is_running: 

519 try: 

520 data = self.source.acquire() 

521 if data is not None and len(data) > 0: 

522 self._buffer.write(data) 

523 except Exception: 

524 if self._is_running: 524 ↛ 518line 524 didn't jump to line 518 because the condition on line 524 was always true

525 time.sleep(0.001) 

526 

527 

528__all__ = [ 

529 "RealtimeAnalyzer", 

530 "RealtimeBuffer", 

531 "RealtimeConfig", 

532 "RealtimeSource", 

533 "RealtimeStream", 

534]