Coverage for src / tracekit / streaming / progressive.py: 95%
149 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Progressive streaming analysis with incremental confidence.
3Enables real-time analysis with confidence that increases as more data
4is processed, allowing early stopping when confidence is sufficient.
5"""
7from __future__ import annotations
9import logging
10from collections import deque
11from collections.abc import Callable
12from dataclasses import dataclass
13from typing import TYPE_CHECKING, Any
15import numpy as np
17from ..quality.scoring import assess_data_quality
19if TYPE_CHECKING:
20 from numpy.typing import NDArray
22logger = logging.getLogger(__name__)
25@dataclass
26class StreamingProgress:
27 """Progress update from streaming analysis.
29 Attributes:
30 samples_processed: Number of samples processed so far
31 total_samples: Total samples expected (None if unknown)
32 confidence: Current confidence level (0-1)
33 preliminary_results: Current analysis results
34 is_complete: Whether analysis is complete
35 can_stop_early: Whether confidence is sufficient for early stopping
36 message: Human-readable status message
37 """
39 samples_processed: int
40 total_samples: int | None
41 confidence: float
42 preliminary_results: dict[str, Any]
43 is_complete: bool = False
44 can_stop_early: bool = False
45 message: str = ""
47 @property
48 def progress_percent(self) -> float | None:
49 """Get progress as percentage if total is known.
51 Returns:
52 Progress percentage (0-100) or None if total unknown
53 """
54 if self.total_samples:
55 return 100.0 * self.samples_processed / self.total_samples
56 return None
59@dataclass
60class StreamingConfig:
61 """Configuration for streaming analysis.
63 Attributes:
64 chunk_size: Samples per processing chunk
65 overlap: Overlap ratio between chunks (0-1)
66 min_samples_for_result: Minimum samples before generating results
67 early_stop_confidence: Confidence threshold for early stopping
68 max_buffer_size: Maximum buffer size to maintain
69 update_interval_samples: Samples between progress updates
70 """
72 chunk_size: int = 1024
73 overlap: float = 0.25
74 min_samples_for_result: int = 100
75 early_stop_confidence: float = 0.9
76 max_buffer_size: int = 100000
77 update_interval_samples: int = 512
80class ProgressiveAnalyzer:
81 """Streaming analyzer with progressive confidence updates.
83 Processes data in chunks and provides incremental updates with
84 increasing confidence as more data is analyzed. Enables early
85 stopping when confidence threshold is met.
87 API-004: Real-time streaming analysis
88 QUAL-001: Quality scoring with progressive confidence
90 Example:
91 >>> analyzer = ProgressiveAnalyzer(sample_rate=1000.0)
92 >>> analyzer.subscribe(lambda p: print(f"Confidence: {p.confidence:.0%}"))
93 >>> progress = analyzer.process_chunk(data_chunk)
94 >>> if progress.can_stop_early:
95 ... final = analyzer.finalize()
96 """
98 def __init__(
99 self,
100 sample_rate: float = 1.0,
101 config: StreamingConfig | None = None,
102 ):
103 """Initialize progressive analyzer.
105 Args:
106 sample_rate: Sample rate in Hz
107 config: Streaming configuration
108 """
109 self.sample_rate = sample_rate
110 self.config = config or StreamingConfig()
112 # Internal state
113 self._buffer: deque[float] = deque(maxlen=self.config.max_buffer_size)
114 self._samples_processed = 0
115 self._current_results: dict[str, Any] = {}
116 self._confidence = 0.0
117 self._callbacks: list[Callable[[StreamingProgress], None]] = []
119 # Running statistics (incremental calculation)
120 self._sum = 0.0
121 self._sum_sq = 0.0
122 self._min_val = float("inf")
123 self._max_val = float("-inf")
125 # Frequency estimation state
126 self._zero_crossings: list[int] = []
127 self._last_sign = 0
129 def reset(self) -> None:
130 """Reset analyzer state to initial conditions."""
131 self._buffer.clear()
132 self._samples_processed = 0
133 self._current_results = {}
134 self._confidence = 0.0
135 self._sum = 0.0
136 self._sum_sq = 0.0
137 self._min_val = float("inf")
138 self._max_val = float("-inf")
139 self._zero_crossings = []
140 self._last_sign = 0
142 def subscribe(self, callback: Callable[[StreamingProgress], None]) -> None:
143 """Subscribe to progress updates.
145 Args:
146 callback: Function called with StreamingProgress updates
147 """
148 self._callbacks.append(callback)
150 def process_chunk(self, chunk: NDArray[np.float64]) -> StreamingProgress:
151 """Process a chunk of data.
153 Args:
154 chunk: Data chunk to process
156 Returns:
157 StreamingProgress with current state
158 """
159 chunk_arr = np.asarray(chunk).flatten()
161 # Handle empty chunks
162 if len(chunk_arr) == 0:
163 return StreamingProgress(
164 samples_processed=self._samples_processed,
165 total_samples=None,
166 confidence=self._confidence,
167 preliminary_results=self._current_results.copy(),
168 is_complete=False,
169 can_stop_early=self._confidence >= self.config.early_stop_confidence,
170 message=self._get_status_message(),
171 )
173 # Add to buffer
174 for val in chunk_arr:
175 self._buffer.append(val)
177 # Update running statistics
178 self._update_statistics(chunk_arr)
180 # Update frequency estimation
181 self._update_frequency_estimation(chunk_arr)
183 self._samples_processed += len(chunk_arr)
185 # Calculate current confidence
186 self._update_confidence()
188 # Generate preliminary results
189 self._update_results()
191 # Create progress update
192 progress = StreamingProgress(
193 samples_processed=self._samples_processed,
194 total_samples=None,
195 confidence=self._confidence,
196 preliminary_results=self._current_results.copy(),
197 is_complete=False,
198 can_stop_early=self._confidence >= self.config.early_stop_confidence,
199 message=self._get_status_message(),
200 )
202 # Notify subscribers
203 if self._samples_processed % self.config.update_interval_samples < len(chunk_arr):
204 self._notify(progress)
206 return progress
208 def finalize(self) -> StreamingProgress:
209 """Finalize analysis and return final results.
211 Returns:
212 Final StreamingProgress with complete results
213 """
214 # Do final calculations with all buffered data
215 if self._buffer: 215 ↛ 229line 215 didn't jump to line 229 because the condition on line 215 was always true
216 buffer_array = np.array(list(self._buffer))
218 # Enhanced frequency detection with full buffer
219 self._current_results["frequency_final"] = self._detect_frequency_fft(buffer_array)
221 # Final quality assessment
222 data_quality = assess_data_quality(buffer_array, self.sample_rate)
223 self._current_results["data_quality"] = {
224 "snr_db": data_quality.snr_db,
225 "sample_count": data_quality.sample_count,
226 "completeness": data_quality.completeness,
227 }
229 self._confidence = min(1.0, self._confidence * 1.1) # Boost for finalization
231 progress = StreamingProgress(
232 samples_processed=self._samples_processed,
233 total_samples=self._samples_processed,
234 confidence=self._confidence,
235 preliminary_results=self._current_results.copy(),
236 is_complete=True,
237 can_stop_early=False,
238 message="Analysis complete",
239 )
241 self._notify(progress)
242 return progress
244 def _update_statistics(self, chunk: NDArray[np.float64]) -> None:
245 """Update running statistics with new chunk."""
246 self._sum += float(np.sum(chunk))
247 self._sum_sq += float(np.sum(chunk**2))
248 self._min_val = min(self._min_val, float(np.min(chunk)))
249 self._max_val = max(self._max_val, float(np.max(chunk)))
251 def _update_frequency_estimation(self, chunk: NDArray[np.float64]) -> None:
252 """Update frequency estimation from zero crossings."""
253 # Track zero crossings
254 mean_val = self._sum / max(1, self._samples_processed)
256 for i, val in enumerate(chunk):
257 current_sign = 1 if val > mean_val else -1
258 if self._last_sign != 0 and current_sign != self._last_sign:
259 self._zero_crossings.append(self._samples_processed - len(chunk) + i)
260 self._last_sign = current_sign
262 def _update_confidence(self) -> None:
263 """Update confidence based on data processed."""
264 # Base confidence from sample count (logarithmic)
265 sample_factor = min(1.0, np.log10(max(1, self._samples_processed)) / 4.0)
267 # Frequency confidence from consistent zero crossings
268 freq_factor = 0.5
269 if len(self._zero_crossings) > 10:
270 intervals = np.diff(self._zero_crossings)
271 if len(intervals) > 5: 271 ↛ 276line 271 didn't jump to line 276 because the condition on line 271 was always true
272 cv = np.std(intervals) / (np.mean(intervals) + 1e-10)
273 freq_factor = min(1.0, 1.0 - cv)
275 # Combine factors - ensure monotonic increase by taking max with previous
276 new_confidence = 0.4 * sample_factor + 0.4 * freq_factor + 0.2
277 self._confidence = max(self._confidence, new_confidence)
279 def _update_results(self) -> None:
280 """Update preliminary results."""
281 n = self._samples_processed
282 if n < self.config.min_samples_for_result:
283 return
285 # Running mean and std
286 mean = self._sum / n
287 variance = (self._sum_sq / n) - (mean**2)
288 std = np.sqrt(max(0, variance))
290 self._current_results.update(
291 {
292 "mean": mean,
293 "std": std,
294 "min": self._min_val,
295 "max": self._max_val,
296 "amplitude": self._max_val - self._min_val,
297 "sample_count": n,
298 }
299 )
301 # Frequency from zero crossings
302 if len(self._zero_crossings) > 4:
303 intervals = np.diff(self._zero_crossings)
304 avg_half_period = np.mean(intervals) / self.sample_rate
305 if avg_half_period > 0: 305 ↛ exitline 305 didn't return from function '_update_results' because the condition on line 305 was always true
306 self._current_results["frequency_estimate"] = 0.5 / avg_half_period
308 def _detect_frequency_fft(self, data: NDArray[np.float64]) -> float | None:
309 """Detect frequency using FFT on full buffer.
311 Args:
312 data: Signal data
314 Returns:
315 Dominant frequency in Hz or None if detection failed
316 """
317 try:
318 data_ac = data - np.mean(data)
319 fft_result = np.fft.rfft(data_ac)
320 freqs = np.fft.rfftfreq(len(data_ac), d=1.0 / self.sample_rate)
321 magnitude = np.abs(fft_result[1:])
323 if len(magnitude) > 0: 323 ↛ 328line 323 didn't jump to line 328 because the condition on line 323 was always true
324 peak_idx = np.argmax(magnitude)
325 return float(freqs[1:][peak_idx])
326 except Exception:
327 pass
328 return None
330 def _get_status_message(self) -> str:
331 """Generate status message.
333 Returns:
334 Human-readable status string
335 """
336 if self._samples_processed < self.config.min_samples_for_result:
337 return f"Collecting data... ({self._samples_processed} samples)"
339 if self._confidence < 0.5: 339 ↛ 340line 339 didn't jump to line 340 because the condition on line 339 was never true
340 return f"Low confidence ({self._confidence:.0%}), collecting more data..."
341 elif self._confidence < 0.8:
342 return f"Medium confidence ({self._confidence:.0%}), analysis in progress"
343 else:
344 return f"High confidence ({self._confidence:.0%}), results reliable"
346 def _notify(self, progress: StreamingProgress) -> None:
347 """Notify all subscribers.
349 Args:
350 progress: Progress update to send
351 """
352 for callback in self._callbacks:
353 try:
354 callback(progress)
355 except Exception as e:
356 logger.debug(f"Callback error: {e}")
359def create_progressive_analyzer(
360 sample_rate: float = 1.0,
361 chunk_size: int = 1024,
362 early_stop_confidence: float = 0.9,
363) -> ProgressiveAnalyzer:
364 """Create a progressive analyzer with common settings.
366 Args:
367 sample_rate: Sample rate in Hz
368 chunk_size: Samples per chunk
369 early_stop_confidence: Confidence threshold for early stopping
371 Returns:
372 Configured ProgressiveAnalyzer
374 Example:
375 >>> analyzer = create_progressive_analyzer(
376 ... sample_rate=1000.0,
377 ... chunk_size=512,
378 ... early_stop_confidence=0.85
379 ... )
380 """
381 config = StreamingConfig(
382 chunk_size=chunk_size,
383 early_stop_confidence=early_stop_confidence,
384 )
385 return ProgressiveAnalyzer(sample_rate, config)
388__all__ = [
389 "ProgressiveAnalyzer",
390 "StreamingConfig",
391 "StreamingProgress",
392 "create_progressive_analyzer",
393]