Coverage for src / tracekit / streaming / progressive.py: 95%

149 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Progressive streaming analysis with incremental confidence. 

2 

3Enables real-time analysis with confidence that increases as more data 

4is processed, allowing early stopping when confidence is sufficient. 

5""" 

6 

7from __future__ import annotations 

8 

9import logging 

10from collections import deque 

11from collections.abc import Callable 

12from dataclasses import dataclass 

13from typing import TYPE_CHECKING, Any 

14 

15import numpy as np 

16 

17from ..quality.scoring import assess_data_quality 

18 

19if TYPE_CHECKING: 

20 from numpy.typing import NDArray 

21 

22logger = logging.getLogger(__name__) 

23 

24 

25@dataclass 

26class StreamingProgress: 

27 """Progress update from streaming analysis. 

28 

29 Attributes: 

30 samples_processed: Number of samples processed so far 

31 total_samples: Total samples expected (None if unknown) 

32 confidence: Current confidence level (0-1) 

33 preliminary_results: Current analysis results 

34 is_complete: Whether analysis is complete 

35 can_stop_early: Whether confidence is sufficient for early stopping 

36 message: Human-readable status message 

37 """ 

38 

39 samples_processed: int 

40 total_samples: int | None 

41 confidence: float 

42 preliminary_results: dict[str, Any] 

43 is_complete: bool = False 

44 can_stop_early: bool = False 

45 message: str = "" 

46 

47 @property 

48 def progress_percent(self) -> float | None: 

49 """Get progress as percentage if total is known. 

50 

51 Returns: 

52 Progress percentage (0-100) or None if total unknown 

53 """ 

54 if self.total_samples: 

55 return 100.0 * self.samples_processed / self.total_samples 

56 return None 

57 

58 

59@dataclass 

60class StreamingConfig: 

61 """Configuration for streaming analysis. 

62 

63 Attributes: 

64 chunk_size: Samples per processing chunk 

65 overlap: Overlap ratio between chunks (0-1) 

66 min_samples_for_result: Minimum samples before generating results 

67 early_stop_confidence: Confidence threshold for early stopping 

68 max_buffer_size: Maximum buffer size to maintain 

69 update_interval_samples: Samples between progress updates 

70 """ 

71 

72 chunk_size: int = 1024 

73 overlap: float = 0.25 

74 min_samples_for_result: int = 100 

75 early_stop_confidence: float = 0.9 

76 max_buffer_size: int = 100000 

77 update_interval_samples: int = 512 

78 

79 

80class ProgressiveAnalyzer: 

81 """Streaming analyzer with progressive confidence updates. 

82 

83 Processes data in chunks and provides incremental updates with 

84 increasing confidence as more data is analyzed. Enables early 

85 stopping when confidence threshold is met. 

86 

87 API-004: Real-time streaming analysis 

88 QUAL-001: Quality scoring with progressive confidence 

89 

90 Example: 

91 >>> analyzer = ProgressiveAnalyzer(sample_rate=1000.0) 

92 >>> analyzer.subscribe(lambda p: print(f"Confidence: {p.confidence:.0%}")) 

93 >>> progress = analyzer.process_chunk(data_chunk) 

94 >>> if progress.can_stop_early: 

95 ... final = analyzer.finalize() 

96 """ 

97 

98 def __init__( 

99 self, 

100 sample_rate: float = 1.0, 

101 config: StreamingConfig | None = None, 

102 ): 

103 """Initialize progressive analyzer. 

104 

105 Args: 

106 sample_rate: Sample rate in Hz 

107 config: Streaming configuration 

108 """ 

109 self.sample_rate = sample_rate 

110 self.config = config or StreamingConfig() 

111 

112 # Internal state 

113 self._buffer: deque[float] = deque(maxlen=self.config.max_buffer_size) 

114 self._samples_processed = 0 

115 self._current_results: dict[str, Any] = {} 

116 self._confidence = 0.0 

117 self._callbacks: list[Callable[[StreamingProgress], None]] = [] 

118 

119 # Running statistics (incremental calculation) 

120 self._sum = 0.0 

121 self._sum_sq = 0.0 

122 self._min_val = float("inf") 

123 self._max_val = float("-inf") 

124 

125 # Frequency estimation state 

126 self._zero_crossings: list[int] = [] 

127 self._last_sign = 0 

128 

129 def reset(self) -> None: 

130 """Reset analyzer state to initial conditions.""" 

131 self._buffer.clear() 

132 self._samples_processed = 0 

133 self._current_results = {} 

134 self._confidence = 0.0 

135 self._sum = 0.0 

136 self._sum_sq = 0.0 

137 self._min_val = float("inf") 

138 self._max_val = float("-inf") 

139 self._zero_crossings = [] 

140 self._last_sign = 0 

141 

142 def subscribe(self, callback: Callable[[StreamingProgress], None]) -> None: 

143 """Subscribe to progress updates. 

144 

145 Args: 

146 callback: Function called with StreamingProgress updates 

147 """ 

148 self._callbacks.append(callback) 

149 

150 def process_chunk(self, chunk: NDArray[np.float64]) -> StreamingProgress: 

151 """Process a chunk of data. 

152 

153 Args: 

154 chunk: Data chunk to process 

155 

156 Returns: 

157 StreamingProgress with current state 

158 """ 

159 chunk_arr = np.asarray(chunk).flatten() 

160 

161 # Handle empty chunks 

162 if len(chunk_arr) == 0: 

163 return StreamingProgress( 

164 samples_processed=self._samples_processed, 

165 total_samples=None, 

166 confidence=self._confidence, 

167 preliminary_results=self._current_results.copy(), 

168 is_complete=False, 

169 can_stop_early=self._confidence >= self.config.early_stop_confidence, 

170 message=self._get_status_message(), 

171 ) 

172 

173 # Add to buffer 

174 for val in chunk_arr: 

175 self._buffer.append(val) 

176 

177 # Update running statistics 

178 self._update_statistics(chunk_arr) 

179 

180 # Update frequency estimation 

181 self._update_frequency_estimation(chunk_arr) 

182 

183 self._samples_processed += len(chunk_arr) 

184 

185 # Calculate current confidence 

186 self._update_confidence() 

187 

188 # Generate preliminary results 

189 self._update_results() 

190 

191 # Create progress update 

192 progress = StreamingProgress( 

193 samples_processed=self._samples_processed, 

194 total_samples=None, 

195 confidence=self._confidence, 

196 preliminary_results=self._current_results.copy(), 

197 is_complete=False, 

198 can_stop_early=self._confidence >= self.config.early_stop_confidence, 

199 message=self._get_status_message(), 

200 ) 

201 

202 # Notify subscribers 

203 if self._samples_processed % self.config.update_interval_samples < len(chunk_arr): 

204 self._notify(progress) 

205 

206 return progress 

207 

208 def finalize(self) -> StreamingProgress: 

209 """Finalize analysis and return final results. 

210 

211 Returns: 

212 Final StreamingProgress with complete results 

213 """ 

214 # Do final calculations with all buffered data 

215 if self._buffer: 215 ↛ 229line 215 didn't jump to line 229 because the condition on line 215 was always true

216 buffer_array = np.array(list(self._buffer)) 

217 

218 # Enhanced frequency detection with full buffer 

219 self._current_results["frequency_final"] = self._detect_frequency_fft(buffer_array) 

220 

221 # Final quality assessment 

222 data_quality = assess_data_quality(buffer_array, self.sample_rate) 

223 self._current_results["data_quality"] = { 

224 "snr_db": data_quality.snr_db, 

225 "sample_count": data_quality.sample_count, 

226 "completeness": data_quality.completeness, 

227 } 

228 

229 self._confidence = min(1.0, self._confidence * 1.1) # Boost for finalization 

230 

231 progress = StreamingProgress( 

232 samples_processed=self._samples_processed, 

233 total_samples=self._samples_processed, 

234 confidence=self._confidence, 

235 preliminary_results=self._current_results.copy(), 

236 is_complete=True, 

237 can_stop_early=False, 

238 message="Analysis complete", 

239 ) 

240 

241 self._notify(progress) 

242 return progress 

243 

244 def _update_statistics(self, chunk: NDArray[np.float64]) -> None: 

245 """Update running statistics with new chunk.""" 

246 self._sum += float(np.sum(chunk)) 

247 self._sum_sq += float(np.sum(chunk**2)) 

248 self._min_val = min(self._min_val, float(np.min(chunk))) 

249 self._max_val = max(self._max_val, float(np.max(chunk))) 

250 

251 def _update_frequency_estimation(self, chunk: NDArray[np.float64]) -> None: 

252 """Update frequency estimation from zero crossings.""" 

253 # Track zero crossings 

254 mean_val = self._sum / max(1, self._samples_processed) 

255 

256 for i, val in enumerate(chunk): 

257 current_sign = 1 if val > mean_val else -1 

258 if self._last_sign != 0 and current_sign != self._last_sign: 

259 self._zero_crossings.append(self._samples_processed - len(chunk) + i) 

260 self._last_sign = current_sign 

261 

262 def _update_confidence(self) -> None: 

263 """Update confidence based on data processed.""" 

264 # Base confidence from sample count (logarithmic) 

265 sample_factor = min(1.0, np.log10(max(1, self._samples_processed)) / 4.0) 

266 

267 # Frequency confidence from consistent zero crossings 

268 freq_factor = 0.5 

269 if len(self._zero_crossings) > 10: 

270 intervals = np.diff(self._zero_crossings) 

271 if len(intervals) > 5: 271 ↛ 276line 271 didn't jump to line 276 because the condition on line 271 was always true

272 cv = np.std(intervals) / (np.mean(intervals) + 1e-10) 

273 freq_factor = min(1.0, 1.0 - cv) 

274 

275 # Combine factors - ensure monotonic increase by taking max with previous 

276 new_confidence = 0.4 * sample_factor + 0.4 * freq_factor + 0.2 

277 self._confidence = max(self._confidence, new_confidence) 

278 

279 def _update_results(self) -> None: 

280 """Update preliminary results.""" 

281 n = self._samples_processed 

282 if n < self.config.min_samples_for_result: 

283 return 

284 

285 # Running mean and std 

286 mean = self._sum / n 

287 variance = (self._sum_sq / n) - (mean**2) 

288 std = np.sqrt(max(0, variance)) 

289 

290 self._current_results.update( 

291 { 

292 "mean": mean, 

293 "std": std, 

294 "min": self._min_val, 

295 "max": self._max_val, 

296 "amplitude": self._max_val - self._min_val, 

297 "sample_count": n, 

298 } 

299 ) 

300 

301 # Frequency from zero crossings 

302 if len(self._zero_crossings) > 4: 

303 intervals = np.diff(self._zero_crossings) 

304 avg_half_period = np.mean(intervals) / self.sample_rate 

305 if avg_half_period > 0: 305 ↛ exitline 305 didn't return from function '_update_results' because the condition on line 305 was always true

306 self._current_results["frequency_estimate"] = 0.5 / avg_half_period 

307 

308 def _detect_frequency_fft(self, data: NDArray[np.float64]) -> float | None: 

309 """Detect frequency using FFT on full buffer. 

310 

311 Args: 

312 data: Signal data 

313 

314 Returns: 

315 Dominant frequency in Hz or None if detection failed 

316 """ 

317 try: 

318 data_ac = data - np.mean(data) 

319 fft_result = np.fft.rfft(data_ac) 

320 freqs = np.fft.rfftfreq(len(data_ac), d=1.0 / self.sample_rate) 

321 magnitude = np.abs(fft_result[1:]) 

322 

323 if len(magnitude) > 0: 323 ↛ 328line 323 didn't jump to line 328 because the condition on line 323 was always true

324 peak_idx = np.argmax(magnitude) 

325 return float(freqs[1:][peak_idx]) 

326 except Exception: 

327 pass 

328 return None 

329 

330 def _get_status_message(self) -> str: 

331 """Generate status message. 

332 

333 Returns: 

334 Human-readable status string 

335 """ 

336 if self._samples_processed < self.config.min_samples_for_result: 

337 return f"Collecting data... ({self._samples_processed} samples)" 

338 

339 if self._confidence < 0.5: 339 ↛ 340line 339 didn't jump to line 340 because the condition on line 339 was never true

340 return f"Low confidence ({self._confidence:.0%}), collecting more data..." 

341 elif self._confidence < 0.8: 

342 return f"Medium confidence ({self._confidence:.0%}), analysis in progress" 

343 else: 

344 return f"High confidence ({self._confidence:.0%}), results reliable" 

345 

346 def _notify(self, progress: StreamingProgress) -> None: 

347 """Notify all subscribers. 

348 

349 Args: 

350 progress: Progress update to send 

351 """ 

352 for callback in self._callbacks: 

353 try: 

354 callback(progress) 

355 except Exception as e: 

356 logger.debug(f"Callback error: {e}") 

357 

358 

359def create_progressive_analyzer( 

360 sample_rate: float = 1.0, 

361 chunk_size: int = 1024, 

362 early_stop_confidence: float = 0.9, 

363) -> ProgressiveAnalyzer: 

364 """Create a progressive analyzer with common settings. 

365 

366 Args: 

367 sample_rate: Sample rate in Hz 

368 chunk_size: Samples per chunk 

369 early_stop_confidence: Confidence threshold for early stopping 

370 

371 Returns: 

372 Configured ProgressiveAnalyzer 

373 

374 Example: 

375 >>> analyzer = create_progressive_analyzer( 

376 ... sample_rate=1000.0, 

377 ... chunk_size=512, 

378 ... early_stop_confidence=0.85 

379 ... ) 

380 """ 

381 config = StreamingConfig( 

382 chunk_size=chunk_size, 

383 early_stop_confidence=early_stop_confidence, 

384 ) 

385 return ProgressiveAnalyzer(sample_rate, config) 

386 

387 

388__all__ = [ 

389 "ProgressiveAnalyzer", 

390 "StreamingConfig", 

391 "StreamingProgress", 

392 "create_progressive_analyzer", 

393]