Coverage for src / tracekit / streaming / realtime.py: 96%
208 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Real-time streaming APIs for live data acquisition and processing.
3This module provides interfaces for real-time data capture, buffering, and
4on-the-fly analysis of streaming waveforms. Supports pluggable input sources,
5configurable sample buffers, and streaming statistics.
6"""
8from __future__ import annotations
10import threading
11import time
12from collections import deque
13from collections.abc import Callable
14from dataclasses import dataclass
15from typing import TYPE_CHECKING, Any
17import numpy as np
19from ..core.types import TraceMetadata, WaveformTrace
21if TYPE_CHECKING:
22 from collections.abc import Generator
24 from numpy.typing import NDArray
27@dataclass
28class RealtimeConfig:
29 """Configuration for real-time streaming."""
31 sample_rate: float
32 """Sample rate in Hz."""
33 buffer_size: int = 10000
34 """Size of the circular buffer in samples."""
35 chunk_size: int = 1000
36 """Number of samples to yield per chunk."""
37 timeout: float = 10.0
38 """Timeout in seconds for buffer operations."""
39 window_size: int | None = None
40 """Window size for rolling statistics. If None, uses buffer_size."""
41 enable_validation: bool = True
42 """Enable input validation."""
44 def validate(self) -> None:
45 """Validate configuration parameters.
47 Raises:
48 ValueError: If configuration is invalid.
49 """
50 if self.sample_rate <= 0:
51 raise ValueError("sample_rate must be positive")
53 if self.buffer_size <= 0:
54 raise ValueError("buffer_size must be positive")
56 if self.chunk_size <= 0:
57 raise ValueError("chunk_size must be positive")
59 if self.chunk_size > self.buffer_size:
60 raise ValueError("chunk_size cannot exceed buffer_size")
62 if self.timeout <= 0:
63 raise ValueError("timeout must be positive")
65 if self.window_size is not None and self.window_size <= 0:
66 raise ValueError("window_size must be positive")
69class RealtimeBuffer:
70 """Thread-safe circular buffer for real-time streaming.
72 Maintains a fixed-size buffer of the most recent samples with
73 thread-safe read/write operations and overflow handling.
75 Example:
76 >>> buffer = RealtimeBuffer(config)
77 >>> buffer.write(samples)
78 >>> chunk = buffer.read(chunk_size)
79 """
81 def __init__(self, config: RealtimeConfig) -> None:
82 """Initialize real-time buffer.
84 Args:
85 config: Realtime configuration.
86 """
87 config.validate()
88 self.config = config
90 self._buffer: deque[float] = deque(maxlen=config.buffer_size)
91 self._lock = threading.RLock()
92 self._not_empty = threading.Condition(self._lock)
93 self._total_samples = 0
94 self._overflow_count = 0
96 def write(self, data: NDArray[np.floating[Any]]) -> int:
97 """Write samples to buffer.
99 Args:
100 data: Array of samples to write.
102 Returns:
103 Number of samples written.
105 Raises:
106 TypeError: If data is not numeric array.
107 """
108 if not isinstance(data, np.ndarray):
109 raise TypeError("data must be numpy array")
111 if data.dtype.kind not in "fc": # float or complex
112 raise TypeError("data must be float or complex array")
114 with self._not_empty:
115 initial_len = len(self._buffer)
117 # Write samples to buffer
118 for sample in data.flat:
119 self._buffer.append(float(sample))
121 # Track overflow
122 if len(self._buffer) == self.config.buffer_size:
123 written = self.config.buffer_size - initial_len
124 if written < len(data):
125 self._overflow_count += len(data) - written
126 else:
127 written = len(data)
129 self._total_samples += len(data)
130 self._not_empty.notify_all()
132 return written
134 def read(self, n_samples: int, timeout: float | None = None) -> NDArray[np.float64]:
135 """Read samples from buffer (blocking).
137 Args:
138 n_samples: Number of samples to read.
139 timeout: Timeout in seconds (None = use config timeout).
141 Returns:
142 Array of samples, may be shorter if timeout occurs.
144 Raises:
145 ValueError: If n_samples is invalid.
146 TimeoutError: If timeout occurs without sufficient data.
147 """
148 if n_samples <= 0:
149 raise ValueError("n_samples must be positive")
151 if timeout is None:
152 timeout = self.config.timeout
154 with self._not_empty:
155 # Wait for data with timeout
156 if not self._wait_for_data(n_samples, timeout):
157 if len(self._buffer) == 0:
158 raise TimeoutError("No data available")
160 # Read available samples
161 n_read = min(n_samples, len(self._buffer))
162 data = np.array(list(self._buffer)[:n_read], dtype=np.float64)
163 return data
165 def _wait_for_data(self, n_samples: int, timeout: float) -> bool:
166 """Wait for minimum data in buffer.
168 Args:
169 n_samples: Minimum samples to wait for.
170 timeout: Timeout in seconds.
172 Returns:
173 True if sufficient data available, False on timeout.
174 """
175 deadline = time.time() + timeout
176 while len(self._buffer) < n_samples:
177 remaining = deadline - time.time()
178 if remaining <= 0:
179 return False
180 if not self._not_empty.wait(timeout=min(remaining, 0.1)):
181 continue
182 return True
184 def get_available(self) -> int:
185 """Get number of available samples in buffer.
187 Returns:
188 Number of samples currently in buffer.
189 """
190 with self._lock:
191 return len(self._buffer)
193 def get_stats(self) -> dict[str, int]:
194 """Get buffer statistics.
196 Returns:
197 Dictionary with buffer stats (total_samples, overflow_count, available).
198 """
199 with self._lock:
200 return {
201 "total_samples": self._total_samples,
202 "overflow_count": self._overflow_count,
203 "available": len(self._buffer),
204 }
206 def clear(self) -> None:
207 """Clear buffer contents.
209 Example:
210 >>> buffer.clear()
211 """
212 with self._lock:
213 self._buffer.clear()
214 self._total_samples = 0
215 self._overflow_count = 0
217 def close(self) -> None:
218 """Close buffer and release resources.
220 Example:
221 >>> buffer.close()
222 """
223 self.clear()
226class RealtimeSource:
227 """Base class for real-time data sources.
229 Subclass to implement custom data sources that feed the real-time
230 buffer. Must implement the acquire method.
232 Example:
233 >>> class CustomSource(RealtimeSource):
234 ... def acquire(self) -> np.ndarray:
235 ... # Get data from hardware
236 ... return np.array([...])
237 """
239 def acquire(self) -> NDArray[np.floating[Any]]:
240 """Acquire samples from source.
242 Raises:
243 NotImplementedError: Subclasses must implement.
244 """
245 raise NotImplementedError("Subclasses must implement acquire()")
247 def start(self) -> None:
248 """Start acquisition (optional).
250 Default implementation does nothing.
251 """
252 pass
254 def stop(self) -> None:
255 """Stop acquisition (optional).
257 Default implementation does nothing.
258 """
259 pass
262class RealtimeAnalyzer:
263 """Analyzer for real-time streaming data.
265 Maintains rolling statistics and runs configurable analysis on
266 incoming data chunks.
268 Example:
269 >>> config = RealtimeConfig(sample_rate=1e6)
270 >>> analyzer = RealtimeAnalyzer(config)
271 >>> analyzer.accumulate(chunk)
272 >>> stats = analyzer.get_statistics()
273 """
275 def __init__(self, config: RealtimeConfig) -> None:
276 """Initialize real-time analyzer.
278 Args:
279 config: Realtime configuration.
280 """
281 config.validate()
282 self.config = config
284 self._window_size = config.window_size or config.buffer_size
285 self._samples: deque[float] = deque(maxlen=self._window_size)
286 self._sum = 0.0
287 self._sum_sq = 0.0
288 self._min = float("inf")
289 self._max = float("-inf")
290 self._update_count = 0
292 def accumulate(self, data: NDArray[np.floating[Any]]) -> None:
293 """Accumulate statistics from data chunk.
295 Args:
296 data: Array of samples to process.
298 Raises:
299 TypeError: If data is not numeric array.
300 """
301 if not isinstance(data, np.ndarray):
302 raise TypeError("data must be numpy array")
304 if data.dtype.kind not in "fc":
305 raise TypeError("data must be float or complex array")
307 for sample in data.flat:
308 sample_float = float(sample)
310 # Remove oldest sample from stats if window full
311 if len(self._samples) == self._window_size:
312 old_sample = self._samples[0]
313 self._sum -= old_sample
314 self._sum_sq -= old_sample**2
316 # Add new sample
317 self._samples.append(sample_float)
318 self._sum += sample_float
319 self._sum_sq += sample_float**2
320 self._min = min(self._min, sample_float)
321 self._max = max(self._max, sample_float)
322 self._update_count += 1
324 def get_statistics(self) -> dict[str, float]:
325 """Get current rolling statistics.
327 Returns:
328 Dictionary with mean, std, min, max, peak_to_peak.
330 Raises:
331 ValueError: If no data accumulated.
332 """
333 if len(self._samples) == 0:
334 raise ValueError("No data accumulated yet")
336 n = len(self._samples)
337 mean = self._sum / n
338 variance = (self._sum_sq / n) - (mean**2)
339 std = np.sqrt(max(0, variance))
341 return {
342 "mean": mean,
343 "std": std,
344 "min": self._min,
345 "max": self._max,
346 "peak_to_peak": self._max - self._min,
347 "n_samples": n,
348 }
350 def reset(self) -> None:
351 """Reset accumulated statistics.
353 Example:
354 >>> analyzer.reset()
355 """
356 self._samples.clear()
357 self._sum = 0.0
358 self._sum_sq = 0.0
359 self._min = float("inf")
360 self._max = float("-inf")
363class RealtimeStream:
364 """High-level API for real-time data streaming and analysis.
366 Manages a data source, circular buffer, and analyzer for streaming
367 waveform processing.
369 Example:
370 >>> config = RealtimeConfig(sample_rate=1e6)
371 >>> source = CustomSource()
372 >>> stream = RealtimeStream(config, source)
373 >>> stream.start()
374 >>> for chunk in stream.iter_chunks(chunk_size=1000):
375 ... print(chunk.data.mean())
376 >>> stream.stop()
377 """
379 def __init__(
380 self,
381 config: RealtimeConfig,
382 source: RealtimeSource,
383 on_chunk: Callable[[WaveformTrace], None] | None = None,
384 ) -> None:
385 """Initialize real-time stream.
387 Args:
388 config: Realtime configuration.
389 source: Data source for acquisition.
390 on_chunk: Optional callback for each chunk acquired.
391 """
392 config.validate()
393 self.config = config
394 self.source = source
395 self._on_chunk = on_chunk
397 self._buffer = RealtimeBuffer(config)
398 self._analyzer = RealtimeAnalyzer(config)
399 self._is_running = False
400 self._acquire_thread: threading.Thread | None = None
401 self._chunk_count = 0
403 def start(self) -> None:
404 """Start acquisition thread.
406 Example:
407 >>> stream.start()
408 """
409 if self._is_running: 409 ↛ 410line 409 didn't jump to line 410 because the condition on line 409 was never true
410 return
412 self._is_running = True
413 self.source.start()
415 self._acquire_thread = threading.Thread(target=self._acquire_loop, daemon=True)
416 self._acquire_thread.start()
418 def stop(self) -> None:
419 """Stop acquisition thread.
421 Example:
422 >>> stream.stop()
423 """
424 if not self._is_running:
425 return
427 self._is_running = False
428 self.source.stop()
430 if self._acquire_thread is not None: 430 ↛ exitline 430 didn't return from function 'stop' because the condition on line 430 was always true
431 self._acquire_thread.join(timeout=5.0)
433 def iter_chunks(self) -> Generator[WaveformTrace, None, None]:
434 """Iterate over data chunks as they arrive.
436 Yields chunks of configured size as data becomes available.
438 Yields:
439 WaveformTrace chunks.
441 Raises:
442 RuntimeError: If stream not started.
444 Example:
445 >>> for chunk in stream.iter_chunks():
446 ... print(f"Chunk {chunk.metadata.start_index} has {len(chunk.data)} samples")
447 """
448 if not self._is_running:
449 raise RuntimeError("Stream not started")
451 sample_index = 0
453 while self._is_running: 453 ↛ exitline 453 didn't return from function 'iter_chunks' because the condition on line 453 was always true
454 try:
455 data = self._buffer.read(self.config.chunk_size)
457 if len(data) > 0: 457 ↛ 453line 457 didn't jump to line 453 because the condition on line 457 was always true
458 # Accumulate statistics
459 self._analyzer.accumulate(data)
461 # Create trace chunk
462 metadata = TraceMetadata(
463 sample_rate=self.config.sample_rate,
464 )
466 chunk = WaveformTrace(data=data, metadata=metadata)
467 sample_index += len(data)
468 self._chunk_count += 1
470 # Call callback if provided
471 if self._on_chunk is not None:
472 self._on_chunk(chunk)
474 yield chunk
476 except TimeoutError:
477 # Check if stopped during timeout - _is_running may change asynchronously
478 if not self._is_running:
479 break # type: ignore[unreachable]
480 continue
482 def get_statistics(self) -> dict[str, float]:
483 """Get current statistics.
485 Returns:
486 Dictionary with stream statistics.
487 """
488 try:
489 return self._analyzer.get_statistics()
490 except ValueError:
491 return {
492 "mean": 0.0,
493 "std": 0.0,
494 "min": 0.0,
495 "max": 0.0,
496 "peak_to_peak": 0.0,
497 "n_samples": 0,
498 }
500 def get_buffer_stats(self) -> dict[str, int]:
501 """Get buffer statistics.
503 Returns:
504 Dictionary with buffer stats.
505 """
506 return self._buffer.get_stats()
508 def get_chunk_count(self) -> int:
509 """Get total number of chunks acquired.
511 Returns:
512 Number of chunks yielded so far.
513 """
514 return self._chunk_count
516 def _acquire_loop(self) -> None:
517 """Background thread that acquires data from source."""
518 while self._is_running:
519 try:
520 data = self.source.acquire()
521 if data is not None and len(data) > 0:
522 self._buffer.write(data)
523 except Exception:
524 if self._is_running: 524 ↛ 518line 524 didn't jump to line 518 because the condition on line 524 was always true
525 time.sleep(0.001)
528__all__ = [
529 "RealtimeAnalyzer",
530 "RealtimeBuffer",
531 "RealtimeConfig",
532 "RealtimeSource",
533 "RealtimeStream",
534]